OSSIA
Open Scenario System for Interactive Application
Loading...
Searching...
No Matches
pipewire_protocol.hpp
1#pragma once
2#include <ossia/detail/config.hpp>
3
4#if defined(OSSIA_ENABLE_PIPEWIRE)
5#if __has_include(<pipewire/pipewire.h>) && __has_include(<spa/param/latency-utils.h>)
6#define OSSIA_AUDIO_PIPEWIRE 1
7#include <ossia/audio/audio_engine.hpp>
8#include <ossia/detail/dylib_loader.hpp>
9#include <ossia/detail/hash_map.hpp>
11#include <ossia/detail/thread.hpp>
12
13#include <pipewire/core.h>
14#include <pipewire/filter.h>
15#include <pipewire/pipewire.h>
16#include <spa/pod/builder.h>
17#include <spa/utils/result.h>
18
19#include <cmath>
20
21#include <cassert>
22#include <cerrno>
23#include <chrono>
24#include <cstdio>
25#include <stdexcept>
26
27#include <spa/param/latency-utils.h>
28
29namespace ossia
30{
31class libpipewire
32{
33public:
34 decltype(&::pw_init) init{};
35 decltype(&::pw_deinit) deinit{};
36
37 decltype(&::pw_context_new) context_new{};
38 decltype(&::pw_context_connect) context_connect{};
39 decltype(&::pw_context_destroy) context_destroy{};
40
41 decltype(&::pw_core_disconnect) core_disconnect{};
42
43 decltype(&::pw_proxy_add_listener) proxy_add_listener{};
44 decltype(&::pw_proxy_destroy) proxy_destroy{};
45
46 decltype(&::pw_main_loop_new) main_loop_new{};
47 decltype(&::pw_main_loop_destroy) main_loop_destroy{};
48 decltype(&::pw_main_loop_quit) main_loop_quit{};
49 decltype(&::pw_main_loop_run) main_loop_run{};
50 decltype(&::pw_main_loop_get_loop) main_loop_get_loop{};
51
52 decltype(&::pw_properties_new) properties_new{};
53 decltype(&::pw_properties_free) properties_free{};
54 decltype(&::pw_properties_get) properties_get{};
55
56 decltype(&::pw_filter_new_simple) filter_new_simple{};
57 decltype(&::pw_filter_get_node_id) filter_get_node_id{};
58 decltype(&::pw_filter_get_properties) filter_get_properties{};
59 decltype(&::pw_filter_add_port) filter_add_port{};
60 decltype(&::pw_filter_destroy) filter_destroy{};
61 decltype(&::pw_filter_connect) filter_connect{};
62 decltype(&::pw_filter_disconnect) filter_disconnect{};
63 decltype(&::pw_filter_get_dsp_buffer) filter_get_dsp_buffer{};
64
65 static const libpipewire& instance()
66 {
67 static const libpipewire self;
68 return self;
69 }
70
71private:
72 dylib_loader library;
73
74 libpipewire()
75 : library("libpipewire-0.3.so.0")
76 {
77 // in terms of regex:
78 // decltype\‍(&::([a-z_]+)\‍) [a-z_]+{};
79 // \1 = library.symbol<decltype(&::\1)>("\1");
80 init = library.symbol<decltype(&::pw_init)>("pw_init");
81 deinit = library.symbol<decltype(&::pw_deinit)>("pw_deinit");
82
83 context_new = library.symbol<decltype(&::pw_context_new)>("pw_context_new");
84 context_connect
85 = library.symbol<decltype(&::pw_context_connect)>("pw_context_connect");
86 context_destroy
87 = library.symbol<decltype(&::pw_context_destroy)>("pw_context_destroy");
88
89 core_disconnect
90 = library.symbol<decltype(&::pw_core_disconnect)>("pw_core_disconnect");
91
92 proxy_add_listener
93 = library.symbol<decltype(&::pw_proxy_add_listener)>("pw_proxy_add_listener");
94 proxy_destroy = library.symbol<decltype(&::pw_proxy_destroy)>("pw_proxy_destroy");
95
96 main_loop_new = library.symbol<decltype(&::pw_main_loop_new)>("pw_main_loop_new");
97 main_loop_destroy
98 = library.symbol<decltype(&::pw_main_loop_destroy)>("pw_main_loop_destroy");
99 main_loop_quit = library.symbol<decltype(&::pw_main_loop_quit)>("pw_main_loop_quit");
100 main_loop_run = library.symbol<decltype(&::pw_main_loop_run)>("pw_main_loop_run");
101 main_loop_get_loop
102 = library.symbol<decltype(&::pw_main_loop_get_loop)>("pw_main_loop_get_loop");
103
104 properties_new = library.symbol<decltype(&::pw_properties_new)>("pw_properties_new");
105 properties_free
106 = library.symbol<decltype(&::pw_properties_free)>("pw_properties_free");
107 properties_get = library.symbol<decltype(&::pw_properties_get)>("pw_properties_get");
108
109 filter_new_simple
110 = library.symbol<decltype(&::pw_filter_new_simple)>("pw_filter_new_simple");
111 filter_get_node_id
112 = library.symbol<decltype(&::pw_filter_get_node_id)>("pw_filter_get_node_id");
113 filter_get_properties = library.symbol<decltype(&::pw_filter_get_properties)>(
114 "pw_filter_get_properties");
115 filter_add_port
116 = library.symbol<decltype(&::pw_filter_add_port)>("pw_filter_add_port");
117 filter_destroy = library.symbol<decltype(&::pw_filter_destroy)>("pw_filter_destroy");
118 filter_connect = library.symbol<decltype(&::pw_filter_connect)>("pw_filter_connect");
119 filter_disconnect
120 = library.symbol<decltype(&::pw_filter_disconnect)>("pw_filter_disconnect");
121 filter_get_dsp_buffer = library.symbol<decltype(&::pw_filter_get_dsp_buffer)>(
122 "pw_filter_get_dsp_buffer");
123
124 assert(init);
125 assert(deinit);
126
127 assert(context_new);
128 assert(context_connect);
129 assert(context_destroy);
130
131 assert(core_disconnect);
132
133 assert(proxy_destroy);
134
135 assert(main_loop_new);
136 assert(main_loop_destroy);
137 assert(main_loop_quit);
138 assert(main_loop_run);
139 assert(main_loop_get_loop);
140
141 assert(properties_new);
142 assert(properties_free);
143 assert(properties_get);
144
145 assert(filter_new_simple);
146 assert(filter_get_node_id);
147 assert(filter_get_properties);
148 assert(filter_add_port);
149 assert(filter_destroy);
150 assert(filter_connect);
151 assert(filter_disconnect);
152 assert(filter_get_dsp_buffer);
153 }
154};
155
156struct pipewire_context
157{
158 pw_main_loop* main_loop{};
159 pw_loop* lp{};
160
161 pw_context* context{};
162 pw_core* core{};
163
164 pw_registry* registry{};
165 spa_hook registry_listener{};
166
167 struct listened_port
168 {
169 uint32_t id{};
170 pw_port* port{};
171 std::unique_ptr<spa_hook> listener;
172 };
173 std::vector<listened_port> port_listener{};
174
175 struct port_info
176 {
177 uint32_t id{};
178
179 std::string format;
180 std::string port_name;
181 std::string port_alias;
182 std::string object_path;
183 std::string node_id;
184 std::string port_id;
185
186 bool physical{};
187 bool terminal{};
188 bool monitor{};
189 pw_direction direction{};
190 };
191
192 struct node
193 {
194 std::vector<port_info> inputs;
195 std::vector<port_info> outputs;
196 };
197
198 struct graph
199 {
200 ossia::hash_map<uint32_t, node> physical_audio;
201 ossia::hash_map<uint32_t, node> physical_midi;
202 ossia::hash_map<uint32_t, node> software_audio;
203 ossia::hash_map<uint32_t, node> software_midi;
204
205 void for_each_port(auto func)
206 {
207 for(auto& map : {physical_audio, physical_midi, software_audio, software_midi})
208 {
209 for(auto& [id, node] : map)
210 {
211 for(auto& port : node.inputs)
212 func(port);
213 for(auto& port : node.outputs)
214 func(port);
215 }
216 }
217 }
218
219 void remove_port(uint32_t id)
220 {
221 for(auto map : {&physical_audio, &physical_midi, &software_audio, &software_midi})
222 {
223 for(auto& [_, node] : *map)
224 {
225 ossia::remove_erase_if(
226 node.inputs, [id](const port_info& p) { return p.id == id; });
227 ossia::remove_erase_if(
228 node.outputs, [id](const port_info& p) { return p.id == id; });
229 }
230 }
231 }
232 } current_graph;
233
234 int sync{};
235
236 const libpipewire& pw = libpipewire::instance();
237 explicit pipewire_context()
238 {
240 int argc = 0;
241 char* argv[] = {NULL};
242 char** aa = argv;
243 pw.init(&argc, &aa);
244
245 this->main_loop = pw.main_loop_new(nullptr);
246 if(!this->main_loop)
247 {
248 ossia::logger().error("PipeWire: main_loop_new failed!");
249 return;
250 }
251
252 this->lp = pw.main_loop_get_loop(this->main_loop);
253 if(!lp)
254 {
255 ossia::logger().error("PipeWire: main_loop_get_loop failed!");
256 return;
257 }
258
259 this->context = pw.context_new(lp, nullptr, 0);
260 if(!this->context)
261 {
262 ossia::logger().error("PipeWire: context_new failed!");
263 return;
264 }
265
266 this->core = pw.context_connect(this->context, nullptr, 0);
267 if(!this->core)
268 {
269 ossia::logger().error("PipeWire: context_connect failed!");
270 return;
271 }
272
273 this->registry = pw_core_get_registry(this->core, PW_VERSION_REGISTRY, 0);
274 if(!this->registry)
275 {
276 ossia::logger().error("PipeWire: core_get_registry failed!");
277 return;
278 }
279
280 // Register a listener which will listen on when ports are added / removed
281 spa_zero(registry_listener);
282 static constexpr const struct pw_port_events port_events
283 = {.version = PW_VERSION_PORT_EVENTS,
284 .info =
285 [](void* object, const pw_port_info* info) {
286 ((pipewire_context*)object)->register_port(info);
287 },
288 .param = {}};
289
290 static constexpr const struct pw_registry_events registry_events = {
291 .version = PW_VERSION_REGISTRY_EVENTS,
292 .global =
293 [](void* object, uint32_t id, uint32_t /*permissions*/, const char* type,
294 uint32_t /*version*/, const struct spa_dict* /*props*/) {
295 pipewire_context& self = *(pipewire_context*)object;
296
297 // When a port is added:
298 if(strcmp(type, PW_TYPE_INTERFACE_Port) == 0)
299 {
300 auto port
301 = (pw_port*)pw_registry_bind(self.registry, id, type, PW_VERSION_PORT, 0);
302 self.port_listener.push_back({id, port, std::make_unique<spa_hook>()});
303 auto& l = self.port_listener.back();
304
305 pw_port_add_listener(l.port, l.listener.get(), &port_events, &self);
306 }
307 },
308 .global_remove =
309 [](void* object, uint32_t id) {
310 pipewire_context& self = *(pipewire_context*)object;
311
312 // When a port is removed:
313 // Remove from the graph
314 self.current_graph.remove_port(id);
315
316 // Remove from the listeners
317 auto it = ossia::find_if(
318 self.port_listener, [&](const listened_port& l) { return l.id == id; });
319 if(it != self.port_listener.end())
320 {
321 libpipewire::instance().proxy_destroy((pw_proxy*)it->port);
322 self.port_listener.erase(it);
323 }
324 },
325 };
326
327 // Start listening
328 pw_registry_add_listener(
329 this->registry, &this->registry_listener, &registry_events, this);
330
331 if(!synchronize())
332 {
333 ossia::logger().error(
334 "PipeWire: initial synchronize() failed — context marked as broken");
335 // m_broken is already set by synchronize(); leave members as-is so the
336 // destructor still cleans them up, but the factory will see broken()
337 // and rebuild the context rather than use this one.
338 return;
339 }
340 }
341
342 int pending{};
343 int done{};
344 int error_state{};
345 bool m_broken{};
346
347 // Once broken, the shared core connection is considered unrecoverable:
348 // the factory will tear this context down and build a new one rather
349 // than attempting to reuse it.
350 bool broken() const noexcept { return m_broken; }
351
352 // Round-trip to the PipeWire daemon with a hard deadline. Returns true
353 // on success; on timeout or protocol error, logs, marks the context as
354 // broken, and returns false. Callers must check the return value and
355 // bail out of their own wait loops — we must never spin forever against
356 // a wedged daemon.
357 bool synchronize()
358 {
359 pending = 0;
360 done = 0;
361 error_state = 0;
362
363 if(!core || m_broken)
364 return false;
365
366 static constexpr struct pw_core_events core_events = {
367 .version = PW_VERSION_CORE_EVENTS,
368 .info = {},
369 .done =
370 [](void* object, uint32_t id, int seq) {
371 auto& self = *(pipewire_context*)object;
372 if(id == PW_ID_CORE && seq == self.pending)
373 {
374 self.done = 1;
375 }
376 },
377 .ping = {},
378 .error =
379 [](void* object, uint32_t id, int seq, int res, const char* message) {
380 auto& self = *(pipewire_context*)object;
381 ossia::logger().error(
382 "PipeWire: core error id={} seq={} res={} ({}): {}", id, seq, res,
383 spa_strerror(res), message ? message : "");
384 self.error_state = 1;
385 },
386 .remove_id = {},
387 .bound_id = {},
388 .add_mem = {},
389 .remove_mem = {},
390#if defined(PW_CORE_EVENT_BOUND_PROPS)
391 .bound_props = {},
392#endif
393 };
394
395 spa_hook core_listener;
396 spa_zero(core_listener);
397 pw_core_add_listener(core, &core_listener, &core_events, this);
398
399 // RAII: guarantee the listener is removed on every exit path so the
400 // core never calls into a dead stack frame (the hook lives on the stack).
401 struct hook_guard
402 {
403 spa_hook* h;
404 ~hook_guard() { spa_hook_remove(h); }
405 } guard{&core_listener};
406
407 pending = pw_core_sync(core, PW_ID_CORE, 0);
408
409 using clk = std::chrono::steady_clock;
410 constexpr auto timeout = std::chrono::seconds(2);
411 const auto deadline = clk::now() + timeout;
412
413 while(!done && !error_state)
414 {
415 const auto now = clk::now();
416 if(now >= deadline)
417 {
418 ossia::logger().error(
419 "PipeWire: synchronize() timed out after {} s waiting for core sync",
420 int(std::chrono::duration_cast<std::chrono::seconds>(timeout).count()));
421 error_state = 1;
422 break;
423 }
424
425 const auto remaining
426 = std::chrono::duration_cast<std::chrono::milliseconds>(deadline - now).count();
427 const int iter_ms = int(remaining < 50 ? remaining : 50);
428
429 const int r = pw_loop_iterate(lp, iter_ms);
430 if(r < 0)
431 {
432 ossia::logger().error(
433 "PipeWire: pw_loop_iterate failed: {}", spa_strerror(r));
434 error_state = 1;
435 break;
436 }
437 }
438
439 if(error_state || !done)
440 {
441 m_broken = true;
442 return false;
443 }
444 return true;
445 }
446
447 pw_proxy* link_ports(uint32_t out_port, uint32_t in_port)
448 {
449 if(m_broken)
450 return nullptr;
451
452 auto props = pw.properties_new(
453 PW_KEY_LINK_OUTPUT_PORT, std::to_string(out_port).c_str(),
454 PW_KEY_LINK_INPUT_PORT, std::to_string(in_port).c_str(), nullptr);
455
456 auto proxy = (pw_proxy*)pw_core_create_object(
457 this->core, "link-factory", PW_TYPE_INTERFACE_Link, PW_VERSION_LINK,
458 &props->dict, 0);
459
460 if(!proxy)
461 {
462 ossia::logger().error("PipeWire: could not allocate link");
463 pw.properties_free(props);
464 return nullptr;
465 }
466
467 if(!synchronize())
468 {
469 // sync failed: the daemon may or may not have committed the create.
470 // Destroy our local proxy (best-effort, won't hang) and bail out.
471 pw.proxy_destroy(proxy);
472 pw.properties_free(props);
473 return nullptr;
474 }
475 pw.properties_free(props);
476 return proxy;
477 }
478
479 void register_port(const pw_port_info* info)
480 {
481 const spa_dict_item* item{};
482
483 port_info p;
484 p.id = info->id;
485
486 spa_dict_for_each(item, info->props)
487 {
488 std::string_view k{item->key}, v{item->value};
489 if(k == "format.dsp")
490 p.format = v;
491 else if(k == "port.name")
492 p.port_name = v;
493 else if(k == "port.alias")
494 p.port_alias = v;
495 else if(k == "object.path")
496 p.object_path = v;
497 else if(k == "port.id")
498 p.port_id = v;
499 else if(k == "node.id")
500 p.node_id = v;
501 else if(k == "port.physical" && v == "true")
502 p.physical = true;
503 else if(k == "port.terminal" && v == "true")
504 p.terminal = true;
505 else if(k == "port.monitor" && v == "true")
506 p.monitor = true;
507 else if(k == "port.direction")
508 {
509 if(v == "out")
510 {
511 p.direction = pw_direction::SPA_DIRECTION_OUTPUT;
512 }
513 else
514 {
515 p.direction = pw_direction::SPA_DIRECTION_INPUT;
516 }
517 }
518 }
519
520 if(p.node_id.empty())
521 return;
522
523 const auto nid = std::stoul(p.node_id);
524 if(p.physical)
525 {
526 if(p.format.find("audio") != p.format.npos)
527 {
528 if(p.direction == pw_direction::SPA_DIRECTION_OUTPUT)
529 this->current_graph.physical_audio[nid].outputs.push_back(std::move(p));
530 else
531 this->current_graph.physical_audio[nid].inputs.push_back(std::move(p));
532 }
533 else if(p.format.find("midi") != p.format.npos)
534 {
535 if(p.direction == pw_direction::SPA_DIRECTION_OUTPUT)
536 this->current_graph.physical_midi[nid].outputs.push_back(std::move(p));
537 else
538 this->current_graph.physical_midi[nid].inputs.push_back(std::move(p));
539 }
540 else
541 {
542 // TODO, video ?
543 }
544 }
545 else
546 {
547 if(p.format.find("audio") != p.format.npos)
548 {
549 if(p.direction == pw_direction::SPA_DIRECTION_OUTPUT)
550 this->current_graph.software_audio[nid].outputs.push_back(std::move(p));
551 else
552 this->current_graph.software_audio[nid].inputs.push_back(std::move(p));
553 }
554 else if(p.format.find("midi") != p.format.npos)
555 {
556 if(p.direction == pw_direction::SPA_DIRECTION_OUTPUT)
557 this->current_graph.software_midi[nid].outputs.push_back(std::move(p));
558 else
559 this->current_graph.software_midi[nid].inputs.push_back(std::move(p));
560 }
561 else
562 {
563 // TODO, video ?
564 }
565 }
566 }
567
568 int get_fd() const noexcept
569 {
570 if(!this->lp)
571 return -1;
572
573 auto spa_callbacks = this->lp->control->iface.cb;
574 auto spa_loop_methods = (const spa_loop_control_methods*)spa_callbacks.funcs;
575 if(spa_loop_methods->get_fd)
576 return spa_loop_methods->get_fd(spa_callbacks.data);
577 else
578 return -1;
579 }
580
581 ~pipewire_context()
582 {
583 if(this->registry)
584 pw.proxy_destroy((pw_proxy*)this->registry);
585 for(auto& [id, p, l] : this->port_listener)
586 if(l)
587 pw.proxy_destroy((pw_proxy*)p);
588 if(this->core)
589 pw.core_disconnect(this->core);
590 if(this->context)
591 pw.context_destroy(this->context);
592 if(this->main_loop)
593 pw.main_loop_destroy(this->main_loop);
594
595 pw.deinit();
596 }
597};
598
599struct audio_setup
600{
601 std::string name;
602 std::string card_in;
603 std::string card_out;
604
605 std::vector<std::string> inputs;
606 std::vector<std::string> outputs;
607
608 int rate{};
609 int buffer_size{};
610};
611
612class pipewire_audio_protocol : public audio_engine
613{
614public:
615 struct port
616 {
617 };
618
619 std::shared_ptr<pipewire_context> loop{};
620 pw_filter* filter{};
621 std::vector<pw_proxy*> links{};
622
623 explicit pipewire_audio_protocol(
624 std::shared_ptr<pipewire_context> loop, const audio_setup& setup)
625 {
626 auto& pw = libpipewire::instance();
627
628 static constexpr const struct pw_filter_events filter_events = {
629 .version = PW_VERSION_FILTER_EVENTS,
630 .destroy = {},
631 .state_changed = {},
632 .io_changed = {},
633 .param_changed = {},
634 .add_buffer = {},
635 .remove_buffer = {},
636 .process = on_process,
637 .drained = {},
638#if PW_VERSION_CORE > 3
639 .command = {},
640#endif
641 };
642
643 this->loop = loop;
644
645 auto lp = loop->lp;
646 // clang-format off
647 // Create the filter (the main pipewire object which will represent the
648 // software)
649 auto filter_props{
650 pw.properties_new(
651 PW_KEY_MEDIA_TYPE, "Audio",
652 PW_KEY_MEDIA_CATEGORY, "Duplex",
653 PW_KEY_MEDIA_ROLE, "DSP",
654 PW_KEY_MEDIA_NAME, "ossia",
655 PW_KEY_NODE_NAME, "ossia",
656 PW_KEY_NODE_GROUP, "group.dsp.0",
657 PW_KEY_NODE_DESCRIPTION, "ossia score",
658 PW_KEY_NODE_LATENCY, fmt::format("{}/{}", setup.buffer_size, setup.rate).c_str(),
659 PW_KEY_NODE_FORCE_QUANTUM, fmt::format("{}", setup.buffer_size).c_str(),
660 PW_KEY_NODE_LOCK_QUANTUM, "true",
661 PW_KEY_NODE_TRANSPORT_SYNC, "true",
662 //PW_KEY_NODE_RATE, fmt::format("{}/{}", 1, setup.rate).c_str(),
663 PW_KEY_NODE_FORCE_RATE, fmt::format("{}", setup.rate).c_str(),
664 //PW_KEY_NODE_LOCK_RATE, "true",
665 //PW_KEY_NODE_ALWAYS_PROCESS, "true",
666 PW_KEY_NODE_PAUSE_ON_IDLE, "false",
667 PW_KEY_NODE_SUSPEND_ON_IDLE, "false",
668 nullptr)};
669
670 // clang-format on
671 this->filter = pw.filter_new_simple(
672 lp, setup.name.c_str(), filter_props, &filter_events, this);
673 if(!this->filter)
674 {
675 throw std::runtime_error("PipeWire: could not create filter instance");
676 }
677
678 // Create the request ports
679 for(std::size_t i = 0; i < setup.inputs.size(); i++)
680 {
681 auto p = (port*)pw.filter_add_port(
682 this->filter, PW_DIRECTION_INPUT, PW_FILTER_PORT_FLAG_MAP_BUFFERS,
683 sizeof(struct port),
684 pw.properties_new(
685 PW_KEY_FORMAT_DSP, "32 bit float mono audio", PW_KEY_PORT_NAME,
686 setup.inputs[i].c_str(), NULL),
687 NULL, 0);
688 input_ports.push_back(p);
689 }
690
691 for(std::size_t i = 0; i < setup.outputs.size(); i++)
692 {
693 auto p = (port*)pw.filter_add_port(
694 this->filter, PW_DIRECTION_OUTPUT, PW_FILTER_PORT_FLAG_MAP_BUFFERS,
695 sizeof(struct port),
696 pw.properties_new(
697 PW_KEY_FORMAT_DSP, "32 bit float mono audio", PW_KEY_PORT_NAME,
698 setup.outputs[i].c_str(), NULL),
699 NULL, 0);
700 output_ports.push_back(p);
701 }
702
703 if(pw.filter_connect(this->filter, PW_FILTER_FLAG_RT_PROCESS, nullptr, 0) < 0)
704 {
705 throw std::runtime_error("PipeWire: cannot connect");
706 }
707
708 // Wait until everything is registered with PipeWire. Every synchronize()
709 // call is bounded by a deadline inside the context; if any of them fail
710 // we abort the construction without setting activated, leaving an inert
711 // engine that running() will report as false.
712 if(!this->loop->synchronize())
713 {
714 ossia::logger().error(
715 "PipeWire: synchronize() failed after filter_connect — engine inactive");
716 return;
717 }
718 {
719 int k = 0;
720 auto node_id = filter_node_id();
721 while(node_id == 4294967295)
722 {
723 if(!this->loop->synchronize())
724 {
725 ossia::logger().error(
726 "PipeWire: synchronize() failed while waiting for node id");
727 return;
728 }
729 node_id = filter_node_id();
730
731 if(k++; k > 100)
732 return;
733 }
734
735 // Leave some time to resolve the ports
736 k = 0;
737 const auto num_local_ins = this->input_ports.size();
738 const auto num_local_outs = this->output_ports.size();
739 auto& this_node = this->loop->current_graph.software_audio[node_id];
740 while(this_node.inputs.size() < num_local_ins
741 || this_node.outputs.size() < num_local_outs)
742 {
743 if(!this->loop->synchronize())
744 {
745 ossia::logger().error(
746 "PipeWire: synchronize() failed while waiting for ports");
747 return;
748 }
749 if(k++; k > 100)
750 return;
751 }
752 }
753
754 activated = true;
755 this->effective_buffer_size = setup.buffer_size;
756 this->effective_sample_rate = setup.rate;
757 this->effective_inputs = setup.inputs.size();
758 this->effective_outputs = setup.outputs.size();
759 }
760
761 uint32_t filter_node_id() { return this->loop->pw.filter_get_node_id(this->filter); }
762
763 void autoconnect()
764 {
765 auto node_id = filter_node_id();
766
767 std::vector<std::pair<std::optional<uint32_t>, std::optional<uint32_t>>>
768 phys_in_to_ossia;
769 std::vector<std::pair<std::optional<uint32_t>, std::optional<uint32_t>>>
770 ossia_to_phys_out;
771
772 // Link to the first physical soundcard we see
773 for(auto& [node, ports] : loop->current_graph.physical_audio)
774 {
775 auto& [out, in] = ports;
776
777 // The soundcard outputs are input ports
778 for(auto& port : in)
779 {
780 phys_in_to_ossia.emplace_back(port.id, std::nullopt);
781 }
782
783 // The soundcard inputs are output ports
784 for(auto& port : out)
785 {
786 ossia_to_phys_out.emplace_back(std::nullopt, port.id);
787 }
788 }
789
790 // Enumerate our matching local ports
791 for(auto& [node, ports] : loop->current_graph.software_audio)
792 {
793 if(node == node_id)
794 {
795 auto& [in, out] = ports;
796
797 // Connect our inputs to the soundcard inputs
798 for(std::size_t i = 0; i < in.size(); i++)
799 {
800 if(i >= phys_in_to_ossia.size())
801 break;
802
803 phys_in_to_ossia[i].second = in[i].id;
804 }
805
806 // Connect our outputs to the soundcard inputs
807 for(std::size_t i = 0; i < out.size(); i++)
808 {
809 if(i >= ossia_to_phys_out.size())
810 break;
811
812 ossia_to_phys_out[i].first = out[i].id;
813 }
814 break;
815 }
816 }
817
818 // Connect as much as we can
819 for(auto [phys, self] : phys_in_to_ossia)
820 {
821 if(phys && self)
822 {
823 if(auto link = this->loop->link_ports(*phys, *self))
824 this->links.push_back(link);
825 }
826 else
827 {
828 break;
829 }
830 }
831
832 for(auto [self, phys] : ossia_to_phys_out)
833 {
834 if(self && phys)
835 {
836 if(auto link = this->loop->link_ports(*self, *phys))
837 this->links.push_back(link);
838 }
839 else
840 {
841 break;
842 }
843 }
844 }
845
846 void wait(int ms) override
847 {
848 if(!loop)
849 return;
850
851 using namespace std::chrono;
852 using clk = high_resolution_clock;
853
854 auto t0 = clk::now();
855 auto t1 = clk::now();
856 while(duration_cast<milliseconds>(t1 - t0).count() < ms)
857 {
858 pw_loop_iterate(loop->lp, ms);
859 t1 = clk::now();
860 }
861 }
862
863 bool running() const override
864 {
865 if(!this->loop)
866 return false;
867 return activated;
868 }
869
870 void stop() override
871 {
872 audio_engine::stop();
873
874 if(!this->loop)
875 return;
876
877 if(!activated)
878 return;
879
880 auto& pw = libpipewire::instance();
881
882 // Tear-down ordering matters here. pw_filter_connect with
883 // PW_FILTER_FLAG_RT_PROCESS schedules on_process on PipeWire's data
884 // (RT) loop, which is a different thread from the main/control loop
885 // we're running on. If we destroy link proxies or the filter while
886 // the data loop is still dispatching on_process against them, we race
887 // the RT thread and corrupt PipeWire's internal state — which can
888 // wedge the shared core connection and leave the app unrecoverable
889 // short of a full restart.
890 //
891 // Correct sequence:
892 // 1. pw_filter_disconnect: synchronously deactivates the node and
893 // tears down its data-loop scheduling, so on_process will no
894 // longer fire for this filter.
895 // 2. synchronize: round-trip with the daemon so the disconnect has
896 // fully propagated before we touch any proxies the RT path may
897 // have referenced.
898 // 3. destroy link proxies — now safe.
899 // 4. pw_filter_destroy — frees the local filter impl. It detects
900 // the prior disconnect and skips re-doing it.
901 // 5. synchronize — make sure the destroys have landed before we
902 // return and potentially let the shared context keep running.
903 if(this->filter)
904 {
905 if(int res = pw.filter_disconnect(this->filter); res < 0)
906 {
907 ossia::logger().warn(
908 "PipeWire: filter_disconnect failed: {}", spa_strerror(res));
909 }
910
911 // Best-effort sync; if this fails the context will be marked broken
912 // and the factory will rebuild it on the next make_engine call. We
913 // still continue the tear-down to free our local resources.
914 loop->synchronize();
915 }
916
917 for(auto link : this->links)
918 pw.proxy_destroy(link);
919 this->links.clear();
920
921 if(this->filter)
922 {
923 pw.filter_destroy(this->filter);
924 this->filter = nullptr;
925 }
926
927 loop->synchronize();
928 activated = false;
929 }
930
931 ~pipewire_audio_protocol() { stop(); }
932
933 static void
934 clear_buffers(pipewire_audio_protocol& self, uint32_t nframes, std::size_t outputs)
935 {
936 auto& pw = libpipewire::instance();
937 for(std::size_t i = 0; i < outputs; i++)
938 {
939 auto chan = (float*)pw.filter_get_dsp_buffer(self.output_ports[i], nframes);
940 if(chan)
941 for(std::size_t j = 0; j < nframes; j++)
942 chan[j] = 0.f;
943 }
944
945 return;
946 }
947
948 void do_process(uint32_t nframes, double secs)
949 {
950 const auto& pw = libpipewire::instance();
951
952 tick_start();
953
954 const auto inputs = input_ports.size();
955 const auto outputs = output_ports.size();
956 if(stop_processing)
957 {
958 tick_clear();
959 clear_buffers(*this, nframes, outputs);
960 return;
961 }
962
963 auto dummy = (float*)alloca(sizeof(float) * nframes);
964 memset(dummy, 0, sizeof(float) * nframes);
965
966 auto float_input = (float**)alloca(sizeof(float*) * inputs);
967 auto float_output = (float**)alloca(sizeof(float*) * outputs);
968 for(std::size_t i = 0; i < inputs; i++)
969 {
970 float_input[i] = (float*)pw.filter_get_dsp_buffer(input_ports[i], nframes);
971 if(float_input[i] == nullptr)
972 float_input[i] = dummy;
973 }
974 for(std::size_t i = 0; i < outputs; i++)
975 {
976 float_output[i] = (float*)pw.filter_get_dsp_buffer(output_ports[i], nframes);
977 if(float_output[i] == nullptr)
978 float_output[i] = dummy;
979 }
980
981 // Actual execution
982 ossia::audio_tick_state ts{float_input, float_output, (int)inputs,
983 (int)outputs, nframes, secs};
984 audio_tick(ts);
985 tick_end();
986 }
987
988 static void on_process(void* userdata, struct spa_io_position* position)
989 {
990 [[maybe_unused]]
991 static const thread_local auto _
992 = [] {
993 ossia::set_thread_name("ossia audio 0");
994 ossia::set_thread_pinned(thread_type::Audio, 0);
995 return 0;
996 }();
997
998 if(!userdata)
999 return;
1000
1001 auto& self = *(pipewire_audio_protocol*)userdata;
1002 const uint32_t nframes = position->clock.duration;
1003 const double current_time_ns = position->clock.nsec * 1e-9;
1004
1005 // ossia's audio graph runs at a fixed block size; since we force the quantum
1006 // via PW_KEY_NODE_FORCE_QUANTUM + PW_KEY_NODE_LOCK_QUANTUM the server should
1007 // always call us with exactly effective_buffer_size frames. If it doesn't,
1008 // skip the cycle rather than overrunning the DSP buffer or feeding the tick
1009 // a mismatched block size.
1010 if(nframes != static_cast<uint32_t>(self.effective_buffer_size))
1011 {
1012 ossia::logger().warn(
1013 "PipeWire: unexpected block size {} (expected {}), skipping cycle",
1014 nframes, self.effective_buffer_size);
1015 return;
1016 }
1017
1018 self.do_process(nframes, current_time_ns);
1019 }
1020
1021 std::vector<port*> input_ports;
1022 std::vector<port*> output_ports;
1023 bool activated{};
1024};
1025
1026}
1027#endif
1028#endif
Definition git_info.h:7
spdlog::logger & logger() noexcept
Where the errors will be logged. Default is stderr.
Definition context.cpp:118