OSSIA
Open Scenario System for Interactive Application
Loading...
Searching...
No Matches
message_queue.hpp
1#pragma once
2#include <ossia/detail/lockfree_queue.hpp>
3#include <ossia/detail/ptr_set.hpp>
4#include <ossia/network/base/device.hpp>
5#include <ossia/network/base/parameter.hpp>
6
7#include <ankerl/unordered_dense.h>
8
9#include <smallfun.hpp>
10
11namespace ossia
12{
13struct received_value
14{
16 ossia::value value;
17};
18
19class message_queue final : public Nano::Observer
20{
21public:
23 message_queue(ossia::net::device_base& dev)
24 : device{dev}
25 {
26 dev.on_parameter_removing.connect<&message_queue::on_param_removed>(*this);
27 }
28
29 ~message_queue()
30 {
31#if defined(__cpp_exceptions)
32 try
33 {
34 for(auto reg : m_reg)
35 {
36 reg.first->remove_callback(reg.second.second);
37 }
38 }
39 catch(...)
40 {
41 }
42#else
43 for(auto reg : m_reg)
44 {
45 reg.first->remove_callback(reg.second.second);
46 }
47#endif
48 }
49
50 bool try_dequeue(ossia::received_value& v) { return m_queue.try_dequeue(v); }
51
53 {
54 auto ptr = &p;
55 auto reg_it = m_reg.find(&p);
56 if(reg_it == m_reg.end())
57 {
58 auto it = p.add_callback([this, ptr](const ossia::value& val) {
59 m_queue.enqueue({ptr, val});
60 });
61 m_reg.insert({&p, {0, it}});
62 }
63 else
64 {
65 reg_it->second.first++;
66 }
67 }
68
69 void unreg(ossia::net::parameter_base& p)
70 {
71 auto it = m_reg.find(&p);
72 if(it != m_reg.end())
73 {
74 it->second.first--;
75 if(it->second.first <= 0)
76 {
77 p.remove_callback(it->second.second);
78 m_reg.erase(it);
79 }
80 }
81 }
82
83private:
84 void on_param_removed(const ossia::net::parameter_base& p)
85 {
86 auto it = m_reg.find(const_cast<ossia::net::parameter_base*>(&p));
87 if(it != m_reg.end())
88 m_reg.erase(it);
89 }
90
91 ossia::mpmc_queue<received_value> m_queue;
92
93 ossia::ptr_map<
95 std::pair<int, ossia::net::parameter_base::callback_index>>
96 m_reg;
97};
98
99class global_message_queue final : public Nano::Observer
100{
101public:
102 global_message_queue(ossia::net::device_base& dev)
103 {
104 dev.on_message.connect<&global_message_queue::on_message>(*this);
105 }
106
107 void on_message(const ossia::net::parameter_base& p)
108 {
109 m_queue.enqueue({const_cast<ossia::net::parameter_base*>(&p), p.value()});
110 }
111
112 bool try_dequeue(ossia::received_value& v) { return m_queue.try_dequeue(v); }
113
114private:
115 ossia::mpmc_queue<received_value> m_queue;
116};
117
118struct coalescing_queue
119{
120public:
121 using coalesced = ossia::hash_map<ossia::net::parameter_base*, ossia::value>;
122
123 smallfun::function<void(const coalesced&)> callback;
124
125 ossia::mpmc_queue<ossia::received_value> noncritical;
126 ossia::mpmc_queue<ossia::received_value> critical;
127
128 coalesced coalesce;
129
130 void process_messages()
131 {
132 ossia::received_value v;
133
134 while(critical.try_dequeue(v))
135 {
136 coalesce.emplace(v.address, std::move(v.value));
137 callback(coalesce);
138 coalesce.clear();
139 }
140
141 coalesce.clear();
142 while(noncritical.try_dequeue(v))
143 {
144 coalesce[v.address] = std::move(v.value);
145 }
146
147 callback(coalesce);
148
149 coalesce.clear();
150 }
151};
152}
void remove_callback(iterator it)
remove_callback Removes a callback identified by an iterator.
Definition callback_container.hpp:114
iterator add_callback(T &&callback)
add_callback Add a new callback.
Definition callback_container.hpp:90
Root of a device tree.
Definition ossia/network/base/device.hpp:58
The parameter_base class.
Definition ossia/network/base/parameter.hpp:48
virtual ossia::value value() const =0
Clone the current value without any network request.
The value class.
Definition value.hpp:173
Definition git_info.h:7
bool critical
Means that the node is very important, e.g. a "play" message.
Definition node_attributes.hpp:92