Loading...
Searching...
No Matches
score-plugin-avnd/Crousti/Executor.hpp
1#pragma once
2
3#include <Process/Execution/ProcessComponent.hpp>
4#include <Process/ExecutionContext.hpp>
5
6#include <Explorer/DocumentPlugin/DeviceDocumentPlugin.hpp>
7
8#include <Crousti/CpuAnalysisNode.hpp>
9#include <Crousti/CpuFilterNode.hpp>
10#include <Crousti/CpuGeneratorNode.hpp>
11#include <Crousti/ExecutorPortSetup.hpp>
12#include <Crousti/ExecutorUpdateControlValueInUi.hpp>
13#include <Crousti/File.hpp>
14#include <Crousti/GpuComputeNode.hpp>
15#include <Crousti/GpuNode.hpp>
16#include <Crousti/MessageBus.hpp>
17#include <Crousti/Metadatas.hpp>
18#include <Crousti/ProcessModel.hpp>
19
20#include <score/tools/Bind.hpp>
21
22#include <ossia/dataflow/exec_state_facade.hpp>
23#include <ossia/dataflow/node_process.hpp>
24#include <ossia/network/context.hpp>
25
26#include <ossia-qt/invoke.hpp>
27
28#include <QGuiApplication>
29
30#include <flicks.h>
31
32#if SCORE_PLUGIN_GFX
33#include <Crousti/GpuNode.hpp>
34#include <Gfx/GfxApplicationPlugin.hpp>
35#endif
36
37#include <score/tools/ThreadPool.hpp>
38
39#include <ossia/detail/type_if.hpp>
40
41#include <QTimer>
42
43#include <avnd/binding/ossia/data_node.hpp>
44#include <avnd/binding/ossia/mono_audio_node.hpp>
45#include <avnd/binding/ossia/node.hpp>
46#include <avnd/binding/ossia/ossia_audio_node.hpp>
47#include <avnd/binding/ossia/poly_audio_node.hpp>
48#include <avnd/concepts/temporality.hpp>
49#include <avnd/concepts/ui.hpp>
50#include <avnd/concepts/worker.hpp>
51
52namespace oscr
53{
54
55template <typename Node>
56class CustomNodeProcess : public ossia::node_process
57{
58 using node_process::node_process;
59 void start() override
60 {
61 node_process::start();
62 auto& n = static_cast<safe_node<Node>&>(*node);
63 if_possible(n.impl.effect.start());
64 }
65 void pause() override
66 {
67 auto& n = static_cast<safe_node<Node>&>(*node);
68 if_possible(n.impl.effect.pause());
69 node_process::pause();
70 }
71
72 void resume() override
73 {
74 node_process::resume();
75 auto& n = static_cast<safe_node<Node>&>(*node);
76 if_possible(n.impl.effect.resume());
77 }
78
79 void stop() override
80 {
81 auto& n = static_cast<safe_node<Node>&>(*node);
82 if_possible(n.impl.effect.stop());
83 node_process::stop();
84 }
85
86 void offset_impl(ossia::time_value date) override
87 {
88 node_process::offset_impl(date);
89 auto& n = static_cast<safe_node<Node>&>(*node);
90 util::flicks f{date.impl};
91 if_possible(n.impl.effect.transport(f));
92 }
93
94 void transport_impl(ossia::time_value date) override
95 {
96 node_process::transport_impl(date);
97 auto& n = static_cast<safe_node<Node>&>(*node);
98
99 util::flicks f{date.impl};
100 if_possible(n.impl.effect.transport(f));
101 }
102};
103
104template <typename Node>
105class Executor final
106 : public Execution::ProcessComponent_T<ProcessModel<Node>, ossia::node_process>
107{
108 Process::Inlets m_oldInlets;
109 Process::Outlets m_oldOutlets;
110
111public:
112 static Q_DECL_RELAXED_CONSTEXPR UuidKey<score::Component> static_key() noexcept
113 {
114 return uuid_from_string<Node>();
115 }
116
117 UuidKey<score::Component> key() const noexcept final override { return static_key(); }
118
119 bool key_match(UuidKey<score::Component> other) const noexcept final override
120 {
121 return static_key() == other || Execution::ProcessComponent::base_key_match(other);
122 }
123
124 [[no_unique_address]] ossia::type_if<int, is_gpu<Node>> node_id = -1;
125
126 Executor(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
128 element, ctx, "Executor::ProcessModel<Info>", p}
129 {
130#if SCORE_PLUGIN_GFX
131 if constexpr(is_gpu<Node>)
132 {
133 setup_gpu(element, ctx, p);
134 }
135 else
136#endif
137 {
138 setup_cpu(element, ctx, p);
139 }
140
141 if constexpr(avnd::tag_process_exec<Node>)
142 {
143 this->m_ossia_process = std::make_shared<CustomNodeProcess<Node>>(this->node);
144 }
145 else
146 {
147 this->m_ossia_process = std::make_shared<ossia::node_process>(this->node);
148 }
149 }
150
151 void
152 setup_cpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
153 {
154 auto& net_ctx
155 = *ctx.doc.findPlugin<Explorer::DeviceDocumentPlugin>()->networkContext();
156 const auto id
157 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
158
159 auto st = ossia::exec_state_facade{ctx.execState.get()};
160 std::shared_ptr<safe_node<Node>> ptr;
161 auto node = new safe_node<Node>{st.bufferSize(), (double)st.sampleRate(), id};
162 node->root_inputs().reserve(element.inlets().size());
163 node->root_outputs().reserve(element.outlets().size());
164
165 node->prepare(*ctx.execState.get()); // Preparation of the ossia side
166
167 if_possible(node->impl.effect.ossia_state = st);
168 if_possible(node->impl.effect.io_context = &net_ctx.context);
169 if_possible(node->impl.effect.ossia_document_context = &ctx.doc);
170 ptr.reset(node);
171 this->node = ptr;
172
173 if constexpr(requires { ptr->impl.effect; })
174 if constexpr(std::is_same_v<std::decay_t<decltype(ptr->impl.effect)>, Node>)
175 connect_message_bus(element, ctx, ptr->impl.effect);
176 connect_worker(ctx, ptr->impl);
177
178 node->dynamic_ports = element.dynamic_ports;
179 node->finish_init();
180
181 connect_controls(element, ctx, ptr);
182 update_controls(ptr);
183 QObject::connect(
184 &element, &Process::ProcessModel::inletsChanged, this,
185 &Executor::recompute_ports);
186 QObject::connect(
187 &element, &Process::ProcessModel::outletsChanged, this,
188 &Executor::recompute_ports);
189
190 // To call prepare() after evertyhing is ready
191 node->audio_configuration_changed(st);
192
193 m_oldInlets = element.inlets();
194 m_oldOutlets = element.outlets();
195 }
196
197 void
198 setup_gpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
199 {
200#if SCORE_PLUGIN_GFX
201 // FIXME net context for gpu node ?
202 const auto id
203 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
204
205 auto& gfx_exec = ctx.doc.plugin<Gfx::DocumentPlugin>().exec;
206
207 // Create the executor in the audio thread
208 struct named_exec_node final : Gfx::gfx_exec_node
209 {
210 using Gfx::gfx_exec_node::gfx_exec_node;
211 std::string label() const noexcept override
212 {
213 return std::string(avnd::get_name<Node>());
214 }
215 };
216
217 auto node = std::make_shared<named_exec_node>(gfx_exec);
218 node->prepare(*ctx.execState);
219
220 this->node = node;
221
222 // Create the controls, inputs outputs etc.
223 std::size_t i = 0;
224 for(auto& ctl : element.inlets())
225 {
226 if(auto ctrl = qobject_cast<Process::ControlInlet*>(ctl))
227 {
228 auto& p = node->add_control();
229 p->value = ctrl->value();
230 p->changed = true;
231
232 QObject::connect(
233 ctrl, &Process::ControlInlet::valueChanged, this,
234 Gfx::con_unvalidated{ctx, i, 0, node});
235 i++;
236 }
237 else if(auto ctrl = qobject_cast<Process::ValueInlet*>(ctl))
238 {
239 auto& p = node->add_control();
240 p->changed = true;
241 i++;
242 }
243 else if(auto ctrl = qobject_cast<Process::AudioInlet*>(ctl))
244 {
245 node->add_audio();
246 }
247 else if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
248 {
249 node->add_texture();
250 }
251 }
252
253 // FIXME refactor this with other GFX processes
254 for(auto* outlet : element.outlets())
255 {
256 if(auto ctrl = qobject_cast<Process::ControlOutlet*>(outlet))
257 {
258 node->add_control_out();
259 }
260 else if(auto ctrl = qobject_cast<Process::ValueOutlet*>(outlet))
261 {
262 node->add_control_out();
263 }
264 else if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
265 {
266 node->add_texture_out();
267 out->nodeId = node_id;
268 }
269 }
270
271 // Create the GPU node
272 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
273 ctx.alias.lock(), &ctx.executionQueue);
274 std::unique_ptr<score::gfx::Node> ptr;
275 if constexpr(GpuGraphicsNode2<Node>)
276 {
277 auto gpu_node = new CustomGpuNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
278 ptr.reset(gpu_node);
279 }
280 else if constexpr(GpuComputeNode2<Node>)
281 {
282 auto gpu_node = new GpuComputeNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
283 ptr.reset(gpu_node);
284 }
285 else if constexpr(GpuNode<Node>)
286 {
287 auto gpu_node
288 = new GfxNode<Node>(element, qex_ptr, node->control_outs, id, ctx.doc);
289 ptr.reset(gpu_node);
290 }
291 node->id = gfx_exec.ui->register_node(std::move(ptr));
292 node_id = node->id;
293#endif
294 }
295
296 void recompute_ports()
297 {
298 Execution::Transaction commands{this->system()};
299 auto n = std::dynamic_pointer_cast<safe_node<Node>>(this->node);
300 if(!n)
301 return;
302
303 // Re-run setup_inlets ?
304 this->in_exec([dp = this->process().dynamic_ports, node = n] {
305 node->dynamic_ports = dp;
306 node->root_inputs().clear();
307 node->root_outputs().clear();
308 node->initialize_all_ports();
309 });
310 }
311
312 void connect_controls(
313 ProcessModel<Node>& element, const ::Execution::Context& ctx,
314 std::shared_ptr<safe_node<Node>>& ptr)
315 {
316 using dynamic_ports_port_type = avnd::dynamic_ports_input_introspection<Node>;
317 using control_inputs_type = avnd::control_input_introspection<Node>;
318 using curve_inputs_type = avnd::curve_input_introspection<Node>;
319 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
320 using midifile_inputs_type = avnd::midifile_input_introspection<Node>;
321 using raw_file_inputs_type = avnd::raw_file_input_introspection<Node>;
322 using control_outputs_type = avnd::control_output_introspection<Node>;
323
324 // UI controls to engine
325 safe_node<Node>& node = *ptr;
326 avnd::effect_container<Node>& eff = node.impl;
327
328 // Initialize all the controls in the node with the current value.
329 // And update the node when the UI changes
330
331 if constexpr(dynamic_ports_port_type::size > 0)
332 {
333 for(auto state : eff.full_state())
334 {
335 dynamic_ports_port_type::for_all_n2(
336 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
337 }
338 }
339 if constexpr(control_inputs_type::size > 0)
340 {
341 for(auto state : eff.full_state())
342 {
343 control_inputs_type::for_all_n2(
344 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
345 }
346 }
347 if constexpr(curve_inputs_type::size > 0)
348 {
349 for(auto state : eff.full_state())
350 {
351 curve_inputs_type::for_all_n2(
352 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
353 }
354 }
355 if constexpr(soundfile_inputs_type::size > 0)
356 {
357 soundfile_inputs_type::for_all_n2(
358 avnd::get_inputs<Node>(eff),
359 dispatch_control_setup<Node>{element, ctx, ptr, this});
360
361 setup_soundfile_task_pool(element, ctx, ptr);
362 }
363 if constexpr(midifile_inputs_type::size > 0)
364 {
365 midifile_inputs_type::for_all_n2(
366 avnd::get_inputs<Node>(eff),
367 dispatch_control_setup<Node>{element, ctx, ptr, this});
368 }
369 if constexpr(raw_file_inputs_type::size > 0)
370 {
371 raw_file_inputs_type::for_all_n2(
372 avnd::get_inputs<Node>(eff),
373 dispatch_control_setup<Node>{element, ctx, ptr, this});
374 }
375
376 // Engine to ui controls
377 if constexpr(control_inputs_type::size > 0 || control_outputs_type::size > 0)
378 {
379 // Update the value in the UI
380 std::weak_ptr<safe_node<Node>> weak_node = ptr;
381 update_control_value_in_ui<Node> timer_action{weak_node, element};
382 timer_action();
383
384 con(ctx.doc.coarseUpdateTimer, &QTimer::timeout, this,
385 [timer_action = std::move(timer_action)] { timer_action(); },
386 Qt::QueuedConnection);
387 }
388 }
389
390 void setup_soundfile_task_pool(
391 ProcessModel<Node>& element, const ::Execution::Context& ctx,
392 std::shared_ptr<safe_node<Node>>& ptr)
393 {
394 safe_node<Node>& node = *ptr;
395 avnd::effect_container<Node>& eff = node.impl;
396
397 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
398
399 auto& tq = score::TaskPool::instance();
400 node.soundfiles.load_request
401 = [&tq, p = std::weak_ptr{ptr}, &ctx](std::string& str, int idx) {
402 auto eff_ptr = p.lock();
403 if(!eff_ptr)
404 return;
405 tq.post([eff_ptr = std::move(eff_ptr), filename = str, &ctx, idx]() mutable {
406 if(auto file = loadSoundfile(filename, ctx.doc, ctx.execState))
407 {
408 ctx.executionQueue.enqueue(
409 [sf = std::move(file), p = std::weak_ptr{eff_ptr}, idx]() mutable {
410 auto eff_ptr = p.lock();
411 if(!eff_ptr)
412 return;
413
414 avnd::effect_container<Node>& eff = eff_ptr->impl;
415 soundfile_inputs_type::for_nth_mapped_n2(
416 avnd::get_inputs<Node>(eff), idx,
417 [&]<std::size_t NField, std::size_t N>(
418 auto& field, avnd::predicate_index<N> p,
419 avnd::field_index<NField> f) {
420 eff_ptr->soundfile_loaded(sf, p, f);
421 });
422 });
423 }
424 });
425 };
426 }
427
428 void connect_message_bus(
429 ProcessModel<Node>& element, const ::Execution::Context& ctx, Node& eff)
430 {
431 // Custom UI messages to engine
432 if constexpr(avnd::has_gui_to_processor_bus<Node>)
433 {
434 element.from_ui = [p = QPointer{this}, &eff](QByteArray b) {
435 if(!p)
436 return;
437
438 p->in_exec([mess = std::move(b), &eff]() mutable {
439 using refl = avnd::function_reflection<&Node::process_message>;
440 static_assert(refl::count <= 1);
441
442 if constexpr(refl::count == 0)
443 {
444 // no arguments, just call it
445 eff.process_message();
446 }
447 else if constexpr(refl::count == 1)
448 {
449 using arg_type = avnd::first_argument<&Node::process_message>;
450 std::decay_t<arg_type> arg;
451 MessageBusReader reader{mess};
452 reader(arg);
453 eff.process_message(std::move(arg));
454 }
455 });
456 };
457 }
458
459 if constexpr(avnd::has_processor_to_gui_bus<Node>)
460 {
461 eff.send_message = [self = QPointer{this}]<typename T>(T&& b) mutable {
462 if(!self)
463 return;
464 if constexpr(
465 sizeof(QPointer<QObject>) + sizeof(b)
466 < Execution::ExecutionCommand::max_storage)
467 {
468 self->in_edit(
469 [proc = QPointer{&self->process()}, bb = std::move(b)]() mutable {
470 if(proc->to_ui)
471 MessageBusSender{proc->to_ui}(std::move(bb));
472 });
473 }
474 else
475 {
476 self->in_edit(
477 [proc = QPointer{&self->process()},
478 bb = std::make_unique<std::decay_t<T>>(std::move(b))]() mutable {
479 if(proc->to_ui)
480 MessageBusSender{proc->to_ui}(*std::move(bb));
481 });
482 }
483 };
484 }
485 }
486
487 void connect_worker(const ::Execution::Context& ctx, avnd::effect_container<Node>& eff)
488 {
489 if constexpr(avnd::has_worker<Node>)
490 {
491 // Initialize the thread pool beforehand
492 auto& tq = score::TaskPool::instance();
493 using worker_type = decltype(eff.effect.worker);
494 for(auto& eff : eff.effects())
495 {
496 std::weak_ptr eff_ptr = std::shared_ptr<Node>(this->node, &eff);
497 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
498 ctx.alias.lock(), &ctx.executionQueue);
499
500 eff.worker.request
501 = [&tq, qex_ptr = std::move(qex_ptr),
502 eff_ptr = std::move(eff_ptr)]<typename... Args>(Args&&... f) mutable {
503 // request() is invoked in the DSP / processor thread
504 // and just posts the task to the thread pool
505 tq.post([eff_ptr, qex_ptr, ... ff = std::forward<Args>(f)]() mutable {
506 // This happens in the worker thread
507 // If for some reason the object has already been removed, not much
508 // reason to perform the work
509 if(!eff_ptr.lock())
510 return;
511
512 using type_of_result
513 = decltype(worker_type::work(std::forward<decltype(ff)>(ff)...));
514 if constexpr(std::is_void_v<type_of_result>)
515 {
516 worker_type::work(std::forward<decltype(ff)>(ff)...);
517 }
518 else
519 {
520 // If the worker returns a std::function, it
521 // is to be invoked back in the processor DSP thread
522 auto res = worker_type::work(std::forward<decltype(ff)>(ff)...);
523 if(!res)
524 return;
525
526 // Execution queue is currently spsc from main thread to an exec thread,
527 // we cannot just yeet the result back from the thread-pool
528 ossia::qt::run_async(
529 qApp, [eff_ptr = std::move(eff_ptr), qex_ptr = std::move(qex_ptr),
530 res = std::move(res)]() mutable {
531 // Main thread
532 std::shared_ptr qex = qex_ptr.lock();
533 if(!qex)
534 return;
535
536 qex->enqueue(
537 [eff_ptr = std::move(eff_ptr), res = std::move(res)]() mutable {
538 // DSP / processor thread
539 // We need res to be mutable so that the worker can use it to e.g. store
540 // old data which will be freed back in the main thread
541 if(auto p = eff_ptr.lock())
542 res(*p);
543 });
544 });
545 }
546 });
547 };
548 }
549 }
550 }
551
552 // Update everything
553 void update_controls(std::shared_ptr<safe_node<Node>>& ptr)
554 {
555 avnd::effect_container<Node>& eff = ptr->impl;
556 {
557 for(auto state : eff.full_state())
558 {
559 avnd::input_introspection<Node>::for_all(
560 state.inputs, [&](auto& field) { if_possible(field.update(state.effect)); });
561 }
562 }
563 }
564
565 void cleanup() override
566 {
567 if constexpr(requires { this->process().from_ui; })
568 {
569 this->process().from_ui = [](QByteArray arr) {};
570 }
571 // FIXME cleanup eff.effect.send_message too ?
572
573#if SCORE_PLUGIN_GFX
574 if constexpr(is_gpu<Node>)
575 {
576 // FIXME this must move in the Node dtor. See video_node
577 auto& gfx_exec = this->system().doc.template plugin<Gfx::DocumentPlugin>().exec;
578 if(node_id >= 0)
579 gfx_exec.ui->unregister_node(node_id);
580 }
581
582 // FIXME refactor this with other GFX processes
583 for(auto* outlet : this->process().outlets())
584 {
585 if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
586 {
587 out->nodeId = -1;
588 }
589 }
590#endif
591 ::Execution::ProcessComponent::cleanup();
592 }
593
594 ~Executor() { }
595};
596}
Definition GfxApplicationPlugin.hpp:13
Definition GfxExecNode.hpp:40
The Path class is a typesafe wrapper around ObjectPath.
Definition Path.hpp:52
Definition UuidKey.hpp:343
Definition score-plugin-avnd/Crousti/Executor.hpp:57
Definition score-plugin-avnd/Crousti/Executor.hpp:107
Definition score-plugin-avnd/Crousti/ProcessModel.hpp:77
Definition Metadatas.hpp:26
Definition Metadatas.hpp:20
Definition Metadatas.hpp:16
Definition Metadatas.hpp:32
Definition Factories.hpp:19
Definition Process/Execution/ProcessComponent.hpp:89
Definition ExecutionTransaction.hpp:18
Definition GfxExecNode.hpp:117
Definition PortForward.hpp:23
Definition PortForward.hpp:27
Definition MessageBus.hpp:133
Definition MessageBus.hpp:37
Definition ExecutorPortSetup.hpp:337
Definition ExecutorUpdateControlValueInUi.hpp:76