Loading...
Searching...
No Matches
score-plugin-avnd/Crousti/Executor.hpp
1#pragma once
2
3#include <Process/Execution/ProcessComponent.hpp>
4#include <Process/ExecutionContext.hpp>
5
6#include <Explorer/DocumentPlugin/DeviceDocumentPlugin.hpp>
7
8#include <Crousti/CpuAnalysisNode.hpp>
9#include <Crousti/CpuFilterNode.hpp>
10#include <Crousti/ExecutorPortSetup.hpp>
11#include <Crousti/ExecutorUpdateControlValueInUi.hpp>
12#include <Crousti/File.hpp>
13#include <Crousti/GpuComputeNode.hpp>
14#include <Crousti/GpuNode.hpp>
15#include <Crousti/MessageBus.hpp>
16#include <Crousti/Metadatas.hpp>
17#include <Crousti/ProcessModel.hpp>
18
19#include <score/tools/Bind.hpp>
20
21#include <ossia/dataflow/exec_state_facade.hpp>
22#include <ossia/dataflow/node_process.hpp>
23#include <ossia/network/context.hpp>
24
25#include <ossia-qt/invoke.hpp>
26
27#include <QGuiApplication>
28
29#include <flicks.h>
30
31#if SCORE_PLUGIN_GFX
32#include <Crousti/GpuNode.hpp>
33#include <Gfx/GfxApplicationPlugin.hpp>
34#endif
35
36#include <score/tools/ThreadPool.hpp>
37
38#include <ossia/detail/type_if.hpp>
39
40#include <QTimer>
41
42#include <avnd/binding/ossia/data_node.hpp>
43#include <avnd/binding/ossia/mono_audio_node.hpp>
44#include <avnd/binding/ossia/node.hpp>
45#include <avnd/binding/ossia/ossia_audio_node.hpp>
46#include <avnd/binding/ossia/poly_audio_node.hpp>
47#include <avnd/concepts/temporality.hpp>
48#include <avnd/concepts/ui.hpp>
49#include <avnd/concepts/worker.hpp>
50
51namespace oscr
52{
53
54template <typename Node>
55class CustomNodeProcess : public ossia::node_process
56{
57 using node_process::node_process;
58 void start() override
59 {
60 node_process::start();
61 auto& n = static_cast<safe_node<Node>&>(*node);
62 if_possible(n.impl.effect.start());
63 }
64 void pause() override
65 {
66 auto& n = static_cast<safe_node<Node>&>(*node);
67 if_possible(n.impl.effect.pause());
68 node_process::pause();
69 }
70
71 void resume() override
72 {
73 node_process::resume();
74 auto& n = static_cast<safe_node<Node>&>(*node);
75 if_possible(n.impl.effect.resume());
76 }
77
78 void stop() override
79 {
80 auto& n = static_cast<safe_node<Node>&>(*node);
81 if_possible(n.impl.effect.stop());
82 node_process::stop();
83 }
84
85 void offset_impl(ossia::time_value date) override
86 {
87 node_process::offset_impl(date);
88 auto& n = static_cast<safe_node<Node>&>(*node);
89 util::flicks f{date.impl};
90 if_possible(n.impl.effect.transport(f));
91 }
92
93 void transport_impl(ossia::time_value date) override
94 {
95 node_process::transport_impl(date);
96 auto& n = static_cast<safe_node<Node>&>(*node);
97
98 util::flicks f{date.impl};
99 if_possible(n.impl.effect.transport(f));
100 }
101};
102
103template <typename Node>
104class Executor final
105 : public Execution::ProcessComponent_T<ProcessModel<Node>, ossia::node_process>
106{
107 Process::Inlets m_oldInlets;
108 Process::Outlets m_oldOutlets;
109
110public:
111 static Q_DECL_RELAXED_CONSTEXPR UuidKey<score::Component> static_key() noexcept
112 {
113 return uuid_from_string<Node>();
114 }
115
116 UuidKey<score::Component> key() const noexcept final override { return static_key(); }
117
118 bool key_match(UuidKey<score::Component> other) const noexcept final override
119 {
120 return static_key() == other || Execution::ProcessComponent::base_key_match(other);
121 }
122
123 [[no_unique_address]] ossia::type_if<int, is_gpu<Node>> node_id = -1;
124
125 Executor(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
127 element, ctx, "Executor::ProcessModel<Info>", p}
128 {
129 if constexpr(is_gpu<Node>)
130 {
131#if SCORE_PLUGIN_GFX
132 setup_gpu(element, ctx, p);
133#endif
134 }
135 else
136 {
137 setup_cpu(element, ctx, p);
138 }
139
140 if constexpr(avnd::tag_process_exec<Node>)
141 {
142 this->m_ossia_process = std::make_shared<CustomNodeProcess<Node>>(this->node);
143 }
144 else
145 {
146 this->m_ossia_process = std::make_shared<ossia::node_process>(this->node);
147 }
148 }
149
150 void
151 setup_cpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
152 {
153 auto& net_ctx
154 = *ctx.doc.findPlugin<Explorer::DeviceDocumentPlugin>()->networkContext();
155 const auto id
156 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
157
158 auto st = ossia::exec_state_facade{ctx.execState.get()};
159 std::shared_ptr<safe_node<Node>> ptr;
160 auto node = new safe_node<Node>{st.bufferSize(), (double)st.sampleRate(), id};
161 node->root_inputs().reserve(element.inlets().size());
162 node->root_outputs().reserve(element.outlets().size());
163
164 node->prepare(*ctx.execState.get()); // Preparation of the ossia side
165
166 if_possible(node->impl.effect.ossia_state = st);
167 if_possible(node->impl.effect.io_context = &net_ctx.context);
168 if_possible(node->impl.effect.ossia_document_context = &ctx.doc);
169 ptr.reset(node);
170 this->node = ptr;
171
172 if constexpr(requires { ptr->impl.effect; })
173 if constexpr(std::is_same_v<std::decay_t<decltype(ptr->impl.effect)>, Node>)
174 connect_message_bus(element, ctx, ptr->impl.effect);
175 connect_worker(ctx, ptr->impl);
176
177 node->dynamic_ports = element.dynamic_ports;
178 node->finish_init();
179
180 connect_controls(element, ctx, ptr);
181 update_controls(ptr);
182 QObject::connect(
183 &element, &Process::ProcessModel::inletsChanged, this,
184 &Executor::recompute_ports);
185 QObject::connect(
186 &element, &Process::ProcessModel::outletsChanged, this,
187 &Executor::recompute_ports);
188
189 // To call prepare() after evertyhing is ready
190 node->audio_configuration_changed(st);
191
192 m_oldInlets = element.inlets();
193 m_oldOutlets = element.outlets();
194 }
195
196 void
197 setup_gpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
198 {
199#if SCORE_PLUGIN_GFX
200 // FIXME net context for gpu node ?
201 const int64_t id
202 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
203
204 auto& gfx_exec = ctx.doc.plugin<Gfx::DocumentPlugin>().exec;
205
206 // Create the executor in the audio thread
207 struct named_exec_node final : Gfx::gfx_exec_node
208 {
209 using Gfx::gfx_exec_node::gfx_exec_node;
210 std::string label() const noexcept override
211 {
212 return std::string(avnd::get_name<Node>());
213 }
214 };
215
216 auto node = std::make_shared<named_exec_node>(gfx_exec);
217 node->prepare(*ctx.execState);
218
219 this->node = node;
220
221 // Create the controls, inputs outputs etc.
222 std::size_t i = 0;
223
224 for(auto& ctl : element.inlets())
225 {
226 if(auto ctrl = qobject_cast<Process::ControlInlet*>(ctl))
227 {
228 auto& p = node->add_control();
229 p->value = ctrl->value();
230 p->changed = true;
231
232 QObject::connect(
233 ctrl, &Process::ControlInlet::valueChanged, this,
234 Gfx::con_unvalidated{ctx, i, 0, node});
235 i++;
236 }
237 else if(auto ctrl = qobject_cast<Process::ValueInlet*>(ctl))
238 {
239 auto& p = node->add_control();
240 p->changed = true;
241 i++;
242 }
243 else if(auto ctrl = qobject_cast<Process::AudioInlet*>(ctl))
244 {
245 node->add_audio();
246 }
247 else if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
248 {
249 ossia::texture_inlet& inl = *node->add_texture();
250 ctrl->setupExecution(inl, this);
251 }
252 else if(auto ctrl = qobject_cast<Gfx::GeometryInlet*>(ctl))
253 {
254 ossia::geometry_inlet& inl = *node->add_geometry();
255 ctrl->setupExecution(inl, this);
256 }
257 }
258
259 // FIXME refactor this with other GFX processes
260 for(auto* outlet : element.outlets())
261 {
262 if(auto ctrl = qobject_cast<Process::ControlOutlet*>(outlet))
263 {
264 node->add_control_out();
265 }
266 else if(auto ctrl = qobject_cast<Process::ValueOutlet*>(outlet))
267 {
268 node->add_control_out();
269 }
270 else if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
271 {
272 node->add_texture_out();
273 out->nodeId = node_id;
274 }
275 else if(auto out = qobject_cast<Gfx::GeometryOutlet*>(outlet))
276 {
277 node->add_geometry_out();
278 }
279 }
280
281 // Create the GPU node
282 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
283 ctx.alias.lock(), &ctx.executionQueue);
284 std::unique_ptr<score::gfx::Node> ptr;
285 if constexpr(GpuGraphicsNode2<Node>)
286 {
287 auto gpu_node = new CustomGpuNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
288 ptr.reset(gpu_node);
289 }
290 else if constexpr(GpuComputeNode2<Node>)
291 {
292 auto gpu_node = new GpuComputeNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
293 ptr.reset(gpu_node);
294 }
295 else if constexpr(GpuNode<Node>)
296 {
297 auto gpu_node
298 = new GfxNode<Node>(element, qex_ptr, node->control_outs, id, ctx.doc);
299 ptr.reset(gpu_node);
300 }
301
302 i = 0;
303 for(auto& ctl : element.inlets())
304 {
305 if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
306 {
307 ossia::texture_inlet& inl
308 = static_cast<ossia::texture_inlet&>(*node->root_inputs()[i]);
309 ptr->process(i, inl.data); // Setup render_target_spec
310 }
311 i++;
312 }
313 node->id = gfx_exec.ui->register_node(std::move(ptr));
314 node_id = node->id;
315#endif
316 }
317
318 void recompute_ports()
319 {
320 Execution::Transaction commands{this->system()};
321 auto n = std::dynamic_pointer_cast<safe_node<Node>>(this->node);
322 if(!n)
323 return;
324
325 // Re-run setup_inlets ?
326 in_exec([dp = this->process().dynamic_ports, node = n] {
327 node->dynamic_ports = dp;
328 node->root_inputs().clear();
329 node->root_outputs().clear();
330 node->initialize_all_ports();
331 });
332 }
333
334 void connect_controls(
335 ProcessModel<Node>& element, const ::Execution::Context& ctx,
336 std::shared_ptr<safe_node<Node>>& ptr)
337 {
338 using dynamic_ports_port_type = avnd::dynamic_ports_input_introspection<Node>;
339 using control_inputs_type = avnd::control_input_introspection<Node>;
340 using curve_inputs_type = avnd::curve_input_introspection<Node>;
341 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
342 using midifile_inputs_type = avnd::midifile_input_introspection<Node>;
343 using raw_file_inputs_type = avnd::raw_file_input_introspection<Node>;
344 using control_outputs_type = avnd::control_output_introspection<Node>;
345
346 // UI controls to engine
347 safe_node<Node>& node = *ptr;
348 avnd::effect_container<Node>& eff = node.impl;
349
350 // Initialize all the controls in the node with the current value.
351 // And update the node when the UI changes
352
353 if constexpr(dynamic_ports_port_type::size > 0)
354 {
355 for(auto state : eff.full_state())
356 {
357 dynamic_ports_port_type::for_all_n2(
358 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
359 }
360 }
361 if constexpr(control_inputs_type::size > 0)
362 {
363 for(auto state : eff.full_state())
364 {
365 control_inputs_type::for_all_n2(
366 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
367 }
368 }
369 if constexpr(curve_inputs_type::size > 0)
370 {
371 for(auto state : eff.full_state())
372 {
373 curve_inputs_type::for_all_n2(
374 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
375 }
376 }
377 if constexpr(soundfile_inputs_type::size > 0)
378 {
379 soundfile_inputs_type::for_all_n2(
380 avnd::get_inputs<Node>(eff),
381 dispatch_control_setup<Node>{element, ctx, ptr, this});
382
383 setup_soundfile_task_pool(element, ctx, ptr);
384 }
385 if constexpr(midifile_inputs_type::size > 0)
386 {
387 midifile_inputs_type::for_all_n2(
388 avnd::get_inputs<Node>(eff),
389 dispatch_control_setup<Node>{element, ctx, ptr, this});
390 }
391 if constexpr(raw_file_inputs_type::size > 0)
392 {
393 raw_file_inputs_type::for_all_n2(
394 avnd::get_inputs<Node>(eff),
395 dispatch_control_setup<Node>{element, ctx, ptr, this});
396 }
397
398 // Engine to ui controls
399 if constexpr(control_inputs_type::size > 0 || control_outputs_type::size > 0)
400 {
401 // Update the value in the UI
402 std::weak_ptr<safe_node<Node>> weak_node = ptr;
403 update_control_value_in_ui<Node> timer_action{weak_node, &element};
404 timer_action();
405
406 con(ctx.doc.coarseUpdateTimer, &QTimer::timeout, this,
407 [timer_action = std::move(timer_action)] { timer_action(); },
408 Qt::QueuedConnection);
409 }
410 }
411
412 void setup_soundfile_task_pool(
413 ProcessModel<Node>& element, const ::Execution::Context& ctx,
414 std::shared_ptr<safe_node<Node>>& ptr)
415 {
416 safe_node<Node>& node = *ptr;
417
418 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
419
420 auto& tq = score::TaskPool::instance();
421 node.soundfiles.load_request
422 = [&tq, p = std::weak_ptr{ptr}, &ctx](std::string& str, int idx) {
423 auto eff_ptr = p.lock();
424 if(!eff_ptr)
425 return;
426 tq.post([eff_ptr = std::move(eff_ptr), filename = str, &ctx, idx]() mutable {
427 if(auto file = loadSoundfile(filename, ctx.doc, ctx.execState))
428 {
429 ctx.executionQueue.enqueue(
430 [sf = std::move(file), p = std::weak_ptr{eff_ptr}, idx]() mutable {
431 auto eff_ptr = p.lock();
432 if(!eff_ptr)
433 return;
434
435 avnd::effect_container<Node>& eff = eff_ptr->impl;
436 soundfile_inputs_type::for_nth_mapped_n2(
437 avnd::get_inputs<Node>(eff), idx,
438 [&]<std::size_t NField, std::size_t N>(
439 auto& field, avnd::predicate_index<N> p,
440 avnd::field_index<NField> f) {
441 eff_ptr->soundfile_loaded(sf, p, f);
442 });
443 });
444 }
445 });
446 };
447 }
448
449 void connect_message_bus(
450 ProcessModel<Node>& element, const ::Execution::Context& ctx, Node& eff)
451 {
452 // Custom UI messages to engine
453 if constexpr(avnd::has_gui_to_processor_bus<Node>)
454 {
455 element.from_ui = [qex_ptr = weak_exec, &eff](QByteArray b) {
456 auto qex = qex_ptr.lock();
457 if(!qex)
458 return;
459
460 qex->enqueue([mess = std::move(b), &eff]() mutable {
461 using refl = avnd::function_reflection<&Node::process_message>;
462 static_assert(refl::count <= 1);
463
464 if constexpr(refl::count == 0)
465 {
466 // no arguments, just call it
467 eff.process_message();
468 }
469 else if constexpr(refl::count == 1)
470 {
471 using arg_type = avnd::first_argument<&Node::process_message>;
472 std::decay_t<arg_type> arg;
473 MessageBusReader reader{mess};
474 reader(arg);
475 eff.process_message(std::move(arg));
476 }
477 });
478 };
479 }
480
481 if constexpr(avnd::has_processor_to_gui_bus<Node>)
482 {
483 if constexpr(requires { eff.send_message = [](auto&&) { }; })
484 {
485 eff.send_message = [proc = QPointer{&this->process()},
486 qed_ptr = weak_edit]<typename T>(T&& b) mutable {
487 auto qed = qed_ptr.lock();
488 if(!qed)
489 return;
490 if constexpr(
491 sizeof(QPointer<QObject>) + sizeof(b)
492 < Execution::ExecutionCommand::max_storage)
493 {
494 qed->enqueue([proc, bb = std::move(b)]() mutable {
495 if(proc && proc->to_ui)
496 MessageBusSender{proc->to_ui}(std::move(bb));
497 });
498 }
499 else
500 {
501 qed->enqueue(
502 [proc, bb = std::make_unique<std::decay_t<T>>(std::move(b))]() mutable {
503 if(proc && proc->to_ui)
504 MessageBusSender{proc->to_ui}(*std::move(bb));
505 });
506 }
507 };
508 }
509 else if constexpr(requires { eff.send_message = []() { }; })
510 {
511 eff.send_message
512 = [proc = QPointer{&this->process()}, qed_ptr = weak_edit]() mutable {
513 if(!proc)
514 return;
515 auto qed = qed_ptr.lock();
516 if(!qed)
517 return;
518
519 qed->enqueue([proc]() mutable {
520 if(proc && proc->to_ui)
521 MessageBusSender{proc->to_ui}();
522 });
523 };
524 }
525 }
526 }
527
528 void connect_worker(const ::Execution::Context& ctx, avnd::effect_container<Node>& eff)
529 {
530 if constexpr(avnd::has_worker<Node>)
531 {
532 // Initialize the thread pool beforehand
533 auto& tq = score::TaskPool::instance();
534 using worker_type = decltype(eff.effect.worker);
535 for(auto& eff : eff.effects())
536 {
537 std::weak_ptr eff_ptr = std::shared_ptr<Node>(this->node, &eff);
538 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
539 ctx.alias.lock(), &ctx.executionQueue);
540
541 eff.worker.request
542 = [&tq, qex_ptr = std::move(qex_ptr),
543 eff_ptr = std::move(eff_ptr)]<typename... Args>(Args&&... f) mutable {
544 // request() is invoked in the DSP / processor thread
545 // and just posts the task to the thread pool
546 tq.post([eff_ptr, qex_ptr, ... ff = std::forward<Args>(f)]() mutable {
547 // This happens in the worker thread
548 // If for some reason the object has already been removed, not much
549 // reason to perform the work
550 if(!eff_ptr.lock())
551 return;
552
553 using type_of_result
554 = decltype(worker_type::work(std::forward<decltype(ff)>(ff)...));
555 if constexpr(std::is_void_v<type_of_result>)
556 {
557 worker_type::work(std::forward<decltype(ff)>(ff)...);
558 }
559 else
560 {
561 // If the worker returns a std::function, it
562 // is to be invoked back in the processor DSP thread
563 auto res = worker_type::work(std::forward<decltype(ff)>(ff)...);
564 if(!res)
565 return;
566
567 // Execution queue is currently spsc from main thread to an exec thread,
568 // we cannot just yeet the result back from the thread-pool
569 ossia::qt::run_async(
570 qApp, [eff_ptr = std::move(eff_ptr), qex_ptr = std::move(qex_ptr),
571 res = std::move(res)]() mutable {
572 // Main thread
573 std::shared_ptr qex = qex_ptr.lock();
574 if(!qex)
575 return;
576
577 qex->enqueue(
578 [eff_ptr = std::move(eff_ptr), res = std::move(res)]() mutable {
579 // DSP / processor thread
580 // We need res to be mutable so that the worker can use it to e.g. store
581 // old data which will be freed back in the main thread
582 if(auto p = eff_ptr.lock())
583 res(*p);
584 });
585 });
586 }
587 });
588 };
589 }
590 }
591 }
592
593 // Update everything
594 void update_controls(std::shared_ptr<safe_node<Node>>& ptr)
595 {
596 avnd::effect_container<Node>& eff = ptr->impl;
597 {
598 for(auto state : eff.full_state())
599 {
600 avnd::input_introspection<Node>::for_all(
601 state.inputs, [&](auto& field) { if_possible(field.update(state.effect)); });
602 }
603 }
604 }
605
606 void cleanup() override
607 {
608 if constexpr(requires { this->process().from_ui; })
609 {
610 this->process().from_ui = [](QByteArray arr) {};
611 }
612 // FIXME cleanup eff.effect.send_message too ?
613
614#if SCORE_PLUGIN_GFX
615 if constexpr(is_gpu<Node>)
616 {
617 // FIXME this must move in the Node dtor. See video_node
618 auto& gfx_exec = this->system().doc.template plugin<Gfx::DocumentPlugin>().exec;
619 if(node_id >= 0)
620 gfx_exec.ui->unregister_node(node_id);
621 }
622
623 // FIXME refactor this with other GFX processes
624 for(auto* outlet : this->process().outlets())
625 {
626 if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
627 {
628 out->nodeId = -1;
629 }
630 }
631#endif
632 ::Execution::ProcessComponent::cleanup();
633 }
634
635 ~Executor() { }
636};
637}
Definition GfxApplicationPlugin.hpp:13
Definition GfxExecNode.hpp:40
The Path class is a typesafe wrapper around ObjectPath.
Definition Path.hpp:52
Definition UuidKey.hpp:345
Definition score-plugin-avnd/Crousti/Executor.hpp:56
Definition score-plugin-avnd/Crousti/Executor.hpp:106
Definition score-plugin-avnd/Crousti/ProcessModel.hpp:86
Definition Metadatas.hpp:29
Definition Metadatas.hpp:23
Definition Metadatas.hpp:15
Definition Metadatas.hpp:35
Definition Factories.hpp:19
Definition Process/Execution/ProcessComponent.hpp:93
Definition ExecutionTransaction.hpp:20
Definition GfxExecNode.hpp:134
Definition PortForward.hpp:23
Definition PortForward.hpp:27
Definition MessageBus.hpp:171
Definition MessageBus.hpp:44
Definition ExecutorPortSetup.hpp:344
Definition ExecutorUpdateControlValueInUi.hpp:84