Loading...
Searching...
No Matches
score-plugin-avnd/Crousti/Executor.hpp
1#pragma once
2
3#include <Process/Execution/ProcessComponent.hpp>
4#include <Process/ExecutionContext.hpp>
5
6#include <Explorer/DocumentPlugin/DeviceDocumentPlugin.hpp>
7
8#include <Crousti/CpuAnalysisNode.hpp>
9#include <Crousti/CpuFilterNode.hpp>
10#include <Crousti/ExecutorPortSetup.hpp>
11#include <Crousti/ExecutorUpdateControlValueInUi.hpp>
12#include <Crousti/File.hpp>
13#include <Crousti/GpuComputeNode.hpp>
14#include <Crousti/GpuNode.hpp>
15#include <Crousti/MessageBus.hpp>
16#include <Crousti/Metadatas.hpp>
17#include <Crousti/ProcessModel.hpp>
18
19#include <score/tools/Bind.hpp>
20
21#include <ossia/dataflow/exec_state_facade.hpp>
22#include <ossia/dataflow/node_process.hpp>
23#include <ossia/network/context.hpp>
24
25#include <ossia-qt/invoke.hpp>
26
27#include <QGuiApplication>
28
29#include <flicks.h>
30
31#if SCORE_PLUGIN_GFX
32#include <Crousti/GpuNode.hpp>
33#include <Gfx/GfxApplicationPlugin.hpp>
34#endif
35
36#include <score/tools/ThreadPool.hpp>
37
38#include <ossia/detail/type_if.hpp>
39
40#include <QTimer>
41
42#include <avnd/binding/ossia/data_node.hpp>
43#include <avnd/binding/ossia/mono_audio_node.hpp>
44#include <avnd/binding/ossia/node.hpp>
45#include <avnd/binding/ossia/ossia_audio_node.hpp>
46#include <avnd/binding/ossia/ossia_dynamic_audio_node.hpp>
47#include <avnd/binding/ossia/poly_audio_node.hpp>
48#include <avnd/concepts/temporality.hpp>
49#include <avnd/concepts/ui.hpp>
50#include <avnd/concepts/worker.hpp>
51
52namespace oscr
53{
54
55template <typename Node>
56class CustomNodeProcess : public ossia::node_process
57{
58 using node_process::node_process;
59 void start() override
60 {
61 node_process::start();
62 auto& n = static_cast<safe_node<Node>&>(*node);
63 if_possible(n.impl.effect.start());
64 }
65 void pause() override
66 {
67 auto& n = static_cast<safe_node<Node>&>(*node);
68 if_possible(n.impl.effect.pause());
69 node_process::pause();
70 }
71
72 void resume() override
73 {
74 node_process::resume();
75 auto& n = static_cast<safe_node<Node>&>(*node);
76 if_possible(n.impl.effect.resume());
77 }
78
79 void stop() override
80 {
81 auto& n = static_cast<safe_node<Node>&>(*node);
82 if_possible(n.impl.effect.stop());
83 node_process::stop();
84 }
85
86 void offset_impl(ossia::time_value date) override
87 {
88 node_process::offset_impl(date);
89 auto& n = static_cast<safe_node<Node>&>(*node);
90 util::flicks f{date.impl};
91 if_possible(n.impl.effect.transport(f));
92 }
93
94 void transport_impl(ossia::time_value date) override
95 {
96 node_process::transport_impl(date);
97 auto& n = static_cast<safe_node<Node>&>(*node);
98
99 util::flicks f{date.impl};
100 if_possible(n.impl.effect.transport(f));
101 }
102};
103
104template <typename Node>
105class Executor final
106 : public Execution::ProcessComponent_T<ProcessModel<Node>, ossia::node_process>
107{
108 Process::Inlets m_oldInlets;
109 Process::Outlets m_oldOutlets;
110
111public:
112 static Q_DECL_RELAXED_CONSTEXPR UuidKey<score::Component> static_key() noexcept
113 {
114 return uuid_from_string<Node>();
115 }
116
117 UuidKey<score::Component> key() const noexcept final override { return static_key(); }
118
119 bool key_match(UuidKey<score::Component> other) const noexcept final override
120 {
121 return static_key() == other || Execution::ProcessComponent::base_key_match(other);
122 }
123
124#if defined(SCORE_PLUGIN_GFX)
125 [[no_unique_address]] ossia::type_if<int, is_gpu<Node>> node_id = score::gfx::invalid_node_index;
126#endif
127
128 Executor(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
130 element, ctx, "Executor::ProcessModel<Info>", p}
131 {
132 if constexpr(is_gpu<Node>)
133 {
134#if SCORE_PLUGIN_GFX
135 setup_gpu(element, ctx, p);
136#endif
137 }
138 else
139 {
140 setup_cpu(element, ctx, p);
141 }
142
143 if constexpr(avnd::tag_process_exec<Node>)
144 {
145 this->m_ossia_process = std::make_shared<CustomNodeProcess<Node>>(this->node);
146 }
147 else
148 {
149 this->m_ossia_process = std::make_shared<ossia::node_process>(this->node);
150 }
151 }
152
153 void
154 setup_cpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
155 {
156 auto& net_ctx
157 = *ctx.doc.findPlugin<Explorer::DeviceDocumentPlugin>()->networkContext();
158 const auto id
159 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
160
161 auto st = ossia::exec_state_facade{ctx.execState.get()};
162 std::shared_ptr<safe_node<Node>> ptr;
163 auto node = new safe_node<Node>{st.bufferSize(), (double)st.sampleRate(), id};
164 node->root_inputs().reserve(element.inlets().size());
165 node->root_outputs().reserve(element.outlets().size());
166
167 node->prepare(*ctx.execState.get()); // Preparation of the ossia side
168
169 if_possible(node->impl.effect.ossia_state = st);
170 if_possible(node->impl.effect.io_context = &net_ctx.context);
171 if_possible(node->impl.effect.ossia_document_context = &ctx.doc);
172 ptr.reset(node);
173 this->node = ptr;
174
175 if constexpr(requires { ptr->impl.effect; })
176 if constexpr(std::is_same_v<std::decay_t<decltype(ptr->impl.effect)>, Node>)
177 connect_message_bus(element, ctx, ptr->impl.effect);
178 connect_worker(ctx, ptr->impl);
179
180 node->dynamic_ports = element.dynamic_ports;
181 node->finish_init();
182
183 connect_controls(element, ctx, ptr);
184 update_controls(ptr);
185 QObject::connect(
186 &element, &Process::ProcessModel::inletsChanged, this,
187 &Executor::recompute_ports);
188 QObject::connect(
189 &element, &Process::ProcessModel::outletsChanged, this,
190 &Executor::recompute_ports);
191
192 // To call prepare() after evertyhing is ready
193 node->audio_configuration_changed(st);
194
195 m_oldInlets = element.inlets();
196 m_oldOutlets = element.outlets();
197 }
198
199 void
200 setup_gpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
201 {
202#if SCORE_PLUGIN_GFX
203 // FIXME net context for gpu node ?
204 const int64_t id
205 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
206
207 auto& gfx_exec = ctx.doc.plugin<Gfx::DocumentPlugin>().exec;
208
209 // Create the executor in the audio thread
210 struct named_exec_node final : Gfx::gfx_exec_node
211 {
212 using Gfx::gfx_exec_node::gfx_exec_node;
213 std::string label() const noexcept override
214 {
215 return std::string(avnd::get_name<Node>());
216 }
217 };
218
219 auto node = std::make_shared<named_exec_node>(gfx_exec);
220 node->prepare(*ctx.execState);
221
222 this->node = node;
223
224 // Create the controls, inputs outputs etc.
225 std::size_t i = 0;
226
227 for(auto& ctl : element.inlets())
228 {
229 if(auto ctrl = qobject_cast<Process::ControlInlet*>(ctl))
230 {
231 auto& p = node->add_control();
232 p->value = ctrl->value();
233 p->changed = true;
234
235 QObject::connect(
236 ctrl, &Process::ControlInlet::valueChanged, this,
237 Gfx::con_unvalidated{ctx, i, 0, node});
238 i++;
239 }
240 else if(auto ctrl = qobject_cast<Process::ValueInlet*>(ctl))
241 {
242 auto& p = node->add_control();
243 p->changed = true;
244 i++;
245 }
246 else if(auto ctrl = qobject_cast<Process::AudioInlet*>(ctl))
247 {
248 node->add_audio();
249 }
250 else if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
251 {
252 ossia::texture_inlet& inl = *node->add_texture();
253 ctrl->setupExecution(inl, this);
254 }
255 else if(auto ctrl = qobject_cast<Gfx::GeometryInlet*>(ctl))
256 {
257 ossia::geometry_inlet& inl = *node->add_geometry();
258 ctrl->setupExecution(inl, this);
259 }
260 }
261
262 // FIXME refactor this with other GFX processes
263 for(auto* outlet : element.outlets())
264 {
265 if(auto ctrl = qobject_cast<Process::ControlOutlet*>(outlet))
266 {
267 node->add_control_out();
268 }
269 else if(auto ctrl = qobject_cast<Process::ValueOutlet*>(outlet))
270 {
271 node->add_control_out();
272 }
273 else if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
274 {
275 node->add_texture_out();
276 out->nodeId = node_id;
277 }
278 else if(auto out = qobject_cast<Gfx::GeometryOutlet*>(outlet))
279 {
280 node->add_geometry_out();
281 }
282 }
283
284 // Create the GPU node
285 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
286 ctx.alias.lock(), &ctx.executionQueue);
287 std::unique_ptr<score::gfx::Node> ptr;
288 if constexpr(GpuGraphicsNode2<Node>)
289 {
290 auto gpu_node = new CustomGpuNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
291 ptr.reset(gpu_node);
292 }
293 else if constexpr(GpuComputeNode2<Node>)
294 {
295 auto gpu_node = new GpuComputeNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
296 ptr.reset(gpu_node);
297 }
298 else if constexpr(GpuNode<Node>)
299 {
300 auto gpu_node
301 = new GfxNode<Node>(element, qex_ptr, node->control_outs, id, ctx.doc);
302 ptr.reset(gpu_node);
303 }
304
305 i = 0;
306 for(auto& ctl : element.inlets())
307 {
308 if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
309 {
310 ossia::texture_inlet& inl
311 = static_cast<ossia::texture_inlet&>(*node->root_inputs()[i]);
312 ptr->process(i, inl.data); // Setup render_target_spec
313 }
314 i++;
315 }
316 node->id = gfx_exec.ui->register_node(std::move(ptr));
317 node_id = node->id;
318#endif
319 }
320
321 void recompute_ports()
322 {
323 Execution::Transaction commands{this->system()};
324 auto n = std::dynamic_pointer_cast<safe_node<Node>>(this->node);
325 if(!n)
326 return;
327
328 // Re-run setup_inlets ?
329 in_exec([dp = this->process().dynamic_ports, node = n] {
330 node->dynamic_ports = dp;
331 node->root_inputs().clear();
332 node->root_outputs().clear();
333 node->initialize_all_ports();
334 });
335 }
336
337 void connect_controls(
338 ProcessModel<Node>& element, const ::Execution::Context& ctx,
339 std::shared_ptr<safe_node<Node>>& ptr)
340 {
341 using dynamic_ports_port_type = avnd::dynamic_ports_input_introspection<Node>;
342 using control_inputs_type = avnd::control_input_introspection<Node>;
343 using curve_inputs_type = avnd::curve_input_introspection<Node>;
344 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
345 using midifile_inputs_type = avnd::midifile_input_introspection<Node>;
346 using raw_file_inputs_type = avnd::raw_file_input_introspection<Node>;
347 using control_outputs_type = avnd::control_output_introspection<Node>;
348
349 // UI controls to engine
350 safe_node<Node>& node = *ptr;
351 avnd::effect_container<Node>& eff = node.impl;
352
353 // Initialize all the controls in the node with the current value.
354 // And update the node when the UI changes
355
356 if constexpr(dynamic_ports_port_type::size > 0)
357 {
358 for(auto state : eff.full_state())
359 {
360 dynamic_ports_port_type::for_all_n2(
361 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
362 }
363 }
364 if constexpr(control_inputs_type::size > 0)
365 {
366 for(auto state : eff.full_state())
367 {
368 control_inputs_type::for_all_n2(
369 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
370 }
371 }
372 if constexpr(curve_inputs_type::size > 0)
373 {
374 for(auto state : eff.full_state())
375 {
376 curve_inputs_type::for_all_n2(
377 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
378 }
379 }
380 if constexpr(soundfile_inputs_type::size > 0)
381 {
382 soundfile_inputs_type::for_all_n2(
383 avnd::get_inputs<Node>(eff),
384 dispatch_control_setup<Node>{element, ctx, ptr, this});
385
386 setup_soundfile_task_pool(element, ctx, ptr);
387 }
388 if constexpr(midifile_inputs_type::size > 0)
389 {
390 midifile_inputs_type::for_all_n2(
391 avnd::get_inputs<Node>(eff),
392 dispatch_control_setup<Node>{element, ctx, ptr, this});
393 }
394 if constexpr(raw_file_inputs_type::size > 0)
395 {
396 raw_file_inputs_type::for_all_n2(
397 avnd::get_inputs<Node>(eff),
398 dispatch_control_setup<Node>{element, ctx, ptr, this});
399 }
400
401 // Engine to ui controls
402 if constexpr(control_inputs_type::size > 0 || control_outputs_type::size > 0)
403 {
404 // Update the value in the UI
405 std::weak_ptr<safe_node<Node>> weak_node = ptr;
406 update_control_value_in_ui<Node> timer_action{weak_node, &element};
407 timer_action();
408
409 con(ctx.doc.coarseUpdateTimer, &QTimer::timeout, this,
410 [timer_action = std::move(timer_action)] { timer_action(); },
411 Qt::QueuedConnection);
412 }
413 }
414
415 void setup_soundfile_task_pool(
416 ProcessModel<Node>& element, const ::Execution::Context& ctx,
417 std::shared_ptr<safe_node<Node>>& ptr)
418 {
419 safe_node<Node>& node = *ptr;
420
421 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
422
423 auto& tq = score::TaskPool::instance();
424 node.soundfiles.load_request
425 = [&tq, p = std::weak_ptr{ptr}, &ctx](std::string& str, int idx) {
426 auto eff_ptr = p.lock();
427 if(!eff_ptr)
428 return;
429 tq.post([eff_ptr = std::move(eff_ptr), filename = str, &ctx, idx]() mutable {
430 if(auto file = loadSoundfile(filename, ctx.doc, ctx.execState))
431 {
432 ctx.executionQueue.enqueue(
433 [sf = std::move(file), p = std::weak_ptr{eff_ptr}, idx]() mutable {
434 auto eff_ptr = p.lock();
435 if(!eff_ptr)
436 return;
437
438 avnd::effect_container<Node>& eff = eff_ptr->impl;
439 soundfile_inputs_type::for_nth_mapped_n2(
440 avnd::get_inputs<Node>(eff), idx,
441 [&]<std::size_t NField, std::size_t N>(
442 auto& field, avnd::predicate_index<N> p,
443 avnd::field_index<NField> f) {
444 eff_ptr->soundfile_loaded(sf, p, f);
445 });
446 });
447 }
448 });
449 };
450 }
451
452 void connect_message_bus(
453 ProcessModel<Node>& element, const ::Execution::Context& ctx, Node& eff)
454 {
455 // Custom UI messages to engine
456 if constexpr(avnd::has_gui_to_processor_bus<Node>)
457 {
458 element.from_ui = [qex_ptr = weak_exec, &eff](QByteArray b) {
459 auto qex = qex_ptr.lock();
460 if(!qex)
461 return;
462
463 qex->enqueue([mess = std::move(b), &eff]() mutable {
464 using refl = avnd::function_reflection<&Node::process_message>;
465 static_assert(refl::count <= 1);
466
467 if constexpr(refl::count == 0)
468 {
469 // no arguments, just call it
470 eff.process_message();
471 }
472 else if constexpr(refl::count == 1)
473 {
474 using arg_type = avnd::first_argument<&Node::process_message>;
475 std::decay_t<arg_type> arg;
476 MessageBusReader reader{mess};
477 reader(arg);
478 eff.process_message(std::move(arg));
479 }
480 });
481 };
482 }
483
484 if constexpr(avnd::has_processor_to_gui_bus<Node>)
485 {
486 if constexpr(requires { eff.send_message = [](auto&&) { }; })
487 {
488 eff.send_message = [proc = QPointer{&this->process()},
489 qed_ptr = weak_edit]<typename T>(T&& b) mutable {
490 auto qed = qed_ptr.lock();
491 if(!qed)
492 return;
493 if constexpr(
494 sizeof(QPointer<QObject>) + sizeof(b)
495 < Execution::ExecutionCommand::max_storage)
496 {
497 qed->enqueue([proc, bb = std::move(b)]() mutable {
498 if(proc && proc->to_ui)
499 MessageBusSender{proc->to_ui}(std::move(bb));
500 });
501 }
502 else
503 {
504 qed->enqueue(
505 [proc, bb = std::make_unique<std::decay_t<T>>(std::move(b))]() mutable {
506 if(proc && proc->to_ui)
507 MessageBusSender{proc->to_ui}(*std::move(bb));
508 });
509 }
510 };
511 }
512 else if constexpr(requires { eff.send_message = []() { }; })
513 {
514 eff.send_message
515 = [proc = QPointer{&this->process()}, qed_ptr = weak_edit]() mutable {
516 if(!proc)
517 return;
518 auto qed = qed_ptr.lock();
519 if(!qed)
520 return;
521
522 qed->enqueue([proc]() mutable {
523 if(proc && proc->to_ui)
524 MessageBusSender{proc->to_ui}();
525 });
526 };
527 }
528 }
529 }
530
531 void connect_worker(const ::Execution::Context& ctx, avnd::effect_container<Node>& eff)
532 {
533 if constexpr(avnd::has_worker<Node>)
534 {
535 // Initialize the thread pool beforehand
536 auto& tq = score::TaskPool::instance();
537 using worker_type = decltype(eff.effect.worker);
538 for(auto& eff : eff.effects())
539 {
540 std::weak_ptr eff_ptr = std::shared_ptr<Node>(this->node, &eff);
541 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
542 ctx.alias.lock(), &ctx.executionQueue);
543
544 eff.worker.request
545 = [&tq, qex_ptr = std::move(qex_ptr),
546 eff_ptr = std::move(eff_ptr)]<typename... Args>(Args&&... f) mutable {
547 // request() is invoked in the DSP / processor thread
548 // and just posts the task to the thread pool
549 tq.post([eff_ptr, qex_ptr, ... ff = std::forward<Args>(f)]() mutable {
550 // This happens in the worker thread
551 // If for some reason the object has already been removed, not much
552 // reason to perform the work
553 if(!eff_ptr.lock())
554 return;
555
556 using type_of_result
557 = decltype(worker_type::work(std::forward<decltype(ff)>(ff)...));
558 if constexpr(std::is_void_v<type_of_result>)
559 {
560 worker_type::work(std::forward<decltype(ff)>(ff)...);
561 }
562 else
563 {
564 // If the worker returns a std::function, it
565 // is to be invoked back in the processor DSP thread
566 auto res = worker_type::work(std::forward<decltype(ff)>(ff)...);
567 if(!res)
568 return;
569
570 // Execution queue is currently spsc from main thread to an exec thread,
571 // we cannot just yeet the result back from the thread-pool
572 ossia::qt::run_async(
573 qApp, [eff_ptr = std::move(eff_ptr), qex_ptr = std::move(qex_ptr),
574 res = std::move(res)]() mutable {
575 // Main thread
576 std::shared_ptr qex = qex_ptr.lock();
577 if(!qex)
578 return;
579
580 qex->enqueue(
581 [eff_ptr = std::move(eff_ptr), res = std::move(res)]() mutable {
582 // DSP / processor thread
583 // We need res to be mutable so that the worker can use it to e.g. store
584 // old data which will be freed back in the main thread
585 if(auto p = eff_ptr.lock())
586 res(*p);
587 });
588 });
589 }
590 });
591 };
592 }
593 }
594 }
595
596 // Update everything
597 void update_controls(std::shared_ptr<safe_node<Node>>& ptr)
598 {
599 avnd::effect_container<Node>& eff = ptr->impl;
600 {
601 for(auto state : eff.full_state())
602 {
603 avnd::input_introspection<Node>::for_all(
604 state.inputs, [&](auto& field) { if_possible(field.update(state.effect)); });
605 }
606 }
607 }
608
609 void cleanup() override
610 {
611 if constexpr(requires { this->process().from_ui; })
612 {
613 this->process().from_ui = [](QByteArray arr) {};
614 }
615 // FIXME cleanup eff.effect.send_message too ?
616
617#if SCORE_PLUGIN_GFX
618 if constexpr(is_gpu<Node>)
619 {
620 // FIXME this must move in the Node dtor. See video_node
621 auto& gfx_exec = this->system().doc.template plugin<Gfx::DocumentPlugin>().exec;
622 if(node_id >= 0)
623 gfx_exec.ui->unregister_node(node_id);
624 }
625
626 // FIXME refactor this with other GFX processes
627 for(auto* outlet : this->process().outlets())
628 {
629 if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
630 {
631 out->nodeId = -1;
632 }
633 }
634#endif
635 ::Execution::ProcessComponent::cleanup();
636 }
637
638 ~Executor() { }
639};
640}
Definition GfxApplicationPlugin.hpp:13
Definition GfxExecNode.hpp:40
The Path class is a typesafe wrapper around ObjectPath.
Definition Path.hpp:52
Definition UuidKey.hpp:345
Definition score-plugin-avnd/Crousti/Executor.hpp:57
Definition score-plugin-avnd/Crousti/Executor.hpp:107
Definition score-plugin-avnd/Crousti/ProcessModel.hpp:86
Definition Metadatas.hpp:29
Definition Metadatas.hpp:23
Definition Metadatas.hpp:15
Definition Metadatas.hpp:35
Definition Factories.hpp:19
Definition Process/Execution/ProcessComponent.hpp:93
Definition ExecutionTransaction.hpp:21
Definition GfxExecNode.hpp:134
Definition PortForward.hpp:23
Definition PortForward.hpp:27
Definition MessageBus.hpp:171
Definition MessageBus.hpp:44
Definition ExecutorPortSetup.hpp:344
Definition ExecutorUpdateControlValueInUi.hpp:84