OSSIA
Open Scenario System for Interactive Application
Loading...
Searching...
No Matches
graph_parallel_impl.hpp
1#pragma once
2#include <ossia/dataflow/graph_node.hpp>
3#include <ossia/detail/disable_fpe.hpp>
4#include <ossia/detail/fmt.hpp>
5#include <ossia/detail/lockfree_queue.hpp>
6#include <ossia/detail/thread.hpp>
7
8#include <boost/container/static_vector.hpp>
9
10#include <blockingconcurrentqueue.h>
11#include <concurrentqueue.h>
12#include <smallfun.hpp>
13
14#include <thread>
15#include <vector>
16#define DISABLE_DONE_TASKS
17//#define CHECK_FOLLOWS
18//#define CHECK_EXEC_COUNTS
19//#define memory_order_relaxed memory_order_seq_cst
20//#define memory_order_acquire memory_order_seq_cst
21//#define memory_order_release memory_order_seq_cst
22namespace ossia
23{
24using task_function = smallfun::function<void(ossia::graph_node&), sizeof(void*) * 4>;
25
26class taskflow;
27class executor;
28class task
29{
30public:
31 task() = default;
32 task(const task&) = delete;
33 task(task&& other) noexcept
34 : m_taskId{other.m_taskId}
35 , m_dependencies{other.m_dependencies}
36 , m_remaining_dependencies{other.m_remaining_dependencies.load()}
37 , m_node{other.m_node}
38 , m_precedes{std::move(other.m_precedes)}
39#if defined(CHECK_FOLLOWS)
40 , m_follows{std::move(other.m_follows)}
41#endif
42 {
43 other.m_precedes.clear();
44#if defined(CHECK_FOLLOWS)
45 other.m_follows.clear();
46#endif
47 }
48 task& operator=(const task&) = delete;
49 task& operator=(task&& other) noexcept
50 {
51 m_taskId = other.m_taskId;
52 m_dependencies = other.m_dependencies;
53 m_remaining_dependencies = other.m_remaining_dependencies.load();
54 m_node = other.m_node;
55 m_precedes = std::move(other.m_precedes);
56 other.m_precedes.clear();
57#if defined(CHECK_FOLLOWS)
58 m_follows = std::move(other.m_follows);
59 other.m_follows.clear();
60#endif
61 return *this;
62 }
63
64 task(ossia::graph_node& node)
65 : m_node{&node}
66 {
67 }
68
69 void precede(task& other)
70 {
71 m_precedes.push_back(other.m_taskId);
72#if defined(CHECK_FOLLOWS)
73 other.m_follows.push_back(m_taskId);
74#endif
75 other.m_dependencies++;
76 }
77
78private:
79 friend class taskflow;
80 friend class executor;
81
82 int m_taskId{};
83 int m_dependencies{0};
84 std::atomic_int m_remaining_dependencies{};
85 std::atomic_bool m_executed{};
86
87 ossia::graph_node* m_node{};
88 ossia::small_pod_vector<int, 4> m_precedes;
89#if defined(CHECK_FOLLOWS)
90 ossia::small_pod_vector<int, 4> m_follows;
91#endif
92};
93
94class taskflow
95{
96public:
97 void clear() { m_tasks.clear(); }
98
99 void reserve(std::size_t sz) { m_tasks.reserve(sz); }
100
101 task* emplace(ossia::graph_node& node)
102 {
103 const int taskId = m_tasks.size();
104 auto& last = m_tasks.emplace_back(node);
105 last.m_taskId = taskId;
106 return &last;
107 }
108
109private:
110 friend class executor;
111
112 std::vector<task> m_tasks;
113};
114
115class executor
116{
117public:
118 explicit executor(int nthreads)
119 {
120 m_running = true;
121 m_threads.resize(nthreads);
122 int k = 0;
123 for(auto& t : m_threads)
124 {
125 t = std::thread{[this, k = k++] {
126 while(!m_startFlag.test())
127 std::this_thread::yield();
128
129 ossia::set_thread_name(m_threads[k], "ossia exec " + std::to_string(k));
130 ossia::set_thread_realtime(m_threads[k], 95);
131 ossia::set_thread_pinned(ossia::thread_type::AudioTask, k);
132
133 ossia::disable_fpe();
134 while(m_running)
135 {
136 task* t{};
137 if(m_tasks.wait_dequeue_timed(t, 100))
138 {
139 execute(*t);
140 }
141 }
142 }};
143 }
144
145 m_startFlag.test_and_set();
146 }
147
148 ~executor()
149 {
150 m_running = false;
151 for(auto& t : m_threads)
152 {
153 t.join();
154 }
155 }
156
157 void set_task_executor(task_function f) { m_func = std::move(f); }
158
159 void enqueue_task(task& task)
160 {
161 if(task.m_node->not_threadable())
162 {
163 [[unlikely]];
164 m_not_threadsafe_tasks.enqueue(&task);
165 }
166 else
167 {
168 [[likely]];
169 m_tasks.enqueue(&task);
170 }
171 }
172
173 void run(taskflow& tf)
174 {
175 m_tf = &tf;
176 if(tf.m_tasks.empty())
177 {
178 return;
179 }
180
181 m_toDoTasks = tf.m_tasks.size();
182 m_doneTasks.store(0, std::memory_order_relaxed);
183
184 for(auto& task : tf.m_tasks)
185 {
186 task.m_remaining_dependencies.store(
187 task.m_dependencies, std::memory_order_relaxed);
188 task.m_executed.store(false, std::memory_order_relaxed);
189#if defined(CHECK_EXEC_COUNTS)
190 m_checkVec[task.m_taskId] = 0;
191#endif
192 }
193
194 std::atomic_thread_fence(std::memory_order_seq_cst);
195#if defined(DISABLE_DONE_TASKS)
196 thread_local ossia::small_pod_vector<ossia::task*, 8> toCleanup;
197#endif
198 for(auto& task : tf.m_tasks)
199 {
200 if(task.m_dependencies == 0)
201 {
202#if defined(DISABLE_DONE_TASKS)
203 if(task.m_node->enabled())
204#endif
205 {
206#if defined(CHECK_EXEC_COUNTS)
207 m_checkVec[task.m_taskId]++;
208 if(m_checkVec[task.m_taskId] != 1)
209 {
210 fmt::print(
211 stderr, "!!! task {} enqueued {}\n", task.m_taskId,
212 m_checkVec[task.m_taskId]);
213 std::abort();
214 }
215 assert(task.m_dependencies == 0);
216#endif
217 std::atomic_thread_fence(std::memory_order_release);
218 enqueue_task(task);
219 }
220#if defined(DISABLE_DONE_TASKS)
221 else
222 {
223 toCleanup.push_back(&task);
224 }
225#endif
226 }
227 }
228
229#if defined(DISABLE_DONE_TASKS)
230 for(auto& task : toCleanup)
231 {
232 process_done(*task);
233 }
234 toCleanup.clear();
235#endif
236
237 while(m_doneTasks.load(std::memory_order_relaxed) != m_toDoTasks)
238 {
239 task* t{};
240 if(m_not_threadsafe_tasks.wait_dequeue_timed(t, 1))
241 {
242 execute(*t);
243 }
244
245 if(m_tasks.wait_dequeue_timed(t, 1))
246 {
247 execute(*t);
248 }
249 }
250
251 std::atomic_thread_fence(std::memory_order_seq_cst);
252 }
253
254private:
255 void process_done(ossia::task& task)
256 {
257 if(task.m_executed.exchange(true))
258 return;
259 assert(this->m_doneTasks != m_tf->m_tasks.size());
260
261#if defined(DISABLE_DONE_TASKS)
262 ossia::small_pod_vector<ossia::task*, 8> toCleanup;
263#endif
264 for(int taskId : task.m_precedes)
265 {
266 auto& nextTask = m_tf->m_tasks[taskId];
267 assert(!nextTask.m_executed);
268
269 std::atomic_int& remaining = nextTask.m_remaining_dependencies;
270 assert(remaining > 0);
271 const int rem = remaining.fetch_sub(1, std::memory_order_relaxed) - 1;
272 assert(rem >= 0);
273 if(rem == 0)
274 {
275#if defined(DISABLE_DONE_TASKS)
276 if(nextTask.m_node->enabled())
277#endif
278 {
279#if defined(CHECK_EXEC_COUNTS)
280 m_checkVec[nextTask.m_taskId]++;
281 if(m_checkVec[nextTask.m_taskId] != 1)
282 {
283 fmt::print(
284 stderr, "!!! task {} enqueued {}\n", nextTask.m_taskId,
285 m_checkVec[nextTask.m_taskId]);
286 std::abort();
287 }
288#endif
289 std::atomic_thread_fence(std::memory_order_release);
290 enqueue_task(nextTask);
291 }
292#if defined(DISABLE_DONE_TASKS)
293 else
294 {
295 toCleanup.push_back(&nextTask);
296 }
297#endif
298 }
299 }
300
301#if defined(DISABLE_DONE_TASKS)
302 for(auto& clean : toCleanup)
303 {
304 process_done(*clean);
305 }
306#endif
307
308 this->m_doneTasks.fetch_add(1, std::memory_order_relaxed);
309 }
310
311 void execute(task& task)
312 {
313 std::atomic_thread_fence(std::memory_order_acquire);
314 try
315 {
316 assert(!task.m_executed);
317#if defined(CHECK_FOLLOWS)
318 for(auto& prev : task.m_follows)
319 {
320 auto& t = m_tf->m_tasks[prev];
321 assert(t.m_executed);
322 }
323#endif
324
325#if defined(CHECK_EXEC_COUNTS)
326 assert(m_checkVec[task.m_taskId] == 1);
327#endif
328 m_func(*task.m_node);
329
330#if defined(CHECK_EXEC_COUNTS)
331 assert(m_checkVec[task.m_taskId] == 1);
332#endif
333 }
334 catch(...)
335 {
336 fmt::print(stderr, "error !\n");
337 }
338 std::atomic_thread_fence(std::memory_order_release);
339
340 process_done(task);
341#if defined(CHECK_EXEC_COUNTS)
342 assert(m_checkVec[task.m_taskId] == 1);
343#endif
344
345 std::atomic_thread_fence(std::memory_order_release);
346 }
347
348 task_function m_func;
349
350 std::atomic_bool m_running{};
351
352 ossia::small_vector<std::thread, 8> m_threads;
353 std::atomic_flag m_startFlag = ATOMIC_FLAG_INIT;
354
355 taskflow* m_tf{};
356 std::atomic_size_t m_doneTasks = 0;
357 std::size_t m_toDoTasks = 0;
358
359 moodycamel::BlockingConcurrentQueue<task*> m_tasks;
360 moodycamel::BlockingConcurrentQueue<task*> m_not_threadsafe_tasks;
361
362#if defined(CHECK_EXEC_COUNTS)
363 std::array<std::atomic_int, 5000> m_checkVec;
364#endif
365};
366}
367
368#include <ossia/dataflow/graph/graph_static.hpp>
369#include <ossia/detail/hash_map.hpp>
370namespace ossia
371{
372struct custom_parallel_exec;
373template <typename Impl>
374struct custom_parallel_update
375{
376public:
377 std::shared_ptr<ossia::logger_type> logger;
378 std::shared_ptr<bench_map> perf_map;
379
380 template <typename Graph_T>
381 custom_parallel_update(Graph_T& g, const ossia::graph_setup_options& opt)
382 : impl{g, opt}
383 , executor{opt.parallel_threads}
384 {
385 }
386
387 void update_graph(
388 ossia::node_map& nodes, const std::vector<graph_node*>& topo_order,
389 ossia::graph_t& graph)
390 {
391 flow_nodes.clear();
392 flow_graph.clear();
393
394 flow_graph.reserve(nodes.size());
395
396 if(logger)
397 {
398 if(perf_map)
399 {
400 executor.set_task_executor(
401 node_exec_logger_bench{cur_state, *perf_map, *logger});
402 for(auto node : topo_order)
403 {
404 (*perf_map)[node] = std::nullopt;
405 flow_nodes[node] = flow_graph.emplace(*node);
406 }
407 }
408 else
409 {
410 executor.set_task_executor(node_exec_logger{cur_state, *logger});
411 for(auto node : topo_order)
412 {
413 flow_nodes[node] = flow_graph.emplace(*node);
414 }
415 }
416 }
417 else
418 {
419 executor.set_task_executor(node_exec{cur_state});
420 for(auto node : topo_order)
421 {
422 flow_nodes[node] = flow_graph.emplace(*node);
423 }
424 }
425
426 for(auto [ei, ei_end] = boost::edges(graph); ei != ei_end; ++ei)
427 {
428 auto edge = *ei;
429 auto& n1 = graph[edge.m_source];
430 auto& n2 = graph[edge.m_target];
431
432 auto& sender = flow_nodes[n2.get()];
433 auto& receiver = flow_nodes[n1.get()];
434 sender->precede(*receiver);
435 }
436 }
437
438 template <typename Graph_T, typename DevicesT>
439 void operator()(Graph_T& g, const DevicesT& devices)
440 {
441 impl(g, devices);
442 update_graph(g.m_nodes, g.m_all_nodes, impl.m_sub_graph);
443 }
444
445private:
446 friend struct custom_parallel_exec;
447
448 Impl impl;
449 execution_state* cur_state{};
450
451 ossia::taskflow flow_graph;
452 ossia::executor executor;
453 ossia::hash_map<graph_node*, ossia::task*> flow_nodes;
454};
455
456struct custom_parallel_exec
457{
458 template <typename Graph_T>
459 custom_parallel_exec(Graph_T&)
460 {
461 }
462
463 template <typename Graph_T, typename Impl>
464 void operator()(
465 Graph_T& g, custom_parallel_update<Impl>& self, ossia::execution_state& e,
466 const std::vector<ossia::graph_node*>&)
467 {
468 self.cur_state = &e;
469 self.executor.run(self.flow_graph);
470 }
471};
472
473using custom_parallel_tc_graph
474 = graph_static<custom_parallel_update<tc_update<fast_tc>>, custom_parallel_exec>;
475}
476
477//#undef memory_order_relaxed
478//#undef memory_order_acquire
479//#undef memory_order_release
Definition git_info.h:7
spdlog::logger & logger() noexcept
Where the errors will be logged. Default is stderr.
Definition context.cpp:118