OSSIA
Open Scenario System for Interactive Application
Loading...
Searching...
No Matches
receiver.hpp
1#pragma once
3#include <ossia/detail/thread.hpp>
4
5#include <oscpack/ip/UdpSocket.h>
6#include <oscpack/osc/OscDebug.h>
7#include <oscpack/osc/OscPacketListener.h>
8
9#include <functional>
10#include <memory>
11#include <sstream>
12#include <thread>
13
14namespace oscpack
15{
16
17namespace detail
18{
19
20template <typename Impl_T>
21struct ClearListener : public oscpack::TimerListener
22{
23 ClearListener(UdpSocket<Impl_T>& s)
24 : socket{s}
25 {
26 }
27 UdpSocket<Impl_T>& socket;
28
29 void TimerExpired() override { socket.AsynchronousBreak(); }
30};
31
32template <typename Impl_T>
33class ReceiveSocket : public UdpSocket<Impl_T>
34{
35 SocketReceiveMultiplexer<Impl_T> mux_;
36 PacketListener* listener_;
37
38public:
39 ReceiveSocket(const IpEndpointName& localEndpoint, PacketListener* listener)
40 : listener_(listener)
41 {
42 this->Bind(localEndpoint);
43 mux_.AttachSocketListener(&this->impl_, listener_);
44 }
45
46 ~ReceiveSocket() { mux_.DetachSocketListener(&this->impl_, listener_); }
47
48 // see SocketReceiveMultiplexer above for the behaviour of these methods...
49 void Run() { mux_.Run(); }
50 void Break()
51 {
52 ClearListener<Impl_T> l{*this};
53 mux_.AttachPeriodicTimerListener(0, &l);
54 mux_.Break();
55 }
56 void AsynchronousBreak()
57 {
58 ClearListener<Impl_T> l{*this};
59 mux_.AttachPeriodicTimerListener(0, &l);
60 mux_.AsynchronousBreak();
61 }
62};
63}
64using ReceiveSocket = detail::UdpListeningReceiveSocket<detail::Implementation>;
65}
66namespace osc
67{
68
69template <typename MessageHandler>
75class listener final : public oscpack::OscPacketListener
76{
77public:
78 listener(MessageHandler msg)
79 : m_messageHandler{msg}
80 {
81 }
82
83 void ProcessMessage(
84 const oscpack::ReceivedMessage& m, const oscpack::IpEndpointName& ip) override
85 {
86 try
87 {
88 m_messageHandler(m, ip);
89 }
90 catch(std::exception& e)
91 {
92 std::stringstream s;
93 oscpack::debug(s, m);
94
95 ossia::logger().error(
96 "osc::listener::ProcessMessage error: '{}': {}", s.str(), e.what());
97 }
98 catch(...)
99 {
100 std::stringstream s;
101 oscpack::debug(s, m);
102 ossia::logger().error("osc::listener::ProcessMessage error: '{}'", s.str());
103 }
104 }
105
106 void ProcessPacket(
107 const char* data, int size, const oscpack::IpEndpointName& remoteEndpoint) override
108 {
109 try
110 {
111 oscpack::ReceivedPacket p(data, size);
112 if(p.IsBundle())
113 this->ProcessBundle(oscpack::ReceivedBundle(p), remoteEndpoint);
114 else
115 this->ProcessMessage(oscpack::ReceivedMessage(p), remoteEndpoint);
116 }
117 catch(std::exception& e)
118 {
119 ossia::logger().error("osc::listener::ProcessPacket error: {}", e.what());
120 }
121 catch(...)
122 {
123 ossia::logger().error("osc::listener::ProcessPacket error");
124 }
125 }
126
127private:
128 MessageHandler m_messageHandler;
129};
130
138{
139public:
140 template <typename Handler>
141 receiver(unsigned int port, Handler msg)
142 : m_impl{std::make_unique<listener<Handler>>(msg)}
143 {
144 setPort(port);
145 }
146
147 receiver() = default;
148 receiver(receiver&& other) noexcept
149 {
150 other.stop();
151 m_impl = std::move(other.m_impl);
152 m_socket = std::move(other.m_socket);
153 setPort(other.m_port);
154 }
155
156 receiver& operator=(receiver&& other) noexcept
157 {
158 stop();
159
160 m_impl = std::move(other.m_impl);
161 m_socket = std::move(other.m_socket);
162
163 setPort(other.m_port);
164
165 return *this;
166 }
167
168 ~receiver() { stop(); }
169
170 void run()
171 {
172 if(m_runThread.joinable())
173 stop();
174
175 m_runThread = std::thread([this] {
176 ossia::set_thread_name("ossia osc");
177 run_impl();
178 });
179 while(!m_running)
180 std::this_thread::sleep_for(std::chrono::microseconds(1));
181 }
182
183 void run_impl()
184 {
185 m_running = true;
186 osc_thread_run:
187 try
188 {
189 m_socket->Run();
190 }
191 catch(...)
192 {
193 goto osc_thread_run;
194 }
195 }
196
197 void stop()
198 {
199 m_running = false;
200 if(m_socket)
201 {
202 if(m_runThread.joinable())
203 {
204 try
205 {
206 oscpack::UdpTransmitSocket send_socket(
207 oscpack::IpEndpointName("127.0.0.1", port()));
208 send_socket.Send("__stop_", 8);
209 m_socket->AsynchronousBreak();
210 std::this_thread::sleep_for(std::chrono::milliseconds(50));
211
212 m_runThread.join();
213 }
214 catch(std::exception& e)
215 {
216 if(m_runThread.joinable())
217 m_runThread.detach();
218 }
219 }
220
221 m_socket.reset();
222 }
223 else
224 {
225 if(m_runThread.joinable())
226 {
227 // Error somewhere: the thread is joinable, but there's no socket...
228 m_runThread.detach();
229 }
230 }
231 }
232
233 unsigned int port() const { return m_port; }
234
235 unsigned int setPort(unsigned int port)
236 {
237 m_port = port;
238
239 bool ok = false;
240 while(!ok)
241 {
242 try
243 {
244 m_socket = std::make_unique<oscpack::ReceiveSocket>(
245 oscpack::IpEndpointName(oscpack::IpEndpointName::ANY_ADDRESS, m_port),
246 m_impl.get());
247 ok = true;
248 }
249 catch(std::runtime_error&)
250 {
251 m_port++;
252 }
253 }
254
255 return m_port;
256 }
257
258private:
259 unsigned int m_port = 0;
260 std::unique_ptr<oscpack::OscPacketListener> m_impl;
261 std::unique_ptr<oscpack::ReceiveSocket> m_socket;
262
263 std::thread m_runThread;
264 std::atomic_bool m_running = false;
265};
266}
The listener class.
Definition receiver.hpp:76
The receiver class.
Definition receiver.hpp:138
spdlog::logger & logger() noexcept
Where the errors will be logged. Default is stderr.
Definition context.cpp:118