OSSIA
Open Scenario System for Interactive Application
Loading...
Searching...
No Matches
exec_pool.hpp
1#pragma once
2#include <ossia/detail/config.hpp>
3#include <ossia/detail/disable_fpe.hpp>
4#include <ossia/detail/small_vector.hpp>
5#include <ossia/detail/thread.hpp>
6
7#include <boost/container/static_vector.hpp>
8
9#include <concurrentqueue.h>
10
11#include <algorithm>
12#include <atomic>
13#include <cassert>
14#include <cstdint>
15#include <string>
16#include <thread>
17
18namespace ossia
19{
20struct pool_task
21{
22 void (*execute)(pool_task*) noexcept = nullptr;
23};
24
25struct task_batch
26{
27 std::atomic<int> remaining;
28};
29
30class task_pool
31{
32public:
33 static task_pool& instance()
34 {
35 static task_pool pool;
36 return pool;
37 }
38
39 int worker_count() const noexcept { return static_cast<int>(m_workers.size()); }
40
41 void submit(pool_task* t) noexcept
42 {
43 if(!m_queue.try_enqueue(t))
44 {
45 t->execute(t);
46 return;
47 }
48 wake();
49 }
50
51 void submit_owner_only(pool_task* t) noexcept
52 {
53 while(!m_owner_queue.try_enqueue(t))
54 std::this_thread::yield();
55 wake();
56 }
57
58 void finish(task_batch& b) noexcept
59 {
60 if(b.remaining.fetch_sub(1, std::memory_order_acq_rel) == 1)
61 wake();
62 }
63
64 void corun_until(task_batch& b) noexcept
65 {
66 const bool owner = !t_is_worker;
67 ++t_depth;
68 assert(t_depth < 1024);
69 while(b.remaining.load(std::memory_order_acquire) != 0)
70 {
71 if(try_run_one(owner))
72 continue;
73
74 const std::uint32_t e = m_epoch.load(std::memory_order_acquire);
75 if(b.remaining.load(std::memory_order_acquire) == 0)
76 break;
77 if(try_run_one(owner))
78 continue;
79 m_epoch.wait(e, std::memory_order_acquire);
80 }
81 --t_depth;
82 }
83
84 template <typename F>
85 void fork_n(int n, F&& f) noexcept
86 {
87 if(n <= 0)
88 return;
89
90 int nslices = std::min(n, worker_count() + 1);
91 if(nslices > MAX_SLICES)
92 nslices = MAX_SLICES;
93
94 struct slice : pool_task
95 {
96 std::remove_reference_t<F>* fn = nullptr;
97 int lo = 0, hi = 0;
98 task_batch* batch = nullptr;
99 task_pool* pool = nullptr;
100 };
101
102 boost::container::static_vector<slice, MAX_SLICES> slices(nslices);
103 task_batch batch;
104 batch.remaining.store(nslices, std::memory_order_relaxed);
105
106 constexpr auto run_slice = [](pool_task* pt) noexcept {
107 auto* s = static_cast<slice*>(pt);
108 for(int i = s->lo; i < s->hi; ++i)
109 {
110 try
111 {
112 (*s->fn)(i);
113 }
114 catch(...)
115 {
116 }
117 }
118 s->pool->finish(*s->batch);
119 };
120
121 const int base = n / nslices;
122 const int rem = n % nslices;
123 int pos = 0;
124 for(int s = 0; s < nslices; ++s)
125 {
126 const int count = base + (s < rem ? 1 : 0);
127 slice& sl = slices[s];
128 sl.execute = +run_slice;
129 sl.fn = &f;
130 sl.lo = pos;
131 sl.hi = pos + count;
132 sl.batch = &batch;
133 sl.pool = this;
134 pos += count;
135 }
136 assert(pos == n);
137
138 for(int s = 0; s < nslices; ++s)
139 submit(&slices[s]);
140
141 corun_until(batch);
142 }
143
144 task_pool(const task_pool&) = delete;
145 task_pool& operator=(const task_pool&) = delete;
146
147private:
148 static constexpr int MAX_SLICES = 64;
149
150 task_pool()
151 {
152 int W = default_worker_count();
153
154 m_running.store(true, std::memory_order_relaxed);
155 m_workers.reserve(W);
156 for(int k = 0; k < W; ++k)
157 m_workers.emplace_back([this, k] { worker_run(k); });
158
159 for(auto& t : m_workers)
160 ossia::set_thread_realtime(t, 95);
161
162 m_start.test_and_set();
163 }
164
165 ~task_pool()
166 {
167 m_running.store(false, std::memory_order_release);
168 wake();
169 for(auto& t : m_workers)
170 if(t.joinable())
171 t.join();
172 }
173
174 static int default_worker_count() noexcept
175 {
176 const auto& specs = ossia::get_thread_specs();
177 if(auto it = specs.find(ossia::thread_type::AudioTask);
178 it != specs.end() && it->second.num_threads > 1)
179 return it->second.num_threads;
180
181 const unsigned hw = std::thread::hardware_concurrency();
182 return static_cast<int>(hw > 2 ? hw - 1 : 1);
183 }
184
185 bool try_run_one(bool owner) noexcept
186 {
187 pool_task* t = nullptr;
188 if(owner && m_owner_queue.try_dequeue(t))
189 {
190 t->execute(t);
191 return true;
192 }
193 if(m_queue.try_dequeue(t))
194 {
195 t->execute(t);
196 return true;
197 }
198 return false;
199 }
200
201 void wake() noexcept
202 {
203 m_epoch.fetch_add(1, std::memory_order_release);
204 m_epoch.notify_all();
205 }
206
207 void worker_run(int k) noexcept
208 {
209 while(!m_start.test())
210 std::this_thread::yield();
211
212 t_is_worker = true;
213 ossia::set_thread_name("ossia exec " + std::to_string(k));
214 ossia::set_thread_pinned(ossia::thread_type::AudioTask, k);
215 ossia::disable_fpe();
216
217 while(m_running.load(std::memory_order_acquire))
218 {
219 if(try_run_one(false))
220 continue;
221
222 const std::uint32_t e = m_epoch.load(std::memory_order_acquire);
223 if(!m_running.load(std::memory_order_acquire))
224 break;
225 if(try_run_one(false))
226 continue;
227 m_epoch.wait(e, std::memory_order_acquire);
228 }
229 }
230
231 moodycamel::ConcurrentQueue<pool_task*> m_queue{4096};
232 moodycamel::ConcurrentQueue<pool_task*> m_owner_queue{1024};
233
234 std::atomic<std::uint32_t> m_epoch{0};
235 std::atomic<bool> m_running{false};
236 std::atomic_flag m_start = ATOMIC_FLAG_INIT;
237
238 ossia::small_vector<std::thread, 16> m_workers;
239
240 static inline thread_local bool t_is_worker = false;
241 static inline thread_local int t_depth = 0;
242};
243}
Definition git_info.h:7