OSSIA
Open Scenario System for Interactive Application
Loading...
Searching...
No Matches
pipewire_protocol.hpp
1#pragma once
2#include <ossia/detail/config.hpp>
3
4#if defined(OSSIA_ENABLE_PIPEWIRE)
5#if __has_include(<libremidi/backends/linux/pipewire/context.hpp>) \
6 && __has_include(<pipewire/filter.h>) \
7 && __has_include(<spa/param/latency-utils.h>)
8#define OSSIA_AUDIO_PIPEWIRE 1
9
10#include <ossia/audio/audio_engine.hpp>
12#include <ossia/detail/thread.hpp>
13
14#include <libremidi/backends/linux/pipewire/context.hpp>
15#include <libremidi/backends/linux/pipewire/filter.hpp>
16#include <libremidi/backends/linux/pipewire/loader.hpp>
17#include <libremidi/backends/linux/pipewire/subscription.hpp>
18#include <libremidi/backends/linux/pipewire/types.hpp>
19
20#include <pipewire/filter.h>
21#include <pipewire/keys.h>
22#include <pipewire/properties.h>
23#include <spa/param/latency-utils.h>
24#include <spa/utils/result.h>
25
26#include <fmt/format.h>
27
28#include <chrono>
29#include <cstdint>
30#include <cstring>
31#include <memory>
32#include <stdexcept>
33#include <string>
34#include <thread>
35#include <vector>
36
37namespace ossia
38{
39
40struct audio_setup
41{
42 std::string name;
43 std::string card_in;
44 std::string card_out;
45
46 std::vector<std::string> inputs;
47 std::vector<std::string> outputs;
48
49 int rate{};
50 int buffer_size{};
51};
52
53class pipewire_audio_protocol : public audio_engine
54{
55public:
56 struct port
57 {
58 };
59
60 std::shared_ptr<libremidi::pipewire::context> loop;
61 pw_filter* filter{};
62 std::vector<pw_proxy*> links;
63
64 std::vector<port*> input_ports;
65 std::vector<port*> output_ports;
66
67 bool activated{};
68
69 explicit pipewire_audio_protocol(
70 std::shared_ptr<libremidi::pipewire::context> ctx,
71 const audio_setup& setup)
72 : loop{std::move(ctx)}
73 {
74 if (!loop || !loop->ok())
75 return;
76
77 auto& pw = libremidi::pipewire::load();
78 if (!pw.filter_available)
79 return;
80
81 // static: pw_filter_new_simple stores the pointer, not the table.
82 static constexpr const struct pw_filter_events filter_events = {
83 .version = PW_VERSION_FILTER_EVENTS,
84 .destroy = {},
85 .state_changed = {},
86 .io_changed = {},
87 .param_changed = {},
88 .add_buffer = {},
89 .remove_buffer = {},
90 .process = &on_process,
91 .drained = {},
92#if PW_VERSION_CORE > 3
93 .command = {},
94#endif
95 };
96
97 std::string default_sink_name = loop->default_audio_sink_name();
98 ossia::logger().info(
99 "PipeWire filter: default sink name = '{}'", default_sink_name);
100
101 bool created = false;
102 loop->with_lock([&] {
103 auto* filter_props = pw.properties_new(
104 PW_KEY_MEDIA_TYPE, "Audio",
105 PW_KEY_MEDIA_CATEGORY, "Duplex",
106 PW_KEY_MEDIA_ROLE, "DSP",
107 PW_KEY_MEDIA_NAME, setup.name.c_str(),
108 PW_KEY_NODE_NAME, setup.name.c_str(),
109 PW_KEY_NODE_GROUP, "group.dsp.0",
110 PW_KEY_NODE_DESCRIPTION, "ossia score",
111 // NODE_LATENCY + NODE_TRANSPORT_SYNC: wireplumber routing
112 // hints; without them the filter stays unconnected.
113 PW_KEY_NODE_LATENCY,
114 fmt::format("{}/{}", setup.buffer_size, setup.rate).c_str(),
115 PW_KEY_NODE_FORCE_QUANTUM,
116 fmt::format("{}", setup.buffer_size).c_str(),
117 PW_KEY_NODE_FORCE_RATE, fmt::format("{}", setup.rate).c_str(),
118 PW_KEY_NODE_LOCK_RATE, "true",
119 PW_KEY_NODE_TRANSPORT_SYNC, "true",
120 PW_KEY_NODE_ALWAYS_PROCESS, "true",
121 PW_KEY_NODE_PAUSE_ON_IDLE, "false",
122 PW_KEY_NODE_SUSPEND_ON_IDLE, "false",
123 nullptr);
124 if (!filter_props)
125 return;
126 if (!default_sink_name.empty())
127 {
128 pw.properties_set(
129 filter_props, PW_KEY_TARGET_OBJECT, default_sink_name.c_str());
130 }
131
132 this->filter = pw.filter_new_simple(
133 loop->bare_loop(), setup.name.c_str(), filter_props,
134 &filter_events, this);
135 // filter_props ownership taken by pw_filter_new_simple.
136 if (!this->filter)
137 return;
138
139 for (const auto& name : setup.inputs)
140 {
141 auto* p = static_cast<port*>(pw.filter_add_port(
142 this->filter, PW_DIRECTION_INPUT,
143 PW_FILTER_PORT_FLAG_MAP_BUFFERS, sizeof(struct port),
144 pw.properties_new(
145 PW_KEY_FORMAT_DSP, "32 bit float mono audio",
146 PW_KEY_PORT_NAME, name.c_str(), nullptr),
147 nullptr, 0));
148 input_ports.push_back(p);
149 }
150
151 for (const auto& name : setup.outputs)
152 {
153 auto* p = static_cast<port*>(pw.filter_add_port(
154 this->filter, PW_DIRECTION_OUTPUT,
155 PW_FILTER_PORT_FLAG_MAP_BUFFERS, sizeof(struct port),
156 pw.properties_new(
157 PW_KEY_FORMAT_DSP, "32 bit float mono audio",
158 PW_KEY_PORT_NAME, name.c_str(), nullptr),
159 nullptr, 0));
160 output_ports.push_back(p);
161 }
162
163 created = (pw.filter_connect(
164 this->filter, PW_FILTER_FLAG_RT_PROCESS, nullptr, 0)
165 >= 0);
166 });
167
168 if (!this->filter)
169 throw std::runtime_error("PipeWire: could not create filter instance");
170 if (!created)
171 throw std::runtime_error("PipeWire: cannot connect");
172
173 if (!loop->synchronize())
174 {
175 ossia::logger().error(
176 "PipeWire: synchronize() failed after filter_connect — engine inactive");
177 return;
178 }
179 {
180 int k = 0;
181 auto node_id = filter_node_id();
182 while (node_id == 0xFFFFFFFFu)
183 {
184 if (!loop->synchronize())
185 {
186 ossia::logger().error(
187 "PipeWire: synchronize() failed while waiting for node id");
188 return;
189 }
190 node_id = filter_node_id();
191 if (k++ > 100)
192 return;
193 }
194
195 // Registry broadcasts trail core_sync done by one round trip.
196 const auto num_in = input_ports.size();
197 const auto num_out = output_ports.size();
198 bool have_ports = false;
199 for (int j = 0; j < 200; ++j)
200 {
201 auto snap = loop->snapshot();
202 if (const auto* self = snap.find_by_id(node_id))
203 {
204 if (self->inputs.size() >= num_in
205 && self->outputs.size() >= num_out)
206 {
207 have_ports = true;
208 break;
209 }
210 }
211 if (!loop->synchronize())
212 {
213 ossia::logger().error(
214 "PipeWire: synchronize() failed while waiting for ports");
215 return;
216 }
217 }
218 if (!have_ports)
219 {
220 ossia::logger().error(
221 "PipeWire: ports never appeared in graph — engine inactive");
222 return;
223 }
224 }
225
226 activated = true;
227 this->effective_buffer_size = setup.buffer_size;
228 this->effective_sample_rate = setup.rate;
229 this->effective_inputs = setup.inputs.size();
230 this->effective_outputs = setup.outputs.size();
231 }
232
233 std::uint32_t filter_node_id() const noexcept
234 {
235 if (!this->filter)
236 return 0xFFFFFFFFu;
237 auto& pw = libremidi::pipewire::load();
238 if (!pw.filter_get_node_id)
239 return 0xFFFFFFFFu;
240 return pw.filter_get_node_id(this->filter);
241 }
242
243 // Modern PipeWire/WirePlumber does NOT set port.physical=true on
244 // ALSA hw ports — match the node's media.class (Audio/Source,
245 // Audio/Sink) rather than n.physical.
246 void autoconnect()
247 {
248 const auto our_node = filter_node_id();
249 if (our_node == 0xFFFFFFFFu)
250 return;
251
252 const std::string default_sink = loop->default_audio_sink_name();
253 const std::string default_source = loop->default_audio_source_name();
254 ossia::logger().info(
255 "PipeWire autoconnect: defaults src='{}' sink='{}'",
256 default_source, default_sink);
257
258 std::vector<std::uint32_t> source_outputs;
259 std::vector<std::uint32_t> sink_inputs;
260 std::vector<std::uint32_t> self_in_ids, self_out_ids;
261 bool have_self = false;
262
263 for (int attempt = 0; attempt < 50; ++attempt)
264 {
265 auto snap = loop->snapshot();
266
267 self_in_ids.clear();
268 self_out_ids.clear();
269 have_self = false;
270 if (const auto* self_node = snap.find_by_id(our_node))
271 {
272 have_self = true;
273 for (const auto& p : self_node->inputs)
274 self_in_ids.push_back(p.id);
275 for (const auto& p : self_node->outputs)
276 self_out_ids.push_back(p.id);
277 }
278
279 source_outputs.clear();
280 sink_inputs.clear();
281 if (!default_source.empty())
282 {
283 if (const auto* n = snap.find_by_name(default_source))
284 {
285 for (const auto& p : n->outputs)
286 source_outputs.push_back(p.id);
287 }
288 }
289 if (!default_sink.empty())
290 {
291 if (const auto* n = snap.find_by_name(default_sink))
292 {
293 for (const auto& p : n->inputs)
294 sink_inputs.push_back(p.id);
295 }
296 }
297
298 // A suspended sink legitimately has no ports — target.object
299 // on the filter wakes it during activation.
300 if (have_self && !source_outputs.empty() && !sink_inputs.empty())
301 break;
302 if (!loop->synchronize())
303 break;
304 }
305
306 if (!have_self)
307 return;
308
309 ossia::logger().info(
310 "PipeWire autoconnect: src_ports={}, sink_ports={}, "
311 "self_in={}, self_out={}",
312 source_outputs.size(), sink_inputs.size(),
313 self_in_ids.size(), self_out_ids.size());
314
315 {
316 auto snap = loop->snapshot();
317 ossia::logger().info(
318 "PipeWire autoconnect: snapshot has {} nodes", snap.nodes.size());
319 for (const auto& n : snap.nodes)
320 {
321 ossia::logger().info(
322 "PipeWire autoconnect: snap id={} name='{}' class='{}' "
323 "inputs={} outputs={}",
324 n.id, n.name, n.media_class_str, n.inputs.size(),
325 n.outputs.size());
326 }
327 }
328
329 for (std::size_t i = 0;
330 i < self_in_ids.size() && i < source_outputs.size(); ++i)
331 {
332 if (auto* link = libremidi::pipewire::link_ports(
333 *loop, source_outputs[i], self_in_ids[i]))
334 links.push_back(link);
335 }
336 for (std::size_t i = 0;
337 i < self_out_ids.size() && i < sink_inputs.size(); ++i)
338 {
339 if (auto* link = libremidi::pipewire::link_ports(
340 *loop, self_out_ids[i], sink_inputs[i]))
341 links.push_back(link);
342 }
343 }
344
345 void wait(int ms) override
346 {
347 if (ms > 0)
348 std::this_thread::sleep_for(std::chrono::milliseconds(ms));
349 }
350
351 bool running() const override { return loop && activated; }
352
353 void stop() override
354 {
355 audio_engine::stop();
356 if (!loop || !activated)
357 return;
358
359 auto& pw = libremidi::pipewire::load();
360
361 // Tear-down order: disconnect → sync → drop links → destroy → sync.
362 // pw_filter_* / pw_proxy_destroy must run under the thread_loop lock.
363 if (this->filter)
364 {
365 loop->with_lock([&] {
366 if (int res = pw.filter_disconnect(this->filter); res < 0)
367 {
368 ossia::logger().warn(
369 "PipeWire: filter_disconnect failed: {}", spa_strerror(res));
370 }
371 });
372 (void)loop->synchronize();
373 }
374
375 for (auto* link : this->links)
376 libremidi::pipewire::unlink_ports(*loop, link);
377 this->links.clear();
378
379 if (this->filter)
380 {
381 loop->with_lock([&] {
382 pw.filter_destroy(this->filter);
383 this->filter = nullptr;
384 });
385 }
386
387 (void)loop->synchronize();
388 activated = false;
389 }
390
391 ~pipewire_audio_protocol() override { stop(); }
392
393 // RT thread: no locks, no allocation, no logging.
394 static void
395 clear_buffers(pipewire_audio_protocol& self, std::uint32_t nframes,
396 std::size_t outputs)
397 {
398 auto& pw = libremidi::pipewire::load();
399 for (std::size_t i = 0; i < outputs; i++)
400 {
401 auto* chan
402 = static_cast<float*>(pw.filter_get_dsp_buffer(self.output_ports[i], nframes));
403 if (chan)
404 for (std::size_t j = 0; j < nframes; j++)
405 chan[j] = 0.f;
406 }
407 }
408
409 void do_process(std::uint32_t nframes, double secs)
410 {
411 auto& pw = libremidi::pipewire::load();
412
413 tick_start();
414
415 const auto inputs = input_ports.size();
416 const auto outputs = output_ports.size();
417 if (stop_processing)
418 {
419 tick_clear();
420 clear_buffers(*this, nframes, outputs);
421 return;
422 }
423
424 auto* dummy = static_cast<float*>(alloca(sizeof(float) * nframes));
425 std::memset(dummy, 0, sizeof(float) * nframes);
426
427 auto** float_input = static_cast<float**>(alloca(sizeof(float*) * inputs));
428 auto** float_output = static_cast<float**>(alloca(sizeof(float*) * outputs));
429 for (std::size_t i = 0; i < inputs; i++)
430 {
431 float_input[i]
432 = static_cast<float*>(pw.filter_get_dsp_buffer(input_ports[i], nframes));
433 if (float_input[i] == nullptr)
434 float_input[i] = dummy;
435 }
436 for (std::size_t i = 0; i < outputs; i++)
437 {
438 float_output[i]
439 = static_cast<float*>(pw.filter_get_dsp_buffer(output_ports[i], nframes));
440 if (float_output[i] == nullptr)
441 float_output[i] = dummy;
442 }
443
444 ossia::audio_tick_state ts{
445 float_input, float_output, (int)inputs, (int)outputs, nframes, secs};
446 audio_tick(ts);
447 tick_end();
448 }
449
450 static void on_process(void* userdata, struct spa_io_position* position)
451 {
452 [[maybe_unused]] static const thread_local auto _ = [] {
453 ossia::set_thread_name("ossia audio 0");
454 ossia::set_thread_pinned(thread_type::Audio, 0);
455 return 0;
456 }();
457
458 if (!userdata)
459 return;
460
461 auto& self = *static_cast<pipewire_audio_protocol*>(userdata);
462 const std::uint32_t nframes = position->clock.duration;
463 const double current_time_ns = position->clock.nsec * 1e-9;
464
465 if (nframes != static_cast<std::uint32_t>(self.effective_buffer_size))
466 {
467 ossia::logger().warn(
468 "PipeWire: unexpected block size {} (expected {}), skipping cycle",
469 nframes, self.effective_buffer_size);
470 return;
471 }
472
473 self.do_process(nframes, current_time_ns);
474 }
475};
476
477} // namespace ossia
478
479#endif
480#endif
Definition git_info.h:7
spdlog::logger & logger() noexcept
Where the errors will be logged. Default is stderr.
Definition context.cpp:118