2#include <ossia/dataflow/graph_node.hpp>
3#include <ossia/detail/disable_fpe.hpp>
4#include <ossia/detail/fmt.hpp>
5#include <ossia/detail/lockfree_queue.hpp>
6#include <ossia/detail/thread.hpp>
8#include <boost/container/static_vector.hpp>
10#include <blockingconcurrentqueue.h>
11#include <concurrentqueue.h>
12#include <smallfun.hpp>
16#define DISABLE_DONE_TASKS
24using task_function = smallfun::function<void(ossia::graph_node&),
sizeof(
void*) * 4>;
32 task(
const task&) =
delete;
33 task(task&& other) noexcept
34 : m_taskId{other.m_taskId}
35 , m_dependencies{other.m_dependencies}
36 , m_remaining_dependencies{other.m_remaining_dependencies.load()}
37 , m_node{other.m_node}
38 , m_precedes{std::move(other.m_precedes)}
39#if defined(CHECK_FOLLOWS)
40 , m_follows{std::move(other.m_follows)}
43 other.m_precedes.clear();
44#if defined(CHECK_FOLLOWS)
45 other.m_follows.clear();
48 task& operator=(
const task&) =
delete;
49 task& operator=(task&& other)
noexcept
51 m_taskId = other.m_taskId;
52 m_dependencies = other.m_dependencies;
53 m_remaining_dependencies = other.m_remaining_dependencies.load();
54 m_node = other.m_node;
55 m_precedes = std::move(other.m_precedes);
56 other.m_precedes.clear();
57#if defined(CHECK_FOLLOWS)
58 m_follows = std::move(other.m_follows);
59 other.m_follows.clear();
64 task(ossia::graph_node& node)
69 void precede(task& other)
71 m_precedes.push_back(other.m_taskId);
72#if defined(CHECK_FOLLOWS)
73 other.m_follows.push_back(m_taskId);
75 other.m_dependencies++;
79 friend class taskflow;
80 friend class executor;
83 int m_dependencies{0};
84 std::atomic_int m_remaining_dependencies{};
85 std::atomic_bool m_executed{};
87 ossia::graph_node* m_node{};
88 ossia::small_pod_vector<int, 4> m_precedes;
89#if defined(CHECK_FOLLOWS)
90 ossia::small_pod_vector<int, 4> m_follows;
97 void clear() { m_tasks.clear(); }
99 void reserve(std::size_t sz) { m_tasks.reserve(sz); }
101 task* emplace(ossia::graph_node& node)
103 const int taskId = m_tasks.size();
104 auto& last = m_tasks.emplace_back(node);
105 last.m_taskId = taskId;
110 friend class executor;
112 std::vector<task> m_tasks;
118 explicit executor(
int nthreads)
121 m_threads.resize(nthreads);
123 for(
auto& t : m_threads)
125 t = std::thread{[
this, k = k++] {
126 while(!m_startFlag.test())
127 std::this_thread::yield();
129 ossia::set_thread_name(m_threads[k],
"ossia exec " + std::to_string(k));
130 ossia::set_thread_realtime(m_threads[k], 95);
131 ossia::set_thread_pinned(ossia::thread_type::AudioTask, k);
133 ossia::disable_fpe();
137 if(m_tasks.wait_dequeue_timed(t, 100))
145 m_startFlag.test_and_set();
151 for(
auto& t : m_threads)
157 void set_task_executor(task_function f) { m_func = std::move(f); }
159 void enqueue_task(task& task)
161 if(task.m_node->not_threadable())
164 m_not_threadsafe_tasks.enqueue(&task);
169 m_tasks.enqueue(&task);
173 void run(taskflow& tf)
176 if(tf.m_tasks.empty())
181 m_toDoTasks = tf.m_tasks.size();
182 m_doneTasks.store(0, std::memory_order_relaxed);
184 for(
auto& task : tf.m_tasks)
186 task.m_remaining_dependencies.store(
187 task.m_dependencies, std::memory_order_relaxed);
188 task.m_executed.store(
false, std::memory_order_relaxed);
189#if defined(CHECK_EXEC_COUNTS)
190 m_checkVec[task.m_taskId] = 0;
194 std::atomic_thread_fence(std::memory_order_seq_cst);
195#if defined(DISABLE_DONE_TASKS)
196 thread_local ossia::small_pod_vector<ossia::task*, 8> toCleanup;
198 for(
auto& task : tf.m_tasks)
200 if(task.m_dependencies == 0)
202#if defined(DISABLE_DONE_TASKS)
203 if(task.m_node->enabled())
206#if defined(CHECK_EXEC_COUNTS)
207 m_checkVec[task.m_taskId]++;
208 if(m_checkVec[task.m_taskId] != 1)
211 stderr,
"!!! task {} enqueued {}\n", task.m_taskId,
212 m_checkVec[task.m_taskId]);
215 assert(task.m_dependencies == 0);
217 std::atomic_thread_fence(std::memory_order_release);
220#if defined(DISABLE_DONE_TASKS)
223 toCleanup.push_back(&task);
229#if defined(DISABLE_DONE_TASKS)
230 for(
auto& task : toCleanup)
237 while(m_doneTasks.load(std::memory_order_relaxed) != m_toDoTasks)
240 if(m_not_threadsafe_tasks.wait_dequeue_timed(t, 1))
245 if(m_tasks.wait_dequeue_timed(t, 1))
251 std::atomic_thread_fence(std::memory_order_seq_cst);
255 void process_done(ossia::task& task)
257 if(task.m_executed.exchange(
true))
259 assert(this->m_doneTasks != m_tf->m_tasks.size());
261#if defined(DISABLE_DONE_TASKS)
262 ossia::small_pod_vector<ossia::task*, 8> toCleanup;
264 for(
int taskId : task.m_precedes)
266 auto& nextTask = m_tf->m_tasks[taskId];
267 assert(!nextTask.m_executed);
269 std::atomic_int& remaining = nextTask.m_remaining_dependencies;
270 assert(remaining > 0);
271 const int rem = remaining.fetch_sub(1, std::memory_order_relaxed) - 1;
275#if defined(DISABLE_DONE_TASKS)
276 if(nextTask.m_node->enabled())
279#if defined(CHECK_EXEC_COUNTS)
280 m_checkVec[nextTask.m_taskId]++;
281 if(m_checkVec[nextTask.m_taskId] != 1)
284 stderr,
"!!! task {} enqueued {}\n", nextTask.m_taskId,
285 m_checkVec[nextTask.m_taskId]);
289 std::atomic_thread_fence(std::memory_order_release);
290 enqueue_task(nextTask);
292#if defined(DISABLE_DONE_TASKS)
295 toCleanup.push_back(&nextTask);
301#if defined(DISABLE_DONE_TASKS)
302 for(
auto& clean : toCleanup)
304 process_done(*clean);
308 this->m_doneTasks.fetch_add(1, std::memory_order_relaxed);
311 void execute(task& task)
313 std::atomic_thread_fence(std::memory_order_acquire);
316 assert(!task.m_executed);
317#if defined(CHECK_FOLLOWS)
318 for(
auto& prev : task.m_follows)
320 auto& t = m_tf->m_tasks[prev];
321 assert(t.m_executed);
325#if defined(CHECK_EXEC_COUNTS)
326 assert(m_checkVec[task.m_taskId] == 1);
328 m_func(*task.m_node);
330#if defined(CHECK_EXEC_COUNTS)
331 assert(m_checkVec[task.m_taskId] == 1);
336 fmt::print(stderr,
"error !\n");
338 std::atomic_thread_fence(std::memory_order_release);
341#if defined(CHECK_EXEC_COUNTS)
342 assert(m_checkVec[task.m_taskId] == 1);
345 std::atomic_thread_fence(std::memory_order_release);
348 task_function m_func;
350 std::atomic_bool m_running{};
352 ossia::small_vector<std::thread, 8> m_threads;
353 std::atomic_flag m_startFlag = ATOMIC_FLAG_INIT;
356 std::atomic_size_t m_doneTasks = 0;
357 std::size_t m_toDoTasks = 0;
359 moodycamel::BlockingConcurrentQueue<task*> m_tasks;
360 moodycamel::BlockingConcurrentQueue<task*> m_not_threadsafe_tasks;
362#if defined(CHECK_EXEC_COUNTS)
363 std::array<std::atomic_int, 5000> m_checkVec;
368#include <ossia/dataflow/graph/graph_static.hpp>
369#include <ossia/detail/hash_map.hpp>
372struct custom_parallel_exec;
373template <
typename Impl>
374struct custom_parallel_update
377 std::shared_ptr<ossia::logger_type>
logger;
378 std::shared_ptr<bench_map> perf_map;
380 template <
typename Graph_T>
381 custom_parallel_update(Graph_T& g,
const ossia::graph_setup_options& opt)
383 , executor{opt.parallel_threads}
388 ossia::node_map& nodes,
const std::vector<graph_node*>& topo_order,
389 ossia::graph_t& graph)
394 flow_graph.reserve(nodes.size());
400 executor.set_task_executor(
401 node_exec_logger_bench{cur_state, *perf_map, *
logger});
402 for(
auto node : topo_order)
404 (*perf_map)[node] = std::nullopt;
405 flow_nodes[node] = flow_graph.emplace(*node);
410 executor.set_task_executor(node_exec_logger{cur_state, *
logger});
411 for(
auto node : topo_order)
413 flow_nodes[node] = flow_graph.emplace(*node);
419 executor.set_task_executor(node_exec{cur_state});
420 for(
auto node : topo_order)
422 flow_nodes[node] = flow_graph.emplace(*node);
426 for(
auto [ei, ei_end] = boost::edges(graph); ei != ei_end; ++ei)
429 auto& n1 = graph[edge.m_source];
430 auto& n2 = graph[edge.m_target];
432 auto& sender = flow_nodes[n2.get()];
433 auto& receiver = flow_nodes[n1.get()];
434 sender->precede(*receiver);
438 template <
typename Graph_T,
typename DevicesT>
439 void operator()(Graph_T& g,
const DevicesT& devices)
442 update_graph(g.m_nodes, g.m_all_nodes, impl.m_sub_graph);
446 friend struct custom_parallel_exec;
449 execution_state* cur_state{};
451 ossia::taskflow flow_graph;
452 ossia::executor executor;
453 ossia::hash_map<graph_node*, ossia::task*> flow_nodes;
456struct custom_parallel_exec
458 template <
typename Graph_T>
459 custom_parallel_exec(Graph_T&)
463 template <
typename Graph_T,
typename Impl>
465 Graph_T& g, custom_parallel_update<Impl>& self, ossia::execution_state& e,
466 const std::vector<ossia::graph_node*>&)
469 self.executor.run(self.flow_graph);
473using custom_parallel_tc_graph
474 = graph_static<custom_parallel_update<tc_update<fast_tc>>, custom_parallel_exec>;
spdlog::logger & logger() noexcept
Where the errors will be logged. Default is stderr.
Definition context.cpp:118