Loading...
Searching...
No Matches
score-plugin-avnd/Crousti/Executor.hpp
1#pragma once
2
3#include <Process/Execution/ProcessComponent.hpp>
4#include <Process/ExecutionContext.hpp>
5
6#include <Explorer/DocumentPlugin/DeviceDocumentPlugin.hpp>
7
8#include <Crousti/CpuAnalysisNode.hpp>
9#include <Crousti/CpuFilterNode.hpp>
10#include <Crousti/CpuGeneratorNode.hpp>
11#include <Crousti/ExecutorPortSetup.hpp>
12#include <Crousti/ExecutorUpdateControlValueInUi.hpp>
13#include <Crousti/File.hpp>
14#include <Crousti/GpuComputeNode.hpp>
15#include <Crousti/GpuNode.hpp>
16#include <Crousti/MessageBus.hpp>
17#include <Crousti/Metadatas.hpp>
18#include <Crousti/ProcessModel.hpp>
19
20#include <score/tools/Bind.hpp>
21
22#include <ossia/dataflow/exec_state_facade.hpp>
23#include <ossia/dataflow/node_process.hpp>
24#include <ossia/network/context.hpp>
25
26#include <ossia-qt/invoke.hpp>
27
28#include <QGuiApplication>
29
30#include <flicks.h>
31
32#if SCORE_PLUGIN_GFX
33#include <Crousti/GpuNode.hpp>
34#include <Gfx/GfxApplicationPlugin.hpp>
35#endif
36
37#include <score/tools/ThreadPool.hpp>
38
39#include <ossia/detail/type_if.hpp>
40
41#include <QTimer>
42
43#include <avnd/binding/ossia/data_node.hpp>
44#include <avnd/binding/ossia/mono_audio_node.hpp>
45#include <avnd/binding/ossia/node.hpp>
46#include <avnd/binding/ossia/ossia_audio_node.hpp>
47#include <avnd/binding/ossia/poly_audio_node.hpp>
48#include <avnd/concepts/temporality.hpp>
49#include <avnd/concepts/ui.hpp>
50#include <avnd/concepts/worker.hpp>
51
52namespace oscr
53{
54
55template <typename Node>
56class CustomNodeProcess : public ossia::node_process
57{
58 using node_process::node_process;
59 void start() override
60 {
61 node_process::start();
62 auto& n = static_cast<safe_node<Node>&>(*node);
63 if_possible(n.impl.effect.start());
64 }
65 void pause() override
66 {
67 auto& n = static_cast<safe_node<Node>&>(*node);
68 if_possible(n.impl.effect.pause());
69 node_process::pause();
70 }
71
72 void resume() override
73 {
74 node_process::resume();
75 auto& n = static_cast<safe_node<Node>&>(*node);
76 if_possible(n.impl.effect.resume());
77 }
78
79 void stop() override
80 {
81 auto& n = static_cast<safe_node<Node>&>(*node);
82 if_possible(n.impl.effect.stop());
83 node_process::stop();
84 }
85
86 void offset_impl(ossia::time_value date) override
87 {
88 node_process::offset_impl(date);
89 auto& n = static_cast<safe_node<Node>&>(*node);
90 util::flicks f{date.impl};
91 if_possible(n.impl.effect.transport(f));
92 }
93
94 void transport_impl(ossia::time_value date) override
95 {
96 node_process::transport_impl(date);
97 auto& n = static_cast<safe_node<Node>&>(*node);
98
99 util::flicks f{date.impl};
100 if_possible(n.impl.effect.transport(f));
101 }
102};
103
104template <typename Node>
105class Executor final
106 : public Execution::ProcessComponent_T<ProcessModel<Node>, ossia::node_process>
107{
108 Process::Inlets m_oldInlets;
109 Process::Outlets m_oldOutlets;
110
111public:
112 static Q_DECL_RELAXED_CONSTEXPR UuidKey<score::Component> static_key() noexcept
113 {
114 return uuid_from_string<Node>();
115 }
116
117 UuidKey<score::Component> key() const noexcept final override { return static_key(); }
118
119 bool key_match(UuidKey<score::Component> other) const noexcept final override
120 {
121 return static_key() == other || Execution::ProcessComponent::base_key_match(other);
122 }
123
124 [[no_unique_address]] ossia::type_if<int, is_gpu<Node>> node_id = -1;
125
126 Executor(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
128 element, ctx, "Executor::ProcessModel<Info>", p}
129 {
130 if constexpr(is_gpu<Node>)
131 {
132#if SCORE_PLUGIN_GFX
133 setup_gpu(element, ctx, p);
134#endif
135 }
136 else
137 {
138 setup_cpu(element, ctx, p);
139 }
140
141 if constexpr(avnd::tag_process_exec<Node>)
142 {
143 this->m_ossia_process = std::make_shared<CustomNodeProcess<Node>>(this->node);
144 }
145 else
146 {
147 this->m_ossia_process = std::make_shared<ossia::node_process>(this->node);
148 }
149 }
150
151 void
152 setup_cpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
153 {
154 auto& net_ctx
155 = *ctx.doc.findPlugin<Explorer::DeviceDocumentPlugin>()->networkContext();
156 const auto id
157 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
158
159 auto st = ossia::exec_state_facade{ctx.execState.get()};
160 std::shared_ptr<safe_node<Node>> ptr;
161 auto node = new safe_node<Node>{st.bufferSize(), (double)st.sampleRate(), id};
162 node->root_inputs().reserve(element.inlets().size());
163 node->root_outputs().reserve(element.outlets().size());
164
165 node->prepare(*ctx.execState.get()); // Preparation of the ossia side
166
167 if_possible(node->impl.effect.ossia_state = st);
168 if_possible(node->impl.effect.io_context = &net_ctx.context);
169 if_possible(node->impl.effect.ossia_document_context = &ctx.doc);
170 ptr.reset(node);
171 this->node = ptr;
172
173 if constexpr(requires { ptr->impl.effect; })
174 if constexpr(std::is_same_v<std::decay_t<decltype(ptr->impl.effect)>, Node>)
175 connect_message_bus(element, ctx, ptr->impl.effect);
176 connect_worker(ctx, ptr->impl);
177
178 node->dynamic_ports = element.dynamic_ports;
179 node->finish_init();
180
181 connect_controls(element, ctx, ptr);
182 update_controls(ptr);
183 QObject::connect(
184 &element, &Process::ProcessModel::inletsChanged, this,
185 &Executor::recompute_ports);
186 QObject::connect(
187 &element, &Process::ProcessModel::outletsChanged, this,
188 &Executor::recompute_ports);
189
190 // To call prepare() after evertyhing is ready
191 node->audio_configuration_changed(st);
192
193 m_oldInlets = element.inlets();
194 m_oldOutlets = element.outlets();
195 }
196
197 void
198 setup_gpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
199 {
200#if SCORE_PLUGIN_GFX
201 // FIXME net context for gpu node ?
202 const int64_t id
203 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
204
205 auto& gfx_exec = ctx.doc.plugin<Gfx::DocumentPlugin>().exec;
206
207 // Create the executor in the audio thread
208 struct named_exec_node final : Gfx::gfx_exec_node
209 {
210 using Gfx::gfx_exec_node::gfx_exec_node;
211 std::string label() const noexcept override
212 {
213 return std::string(avnd::get_name<Node>());
214 }
215 };
216
217 auto node = std::make_shared<named_exec_node>(gfx_exec);
218 node->prepare(*ctx.execState);
219
220 this->node = node;
221
222 // Create the controls, inputs outputs etc.
223 std::size_t i = 0;
224
225 for(auto& ctl : element.inlets())
226 {
227 if(auto ctrl = qobject_cast<Process::ControlInlet*>(ctl))
228 {
229 auto& p = node->add_control();
230 p->value = ctrl->value();
231 p->changed = true;
232
233 QObject::connect(
234 ctrl, &Process::ControlInlet::valueChanged, this,
235 Gfx::con_unvalidated{ctx, i, 0, node});
236 i++;
237 }
238 else if(auto ctrl = qobject_cast<Process::ValueInlet*>(ctl))
239 {
240 auto& p = node->add_control();
241 p->changed = true;
242 i++;
243 }
244 else if(auto ctrl = qobject_cast<Process::AudioInlet*>(ctl))
245 {
246 node->add_audio();
247 }
248 else if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
249 {
250 ossia::texture_inlet& inl = *node->add_texture();
251 ctrl->setupExecution(inl, this);
252 }
253 }
254
255 // FIXME refactor this with other GFX processes
256 for(auto* outlet : element.outlets())
257 {
258 if(auto ctrl = qobject_cast<Process::ControlOutlet*>(outlet))
259 {
260 node->add_control_out();
261 }
262 else if(auto ctrl = qobject_cast<Process::ValueOutlet*>(outlet))
263 {
264 node->add_control_out();
265 }
266 else if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
267 {
268 node->add_texture_out();
269 out->nodeId = node_id;
270 }
271 }
272
273 // Create the GPU node
274 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
275 ctx.alias.lock(), &ctx.executionQueue);
276 std::unique_ptr<score::gfx::Node> ptr;
277 if constexpr(GpuGraphicsNode2<Node>)
278 {
279 auto gpu_node = new CustomGpuNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
280 ptr.reset(gpu_node);
281 }
282 else if constexpr(GpuComputeNode2<Node>)
283 {
284 auto gpu_node = new GpuComputeNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
285 ptr.reset(gpu_node);
286 }
287 else if constexpr(GpuNode<Node>)
288 {
289 auto gpu_node
290 = new GfxNode<Node>(element, qex_ptr, node->control_outs, id, ctx.doc);
291 ptr.reset(gpu_node);
292 }
293
294 i = 0;
295 for(auto& ctl : element.inlets())
296 {
297 if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
298 {
299 ossia::texture_inlet& inl
300 = static_cast<ossia::texture_inlet&>(*node->root_inputs()[i]);
301 ptr->process(i, inl.data); // Setup render_target_spec
302 }
303 i++;
304 }
305 node->id = gfx_exec.ui->register_node(std::move(ptr));
306 node_id = node->id;
307#endif
308 }
309
310 void recompute_ports()
311 {
312 Execution::Transaction commands{this->system()};
313 auto n = std::dynamic_pointer_cast<safe_node<Node>>(this->node);
314 if(!n)
315 return;
316
317 // Re-run setup_inlets ?
318 in_exec([dp = this->process().dynamic_ports, node = n] {
319 node->dynamic_ports = dp;
320 node->root_inputs().clear();
321 node->root_outputs().clear();
322 node->initialize_all_ports();
323 });
324 }
325
326 void connect_controls(
327 ProcessModel<Node>& element, const ::Execution::Context& ctx,
328 std::shared_ptr<safe_node<Node>>& ptr)
329 {
330 using dynamic_ports_port_type = avnd::dynamic_ports_input_introspection<Node>;
331 using control_inputs_type = avnd::control_input_introspection<Node>;
332 using curve_inputs_type = avnd::curve_input_introspection<Node>;
333 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
334 using midifile_inputs_type = avnd::midifile_input_introspection<Node>;
335 using raw_file_inputs_type = avnd::raw_file_input_introspection<Node>;
336 using control_outputs_type = avnd::control_output_introspection<Node>;
337
338 // UI controls to engine
339 safe_node<Node>& node = *ptr;
340 avnd::effect_container<Node>& eff = node.impl;
341
342 // Initialize all the controls in the node with the current value.
343 // And update the node when the UI changes
344
345 if constexpr(dynamic_ports_port_type::size > 0)
346 {
347 for(auto state : eff.full_state())
348 {
349 dynamic_ports_port_type::for_all_n2(
350 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
351 }
352 }
353 if constexpr(control_inputs_type::size > 0)
354 {
355 for(auto state : eff.full_state())
356 {
357 control_inputs_type::for_all_n2(
358 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
359 }
360 }
361 if constexpr(curve_inputs_type::size > 0)
362 {
363 for(auto state : eff.full_state())
364 {
365 curve_inputs_type::for_all_n2(
366 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
367 }
368 }
369 if constexpr(soundfile_inputs_type::size > 0)
370 {
371 soundfile_inputs_type::for_all_n2(
372 avnd::get_inputs<Node>(eff),
373 dispatch_control_setup<Node>{element, ctx, ptr, this});
374
375 setup_soundfile_task_pool(element, ctx, ptr);
376 }
377 if constexpr(midifile_inputs_type::size > 0)
378 {
379 midifile_inputs_type::for_all_n2(
380 avnd::get_inputs<Node>(eff),
381 dispatch_control_setup<Node>{element, ctx, ptr, this});
382 }
383 if constexpr(raw_file_inputs_type::size > 0)
384 {
385 raw_file_inputs_type::for_all_n2(
386 avnd::get_inputs<Node>(eff),
387 dispatch_control_setup<Node>{element, ctx, ptr, this});
388 }
389
390 // Engine to ui controls
391 if constexpr(control_inputs_type::size > 0 || control_outputs_type::size > 0)
392 {
393 // Update the value in the UI
394 std::weak_ptr<safe_node<Node>> weak_node = ptr;
395 update_control_value_in_ui<Node> timer_action{weak_node, &element};
396 timer_action();
397
398 con(ctx.doc.coarseUpdateTimer, &QTimer::timeout, this,
399 [timer_action = std::move(timer_action)] { timer_action(); },
400 Qt::QueuedConnection);
401 }
402 }
403
404 void setup_soundfile_task_pool(
405 ProcessModel<Node>& element, const ::Execution::Context& ctx,
406 std::shared_ptr<safe_node<Node>>& ptr)
407 {
408 safe_node<Node>& node = *ptr;
409
410 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
411
412 auto& tq = score::TaskPool::instance();
413 node.soundfiles.load_request
414 = [&tq, p = std::weak_ptr{ptr}, &ctx](std::string& str, int idx) {
415 auto eff_ptr = p.lock();
416 if(!eff_ptr)
417 return;
418 tq.post([eff_ptr = std::move(eff_ptr), filename = str, &ctx, idx]() mutable {
419 if(auto file = loadSoundfile(filename, ctx.doc, ctx.execState))
420 {
421 ctx.executionQueue.enqueue(
422 [sf = std::move(file), p = std::weak_ptr{eff_ptr}, idx]() mutable {
423 auto eff_ptr = p.lock();
424 if(!eff_ptr)
425 return;
426
427 avnd::effect_container<Node>& eff = eff_ptr->impl;
428 soundfile_inputs_type::for_nth_mapped_n2(
429 avnd::get_inputs<Node>(eff), idx,
430 [&]<std::size_t NField, std::size_t N>(
431 auto& field, avnd::predicate_index<N> p,
432 avnd::field_index<NField> f) {
433 eff_ptr->soundfile_loaded(sf, p, f);
434 });
435 });
436 }
437 });
438 };
439 }
440
441 void connect_message_bus(
442 ProcessModel<Node>& element, const ::Execution::Context& ctx, Node& eff)
443 {
444 // Custom UI messages to engine
445 if constexpr(avnd::has_gui_to_processor_bus<Node>)
446 {
447 element.from_ui = [qex_ptr = weak_exec, &eff](QByteArray b) {
448 auto qex = qex_ptr.lock();
449 if(!qex)
450 return;
451
452 qex->enqueue([mess = std::move(b), &eff]() mutable {
453 using refl = avnd::function_reflection<&Node::process_message>;
454 static_assert(refl::count <= 1);
455
456 if constexpr(refl::count == 0)
457 {
458 // no arguments, just call it
459 eff.process_message();
460 }
461 else if constexpr(refl::count == 1)
462 {
463 using arg_type = avnd::first_argument<&Node::process_message>;
464 std::decay_t<arg_type> arg;
465 MessageBusReader reader{mess};
466 reader(arg);
467 eff.process_message(std::move(arg));
468 }
469 });
470 };
471 }
472
473 if constexpr(avnd::has_processor_to_gui_bus<Node>)
474 {
475 if constexpr(requires { eff.send_message = [](auto&&) { }; })
476 {
477 eff.send_message = [proc = QPointer{&this->process()},
478 qed_ptr = weak_edit]<typename T>(T&& b) mutable {
479 auto qed = qed_ptr.lock();
480 if(!qed)
481 return;
482 if constexpr(
483 sizeof(QPointer<QObject>) + sizeof(b)
484 < Execution::ExecutionCommand::max_storage)
485 {
486 qed->enqueue([proc, bb = std::move(b)]() mutable {
487 if(proc && proc->to_ui)
488 MessageBusSender{proc->to_ui}(std::move(bb));
489 });
490 }
491 else
492 {
493 qed->enqueue(
494 [proc, bb = std::make_unique<std::decay_t<T>>(std::move(b))]() mutable {
495 if(proc && proc->to_ui)
496 MessageBusSender{proc->to_ui}(*std::move(bb));
497 });
498 }
499 };
500 }
501 else if constexpr(requires { eff.send_message = []() { }; })
502 {
503 eff.send_message
504 = [proc = QPointer{&this->process()}, qed_ptr = weak_edit]() mutable {
505 if(!proc)
506 return;
507 auto qed = qed_ptr.lock();
508 if(!qed)
509 return;
510
511 qed->enqueue([proc]() mutable {
512 if(proc && proc->to_ui)
513 MessageBusSender{proc->to_ui}();
514 });
515 };
516 }
517 }
518 }
519
520 void connect_worker(const ::Execution::Context& ctx, avnd::effect_container<Node>& eff)
521 {
522 if constexpr(avnd::has_worker<Node>)
523 {
524 // Initialize the thread pool beforehand
525 auto& tq = score::TaskPool::instance();
526 using worker_type = decltype(eff.effect.worker);
527 for(auto& eff : eff.effects())
528 {
529 std::weak_ptr eff_ptr = std::shared_ptr<Node>(this->node, &eff);
530 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
531 ctx.alias.lock(), &ctx.executionQueue);
532
533 eff.worker.request
534 = [&tq, qex_ptr = std::move(qex_ptr),
535 eff_ptr = std::move(eff_ptr)]<typename... Args>(Args&&... f) mutable {
536 // request() is invoked in the DSP / processor thread
537 // and just posts the task to the thread pool
538 tq.post([eff_ptr, qex_ptr, ... ff = std::forward<Args>(f)]() mutable {
539 // This happens in the worker thread
540 // If for some reason the object has already been removed, not much
541 // reason to perform the work
542 if(!eff_ptr.lock())
543 return;
544
545 using type_of_result
546 = decltype(worker_type::work(std::forward<decltype(ff)>(ff)...));
547 if constexpr(std::is_void_v<type_of_result>)
548 {
549 worker_type::work(std::forward<decltype(ff)>(ff)...);
550 }
551 else
552 {
553 // If the worker returns a std::function, it
554 // is to be invoked back in the processor DSP thread
555 auto res = worker_type::work(std::forward<decltype(ff)>(ff)...);
556 if(!res)
557 return;
558
559 // Execution queue is currently spsc from main thread to an exec thread,
560 // we cannot just yeet the result back from the thread-pool
561 ossia::qt::run_async(
562 qApp, [eff_ptr = std::move(eff_ptr), qex_ptr = std::move(qex_ptr),
563 res = std::move(res)]() mutable {
564 // Main thread
565 std::shared_ptr qex = qex_ptr.lock();
566 if(!qex)
567 return;
568
569 qex->enqueue(
570 [eff_ptr = std::move(eff_ptr), res = std::move(res)]() mutable {
571 // DSP / processor thread
572 // We need res to be mutable so that the worker can use it to e.g. store
573 // old data which will be freed back in the main thread
574 if(auto p = eff_ptr.lock())
575 res(*p);
576 });
577 });
578 }
579 });
580 };
581 }
582 }
583 }
584
585 // Update everything
586 void update_controls(std::shared_ptr<safe_node<Node>>& ptr)
587 {
588 avnd::effect_container<Node>& eff = ptr->impl;
589 {
590 for(auto state : eff.full_state())
591 {
592 avnd::input_introspection<Node>::for_all(
593 state.inputs, [&](auto& field) { if_possible(field.update(state.effect)); });
594 }
595 }
596 }
597
598 void cleanup() override
599 {
600 if constexpr(requires { this->process().from_ui; })
601 {
602 this->process().from_ui = [](QByteArray arr) {};
603 }
604 // FIXME cleanup eff.effect.send_message too ?
605
606#if SCORE_PLUGIN_GFX
607 if constexpr(is_gpu<Node>)
608 {
609 // FIXME this must move in the Node dtor. See video_node
610 auto& gfx_exec = this->system().doc.template plugin<Gfx::DocumentPlugin>().exec;
611 if(node_id >= 0)
612 gfx_exec.ui->unregister_node(node_id);
613 }
614
615 // FIXME refactor this with other GFX processes
616 for(auto* outlet : this->process().outlets())
617 {
618 if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
619 {
620 out->nodeId = -1;
621 }
622 }
623#endif
624 ::Execution::ProcessComponent::cleanup();
625 }
626
627 ~Executor() { }
628};
629}
Definition GfxApplicationPlugin.hpp:13
Definition GfxExecNode.hpp:40
The Path class is a typesafe wrapper around ObjectPath.
Definition Path.hpp:52
Definition UuidKey.hpp:344
Definition score-plugin-avnd/Crousti/Executor.hpp:57
Definition score-plugin-avnd/Crousti/Executor.hpp:107
Definition score-plugin-avnd/Crousti/ProcessModel.hpp:86
Definition Metadatas.hpp:25
Definition Metadatas.hpp:19
Definition Metadatas.hpp:15
Definition Metadatas.hpp:31
Definition Factories.hpp:19
Definition Process/Execution/ProcessComponent.hpp:89
Definition ExecutionTransaction.hpp:20
Definition GfxExecNode.hpp:120
Definition PortForward.hpp:23
Definition PortForward.hpp:27
Definition MessageBus.hpp:171
Definition MessageBus.hpp:44
Definition ExecutorPortSetup.hpp:344
Definition ExecutorUpdateControlValueInUi.hpp:84