Loading...
Searching...
No Matches
score-plugin-avnd/Crousti/Executor.hpp
1#pragma once
2
3#include <Process/Execution/ProcessComponent.hpp>
4#include <Process/ExecutionContext.hpp>
5
6#include <Explorer/DocumentPlugin/DeviceDocumentPlugin.hpp>
7
8#include <Crousti/CpuAnalysisNode.hpp>
9#include <Crousti/CpuFilterNode.hpp>
10#include <Crousti/ExecutorPortSetup.hpp>
11#include <Crousti/ExecutorUpdateControlValueInUi.hpp>
12#include <Crousti/File.hpp>
13#include <Crousti/GpuComputeNode.hpp>
14#include <Crousti/GpuNode.hpp>
15#include <Crousti/MessageBus.hpp>
16#include <Crousti/Metadatas.hpp>
17#include <Crousti/ProcessModel.hpp>
18
19#include <score/tools/Bind.hpp>
20
21#include <ossia/dataflow/exec_state_facade.hpp>
22#include <ossia/dataflow/node_process.hpp>
23#include <ossia/network/context.hpp>
24
25#include <ossia-qt/invoke.hpp>
26
27#include <QGuiApplication>
28
29#include <flicks.h>
30
31#if SCORE_PLUGIN_GFX
32#include <Crousti/GpuNode.hpp>
33#include <Gfx/GfxApplicationPlugin.hpp>
34#endif
35
36#include <score/tools/ThreadPool.hpp>
37
38#include <ossia/detail/type_if.hpp>
39
40#include <QTimer>
41
42#include <avnd/binding/ossia/data_node.hpp>
43#include <avnd/binding/ossia/mono_audio_node.hpp>
44#include <avnd/binding/ossia/node.hpp>
45#include <avnd/binding/ossia/ossia_audio_node.hpp>
46#include <avnd/binding/ossia/poly_audio_node.hpp>
47#include <avnd/concepts/temporality.hpp>
48#include <avnd/concepts/ui.hpp>
49#include <avnd/concepts/worker.hpp>
50
51namespace oscr
52{
53
54template <typename Node>
55class CustomNodeProcess : public ossia::node_process
56{
57 using node_process::node_process;
58 void start() override
59 {
60 node_process::start();
61 auto& n = static_cast<safe_node<Node>&>(*node);
62 if_possible(n.impl.effect.start());
63 }
64 void pause() override
65 {
66 auto& n = static_cast<safe_node<Node>&>(*node);
67 if_possible(n.impl.effect.pause());
68 node_process::pause();
69 }
70
71 void resume() override
72 {
73 node_process::resume();
74 auto& n = static_cast<safe_node<Node>&>(*node);
75 if_possible(n.impl.effect.resume());
76 }
77
78 void stop() override
79 {
80 auto& n = static_cast<safe_node<Node>&>(*node);
81 if_possible(n.impl.effect.stop());
82 node_process::stop();
83 }
84
85 void offset_impl(ossia::time_value date) override
86 {
87 node_process::offset_impl(date);
88 auto& n = static_cast<safe_node<Node>&>(*node);
89 util::flicks f{date.impl};
90 if_possible(n.impl.effect.transport(f));
91 }
92
93 void transport_impl(ossia::time_value date) override
94 {
95 node_process::transport_impl(date);
96 auto& n = static_cast<safe_node<Node>&>(*node);
97
98 util::flicks f{date.impl};
99 if_possible(n.impl.effect.transport(f));
100 }
101};
102
103template <typename Node>
104class Executor final
105 : public Execution::ProcessComponent_T<ProcessModel<Node>, ossia::node_process>
106{
107 Process::Inlets m_oldInlets;
108 Process::Outlets m_oldOutlets;
109
110public:
111 static Q_DECL_RELAXED_CONSTEXPR UuidKey<score::Component> static_key() noexcept
112 {
113 return uuid_from_string<Node>();
114 }
115
116 UuidKey<score::Component> key() const noexcept final override { return static_key(); }
117
118 bool key_match(UuidKey<score::Component> other) const noexcept final override
119 {
120 return static_key() == other || Execution::ProcessComponent::base_key_match(other);
121 }
122
123#if defined(SCORE_PLUGIN_GFX)
124 [[no_unique_address]] ossia::type_if<int, is_gpu<Node>> node_id = score::gfx::invalid_node_index;
125#endif
126
127 Executor(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
129 element, ctx, "Executor::ProcessModel<Info>", p}
130 {
131 if constexpr(is_gpu<Node>)
132 {
133#if SCORE_PLUGIN_GFX
134 setup_gpu(element, ctx, p);
135#endif
136 }
137 else
138 {
139 setup_cpu(element, ctx, p);
140 }
141
142 if constexpr(avnd::tag_process_exec<Node>)
143 {
144 this->m_ossia_process = std::make_shared<CustomNodeProcess<Node>>(this->node);
145 }
146 else
147 {
148 this->m_ossia_process = std::make_shared<ossia::node_process>(this->node);
149 }
150 }
151
152 void
153 setup_cpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
154 {
155 auto& net_ctx
156 = *ctx.doc.findPlugin<Explorer::DeviceDocumentPlugin>()->networkContext();
157 const auto id
158 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
159
160 auto st = ossia::exec_state_facade{ctx.execState.get()};
161 std::shared_ptr<safe_node<Node>> ptr;
162 auto node = new safe_node<Node>{st.bufferSize(), (double)st.sampleRate(), id};
163 node->root_inputs().reserve(element.inlets().size());
164 node->root_outputs().reserve(element.outlets().size());
165
166 node->prepare(*ctx.execState.get()); // Preparation of the ossia side
167
168 if_possible(node->impl.effect.ossia_state = st);
169 if_possible(node->impl.effect.io_context = &net_ctx.context);
170 if_possible(node->impl.effect.ossia_document_context = &ctx.doc);
171 ptr.reset(node);
172 this->node = ptr;
173
174 if constexpr(requires { ptr->impl.effect; })
175 if constexpr(std::is_same_v<std::decay_t<decltype(ptr->impl.effect)>, Node>)
176 connect_message_bus(element, ctx, ptr->impl.effect);
177 connect_worker(ctx, ptr->impl);
178
179 node->dynamic_ports = element.dynamic_ports;
180 node->finish_init();
181
182 connect_controls(element, ctx, ptr);
183 update_controls(ptr);
184 QObject::connect(
185 &element, &Process::ProcessModel::inletsChanged, this,
186 &Executor::recompute_ports);
187 QObject::connect(
188 &element, &Process::ProcessModel::outletsChanged, this,
189 &Executor::recompute_ports);
190
191 // To call prepare() after evertyhing is ready
192 node->audio_configuration_changed(st);
193
194 m_oldInlets = element.inlets();
195 m_oldOutlets = element.outlets();
196 }
197
198 void
199 setup_gpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
200 {
201#if SCORE_PLUGIN_GFX
202 // FIXME net context for gpu node ?
203 const int64_t id
204 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
205
206 auto& gfx_exec = ctx.doc.plugin<Gfx::DocumentPlugin>().exec;
207
208 // Create the executor in the audio thread
209 struct named_exec_node final : Gfx::gfx_exec_node
210 {
211 using Gfx::gfx_exec_node::gfx_exec_node;
212 std::string label() const noexcept override
213 {
214 return std::string(avnd::get_name<Node>());
215 }
216 };
217
218 auto node = std::make_shared<named_exec_node>(gfx_exec);
219 node->prepare(*ctx.execState);
220
221 this->node = node;
222
223 // Create the controls, inputs outputs etc.
224 std::size_t i = 0;
225
226 for(auto& ctl : element.inlets())
227 {
228 if(auto ctrl = qobject_cast<Process::ControlInlet*>(ctl))
229 {
230 auto& p = node->add_control();
231 p->value = ctrl->value();
232 p->changed = true;
233
234 QObject::connect(
235 ctrl, &Process::ControlInlet::valueChanged, this,
236 Gfx::con_unvalidated{ctx, i, 0, node});
237 i++;
238 }
239 else if(auto ctrl = qobject_cast<Process::ValueInlet*>(ctl))
240 {
241 auto& p = node->add_control();
242 p->changed = true;
243 i++;
244 }
245 else if(auto ctrl = qobject_cast<Process::AudioInlet*>(ctl))
246 {
247 node->add_audio();
248 }
249 else if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
250 {
251 ossia::texture_inlet& inl = *node->add_texture();
252 ctrl->setupExecution(inl, this);
253 }
254 else if(auto ctrl = qobject_cast<Gfx::GeometryInlet*>(ctl))
255 {
256 ossia::geometry_inlet& inl = *node->add_geometry();
257 ctrl->setupExecution(inl, this);
258 }
259 }
260
261 // FIXME refactor this with other GFX processes
262 for(auto* outlet : element.outlets())
263 {
264 if(auto ctrl = qobject_cast<Process::ControlOutlet*>(outlet))
265 {
266 node->add_control_out();
267 }
268 else if(auto ctrl = qobject_cast<Process::ValueOutlet*>(outlet))
269 {
270 node->add_control_out();
271 }
272 else if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
273 {
274 node->add_texture_out();
275 out->nodeId = node_id;
276 }
277 else if(auto out = qobject_cast<Gfx::GeometryOutlet*>(outlet))
278 {
279 node->add_geometry_out();
280 }
281 }
282
283 // Create the GPU node
284 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
285 ctx.alias.lock(), &ctx.executionQueue);
286 std::unique_ptr<score::gfx::Node> ptr;
287 if constexpr(GpuGraphicsNode2<Node>)
288 {
289 auto gpu_node = new CustomGpuNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
290 ptr.reset(gpu_node);
291 }
292 else if constexpr(GpuComputeNode2<Node>)
293 {
294 auto gpu_node = new GpuComputeNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
295 ptr.reset(gpu_node);
296 }
297 else if constexpr(GpuNode<Node>)
298 {
299 auto gpu_node
300 = new GfxNode<Node>(element, qex_ptr, node->control_outs, id, ctx.doc);
301 ptr.reset(gpu_node);
302 }
303
304 i = 0;
305 for(auto& ctl : element.inlets())
306 {
307 if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
308 {
309 ossia::texture_inlet& inl
310 = static_cast<ossia::texture_inlet&>(*node->root_inputs()[i]);
311 ptr->process(i, inl.data); // Setup render_target_spec
312 }
313 i++;
314 }
315 node->id = gfx_exec.ui->register_node(std::move(ptr));
316 node_id = node->id;
317#endif
318 }
319
320 void recompute_ports()
321 {
322 Execution::Transaction commands{this->system()};
323 auto n = std::dynamic_pointer_cast<safe_node<Node>>(this->node);
324 if(!n)
325 return;
326
327 // Re-run setup_inlets ?
328 in_exec([dp = this->process().dynamic_ports, node = n] {
329 node->dynamic_ports = dp;
330 node->root_inputs().clear();
331 node->root_outputs().clear();
332 node->initialize_all_ports();
333 });
334 }
335
336 void connect_controls(
337 ProcessModel<Node>& element, const ::Execution::Context& ctx,
338 std::shared_ptr<safe_node<Node>>& ptr)
339 {
340 using dynamic_ports_port_type = avnd::dynamic_ports_input_introspection<Node>;
341 using control_inputs_type = avnd::control_input_introspection<Node>;
342 using curve_inputs_type = avnd::curve_input_introspection<Node>;
343 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
344 using midifile_inputs_type = avnd::midifile_input_introspection<Node>;
345 using raw_file_inputs_type = avnd::raw_file_input_introspection<Node>;
346 using control_outputs_type = avnd::control_output_introspection<Node>;
347
348 // UI controls to engine
349 safe_node<Node>& node = *ptr;
350 avnd::effect_container<Node>& eff = node.impl;
351
352 // Initialize all the controls in the node with the current value.
353 // And update the node when the UI changes
354
355 if constexpr(dynamic_ports_port_type::size > 0)
356 {
357 for(auto state : eff.full_state())
358 {
359 dynamic_ports_port_type::for_all_n2(
360 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
361 }
362 }
363 if constexpr(control_inputs_type::size > 0)
364 {
365 for(auto state : eff.full_state())
366 {
367 control_inputs_type::for_all_n2(
368 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
369 }
370 }
371 if constexpr(curve_inputs_type::size > 0)
372 {
373 for(auto state : eff.full_state())
374 {
375 curve_inputs_type::for_all_n2(
376 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
377 }
378 }
379 if constexpr(soundfile_inputs_type::size > 0)
380 {
381 soundfile_inputs_type::for_all_n2(
382 avnd::get_inputs<Node>(eff),
383 dispatch_control_setup<Node>{element, ctx, ptr, this});
384
385 setup_soundfile_task_pool(element, ctx, ptr);
386 }
387 if constexpr(midifile_inputs_type::size > 0)
388 {
389 midifile_inputs_type::for_all_n2(
390 avnd::get_inputs<Node>(eff),
391 dispatch_control_setup<Node>{element, ctx, ptr, this});
392 }
393 if constexpr(raw_file_inputs_type::size > 0)
394 {
395 raw_file_inputs_type::for_all_n2(
396 avnd::get_inputs<Node>(eff),
397 dispatch_control_setup<Node>{element, ctx, ptr, this});
398 }
399
400 // Engine to ui controls
401 if constexpr(control_inputs_type::size > 0 || control_outputs_type::size > 0)
402 {
403 // Update the value in the UI
404 std::weak_ptr<safe_node<Node>> weak_node = ptr;
405 update_control_value_in_ui<Node> timer_action{weak_node, &element};
406 timer_action();
407
408 con(ctx.doc.coarseUpdateTimer, &QTimer::timeout, this,
409 [timer_action = std::move(timer_action)] { timer_action(); },
410 Qt::QueuedConnection);
411 }
412 }
413
414 void setup_soundfile_task_pool(
415 ProcessModel<Node>& element, const ::Execution::Context& ctx,
416 std::shared_ptr<safe_node<Node>>& ptr)
417 {
418 safe_node<Node>& node = *ptr;
419
420 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
421
422 auto& tq = score::TaskPool::instance();
423 node.soundfiles.load_request
424 = [&tq, p = std::weak_ptr{ptr}, &ctx](std::string& str, int idx) {
425 auto eff_ptr = p.lock();
426 if(!eff_ptr)
427 return;
428 tq.post([eff_ptr = std::move(eff_ptr), filename = str, &ctx, idx]() mutable {
429 if(auto file = loadSoundfile(filename, ctx.doc, ctx.execState))
430 {
431 ctx.executionQueue.enqueue(
432 [sf = std::move(file), p = std::weak_ptr{eff_ptr}, idx]() mutable {
433 auto eff_ptr = p.lock();
434 if(!eff_ptr)
435 return;
436
437 avnd::effect_container<Node>& eff = eff_ptr->impl;
438 soundfile_inputs_type::for_nth_mapped_n2(
439 avnd::get_inputs<Node>(eff), idx,
440 [&]<std::size_t NField, std::size_t N>(
441 auto& field, avnd::predicate_index<N> p,
442 avnd::field_index<NField> f) {
443 eff_ptr->soundfile_loaded(sf, p, f);
444 });
445 });
446 }
447 });
448 };
449 }
450
451 void connect_message_bus(
452 ProcessModel<Node>& element, const ::Execution::Context& ctx, Node& eff)
453 {
454 // Custom UI messages to engine
455 if constexpr(avnd::has_gui_to_processor_bus<Node>)
456 {
457 element.from_ui = [qex_ptr = weak_exec, &eff](QByteArray b) {
458 auto qex = qex_ptr.lock();
459 if(!qex)
460 return;
461
462 qex->enqueue([mess = std::move(b), &eff]() mutable {
463 using refl = avnd::function_reflection<&Node::process_message>;
464 static_assert(refl::count <= 1);
465
466 if constexpr(refl::count == 0)
467 {
468 // no arguments, just call it
469 eff.process_message();
470 }
471 else if constexpr(refl::count == 1)
472 {
473 using arg_type = avnd::first_argument<&Node::process_message>;
474 std::decay_t<arg_type> arg;
475 MessageBusReader reader{mess};
476 reader(arg);
477 eff.process_message(std::move(arg));
478 }
479 });
480 };
481 }
482
483 if constexpr(avnd::has_processor_to_gui_bus<Node>)
484 {
485 if constexpr(requires { eff.send_message = [](auto&&) { }; })
486 {
487 eff.send_message = [proc = QPointer{&this->process()},
488 qed_ptr = weak_edit]<typename T>(T&& b) mutable {
489 auto qed = qed_ptr.lock();
490 if(!qed)
491 return;
492 if constexpr(
493 sizeof(QPointer<QObject>) + sizeof(b)
494 < Execution::ExecutionCommand::max_storage)
495 {
496 qed->enqueue([proc, bb = std::move(b)]() mutable {
497 if(proc && proc->to_ui)
498 MessageBusSender{proc->to_ui}(std::move(bb));
499 });
500 }
501 else
502 {
503 qed->enqueue(
504 [proc, bb = std::make_unique<std::decay_t<T>>(std::move(b))]() mutable {
505 if(proc && proc->to_ui)
506 MessageBusSender{proc->to_ui}(*std::move(bb));
507 });
508 }
509 };
510 }
511 else if constexpr(requires { eff.send_message = []() { }; })
512 {
513 eff.send_message
514 = [proc = QPointer{&this->process()}, qed_ptr = weak_edit]() mutable {
515 if(!proc)
516 return;
517 auto qed = qed_ptr.lock();
518 if(!qed)
519 return;
520
521 qed->enqueue([proc]() mutable {
522 if(proc && proc->to_ui)
523 MessageBusSender{proc->to_ui}();
524 });
525 };
526 }
527 }
528 }
529
530 void connect_worker(const ::Execution::Context& ctx, avnd::effect_container<Node>& eff)
531 {
532 if constexpr(avnd::has_worker<Node>)
533 {
534 // Initialize the thread pool beforehand
535 auto& tq = score::TaskPool::instance();
536 using worker_type = decltype(eff.effect.worker);
537 for(auto& eff : eff.effects())
538 {
539 std::weak_ptr eff_ptr = std::shared_ptr<Node>(this->node, &eff);
540 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
541 ctx.alias.lock(), &ctx.executionQueue);
542
543 eff.worker.request
544 = [&tq, qex_ptr = std::move(qex_ptr),
545 eff_ptr = std::move(eff_ptr)]<typename... Args>(Args&&... f) mutable {
546 // request() is invoked in the DSP / processor thread
547 // and just posts the task to the thread pool
548 tq.post([eff_ptr, qex_ptr, ... ff = std::forward<Args>(f)]() mutable {
549 // This happens in the worker thread
550 // If for some reason the object has already been removed, not much
551 // reason to perform the work
552 if(!eff_ptr.lock())
553 return;
554
555 using type_of_result
556 = decltype(worker_type::work(std::forward<decltype(ff)>(ff)...));
557 if constexpr(std::is_void_v<type_of_result>)
558 {
559 worker_type::work(std::forward<decltype(ff)>(ff)...);
560 }
561 else
562 {
563 // If the worker returns a std::function, it
564 // is to be invoked back in the processor DSP thread
565 auto res = worker_type::work(std::forward<decltype(ff)>(ff)...);
566 if(!res)
567 return;
568
569 // Execution queue is currently spsc from main thread to an exec thread,
570 // we cannot just yeet the result back from the thread-pool
571 ossia::qt::run_async(
572 qApp, [eff_ptr = std::move(eff_ptr), qex_ptr = std::move(qex_ptr),
573 res = std::move(res)]() mutable {
574 // Main thread
575 std::shared_ptr qex = qex_ptr.lock();
576 if(!qex)
577 return;
578
579 qex->enqueue(
580 [eff_ptr = std::move(eff_ptr), res = std::move(res)]() mutable {
581 // DSP / processor thread
582 // We need res to be mutable so that the worker can use it to e.g. store
583 // old data which will be freed back in the main thread
584 if(auto p = eff_ptr.lock())
585 res(*p);
586 });
587 });
588 }
589 });
590 };
591 }
592 }
593 }
594
595 // Update everything
596 void update_controls(std::shared_ptr<safe_node<Node>>& ptr)
597 {
598 avnd::effect_container<Node>& eff = ptr->impl;
599 {
600 for(auto state : eff.full_state())
601 {
602 avnd::input_introspection<Node>::for_all(
603 state.inputs, [&](auto& field) { if_possible(field.update(state.effect)); });
604 }
605 }
606 }
607
608 void cleanup() override
609 {
610 if constexpr(requires { this->process().from_ui; })
611 {
612 this->process().from_ui = [](QByteArray arr) {};
613 }
614 // FIXME cleanup eff.effect.send_message too ?
615
616#if SCORE_PLUGIN_GFX
617 if constexpr(is_gpu<Node>)
618 {
619 // FIXME this must move in the Node dtor. See video_node
620 auto& gfx_exec = this->system().doc.template plugin<Gfx::DocumentPlugin>().exec;
621 if(node_id >= 0)
622 gfx_exec.ui->unregister_node(node_id);
623 }
624
625 // FIXME refactor this with other GFX processes
626 for(auto* outlet : this->process().outlets())
627 {
628 if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
629 {
630 out->nodeId = -1;
631 }
632 }
633#endif
634 ::Execution::ProcessComponent::cleanup();
635 }
636
637 ~Executor() { }
638};
639}
Definition GfxApplicationPlugin.hpp:13
Definition GfxExecNode.hpp:40
The Path class is a typesafe wrapper around ObjectPath.
Definition Path.hpp:52
Definition UuidKey.hpp:345
Definition score-plugin-avnd/Crousti/Executor.hpp:56
Definition score-plugin-avnd/Crousti/Executor.hpp:106
Definition score-plugin-avnd/Crousti/ProcessModel.hpp:86
Definition Metadatas.hpp:29
Definition Metadatas.hpp:23
Definition Metadatas.hpp:15
Definition Metadatas.hpp:35
Definition Factories.hpp:19
Definition Process/Execution/ProcessComponent.hpp:93
Definition ExecutionTransaction.hpp:21
Definition GfxExecNode.hpp:134
Definition PortForward.hpp:23
Definition PortForward.hpp:27
Definition MessageBus.hpp:171
Definition MessageBus.hpp:44
Definition ExecutorPortSetup.hpp:344
Definition ExecutorUpdateControlValueInUi.hpp:84