Loading...
Searching...
No Matches
score-plugin-avnd/Crousti/Executor.hpp
1#pragma once
2
3#include <Process/Execution/ProcessComponent.hpp>
4#include <Process/ExecutionContext.hpp>
5
6#include <Explorer/DocumentPlugin/DeviceDocumentPlugin.hpp>
7
8#include <Crousti/CpuAnalysisNode.hpp>
9#include <Crousti/CpuFilterNode.hpp>
10#include <Crousti/ExecutorPortSetup.hpp>
11#include <Crousti/ExecutorUpdateControlValueInUi.hpp>
12#include <Crousti/File.hpp>
13#include <Crousti/GpuComputeNode.hpp>
14#include <Crousti/GpuNode.hpp>
15#include <Crousti/MessageBus.hpp>
16#include <Crousti/Metadatas.hpp>
17#include <Crousti/ProcessModel.hpp>
18
19#include <score/tools/Bind.hpp>
20
21#include <ossia/dataflow/exec_state_facade.hpp>
22#include <ossia/dataflow/node_process.hpp>
23#include <ossia/network/context.hpp>
24
25#include <ossia-qt/invoke.hpp>
26
27#include <QGuiApplication>
28
29#include <flicks.h>
30
31#if SCORE_PLUGIN_GFX
32#include <Crousti/GpuNode.hpp>
33#include <Gfx/GfxApplicationPlugin.hpp>
34#endif
35
36#include <Scenario/Settings/ScenarioSettingsModel.hpp>
37
38#include <score/tools/ThreadPool.hpp>
39
40#include <ossia/detail/type_if.hpp>
41
42#include <QTimer>
43
44#include <avnd/binding/ossia/data_node.hpp>
45#include <avnd/binding/ossia/mono_audio_node.hpp>
46#include <avnd/binding/ossia/node.hpp>
47#include <avnd/binding/ossia/ossia_audio_node.hpp>
48#include <avnd/binding/ossia/ossia_dynamic_audio_node.hpp>
49#include <avnd/binding/ossia/poly_audio_node.hpp>
50#include <avnd/concepts/temporality.hpp>
51#include <avnd/concepts/ui.hpp>
52#include <avnd/concepts/worker.hpp>
53
54namespace oscr
55{
56
57template <typename Node>
58class CustomNodeProcess : public ossia::node_process
59{
60 using node_process::node_process;
61 void start() override
62 {
63 node_process::start();
64 auto& n = static_cast<safe_node<Node>&>(*node);
65 if_possible(n.impl.effect.start());
66 }
67 void pause() override
68 {
69 auto& n = static_cast<safe_node<Node>&>(*node);
70 if_possible(n.impl.effect.pause());
71 node_process::pause();
72 }
73
74 void resume() override
75 {
76 node_process::resume();
77 auto& n = static_cast<safe_node<Node>&>(*node);
78 if_possible(n.impl.effect.resume());
79 }
80
81 void stop() override
82 {
83 auto& n = static_cast<safe_node<Node>&>(*node);
84 if_possible(n.impl.effect.stop());
85 node_process::stop();
86 }
87
88 void offset_impl(ossia::time_value date) override
89 {
90 node_process::offset_impl(date);
91 auto& n = static_cast<safe_node<Node>&>(*node);
92 util::flicks f{date.impl};
93 if_possible(n.impl.effect.transport(f));
94 }
95
96 void transport_impl(ossia::time_value date) override
97 {
98 node_process::transport_impl(date);
99 auto& n = static_cast<safe_node<Node>&>(*node);
100
101 util::flicks f{date.impl};
102 if_possible(n.impl.effect.transport(f));
103 }
104};
105
106template <typename Node>
107class Executor final
108 : public Execution::ProcessComponent_T<ProcessModel<Node>, ossia::node_process>
109{
110 Process::Inlets m_oldInlets;
111 Process::Outlets m_oldOutlets;
112
113public:
114 static Q_DECL_RELAXED_CONSTEXPR UuidKey<score::Component> static_key() noexcept
115 {
116 return uuid_from_string<Node>();
117 }
118
119 UuidKey<score::Component> key() const noexcept final override { return static_key(); }
120
121 bool key_match(UuidKey<score::Component> other) const noexcept final override
122 {
123 return static_key() == other || Execution::ProcessComponent::base_key_match(other);
124 }
125
126#if defined(SCORE_PLUGIN_GFX)
127 [[no_unique_address]] ossia::type_if<int, is_gpu<Node>> node_id = score::gfx::invalid_node_index;
128#endif
129
130 Executor(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
132 element, ctx, "Executor::ProcessModel<Info>", p}
133 {
134 if constexpr(is_gpu<Node>)
135 {
136#if SCORE_PLUGIN_GFX
137 setup_gpu(element, ctx, p);
138#endif
139 }
140 else
141 {
142 setup_cpu(element, ctx, p);
143 }
144
145 if constexpr(avnd::tag_process_exec<Node>)
146 {
147 this->m_ossia_process = std::make_shared<CustomNodeProcess<Node>>(this->node);
148 }
149 else
150 {
151 this->m_ossia_process = std::make_shared<ossia::node_process>(this->node);
152 }
153 }
154
155 void
156 setup_cpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
157 {
158 auto& net_ctx
159 = *ctx.doc.findPlugin<Explorer::DeviceDocumentPlugin>()->networkContext();
160 const auto id
161 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
162
163 auto st = ossia::exec_state_facade{ctx.execState.get()};
164 std::shared_ptr<safe_node<Node>> ptr;
165 auto node = new safe_node<Node>{st.bufferSize(), (double)st.sampleRate(), id};
166 node->root_inputs().reserve(element.inlets().size());
167 node->root_outputs().reserve(element.outlets().size());
168
169 node->prepare(*ctx.execState.get()); // Preparation of the ossia side
170
171 if_possible(node->impl.effect.ossia_state = st);
172 if_possible(node->impl.effect.io_context = &net_ctx.context);
173 if_possible(node->impl.effect.ossia_document_context = &ctx.doc);
174 ptr.reset(node);
175 this->node = ptr;
176
177 if constexpr(requires { ptr->impl.effect; })
178 if constexpr(std::is_same_v<std::decay_t<decltype(ptr->impl.effect)>, Node>)
179 connect_message_bus(element, ctx, ptr->impl.effect);
180 connect_worker(ctx, ptr->impl);
181
182 node->dynamic_ports = element.dynamic_ports;
183 node->finish_init();
184
185 connect_controls(element, ctx, ptr);
186 update_controls(ptr);
187 QObject::connect(
188 &element, &Process::ProcessModel::inletsChanged, this,
189 &Executor::recompute_ports);
190 QObject::connect(
191 &element, &Process::ProcessModel::outletsChanged, this,
192 &Executor::recompute_ports);
193
194 // To call prepare() after evertyhing is ready
195 node->audio_configuration_changed(st);
196
197 m_oldInlets = element.inlets();
198 m_oldOutlets = element.outlets();
199 }
200
201 void
202 setup_gpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
203 {
204#if SCORE_PLUGIN_GFX
205 // FIXME net context for gpu node ?
206 const int64_t id
207 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
208
209 auto& gfx_exec = ctx.doc.plugin<Gfx::DocumentPlugin>().exec;
210
211 // Create the executor in the audio thread
212 struct named_exec_node final : Gfx::gfx_exec_node
213 {
214 using Gfx::gfx_exec_node::gfx_exec_node;
215 std::string label() const noexcept override
216 {
217 return std::string(avnd::get_name<Node>());
218 }
219 };
220
221 auto node = std::make_shared<named_exec_node>(gfx_exec);
222 node->prepare(*ctx.execState);
223
224 this->node = node;
225
226 // Create the controls, inputs outputs etc.
227 std::size_t i = 0;
228
229 for(auto& ctl : element.inlets())
230 {
231 if(auto ctrl = qobject_cast<Process::ControlInlet*>(ctl))
232 {
233 auto& p = node->add_control();
234 p->value = ctrl->value();
235 p->changed = true;
236
237 QObject::connect(
238 ctrl, &Process::ControlInlet::valueChanged, this,
239 Gfx::con_unvalidated{ctx, i, 0, node});
240 i++;
241 }
242 else if(auto ctrl = qobject_cast<Process::ValueInlet*>(ctl))
243 {
244 auto& p = node->add_control();
245 p->changed = true;
246 i++;
247 }
248 else if(auto ctrl = qobject_cast<Process::AudioInlet*>(ctl))
249 {
250 node->add_audio();
251 }
252 else if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
253 {
254 ossia::texture_inlet& inl = *node->add_texture();
255 ctrl->setupExecution(inl, this);
256 }
257 else if(auto ctrl = qobject_cast<Gfx::GeometryInlet*>(ctl))
258 {
259 ossia::geometry_inlet& inl = *node->add_geometry();
260 ctrl->setupExecution(inl, this);
261 }
262 }
263
264 // FIXME refactor this with other GFX processes
265 for(auto* outlet : element.outlets())
266 {
267 if(auto ctrl = qobject_cast<Process::ControlOutlet*>(outlet))
268 {
269 node->add_control_out();
270 }
271 else if(auto ctrl = qobject_cast<Process::ValueOutlet*>(outlet))
272 {
273 node->add_control_out();
274 }
275 else if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
276 {
277 node->add_texture_out();
278 out->nodeId = node_id;
279 }
280 else if(auto out = qobject_cast<Gfx::GeometryOutlet*>(outlet))
281 {
282 node->add_geometry_out();
283 }
284 }
285
286 // Create the GPU node
287 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
288 ctx.alias.lock(), &ctx.executionQueue);
289 std::unique_ptr<score::gfx::Node> ptr;
290 if constexpr(GpuGraphicsNode2<Node>)
291 {
292 auto gpu_node = new CustomGpuNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
293 ptr.reset(gpu_node);
294 }
295 else if constexpr(GpuComputeNode2<Node>)
296 {
297 auto gpu_node = new GpuComputeNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
298 ptr.reset(gpu_node);
299 }
300 else if constexpr(GpuNode<Node>)
301 {
302 auto gpu_node
303 = new GfxNode<Node>(element, qex_ptr, node->control_outs, id, ctx.doc);
304 ptr.reset(gpu_node);
305 }
306
307 i = 0;
308 for(auto& ctl : element.inlets())
309 {
310 if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
311 {
312 ossia::texture_inlet& inl
313 = static_cast<ossia::texture_inlet&>(*node->root_inputs()[i]);
314 ptr->process(i, inl.data); // Setup render_target_spec
315 }
316 i++;
317 }
318 node->id = gfx_exec.ui->register_node(std::move(ptr));
319 node_id = node->id;
320#endif
321 }
322
323 void recompute_ports()
324 {
325 Execution::Transaction commands{this->system()};
326 auto n = std::dynamic_pointer_cast<safe_node<Node>>(this->node);
327 if(!n)
328 return;
329
330 // Re-run setup_inlets ?
331 in_exec([dp = this->process().dynamic_ports, node = n] {
332 node->dynamic_ports = dp;
333 node->root_inputs().clear();
334 node->root_outputs().clear();
335 node->initialize_all_ports();
336 });
337 }
338
339 void connect_controls(
340 ProcessModel<Node>& element, const ::Execution::Context& ctx,
341 std::shared_ptr<safe_node<Node>>& ptr)
342 {
343 using dynamic_ports_port_type = avnd::dynamic_ports_input_introspection<Node>;
344 using control_inputs_type = avnd::control_input_introspection<Node>;
345 using curve_inputs_type = avnd::curve_input_introspection<Node>;
346 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
347 using midifile_inputs_type = avnd::midifile_input_introspection<Node>;
348 using raw_file_inputs_type = avnd::raw_file_input_introspection<Node>;
349 using control_outputs_type = avnd::control_output_introspection<Node>;
350
351 // UI controls to engine
352 safe_node<Node>& node = *ptr;
353 avnd::effect_container<Node>& eff = node.impl;
354
355 // Initialize all the controls in the node with the current value.
356 // And update the node when the UI changes
357
358 if constexpr(dynamic_ports_port_type::size > 0)
359 {
360 for(auto state : eff.full_state())
361 {
362 dynamic_ports_port_type::for_all_n2(
363 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
364 }
365 }
366 if constexpr(control_inputs_type::size > 0)
367 {
368 for(auto state : eff.full_state())
369 {
370 control_inputs_type::for_all_n2(
371 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
372 }
373 }
374 if constexpr(curve_inputs_type::size > 0)
375 {
376 for(auto state : eff.full_state())
377 {
378 curve_inputs_type::for_all_n2(
379 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
380 }
381 }
382 if constexpr(soundfile_inputs_type::size > 0)
383 {
384 soundfile_inputs_type::for_all_n2(
385 avnd::get_inputs<Node>(eff),
386 dispatch_control_setup<Node>{element, ctx, ptr, this});
387
388 setup_soundfile_task_pool(element, ctx, ptr);
389 }
390 if constexpr(midifile_inputs_type::size > 0)
391 {
392 midifile_inputs_type::for_all_n2(
393 avnd::get_inputs<Node>(eff),
394 dispatch_control_setup<Node>{element, ctx, ptr, this});
395 }
396 if constexpr(raw_file_inputs_type::size > 0)
397 {
398 raw_file_inputs_type::for_all_n2(
399 avnd::get_inputs<Node>(eff),
400 dispatch_control_setup<Node>{element, ctx, ptr, this});
401 }
402
403 // Engine to ui controls
404 if constexpr(control_inputs_type::size > 0 || control_outputs_type::size > 0)
405 {
406 auto& settings = score::AppContext().settings<Scenario::Settings::Model>();
407 if(settings.getExecutionUpdate())
408 {
409 // Update the value in the UI
410 std::weak_ptr<safe_node<Node>> weak_node = ptr;
411 update_control_value_in_ui<Node> timer_action{weak_node, &element};
412 timer_action();
413
414 con(ctx.doc.coarseUpdateTimer, &QTimer::timeout, this,
415 [timer_action = std::move(timer_action)] { timer_action(); },
416 Qt::QueuedConnection);
417 }
418 }
419 }
420
421 void setup_soundfile_task_pool(
422 ProcessModel<Node>& element, const ::Execution::Context& ctx,
423 std::shared_ptr<safe_node<Node>>& ptr)
424 {
425 safe_node<Node>& node = *ptr;
426
427 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
428
429 auto& tq = score::TaskPool::instance();
430 node.soundfiles.load_request
431 = [&tq, p = std::weak_ptr{ptr}, &ctx](std::string& str, int idx) {
432 auto eff_ptr = p.lock();
433 if(!eff_ptr)
434 return;
435 tq.post([eff_ptr = std::move(eff_ptr), filename = str, &ctx, idx]() mutable {
436 if(auto file = loadSoundfile(filename, ctx.doc, ctx.execState))
437 {
438 ctx.executionQueue.enqueue(
439 [sf = std::move(file), p = std::weak_ptr{eff_ptr}, idx]() mutable {
440 auto eff_ptr = p.lock();
441 if(!eff_ptr)
442 return;
443
444 avnd::effect_container<Node>& eff = eff_ptr->impl;
445 soundfile_inputs_type::for_nth_mapped_n2(
446 avnd::get_inputs<Node>(eff), idx,
447 [&]<std::size_t NField, std::size_t N>(
448 auto& field, avnd::predicate_index<N> p,
449 avnd::field_index<NField> f) {
450 eff_ptr->soundfile_loaded(sf, p, f);
451 });
452 });
453 }
454 });
455 };
456 }
457
458 void connect_message_bus(
459 ProcessModel<Node>& element, const ::Execution::Context& ctx, Node& eff)
460 {
461 // Custom UI messages to engine
462 if constexpr(avnd::has_gui_to_processor_bus<Node>)
463 {
464 element.from_ui = [qex_ptr = weak_exec, &eff](QByteArray b) {
465 auto qex = qex_ptr.lock();
466 if(!qex)
467 return;
468
469 qex->enqueue([mess = std::move(b), &eff]() mutable {
470 using refl = avnd::function_reflection<&Node::process_message>;
471 static_assert(refl::count <= 1);
472
473 if constexpr(refl::count == 0)
474 {
475 // no arguments, just call it
476 eff.process_message();
477 }
478 else if constexpr(refl::count == 1)
479 {
480 using arg_type = avnd::first_argument<&Node::process_message>;
481 std::decay_t<arg_type> arg;
482 MessageBusReader reader{mess};
483 reader(arg);
484 eff.process_message(std::move(arg));
485 }
486 });
487 };
488 }
489
490 if constexpr(avnd::has_processor_to_gui_bus<Node>)
491 {
492 if constexpr(requires { eff.send_message = [](auto&&) { }; })
493 {
494 eff.send_message = [proc = QPointer{&this->process()},
495 qed_ptr = weak_edit]<typename T>(T&& b) mutable {
496 auto qed = qed_ptr.lock();
497 if(!qed)
498 return;
499 if constexpr(
500 sizeof(QPointer<QObject>) + sizeof(b)
501 < Execution::ExecutionCommand::max_storage)
502 {
503 qed->enqueue([proc, bb = std::move(b)]() mutable {
504 if(proc && proc->to_ui)
505 MessageBusSender{proc->to_ui}(std::move(bb));
506 });
507 }
508 else
509 {
510 qed->enqueue(
511 [proc, bb = std::make_unique<std::decay_t<T>>(std::move(b))]() mutable {
512 if(proc && proc->to_ui)
513 MessageBusSender{proc->to_ui}(*std::move(bb));
514 });
515 }
516 };
517 }
518 else if constexpr(requires { eff.send_message = []() { }; })
519 {
520 eff.send_message
521 = [proc = QPointer{&this->process()}, qed_ptr = weak_edit]() mutable {
522 if(!proc)
523 return;
524 auto qed = qed_ptr.lock();
525 if(!qed)
526 return;
527
528 qed->enqueue([proc]() mutable {
529 if(proc && proc->to_ui)
530 MessageBusSender{proc->to_ui}();
531 });
532 };
533 }
534 }
535 }
536
537 void connect_worker(const ::Execution::Context& ctx, avnd::effect_container<Node>& eff)
538 {
539 if constexpr(avnd::has_worker<Node>)
540 {
541 // Initialize the thread pool beforehand
542 auto& tq = score::TaskPool::instance();
543 using worker_type = decltype(eff.effect.worker);
544 for(auto& eff : eff.effects())
545 {
546 std::weak_ptr eff_ptr = std::shared_ptr<Node>(this->node, &eff);
547 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
548 ctx.alias.lock(), &ctx.executionQueue);
549
550 eff.worker.request
551 = [&tq, qex_ptr = std::move(qex_ptr),
552 eff_ptr = std::move(eff_ptr)]<typename... Args>(Args&&... f) mutable {
553 // request() is invoked in the DSP / processor thread
554 // and just posts the task to the thread pool
555 tq.post([eff_ptr, qex_ptr, ... ff = std::forward<Args>(f)]() mutable {
556 // This happens in the worker thread
557 // If for some reason the object has already been removed, not much
558 // reason to perform the work
559 if(!eff_ptr.lock())
560 return;
561
562 using type_of_result
563 = decltype(worker_type::work(std::forward<decltype(ff)>(ff)...));
564 if constexpr(std::is_void_v<type_of_result>)
565 {
566 worker_type::work(std::forward<decltype(ff)>(ff)...);
567 }
568 else
569 {
570 // If the worker returns a std::function, it
571 // is to be invoked back in the processor DSP thread
572 auto res = worker_type::work(std::forward<decltype(ff)>(ff)...);
573 if(!res)
574 return;
575
576 // Execution queue is currently spsc from main thread to an exec thread,
577 // we cannot just yeet the result back from the thread-pool
578 ossia::qt::run_async(
579 qApp, [eff_ptr = std::move(eff_ptr), qex_ptr = std::move(qex_ptr),
580 res = std::move(res)]() mutable {
581 // Main thread
582 std::shared_ptr qex = qex_ptr.lock();
583 if(!qex)
584 return;
585
586 qex->enqueue(
587 [eff_ptr = std::move(eff_ptr), res = std::move(res)]() mutable {
588 // DSP / processor thread
589 // We need res to be mutable so that the worker can use it to e.g. store
590 // old data which will be freed back in the main thread
591 if(auto p = eff_ptr.lock())
592 res(*p);
593 });
594 });
595 }
596 });
597 };
598 }
599 }
600 }
601
602 // Update everything
603 void update_controls(std::shared_ptr<safe_node<Node>>& ptr)
604 {
605 avnd::effect_container<Node>& eff = ptr->impl;
606 {
607 for(auto state : eff.full_state())
608 {
609 avnd::input_introspection<Node>::for_all(
610 state.inputs, [&](auto& field) { if_possible(field.update(state.effect)); });
611 }
612 }
613 }
614
615 void cleanup() override
616 {
617 if constexpr(requires { this->process().from_ui; })
618 {
619 this->process().from_ui = [](QByteArray arr) {};
620 }
621 // FIXME cleanup eff.effect.send_message too ?
622
623#if SCORE_PLUGIN_GFX
624 if constexpr(is_gpu<Node>)
625 {
626 // FIXME this must move in the Node dtor. See video_node
627 auto& gfx_exec = this->system().doc.template plugin<Gfx::DocumentPlugin>().exec;
628 if(node_id >= 0)
629 gfx_exec.ui->unregister_node(node_id);
630 }
631
632 // FIXME refactor this with other GFX processes
633 for(auto* outlet : this->process().outlets())
634 {
635 if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
636 {
637 out->nodeId = -1;
638 }
639 }
640#endif
641 ::Execution::ProcessComponent::cleanup();
642 }
643
644 ~Executor() { }
645};
646}
Definition GfxApplicationPlugin.hpp:13
Definition GfxExecNode.hpp:40
The Path class is a typesafe wrapper around ObjectPath.
Definition Path.hpp:52
Definition ScenarioSettingsModel.hpp:19
Definition UuidKey.hpp:345
Definition score-plugin-avnd/Crousti/Executor.hpp:59
Definition score-plugin-avnd/Crousti/Executor.hpp:109
Definition score-plugin-avnd/Crousti/ProcessModel.hpp:86
Definition Metadatas.hpp:29
Definition Metadatas.hpp:23
Definition Metadatas.hpp:15
Definition Metadatas.hpp:35
Definition Factories.hpp:19
Definition Process/Execution/ProcessComponent.hpp:93
Definition ExecutionTransaction.hpp:21
Definition GfxExecNode.hpp:134
Definition PortForward.hpp:23
Definition PortForward.hpp:27
Definition MessageBus.hpp:171
Definition MessageBus.hpp:44
Definition ExecutorPortSetup.hpp:344
Definition ExecutorUpdateControlValueInUi.hpp:84
T & settings() const
Access a specific Settings model instance.
Definition ApplicationContext.hpp:41