2#include <ossia/detail/config.hpp>
3#include <ossia/detail/disable_fpe.hpp>
4#include <ossia/detail/small_vector.hpp>
5#include <ossia/detail/thread.hpp>
7#include <boost/container/static_vector.hpp>
9#include <concurrentqueue.h>
22 void (*execute)(pool_task*)
noexcept =
nullptr;
27 std::atomic<int> remaining;
33 static task_pool& instance()
35 static task_pool pool;
39 int worker_count() const noexcept {
return static_cast<int>(m_workers.size()); }
41 void submit(pool_task* t)
noexcept
43 if(!m_queue.try_enqueue(t))
51 void submit_owner_only(pool_task* t)
noexcept
53 while(!m_owner_queue.try_enqueue(t))
54 std::this_thread::yield();
58 void finish(task_batch& b)
noexcept
60 if(b.remaining.fetch_sub(1, std::memory_order_acq_rel) == 1)
64 void corun_until(task_batch& b)
noexcept
66 const bool owner = !t_is_worker;
68 assert(t_depth < 1024);
69 while(b.remaining.load(std::memory_order_acquire) != 0)
71 if(try_run_one(owner))
74 const std::uint32_t e = m_epoch.load(std::memory_order_acquire);
75 if(b.remaining.load(std::memory_order_acquire) == 0)
77 if(try_run_one(owner))
79 m_epoch.wait(e, std::memory_order_acquire);
85 void fork_n(
int n, F&& f)
noexcept
90 int nslices = std::min(n, worker_count() + 1);
91 if(nslices > MAX_SLICES)
94 struct slice : pool_task
96 std::remove_reference_t<F>* fn =
nullptr;
98 task_batch* batch =
nullptr;
99 task_pool* pool =
nullptr;
102 boost::container::static_vector<slice, MAX_SLICES> slices(nslices);
104 batch.remaining.store(nslices, std::memory_order_relaxed);
106 constexpr auto run_slice = [](pool_task* pt)
noexcept {
107 auto* s =
static_cast<slice*
>(pt);
108 for(
int i = s->lo; i < s->hi; ++i)
118 s->pool->finish(*s->batch);
121 const int base = n / nslices;
122 const int rem = n % nslices;
124 for(
int s = 0; s < nslices; ++s)
126 const int count = base + (s < rem ? 1 : 0);
127 slice& sl = slices[s];
128 sl.execute = +run_slice;
138 for(
int s = 0; s < nslices; ++s)
144 task_pool(
const task_pool&) =
delete;
145 task_pool& operator=(
const task_pool&) =
delete;
148 static constexpr int MAX_SLICES = 64;
152 int W = default_worker_count();
154 m_running.store(
true, std::memory_order_relaxed);
155 m_workers.reserve(W);
156 for(
int k = 0; k < W; ++k)
157 m_workers.emplace_back([
this, k] { worker_run(k); });
159 for(
auto& t : m_workers)
160 ossia::set_thread_realtime(t, 95);
162 m_start.test_and_set();
167 m_running.store(
false, std::memory_order_release);
169 for(
auto& t : m_workers)
174 static int default_worker_count() noexcept
176 const auto& specs = ossia::get_thread_specs();
177 if(
auto it = specs.find(ossia::thread_type::AudioTask);
178 it != specs.end() && it->second.num_threads > 1)
179 return it->second.num_threads;
181 const unsigned hw = std::thread::hardware_concurrency();
182 return static_cast<int>(hw > 2 ? hw - 1 : 1);
185 bool try_run_one(
bool owner)
noexcept
187 pool_task* t =
nullptr;
188 if(owner && m_owner_queue.try_dequeue(t))
193 if(m_queue.try_dequeue(t))
203 m_epoch.fetch_add(1, std::memory_order_release);
204 m_epoch.notify_all();
207 void worker_run(
int k)
noexcept
209 while(!m_start.test())
210 std::this_thread::yield();
213 ossia::set_thread_name(
"ossia exec " + std::to_string(k));
214 ossia::set_thread_pinned(ossia::thread_type::AudioTask, k);
215 ossia::disable_fpe();
217 while(m_running.load(std::memory_order_acquire))
219 if(try_run_one(
false))
222 const std::uint32_t e = m_epoch.load(std::memory_order_acquire);
223 if(!m_running.load(std::memory_order_acquire))
225 if(try_run_one(
false))
227 m_epoch.wait(e, std::memory_order_acquire);
231 moodycamel::ConcurrentQueue<pool_task*> m_queue{4096};
232 moodycamel::ConcurrentQueue<pool_task*> m_owner_queue{1024};
234 std::atomic<std::uint32_t> m_epoch{0};
235 std::atomic<bool> m_running{
false};
236 std::atomic_flag m_start = ATOMIC_FLAG_INIT;
238 ossia::small_vector<std::thread, 16> m_workers;
240 static inline thread_local bool t_is_worker =
false;
241 static inline thread_local int t_depth = 0;