2#include <ossia/dataflow/exec_pool.hpp>
3#include <ossia/dataflow/graph_node.hpp>
4#include <ossia/detail/fmt.hpp>
13using task_function = smallfun::function<void(ossia::graph_node&),
sizeof(
void*) * 4>;
24class task :
public ossia::pool_task
28 task(
const task&) =
delete;
29 task(task&& other) noexcept
31 , m_taskId{other.m_taskId}
32 , m_dependencies{other.m_dependencies}
33 , m_remaining_dependencies{other.m_remaining_dependencies.load()}
34 , m_node{other.m_node}
35 , m_exec{other.m_exec}
36 , m_precedes{std::move(other.m_precedes)}
38 other.m_precedes.clear();
40 task& operator=(
const task&) =
delete;
41 task& operator=(task&& other)
noexcept
43 static_cast<pool_task&
>(*this) = other;
44 m_taskId = other.m_taskId;
45 m_dependencies = other.m_dependencies;
46 m_remaining_dependencies = other.m_remaining_dependencies.load();
47 m_node = other.m_node;
48 m_exec = other.m_exec;
49 m_precedes = std::move(other.m_precedes);
50 other.m_precedes.clear();
54 task(ossia::graph_node& node)
59 void precede(task& other)
61 m_precedes.push_back(other.m_taskId);
62 other.m_dependencies++;
66 friend class taskflow;
67 friend class executor;
70 int m_dependencies{0};
71 std::atomic_int m_remaining_dependencies{};
73 ossia::graph_node* m_node{};
75 ossia::small_pod_vector<int, 4> m_precedes;
81 void clear() { m_tasks.clear(); }
83 void reserve(std::size_t sz) { m_tasks.reserve(sz); }
85 task* emplace(ossia::graph_node& node)
87 const int taskId = m_tasks.size();
88 auto& last = m_tasks.emplace_back(node);
89 last.m_taskId = taskId;
94 friend class executor;
96 std::vector<task> m_tasks;
104 executor() =
default;
106 void set_task_executor(task_function f) { m_func = std::move(f); }
108 void run(taskflow& tf)
111 if(tf.m_tasks.empty())
114 auto& pool = ossia::task_pool::instance();
116 m_batch.remaining.store(
117 static_cast<int>(tf.m_tasks.size()), std::memory_order_relaxed);
119 for(
auto& task : tf.m_tasks)
121 task.execute = &executor::run_task;
123 task.m_remaining_dependencies.store(
124 task.m_dependencies, std::memory_order_relaxed);
127 for(
auto& task : tf.m_tasks)
129 if(task.m_dependencies == 0)
133 pool.corun_until(m_batch);
138 static void run_task(ossia::pool_task* pt)
noexcept
140 auto* t =
static_cast<task*
>(pt);
141 t->m_exec->execute(*t);
144 void enqueue(task& t)
146 auto& pool = ossia::task_pool::instance();
147 if(t.m_node->not_threadable())
148 pool.submit_owner_only(&t);
153 void execute(task& t)
noexcept
159 catch(
const std::exception& e)
162 stderr,
"ossia: error executing node '{}': {}\n", t.m_node->label(),
168 stderr,
"ossia: error executing node '{}'\n", t.m_node->label());
174 void process_done(task& t)
noexcept
176 auto& pool = ossia::task_pool::instance();
177 for(
int taskId : t.m_precedes)
179 auto& next = m_tf->m_tasks[taskId];
182 if(next.m_remaining_dependencies.fetch_sub(1, std::memory_order_acq_rel)
188 pool.finish(m_batch);
191 task_function m_func;
193 ossia::task_batch m_batch;
197#include <ossia/dataflow/graph/graph_static.hpp>
198#include <ossia/detail/hash_map.hpp>
201struct custom_parallel_exec;
202template <
typename Impl>
203struct custom_parallel_update
206 std::shared_ptr<ossia::logger_type>
logger;
207 std::shared_ptr<bench_map> perf_map;
209 template <
typename Graph_T>
210 custom_parallel_update(Graph_T& g,
const ossia::graph_setup_options& opt)
216 ossia::node_map& nodes,
const std::vector<graph_node*>& topo_order,
217 ossia::graph_t& graph)
222 flow_graph.reserve(nodes.size());
228 executor.set_task_executor(
229 node_exec_logger_bench{cur_state, *perf_map, *
logger});
230 for(
auto node : topo_order)
232 (*perf_map)[node] = std::nullopt;
233 flow_nodes[node] = flow_graph.emplace(*node);
238 executor.set_task_executor(node_exec_logger{cur_state, *
logger});
239 for(
auto node : topo_order)
241 flow_nodes[node] = flow_graph.emplace(*node);
247 executor.set_task_executor(node_exec{cur_state});
248 for(
auto node : topo_order)
250 flow_nodes[node] = flow_graph.emplace(*node);
254 for(
auto [ei, ei_end] = boost::edges(graph); ei != ei_end; ++ei)
257 auto& n1 = graph[edge.m_source];
258 auto& n2 = graph[edge.m_target];
260 auto& sender = flow_nodes[n2.get()];
261 auto& receiver = flow_nodes[n1.get()];
262 sender->precede(*receiver);
266 template <
typename Graph_T,
typename DevicesT>
267 void operator()(Graph_T& g,
const DevicesT& devices)
270 update_graph(g.m_nodes, g.m_all_nodes, impl.m_sub_graph);
274 friend struct custom_parallel_exec;
277 execution_state* cur_state{};
279 ossia::taskflow flow_graph;
280 ossia::executor executor;
281 ossia::hash_map<graph_node*, ossia::task*> flow_nodes;
284struct custom_parallel_exec
286 template <
typename Graph_T>
287 custom_parallel_exec(Graph_T&)
291 template <
typename Graph_T,
typename Impl>
293 Graph_T& g, custom_parallel_update<Impl>& self, ossia::execution_state& e,
294 const std::vector<ossia::graph_node*>&)
297 self.executor.run(self.flow_graph);
301using custom_parallel_tc_graph
302 = graph_static<custom_parallel_update<tc_update<fast_tc>>, custom_parallel_exec>;
spdlog::logger & logger() noexcept
Where the errors will be logged. Default is stderr.
Definition context.cpp:118