OSSIA
Open Scenario System for Interactive Application
Loading...
Searching...
No Matches
websocket_log_sink.hpp
1#pragma once
2#include <ossia/detail/hash_map.hpp>
3#include <ossia/detail/json.hpp>
4#include <ossia/detail/nullable_variant.hpp>
5#include <ossia/network/sockets/websocket_client.hpp>
6
7#include <spdlog/sinks/stdout_sinks.h>
8#include <spdlog/spdlog.h>
9
10#include <readerwriterqueue.h>
11
12#include <atomic>
13
14namespace ossia
15{
16struct websocket_threaded_connection
17{
18 websocket_threaded_connection(const std::string& ip)
19 : socket([](auto&&...) {})
20 {
21 running = true;
22 thread = std::thread([this, ip] {
23 auto log = spdlog::get("websocket");
24 if(!log)
25 log = spdlog::stderr_logger_mt("websocket");
26 try
27 {
28 while(running)
29 {
30 socket.connect_and_run(ip);
31 if(running)
32 {
33 // Try to reconnect
34 log->critical("Logger could not connect to {}.", ip);
35 std::this_thread::sleep_for(std::chrono::seconds(1));
36 }
37 }
38 log->critical("Logger stopping.");
39 }
40 catch(const websocketpp::exception& e)
41 {
42 log->critical("Logger error: ", e.what());
43 }
44 catch(const std::exception& e)
45 {
46 log->critical("Logger error: ", e.what());
47 }
48 catch(...)
49 {
50 log->critical("Logger error");
51 }
52 });
53 }
54
55 ~websocket_threaded_connection()
56 {
57 running = false;
58 if(!socket.after_connect())
59 std::this_thread::sleep_for(std::chrono::milliseconds(500));
60 socket.stop();
61 if(thread.joinable())
62 thread.join();
63 }
64
66 std::atomic_bool running{};
67 std::thread thread;
68};
69
72 : public spdlog::sinks::sink
73 , public Nano::Observer
74{
75 websocket_log_sink(std::shared_ptr<websocket_threaded_connection> s, std::string send)
76 : socket{std::move(s)}
77 , sender{std::move(send)}
78 {
79 socket->socket.on_open.connect<&websocket_log_sink::open_fun>(this);
80 }
81
82 void open_fun()
83 {
84 std::string m;
85 while(logs.try_dequeue(m))
86 {
87 socket->socket.send_message(m);
88 }
89 }
90
91 ~websocket_log_sink() override
92 {
93 socket->socket.on_open.disconnect<&websocket_log_sink::open_fun>(this);
94 }
95
96 void make_message(const spdlog::details::log_msg& msg)
97 {
98 buffer.Clear();
99
100 ossia::json_writer writer{buffer};
101
102 writer.StartObject();
103
104 writer.Key("operation");
105 writer.String("log");
106
107 writer.Key("level");
108 switch(msg.level)
109 {
110 case spdlog::level::trace:
111 writer.String("trace");
112 break;
113 case spdlog::level::debug:
114 writer.String("debug");
115 break;
116 case spdlog::level::info:
117 writer.String("info");
118 break;
119 case spdlog::level::warn:
120 writer.String("warn");
121 break;
122 case spdlog::level::err:
123 writer.String("error");
124 break;
125 case spdlog::level::critical:
126 writer.String("critical");
127 break;
128 case spdlog::level::off:
129 writer.String("off");
130 break;
131 }
132
133 writer.Key("sender");
134 writer.String(sender.data(), sender.size());
135
136 writer.Key("message");
137 writer.String(msg.payload.data(), msg.payload.size());
138
139 writer.EndObject();
140 }
141
142 void send_message(const spdlog::details::log_msg& msg)
143 {
144 make_message(msg);
145 socket->socket.send_message(buffer);
146 }
147
148 void log(const spdlog::details::log_msg& msg) override
149 {
150 if(!socket->socket.connected())
151 {
152 make_message(msg);
153 logs.enqueue(std::string{buffer.GetString(), buffer.GetSize()});
154 return;
155 }
156 else
157 {
158 send_message(msg);
159 }
160 }
161
162 void flush() override { }
163
164 void set_pattern(const std::string& pattern) override { }
165 void set_formatter(std::unique_ptr<spdlog::formatter> sink_formatter) override { }
166
167private:
168 rapidjson::StringBuffer buffer;
169 std::shared_ptr<websocket_threaded_connection> socket;
170 std::string sender;
171
172 moodycamel::ReaderWriterQueue<std::string> logs;
173};
174
176struct websocket_heartbeat : public Nano::Observer
177{
178public:
180 std::shared_ptr<websocket_threaded_connection> t, std::string s,
181 std::chrono::seconds dur)
182 : interval{dur}
183 , sender{s}
184 , conn{t}
185 {
186 thread = std::thread([this] {
187 while(running)
188 {
189 if(init && conn->socket.connected())
190 {
191 buffer.Clear();
192 ossia::json_writer writer{buffer};
193 writer.StartObject();
194
195 writer.Key("operation");
196 writer.String("alive");
197
198 writer.Key("sender");
199 writer.String(sender.data(), sender.size());
200
201 writer.EndObject();
202
203 conn->socket.send_message(buffer);
204 }
205
206 for(int i = 0; i < 100; i++)
207 {
208 std::this_thread::sleep_for(
209 std::chrono::duration_cast<std::chrono::milliseconds>(interval) / 100.0);
210 if(!running)
211 return;
212 }
213 }
214 });
215
216 t->socket.on_open.connect<&websocket_heartbeat::open_fun>(*this);
217 }
218
219 void open_fun()
220 {
221 conn->socket.send_message(init_msg);
222 init = true;
223 }
224
225 ~websocket_heartbeat()
226 {
227 conn->socket.on_open.disconnect<&websocket_heartbeat::open_fun>(*this);
228 running = false;
229 if(thread.joinable())
230 thread.join();
231 }
232
233 void
234 send_init(const ossia::hash_map<std::string, ossia::variant<std::string, int>>& map)
235 {
236 rapidjson::StringBuffer buffer;
237 ossia::json_writer writer{buffer};
238 writer.StartObject();
239
240 writer.Key("operation");
241 writer.String("initWatchdog");
242
243 writer.Key("sender");
244 writer.String(sender.data(), sender.size());
245
246 writer.Key("aliveTime");
247 writer.Int(interval.count());
248
249 struct
250 {
251 ossia::json_writer& writer;
252 void operator()(const std::string& s) { writer.String(s); }
253
254 void operator()(int s) { writer.Int(s); }
255 } sw{writer};
256
257 for(const auto& pair : map)
258 {
259 writer.Key(pair.first);
260 ossia::visit(sw, pair.second);
261 }
262
263 writer.EndObject();
264
265 if(conn->socket.connected())
266 {
267 conn->socket.send_message(buffer);
268 init = true;
269 }
270 else
271 {
272 init_msg = std::string(buffer.GetString(), buffer.GetSize());
273 }
274 }
275
276private:
277 rapidjson::StringBuffer buffer;
278 std::thread thread;
279 std::chrono::seconds interval;
280 std::string sender;
281 std::string init_msg;
282 std::shared_ptr<websocket_threaded_connection> conn;
283 std::atomic_bool running{true};
284 std::atomic_bool init{false};
285};
286}
Low-level Websocket client.
Definition websocket_client.hpp:18
Definition git_info.h:7
Sends websocket "alive" messages at regular intervals.
Definition websocket_log_sink.hpp:177
A sink to use with spdlog, that will send its log messages over websockets.
Definition websocket_log_sink.hpp:74