2#include <ossia/detail/config.hpp>
4#if defined(OSSIA_ENABLE_PIPEWIRE)
5#if __has_include(<pipewire/pipewire.h>) && __has_include(<spa/param/latency-utils.h>)
6#define OSSIA_AUDIO_PIPEWIRE 1
7#include <ossia/audio/audio_engine.hpp>
8#include <ossia/detail/dylib_loader.hpp>
9#include <ossia/detail/hash_map.hpp>
11#include <ossia/detail/thread.hpp>
13#include <pipewire/core.h>
14#include <pipewire/filter.h>
15#include <pipewire/pipewire.h>
16#include <spa/pod/builder.h>
17#include <spa/utils/result.h>
27#include <spa/param/latency-utils.h>
34 decltype(&::pw_init) init{};
35 decltype(&::pw_deinit) deinit{};
37 decltype(&::pw_context_new) context_new{};
38 decltype(&::pw_context_connect) context_connect{};
39 decltype(&::pw_context_destroy) context_destroy{};
41 decltype(&::pw_core_disconnect) core_disconnect{};
43 decltype(&::pw_proxy_add_listener) proxy_add_listener{};
44 decltype(&::pw_proxy_destroy) proxy_destroy{};
46 decltype(&::pw_main_loop_new) main_loop_new{};
47 decltype(&::pw_main_loop_destroy) main_loop_destroy{};
48 decltype(&::pw_main_loop_quit) main_loop_quit{};
49 decltype(&::pw_main_loop_run) main_loop_run{};
50 decltype(&::pw_main_loop_get_loop) main_loop_get_loop{};
52 decltype(&::pw_properties_new) properties_new{};
53 decltype(&::pw_properties_free) properties_free{};
54 decltype(&::pw_properties_get) properties_get{};
56 decltype(&::pw_filter_new_simple) filter_new_simple{};
57 decltype(&::pw_filter_get_node_id) filter_get_node_id{};
58 decltype(&::pw_filter_get_properties) filter_get_properties{};
59 decltype(&::pw_filter_add_port) filter_add_port{};
60 decltype(&::pw_filter_destroy) filter_destroy{};
61 decltype(&::pw_filter_connect) filter_connect{};
62 decltype(&::pw_filter_disconnect) filter_disconnect{};
63 decltype(&::pw_filter_get_dsp_buffer) filter_get_dsp_buffer{};
65 static const libpipewire& instance()
67 static const libpipewire self;
75 : library(
"libpipewire-0.3.so.0")
80 init = library.symbol<
decltype(&::pw_init)>(
"pw_init");
81 deinit = library.symbol<
decltype(&::pw_deinit)>(
"pw_deinit");
83 context_new = library.symbol<
decltype(&::pw_context_new)>(
"pw_context_new");
85 = library.symbol<
decltype(&::pw_context_connect)>(
"pw_context_connect");
87 = library.symbol<
decltype(&::pw_context_destroy)>(
"pw_context_destroy");
90 = library.symbol<
decltype(&::pw_core_disconnect)>(
"pw_core_disconnect");
93 = library.symbol<
decltype(&::pw_proxy_add_listener)>(
"pw_proxy_add_listener");
94 proxy_destroy = library.symbol<
decltype(&::pw_proxy_destroy)>(
"pw_proxy_destroy");
96 main_loop_new = library.symbol<
decltype(&::pw_main_loop_new)>(
"pw_main_loop_new");
98 = library.symbol<
decltype(&::pw_main_loop_destroy)>(
"pw_main_loop_destroy");
99 main_loop_quit = library.symbol<
decltype(&::pw_main_loop_quit)>(
"pw_main_loop_quit");
100 main_loop_run = library.symbol<
decltype(&::pw_main_loop_run)>(
"pw_main_loop_run");
102 = library.symbol<
decltype(&::pw_main_loop_get_loop)>(
"pw_main_loop_get_loop");
104 properties_new = library.symbol<
decltype(&::pw_properties_new)>(
"pw_properties_new");
106 = library.symbol<
decltype(&::pw_properties_free)>(
"pw_properties_free");
107 properties_get = library.symbol<
decltype(&::pw_properties_get)>(
"pw_properties_get");
110 = library.symbol<
decltype(&::pw_filter_new_simple)>(
"pw_filter_new_simple");
112 = library.symbol<
decltype(&::pw_filter_get_node_id)>(
"pw_filter_get_node_id");
113 filter_get_properties = library.symbol<
decltype(&::pw_filter_get_properties)>(
114 "pw_filter_get_properties");
116 = library.symbol<
decltype(&::pw_filter_add_port)>(
"pw_filter_add_port");
117 filter_destroy = library.symbol<
decltype(&::pw_filter_destroy)>(
"pw_filter_destroy");
118 filter_connect = library.symbol<
decltype(&::pw_filter_connect)>(
"pw_filter_connect");
120 = library.symbol<
decltype(&::pw_filter_disconnect)>(
"pw_filter_disconnect");
121 filter_get_dsp_buffer = library.symbol<
decltype(&::pw_filter_get_dsp_buffer)>(
122 "pw_filter_get_dsp_buffer");
128 assert(context_connect);
129 assert(context_destroy);
131 assert(core_disconnect);
133 assert(proxy_destroy);
135 assert(main_loop_new);
136 assert(main_loop_destroy);
137 assert(main_loop_quit);
138 assert(main_loop_run);
139 assert(main_loop_get_loop);
141 assert(properties_new);
142 assert(properties_free);
143 assert(properties_get);
145 assert(filter_new_simple);
146 assert(filter_get_node_id);
147 assert(filter_get_properties);
148 assert(filter_add_port);
149 assert(filter_destroy);
150 assert(filter_connect);
151 assert(filter_disconnect);
152 assert(filter_get_dsp_buffer);
156struct pipewire_context
158 pw_main_loop* main_loop{};
161 pw_context* context{};
164 pw_registry* registry{};
165 spa_hook registry_listener{};
171 std::unique_ptr<spa_hook> listener;
173 std::vector<listened_port> port_listener{};
180 std::string port_name;
181 std::string port_alias;
182 std::string object_path;
189 pw_direction direction{};
194 std::vector<port_info> inputs;
195 std::vector<port_info> outputs;
200 ossia::hash_map<uint32_t, node> physical_audio;
201 ossia::hash_map<uint32_t, node> physical_midi;
202 ossia::hash_map<uint32_t, node> software_audio;
203 ossia::hash_map<uint32_t, node> software_midi;
205 void for_each_port(
auto func)
207 for(
auto& map : {physical_audio, physical_midi, software_audio, software_midi})
209 for(
auto& [
id, node] : map)
211 for(
auto& port : node.inputs)
213 for(
auto& port : node.outputs)
219 void remove_port(uint32_t
id)
221 for(
auto map : {&physical_audio, &physical_midi, &software_audio, &software_midi})
223 for(
auto& [_, node] : *map)
225 ossia::remove_erase_if(
226 node.inputs, [
id](
const port_info& p) { return p.id == id; });
227 ossia::remove_erase_if(
228 node.outputs, [
id](
const port_info& p) { return p.id == id; });
236 const libpipewire& pw = libpipewire::instance();
237 explicit pipewire_context()
241 char* argv[] = {NULL};
245 this->main_loop = pw.main_loop_new(
nullptr);
252 this->lp = pw.main_loop_get_loop(this->main_loop);
255 ossia::logger().error(
"PipeWire: main_loop_get_loop failed!");
259 this->context = pw.context_new(lp,
nullptr, 0);
266 this->core = pw.context_connect(this->context,
nullptr, 0);
273 this->registry = pw_core_get_registry(this->core, PW_VERSION_REGISTRY, 0);
276 ossia::logger().error(
"PipeWire: core_get_registry failed!");
281 spa_zero(registry_listener);
282 static constexpr const struct pw_port_events port_events
283 = {.version = PW_VERSION_PORT_EVENTS,
285 [](
void* object,
const pw_port_info* info) {
286 ((pipewire_context*)
object)->register_port(info);
290 static constexpr const struct pw_registry_events registry_events = {
291 .version = PW_VERSION_REGISTRY_EVENTS,
293 [](
void* object, uint32_t id, uint32_t ,
const char* type,
294 uint32_t ,
const struct spa_dict* ) {
295 pipewire_context& self = *(pipewire_context*)
object;
298 if(strcmp(type, PW_TYPE_INTERFACE_Port) == 0)
301 = (pw_port*)pw_registry_bind(self.registry,
id, type, PW_VERSION_PORT, 0);
302 self.port_listener.push_back({id, port, std::make_unique<spa_hook>()});
303 auto& l = self.port_listener.back();
305 pw_port_add_listener(l.port, l.listener.get(), &port_events, &self);
309 [](
void* object, uint32_t id) {
310 pipewire_context& self = *(pipewire_context*)
object;
314 self.current_graph.remove_port(
id);
317 auto it = ossia::find_if(
318 self.port_listener, [&](
const listened_port& l) { return l.id == id; });
319 if(it != self.port_listener.end())
321 libpipewire::instance().proxy_destroy((pw_proxy*)it->port);
322 self.port_listener.erase(it);
328 pw_registry_add_listener(
329 this->registry, &this->registry_listener, ®istry_events,
this);
334 "PipeWire: initial synchronize() failed — context marked as broken");
350 bool broken() const noexcept {
return m_broken; }
363 if(!core || m_broken)
366 static constexpr struct pw_core_events core_events = {
367 .version = PW_VERSION_CORE_EVENTS,
370 [](
void* object, uint32_t id,
int seq) {
371 auto& self = *(pipewire_context*)
object;
372 if(
id == PW_ID_CORE && seq == self.pending)
379 [](
void* object, uint32_t id,
int seq,
int res,
const char* message) {
380 auto& self = *(pipewire_context*)
object;
382 "PipeWire: core error id={} seq={} res={} ({}): {}",
id, seq, res,
383 spa_strerror(res), message ? message :
"");
384 self.error_state = 1;
390#if defined(PW_CORE_EVENT_BOUND_PROPS)
395 spa_hook core_listener;
396 spa_zero(core_listener);
397 pw_core_add_listener(core, &core_listener, &core_events,
this);
404 ~hook_guard() { spa_hook_remove(h); }
405 } guard{&core_listener};
407 pending = pw_core_sync(core, PW_ID_CORE, 0);
409 using clk = std::chrono::steady_clock;
410 constexpr auto timeout = std::chrono::seconds(2);
411 const auto deadline = clk::now() + timeout;
413 while(!done && !error_state)
415 const auto now = clk::now();
419 "PipeWire: synchronize() timed out after {} s waiting for core sync",
420 int(std::chrono::duration_cast<std::chrono::seconds>(timeout).count()));
426 = std::chrono::duration_cast<std::chrono::milliseconds>(deadline - now).count();
427 const int iter_ms = int(remaining < 50 ? remaining : 50);
429 const int r = pw_loop_iterate(lp, iter_ms);
433 "PipeWire: pw_loop_iterate failed: {}", spa_strerror(r));
439 if(error_state || !done)
447 pw_proxy* link_ports(uint32_t out_port, uint32_t in_port)
452 auto props = pw.properties_new(
453 PW_KEY_LINK_OUTPUT_PORT, std::to_string(out_port).c_str(),
454 PW_KEY_LINK_INPUT_PORT, std::to_string(in_port).c_str(),
nullptr);
456 auto proxy = (pw_proxy*)pw_core_create_object(
457 this->core,
"link-factory", PW_TYPE_INTERFACE_Link, PW_VERSION_LINK,
463 pw.properties_free(props);
471 pw.proxy_destroy(proxy);
472 pw.properties_free(props);
475 pw.properties_free(props);
479 void register_port(
const pw_port_info* info)
481 const spa_dict_item* item{};
486 spa_dict_for_each(item, info->props)
488 std::string_view k{item->key}, v{item->value};
489 if(k ==
"format.dsp")
491 else if(k ==
"port.name")
493 else if(k ==
"port.alias")
495 else if(k ==
"object.path")
497 else if(k ==
"port.id")
499 else if(k ==
"node.id")
501 else if(k ==
"port.physical" && v ==
"true")
503 else if(k ==
"port.terminal" && v ==
"true")
505 else if(k ==
"port.monitor" && v ==
"true")
507 else if(k ==
"port.direction")
511 p.direction = pw_direction::SPA_DIRECTION_OUTPUT;
515 p.direction = pw_direction::SPA_DIRECTION_INPUT;
520 if(p.node_id.empty())
523 const auto nid = std::stoul(p.node_id);
526 if(p.format.find(
"audio") != p.format.npos)
528 if(p.direction == pw_direction::SPA_DIRECTION_OUTPUT)
529 this->current_graph.physical_audio[nid].outputs.push_back(std::move(p));
531 this->current_graph.physical_audio[nid].inputs.push_back(std::move(p));
533 else if(p.format.find(
"midi") != p.format.npos)
535 if(p.direction == pw_direction::SPA_DIRECTION_OUTPUT)
536 this->current_graph.physical_midi[nid].outputs.push_back(std::move(p));
538 this->current_graph.physical_midi[nid].inputs.push_back(std::move(p));
547 if(p.format.find(
"audio") != p.format.npos)
549 if(p.direction == pw_direction::SPA_DIRECTION_OUTPUT)
550 this->current_graph.software_audio[nid].outputs.push_back(std::move(p));
552 this->current_graph.software_audio[nid].inputs.push_back(std::move(p));
554 else if(p.format.find(
"midi") != p.format.npos)
556 if(p.direction == pw_direction::SPA_DIRECTION_OUTPUT)
557 this->current_graph.software_midi[nid].outputs.push_back(std::move(p));
559 this->current_graph.software_midi[nid].inputs.push_back(std::move(p));
568 int get_fd() const noexcept
573 auto spa_callbacks = this->lp->control->iface.cb;
574 auto spa_loop_methods = (
const spa_loop_control_methods*)spa_callbacks.funcs;
575 if(spa_loop_methods->get_fd)
576 return spa_loop_methods->get_fd(spa_callbacks.data);
584 pw.proxy_destroy((pw_proxy*)this->registry);
585 for(
auto& [
id, p, l] : this->port_listener)
587 pw.proxy_destroy((pw_proxy*)p);
589 pw.core_disconnect(this->core);
591 pw.context_destroy(this->context);
593 pw.main_loop_destroy(this->main_loop);
603 std::string card_out;
605 std::vector<std::string> inputs;
606 std::vector<std::string> outputs;
612class pipewire_audio_protocol :
public audio_engine
619 std::shared_ptr<pipewire_context> loop{};
621 std::vector<pw_proxy*> links{};
623 explicit pipewire_audio_protocol(
624 std::shared_ptr<pipewire_context> loop,
const audio_setup& setup)
626 auto& pw = libpipewire::instance();
628 static constexpr const struct pw_filter_events filter_events = {
629 .version = PW_VERSION_FILTER_EVENTS,
636 .process = on_process,
638#if PW_VERSION_CORE > 3
651 PW_KEY_MEDIA_TYPE,
"Audio",
652 PW_KEY_MEDIA_CATEGORY,
"Duplex",
653 PW_KEY_MEDIA_ROLE,
"DSP",
654 PW_KEY_MEDIA_NAME,
"ossia",
655 PW_KEY_NODE_NAME,
"ossia",
656 PW_KEY_NODE_GROUP,
"group.dsp.0",
657 PW_KEY_NODE_DESCRIPTION,
"ossia score",
658 PW_KEY_NODE_LATENCY, fmt::format(
"{}/{}", setup.buffer_size, setup.rate).c_str(),
659 PW_KEY_NODE_FORCE_QUANTUM, fmt::format(
"{}", setup.buffer_size).c_str(),
660 PW_KEY_NODE_LOCK_QUANTUM,
"true",
661 PW_KEY_NODE_TRANSPORT_SYNC,
"true",
663 PW_KEY_NODE_FORCE_RATE, fmt::format(
"{}", setup.rate).c_str(),
666 PW_KEY_NODE_PAUSE_ON_IDLE,
"false",
667 PW_KEY_NODE_SUSPEND_ON_IDLE,
"false",
671 this->filter = pw.filter_new_simple(
672 lp, setup.name.c_str(), filter_props, &filter_events,
this);
675 throw std::runtime_error(
"PipeWire: could not create filter instance");
679 for(std::size_t i = 0; i < setup.inputs.size(); i++)
681 auto p = (port*)pw.filter_add_port(
682 this->filter, PW_DIRECTION_INPUT, PW_FILTER_PORT_FLAG_MAP_BUFFERS,
685 PW_KEY_FORMAT_DSP,
"32 bit float mono audio", PW_KEY_PORT_NAME,
686 setup.inputs[i].c_str(), NULL),
688 input_ports.push_back(p);
691 for(std::size_t i = 0; i < setup.outputs.size(); i++)
693 auto p = (port*)pw.filter_add_port(
694 this->filter, PW_DIRECTION_OUTPUT, PW_FILTER_PORT_FLAG_MAP_BUFFERS,
697 PW_KEY_FORMAT_DSP,
"32 bit float mono audio", PW_KEY_PORT_NAME,
698 setup.outputs[i].c_str(), NULL),
700 output_ports.push_back(p);
703 if(pw.filter_connect(this->filter, PW_FILTER_FLAG_RT_PROCESS,
nullptr, 0) < 0)
705 throw std::runtime_error(
"PipeWire: cannot connect");
712 if(!this->loop->synchronize())
715 "PipeWire: synchronize() failed after filter_connect — engine inactive");
720 auto node_id = filter_node_id();
721 while(node_id == 4294967295)
723 if(!this->loop->synchronize())
726 "PipeWire: synchronize() failed while waiting for node id");
729 node_id = filter_node_id();
737 const auto num_local_ins = this->input_ports.size();
738 const auto num_local_outs = this->output_ports.size();
739 auto& this_node = this->loop->current_graph.software_audio[node_id];
740 while(this_node.inputs.size() < num_local_ins
741 || this_node.outputs.size() < num_local_outs)
743 if(!this->loop->synchronize())
746 "PipeWire: synchronize() failed while waiting for ports");
755 this->effective_buffer_size = setup.buffer_size;
756 this->effective_sample_rate = setup.rate;
757 this->effective_inputs = setup.inputs.size();
758 this->effective_outputs = setup.outputs.size();
761 uint32_t filter_node_id() {
return this->loop->pw.filter_get_node_id(this->filter); }
765 auto node_id = filter_node_id();
767 std::vector<std::pair<std::optional<uint32_t>, std::optional<uint32_t>>>
769 std::vector<std::pair<std::optional<uint32_t>, std::optional<uint32_t>>>
773 for(
auto& [node, ports] : loop->current_graph.physical_audio)
775 auto& [out, in] = ports;
780 phys_in_to_ossia.emplace_back(port.id, std::nullopt);
784 for(
auto& port : out)
786 ossia_to_phys_out.emplace_back(std::nullopt, port.id);
791 for(
auto& [node, ports] : loop->current_graph.software_audio)
795 auto& [in, out] = ports;
798 for(std::size_t i = 0; i < in.size(); i++)
800 if(i >= phys_in_to_ossia.size())
803 phys_in_to_ossia[i].second = in[i].id;
807 for(std::size_t i = 0; i < out.size(); i++)
809 if(i >= ossia_to_phys_out.size())
812 ossia_to_phys_out[i].first = out[i].id;
819 for(
auto [phys, self] : phys_in_to_ossia)
823 if(
auto link = this->loop->link_ports(*phys, *self))
824 this->links.push_back(link);
832 for(
auto [self, phys] : ossia_to_phys_out)
836 if(
auto link = this->loop->link_ports(*self, *phys))
837 this->links.push_back(link);
846 void wait(
int ms)
override
851 using namespace std::chrono;
852 using clk = high_resolution_clock;
854 auto t0 = clk::now();
855 auto t1 = clk::now();
856 while(duration_cast<milliseconds>(t1 - t0).count() < ms)
858 pw_loop_iterate(loop->lp, ms);
863 bool running()
const override
872 audio_engine::stop();
880 auto& pw = libpipewire::instance();
905 if(
int res = pw.filter_disconnect(this->filter); res < 0)
908 "PipeWire: filter_disconnect failed: {}", spa_strerror(res));
917 for(
auto link : this->links)
918 pw.proxy_destroy(link);
923 pw.filter_destroy(this->filter);
924 this->filter =
nullptr;
931 ~pipewire_audio_protocol() { stop(); }
934 clear_buffers(pipewire_audio_protocol& self, uint32_t nframes, std::size_t outputs)
936 auto& pw = libpipewire::instance();
937 for(std::size_t i = 0; i < outputs; i++)
939 auto chan = (
float*)pw.filter_get_dsp_buffer(self.output_ports[i], nframes);
941 for(std::size_t j = 0; j < nframes; j++)
948 void do_process(uint32_t nframes,
double secs)
950 const auto& pw = libpipewire::instance();
954 const auto inputs = input_ports.size();
955 const auto outputs = output_ports.size();
959 clear_buffers(*
this, nframes, outputs);
963 auto dummy = (
float*)alloca(
sizeof(
float) * nframes);
964 memset(dummy, 0,
sizeof(
float) * nframes);
966 auto float_input = (
float**)alloca(
sizeof(
float*) * inputs);
967 auto float_output = (
float**)alloca(
sizeof(
float*) * outputs);
968 for(std::size_t i = 0; i < inputs; i++)
970 float_input[i] = (
float*)pw.filter_get_dsp_buffer(input_ports[i], nframes);
971 if(float_input[i] ==
nullptr)
972 float_input[i] = dummy;
974 for(std::size_t i = 0; i < outputs; i++)
976 float_output[i] = (
float*)pw.filter_get_dsp_buffer(output_ports[i], nframes);
977 if(float_output[i] ==
nullptr)
978 float_output[i] = dummy;
982 ossia::audio_tick_state ts{float_input, float_output, (int)inputs,
983 (
int)outputs, nframes, secs};
988 static void on_process(
void* userdata,
struct spa_io_position* position)
991 static const thread_local auto _
993 ossia::set_thread_name(
"ossia audio 0");
994 ossia::set_thread_pinned(thread_type::Audio, 0);
1001 auto& self = *(pipewire_audio_protocol*)userdata;
1002 const uint32_t nframes = position->clock.duration;
1003 const double current_time_ns = position->clock.nsec * 1e-9;
1010 if(nframes !=
static_cast<uint32_t
>(self.effective_buffer_size))
1013 "PipeWire: unexpected block size {} (expected {}), skipping cycle",
1014 nframes, self.effective_buffer_size);
1018 self.do_process(nframes, current_time_ns);
1021 std::vector<port*> input_ports;
1022 std::vector<port*> output_ports;
spdlog::logger & logger() noexcept
Where the errors will be logged. Default is stderr.
Definition context.cpp:118