OSSIA
Open Scenario System for Interactive Application
Loading...
Searching...
No Matches
jthread.hpp
1#pragma once
2#include <version>
3
4#if __has_include(<stop_token>)
5 #if __cpp_lib_jthread >= 201911L
6 #define OSSIA_HAS_STD_JTHREAD 1
7
8 #elif ((_LIBCPP_VERSION >= 18100) && (_LIBCPP_VERSION < 99999)) || (_LIBCPP_VERSION >= 180100)
9 #if defined(_LIBCPP_HAS_NO_EXPERIMENTAL_STOP_TOKEN)
10 #error Rebuild with -fexperimental-library, clang 18 ships headers which are incompatible with this file but hides half of them behind that flag
11 #else
12 #define OSSIA_HAS_STD_JTHREAD 1
13 #endif
14
15 #endif
16#endif
17
18#if OSSIA_HAS_STD_JTHREAD
19#include <stop_token>
20#include <thread>
21#else
22#include <ossia/detail/audio_spin_mutex.hpp>
23
24// Polyfill until libc++ gets jthread.
25// License: CC-4.0
26// https://github.com/StirlingLabs/jthread/blob/main/LICENSE
27
28// <stop_token> header
29
30#include <atomic>
31#include <thread>
32#include <type_traits>
33#include <utility>
34#ifdef SAFE
35#include <iostream>
36#endif
37
38namespace std
39{
40//-----------------------------------------------
41// internal types for shared stop state
42//-----------------------------------------------
43
44struct __stop_callback_base
45{
46 void (*__callback_)(__stop_callback_base*) = nullptr;
47
48 __stop_callback_base* __next_ = nullptr;
49 __stop_callback_base** __prev_ = nullptr;
50 bool* __isRemoved_ = nullptr;
51 std::atomic<bool> __callbackFinishedExecuting_{false};
52
53 void __execute() noexcept { __callback_(this); }
54
55protected:
56 // it shall only by us who deletes this
57 // (workaround for virtual __execute() and destructor)
58 ~__stop_callback_base() = default;
59};
60
61struct __stop_state
62{
63public:
64 void __add_token_reference() noexcept
65 {
66 __state_.fetch_add(__token_ref_increment, std::memory_order_relaxed);
67 }
68
69 void __remove_token_reference() noexcept
70 {
71 auto __oldState
72 = __state_.fetch_sub(__token_ref_increment, std::memory_order_acq_rel);
73 if(__oldState < (__token_ref_increment + __token_ref_increment))
74 {
75 delete this;
76 }
77 }
78
79 void __add_source_reference() noexcept
80 {
81 __state_.fetch_add(__source_ref_increment, std::memory_order_relaxed);
82 }
83
84 void __remove_source_reference() noexcept
85 {
86 auto __oldState
87 = __state_.fetch_sub(__source_ref_increment, std::memory_order_acq_rel);
88 if(__oldState < (__token_ref_increment + __source_ref_increment))
89 {
90 delete this;
91 }
92 }
93
94 bool __request_stop() noexcept
95 {
96
97 if(!__try_lock_and_signal_until_signalled())
98 {
99 // Stop has already been requested.
100 return false;
101 }
102
103 // Set the 'stop_requested' signal and acquired the lock.
104
105 __signallingThread_ = std::this_thread::get_id();
106
107 while(__head_ != nullptr)
108 {
109 // Dequeue the head of the queue
110 auto* __cb = __head_;
111 __head_ = __cb->__next_;
112 const bool anyMore = __head_ != nullptr;
113 if(anyMore)
114 {
115 __head_->__prev_ = &__head_;
116 }
117 // Mark this item as removed from the list.
118 __cb->__prev_ = nullptr;
119
120 // Don't hold lock while executing callback
121 // so we don't block other threads from deregistering callbacks.
122 __unlock();
123
124 // TRICKY: Need to store a flag on the stack here that the callback
125 // can use to signal that the destructor was executed inline
126 // during the call. If the destructor was executed inline then
127 // it's not safe to dereference __cb after __execute() returns.
128 // If the destructor runs on some other thread then the other
129 // thread will block waiting for this thread to signal that the
130 // callback has finished executing.
131 bool __isRemoved = false;
132 __cb->__isRemoved_ = &__isRemoved;
133
134 __cb->__execute();
135
136 if(!__isRemoved)
137 {
138 __cb->__isRemoved_ = nullptr;
139 __cb->__callbackFinishedExecuting_.store(true, std::memory_order_release);
140 }
141
142 if(!anyMore)
143 {
144 // This was the last item in the queue when we dequeued it.
145 // No more items should be added to the queue after we have
146 // marked the state as interrupted, only removed from the queue.
147 // Avoid acquring/releasing the lock in this case.
148 return true;
149 }
150
151 __lock();
152 }
153
154 __unlock();
155
156 return true;
157 }
158
159 bool __is_stop_requested() noexcept
160 {
161 return __is_stop_requested(__state_.load(std::memory_order_acquire));
162 }
163
164 bool __is_stop_requestable() noexcept
165 {
166 return __is_stop_requestable(__state_.load(std::memory_order_acquire));
167 }
168
169 bool __try_add_callback(
170 __stop_callback_base* __cb, bool __incrementRefCountIfSuccessful) noexcept
171 {
172 std::uint64_t __oldState;
173 goto __load_state;
174 do
175 {
176 goto __check_state;
177 do
178 {
179 ossia_rwlock_pause();
180 __load_state:
181 __oldState = __state_.load(std::memory_order_acquire);
182 __check_state:
183 if(__is_stop_requested(__oldState))
184 {
185 __cb->__execute();
186 return false;
187 }
188 else if(!__is_stop_requestable(__oldState))
189 {
190 return false;
191 }
192 } while(__is_locked(__oldState));
193 } while(!__state_.compare_exchange_weak(
194 __oldState, __oldState | __locked_flag, std::memory_order_acquire));
195
196 // Push callback onto callback list.
197 __cb->__next_ = __head_;
198 if(__cb->__next_ != nullptr)
199 {
200 __cb->__next_->__prev_ = &__cb->__next_;
201 }
202 __cb->__prev_ = &__head_;
203 __head_ = __cb;
204
205 if(__incrementRefCountIfSuccessful)
206 {
207 __unlock_and_increment_token_ref_count();
208 }
209 else
210 {
211 __unlock();
212 }
213
214 // Successfully added the callback.
215 return true;
216 }
217
218 void __remove_callback(__stop_callback_base* __cb) noexcept
219 {
220 __lock();
221
222 if(__cb->__prev_ != nullptr)
223 {
224 // Still registered, not yet executed
225 // Just remove from the list.
226 *__cb->__prev_ = __cb->__next_;
227 if(__cb->__next_ != nullptr)
228 {
229 __cb->__next_->__prev_ = __cb->__prev_;
230 }
231
232 __unlock_and_decrement_token_ref_count();
233
234 return;
235 }
236
237 __unlock();
238
239 // Callback has either already executed or is executing
240 // concurrently on another thread.
241
242 if(__signallingThread_ == std::this_thread::get_id())
243 {
244 // Callback executed on this thread or is still currently executing
245 // and is deregistering itself from within the callback.
246 if(__cb->__isRemoved_ != nullptr)
247 {
248 // Currently inside the callback, let the __request_stop() method
249 // know the object is about to be destructed and that it should
250 // not try to access the object when the callback returns.
251 *__cb->__isRemoved_ = true;
252 }
253 }
254 else
255 {
256 // Callback is currently executing on another thread,
257 // block until it finishes executing.
258 while(!__cb->__callbackFinishedExecuting_.load(std::memory_order_acquire))
259 {
260 ossia_rwlock_pause();
261 }
262 }
263
264 __remove_token_reference();
265 }
266
267private:
268 static bool __is_locked(std::uint64_t __state) noexcept
269 {
270 return (__state & __locked_flag) != 0;
271 }
272
273 static bool __is_stop_requested(std::uint64_t __state) noexcept
274 {
275 return (__state & __stop_requested_flag) != 0;
276 }
277
278 static bool __is_stop_requestable(std::uint64_t __state) noexcept
279 {
280 // Interruptible if it has already been interrupted or if there are
281 // still interrupt_source instances in existence.
282 return __is_stop_requested(__state) || (__state >= __source_ref_increment);
283 }
284
285 bool __try_lock_and_signal_until_signalled() noexcept
286 {
287 std::uint64_t __oldState = __state_.load(std::memory_order_acquire);
288 do
289 {
290 if(__is_stop_requested(__oldState))
291 return false;
292 while(__is_locked(__oldState))
293 {
294 ossia_rwlock_pause();
295 __oldState = __state_.load(std::memory_order_acquire);
296 if(__is_stop_requested(__oldState))
297 return false;
298 }
299 } while(!__state_.compare_exchange_weak(
300 __oldState, __oldState | __stop_requested_flag | __locked_flag,
301 std::memory_order_acq_rel, std::memory_order_acquire));
302 return true;
303 }
304
305 void __lock() noexcept
306 {
307 auto __oldState = __state_.load(std::memory_order_relaxed);
308 do
309 {
310 while(__is_locked(__oldState))
311 {
312 ossia_rwlock_pause();
313 __oldState = __state_.load(std::memory_order_relaxed);
314 }
315 } while(!__state_.compare_exchange_weak(
316 __oldState, __oldState | __locked_flag, std::memory_order_acquire,
317 std::memory_order_relaxed));
318 }
319
320 void __unlock() noexcept
321 {
322 __state_.fetch_sub(__locked_flag, std::memory_order_release);
323 }
324
325 void __unlock_and_increment_token_ref_count() noexcept
326 {
327 __state_.fetch_sub(__locked_flag - __token_ref_increment, std::memory_order_release);
328 }
329
330 void __unlock_and_decrement_token_ref_count() noexcept
331 {
332 auto __oldState = __state_.fetch_sub(
333 __locked_flag + __token_ref_increment, std::memory_order_acq_rel);
334 // Check if new state is less than __token_ref_increment which would
335 // indicate that this was the last reference.
336 if(__oldState < (__locked_flag + __token_ref_increment + __token_ref_increment))
337 {
338 delete this;
339 }
340 }
341
342 static constexpr std::uint64_t __stop_requested_flag = 1u;
343 static constexpr std::uint64_t __locked_flag = 2u;
344 static constexpr std::uint64_t __token_ref_increment = 4u;
345 static constexpr std::uint64_t __source_ref_increment = static_cast<std::uint64_t>(1u)
346 << 33u;
347
348 // bit 0 - stop-requested
349 // bit 1 - locked
350 // bits 2-32 - token ref count (31 bits)
351 // bits 33-63 - source ref count (31 bits)
352 std::atomic<std::uint64_t> __state_{__source_ref_increment};
353 __stop_callback_base* __head_ = nullptr;
354 std::thread::id __signallingThread_{};
355};
356
357//-----------------------------------------------
358// forward declarations
359//-----------------------------------------------
360
361class stop_source;
362template <typename _Callback>
363class stop_callback;
364
365// std::nostopstate
366// - to initialize a stop_source without shared stop state
367struct nostopstate_t
368{
369 explicit nostopstate_t() = default;
370};
371inline constexpr nostopstate_t nostopstate{};
372
373//-----------------------------------------------
374// stop_token
375//-----------------------------------------------
376
377class stop_token
378{
379public:
380 // construct:
381 // - TODO: explicit?
382 stop_token() noexcept
383 : __state_(nullptr)
384 {
385 }
386
387 // copy/move/assign/destroy:
388 stop_token(const stop_token& __it) noexcept
389 : __state_(__it.__state_)
390 {
391 if(__state_ != nullptr)
392 {
393 __state_->__add_token_reference();
394 }
395 }
396
397 stop_token(stop_token&& __it) noexcept
398 : __state_(std::exchange(__it.__state_, nullptr))
399 {
400 }
401
402 ~stop_token()
403 {
404 if(__state_ != nullptr)
405 {
406 __state_->__remove_token_reference();
407 }
408 }
409
410 stop_token& operator=(const stop_token& __it) noexcept
411 {
412 if(__state_ != __it.__state_)
413 {
414 stop_token __tmp{__it};
415 swap(__tmp);
416 }
417 return *this;
418 }
419
420 stop_token& operator=(stop_token&& __it) noexcept
421 {
422 stop_token __tmp{std::move(__it)};
423 swap(__tmp);
424 return *this;
425 }
426
427 void swap(stop_token& __it) noexcept { std::swap(__state_, __it.__state_); }
428
429 // stop handling:
430 [[nodiscard]] bool stop_requested() const noexcept
431 {
432 return __state_ != nullptr && __state_->__is_stop_requested();
433 }
434
435 [[nodiscard]] bool stop_possible() const noexcept
436 {
437 return __state_ != nullptr && __state_->__is_stop_requestable();
438 }
439
440 [[nodiscard]] friend bool
441 operator==(const stop_token& __a, const stop_token& __b) noexcept
442 {
443 return __a.__state_ == __b.__state_;
444 }
445 [[nodiscard]] friend bool
446 operator!=(const stop_token& __a, const stop_token& __b) noexcept
447 {
448 return __a.__state_ != __b.__state_;
449 }
450
451private:
452 friend class stop_source;
453 template <typename _Callback>
454 friend class stop_callback;
455
456 explicit stop_token(__stop_state* __state) noexcept
457 : __state_(__state)
458 {
459 if(__state_ != nullptr)
460 {
461 __state_->__add_token_reference();
462 }
463 }
464
465 __stop_state* __state_;
466};
467
468//-----------------------------------------------
469// stop_source
470//-----------------------------------------------
471
472class stop_source
473{
474public:
475 stop_source()
476 : __state_(new __stop_state())
477 {
478 }
479
480 explicit stop_source(nostopstate_t) noexcept
481 : __state_(nullptr)
482 {
483 }
484
485 ~stop_source()
486 {
487 if(__state_ != nullptr)
488 {
489 __state_->__remove_source_reference();
490 }
491 }
492
493 stop_source(const stop_source& __other) noexcept
494 : __state_(__other.__state_)
495 {
496 if(__state_ != nullptr)
497 {
498 __state_->__add_source_reference();
499 }
500 }
501
502 stop_source(stop_source&& __other) noexcept
503 : __state_(std::exchange(__other.__state_, nullptr))
504 {
505 }
506
507 stop_source& operator=(stop_source&& __other) noexcept
508 {
509 stop_source __tmp{std::move(__other)};
510 swap(__tmp);
511 return *this;
512 }
513
514 stop_source& operator=(const stop_source& __other) noexcept
515 {
516 if(__state_ != __other.__state_)
517 {
518 stop_source __tmp{__other};
519 swap(__tmp);
520 }
521 return *this;
522 }
523
524 [[nodiscard]] bool stop_requested() const noexcept
525 {
526 return __state_ != nullptr && __state_->__is_stop_requested();
527 }
528
529 [[nodiscard]] bool stop_possible() const noexcept { return __state_ != nullptr; }
530
531 bool request_stop() noexcept
532 {
533 if(__state_ != nullptr)
534 {
535 return __state_->__request_stop();
536 }
537 return false;
538 }
539
540 [[nodiscard]] stop_token get_token() const noexcept { return stop_token{__state_}; }
541
542 void swap(stop_source& __other) noexcept { std::swap(__state_, __other.__state_); }
543
544 [[nodiscard]] friend bool
545 operator==(const stop_source& __a, const stop_source& __b) noexcept
546 {
547 return __a.__state_ == __b.__state_;
548 }
549 [[nodiscard]] friend bool
550 operator!=(const stop_source& __a, const stop_source& __b) noexcept
551 {
552 return __a.__state_ != __b.__state_;
553 }
554
555private:
556 __stop_state* __state_;
557};
558
559//-----------------------------------------------
560// stop_callback
561//-----------------------------------------------
562
563template <typename _Callback>
564// requires Destructible<_Callback> && Invocable<_Callback>
565class [[nodiscard]] stop_callback : private __stop_callback_base
566{
567public:
568 using callback_type = _Callback;
569
570 template <
571 typename _CB, std::enable_if_t<std::is_constructible_v<_Callback, _CB>, int> = 0>
572 // requires Constructible<Callback, C>
573 explicit stop_callback(const stop_token& __token, _CB&& __cb) noexcept(
574 std::is_nothrow_constructible_v<_Callback, _CB>)
575 : __stop_callback_base{[](__stop_callback_base* __that) noexcept {
576 static_cast<stop_callback*>(__that)->__execute();
577 }}
578 , __state_(nullptr)
579 , __cb_(static_cast<_CB&&>(__cb))
580 {
581 if(__token.__state_ != nullptr && __token.__state_->__try_add_callback(this, true))
582 {
583 __state_ = __token.__state_;
584 }
585 }
586
587 template <
588 typename _CB, std::enable_if_t<std::is_constructible_v<_Callback, _CB>, int> = 0>
589 // requires Constructible<Callback, C>
590 explicit stop_callback(stop_token&& __token, _CB&& __cb) noexcept(
591 std::is_nothrow_constructible_v<_Callback, _CB>)
592 : __stop_callback_base{[](__stop_callback_base* __that) noexcept {
593 static_cast<stop_callback*>(__that)->__execute();
594 }}
595 , __state_(nullptr)
596 , __cb_(static_cast<_CB&&>(__cb))
597 {
598 if(__token.__state_ != nullptr && __token.__state_->__try_add_callback(this, false))
599 {
600 __state_ = std::exchange(__token.__state_, nullptr);
601 }
602 }
603
604 ~stop_callback()
605 {
606#ifdef SAFE
607 if(__inExecute_.load())
608 {
609 std::cerr << "*** OOPS: ~stop_callback() while callback executed\n";
610 }
611#endif
612 if(__state_ != nullptr)
613 {
614 __state_->__remove_callback(this);
615 }
616 }
617
618 stop_callback& operator=(const stop_callback&) = delete;
619 stop_callback& operator=(stop_callback&&) = delete;
620 stop_callback(const stop_callback&) = delete;
621 stop_callback(stop_callback&&) = delete;
622
623private:
624 void __execute() noexcept
625 {
626 // Executed in a noexcept context
627 // If it throws then we call std::terminate().
628#ifdef SAFE
629 __inExecute_.store(true);
630 __cb_();
631 __inExecute_.store(false);
632#else
633 __cb_();
634#endif
635 }
636
637 __stop_state* __state_;
638 _Callback __cb_;
639#ifdef SAFE
640 std::atomic<bool> __inExecute_{false};
641#endif
642};
643
644template <typename _Callback>
645stop_callback(stop_token, _Callback) -> stop_callback<_Callback>;
646
647} // namespace std
648
649#include <functional> // for invoke()
650#include <future>
651#include <iostream> // for debugging output
652#include <thread>
653#include <type_traits>
654
655namespace std
656{
657
658//*****************************************
659//* class jthread
660//* - joining std::thread with signaling stop/end support
661//*****************************************
662class jthread
663{
664public:
665 //*****************************************
666 //* standardized API:
667 //*****************************************
668 // - cover full API of std::thread
669 // to be able to switch from std::thread to std::jthread
670
671 // types are those from std::thread:
672 using id = ::std::thread::id;
673 using native_handle_type = ::std::thread::native_handle_type;
674
675 // construct/copy/destroy:
676 jthread() noexcept;
677 //template <typename F, typename... Args> explicit jthread(F&& f, Args&&... args);
678 // THE constructor that starts the thread:
679 // - NOTE: does SFINAE out copy constructor semantics
680 template <
681 typename Callable, typename... Args,
682 typename
683 = ::std::enable_if_t<!::std::is_same_v<::std::decay_t<Callable>, jthread>>>
684 explicit jthread(Callable&& cb, Args&&... args);
685 ~jthread();
686
687 jthread(const jthread&) = delete;
688 jthread(jthread&&) noexcept = default;
689 jthread& operator=(const jthread&) = delete;
690 jthread& operator=(jthread&&) noexcept;
691
692 // members:
693 void swap(jthread&) noexcept;
694 bool joinable() const noexcept;
695 void join();
696 void detach();
697
698 id get_id() const noexcept;
699 native_handle_type native_handle();
700
701 // static members:
702 static unsigned hardware_concurrency() noexcept
703 {
704 return ::std::thread::hardware_concurrency();
705 };
706
707 //*****************************************
708 // - supplementary API:
709 // - for the calling thread:
710 [[nodiscard]] stop_source get_stop_source() noexcept;
711 [[nodiscard]] stop_token get_stop_token() const noexcept;
712 bool request_stop() noexcept { return get_stop_source().request_stop(); }
713
714 //*****************************************
715 //* implementation:
716 //*****************************************
717
718private:
719 //*** API for the starting thread:
720 stop_source _stopSource; // stop_source for started thread
721 ::std::thread _thread{}; // started thread (if any)
722};
723
724//**********************************************************************
725
726//*****************************************
727//* implementation of class jthread
728//*****************************************
729
730// default constructor:
731inline jthread::jthread() noexcept
732 : _stopSource{nostopstate}
733{
734}
735
736// THE constructor that starts the thread:
737// - NOTE: declaration does SFINAE out copy constructor semantics
738template <typename Callable, typename... Args, typename>
739inline jthread::jthread(Callable&& cb, Args&&... args)
740 : _stopSource{}
741 , // initialize stop_source
742 _thread{
743 [](stop_token st, auto&& cb, auto&&... args) { // called lambda in the thread
744 // perform tasks of the thread:
745 if constexpr(std::is_invocable_v<Callable, stop_token, Args...>)
746 {
747 // pass the stop_token as first argument to the started thread:
748 ::std::invoke(
749 ::std::forward<decltype(cb)>(cb), std::move(st),
750 ::std::forward<decltype(args)>(args)...);
751 }
752 else
753 {
754 // started thread does not expect a stop token:
755 ::std::invoke(
756 ::std::forward<decltype(cb)>(cb), ::std::forward<decltype(args)>(args)...);
757 }
758 },
759 _stopSource.get_token(), // not captured due to possible races if immediately set
760 ::std::forward<Callable>(cb), // pass callable
761 ::std::forward<Args>(args)... // pass arguments for callable
762}
763{
764}
765
766// move assignment operator:
767inline jthread& jthread::operator=(jthread&& t) noexcept
768{
769 if(joinable())
770 { // if not joined/detached, signal stop and wait for end:
771 request_stop();
772 join();
773 }
774
775 _thread = std::move(t._thread);
776 _stopSource = std::move(t._stopSource);
777 return *this;
778}
779
780// destructor:
781inline jthread::~jthread()
782{
783 if(joinable())
784 { // if not joined/detached, signal stop and wait for end:
785 request_stop();
786 join();
787 }
788}
789
790// others:
791inline bool jthread::joinable() const noexcept
792{
793 return _thread.joinable();
794}
795inline void jthread::join()
796{
797 _thread.join();
798}
799inline void jthread::detach()
800{
801 _thread.detach();
802}
803inline typename jthread::id jthread::get_id() const noexcept
804{
805 return _thread.get_id();
806}
807inline typename jthread::native_handle_type jthread::native_handle()
808{
809 return _thread.native_handle();
810}
811
812inline stop_source jthread::get_stop_source() noexcept
813{
814 return _stopSource;
815}
816inline stop_token jthread::get_stop_token() const noexcept
817{
818 return _stopSource.get_token();
819}
820
821inline void jthread::swap(jthread& t) noexcept
822{
823 std::swap(_stopSource, t._stopSource);
824 std::swap(_thread, t._thread);
825}
826
827} // std
828
829#endif