Loading...
Searching...
No Matches
score-plugin-avnd/Crousti/Executor.hpp
1#pragma once
2
3#include <Process/Execution/ProcessComponent.hpp>
4#include <Process/ExecutionContext.hpp>
5
6#include <Explorer/DocumentPlugin/DeviceDocumentPlugin.hpp>
7
8#include <Crousti/CpuAnalysisNode.hpp>
9#include <Crousti/CpuFilterNode.hpp>
10#include <Crousti/CpuGeneratorNode.hpp>
11#include <Crousti/ExecutorPortSetup.hpp>
12#include <Crousti/ExecutorUpdateControlValueInUi.hpp>
13#include <Crousti/File.hpp>
14#include <Crousti/GpuComputeNode.hpp>
15#include <Crousti/GpuNode.hpp>
16#include <Crousti/MessageBus.hpp>
17#include <Crousti/Metadatas.hpp>
18#include <Crousti/ProcessModel.hpp>
19
20#include <score/tools/Bind.hpp>
21
22#include <ossia/dataflow/exec_state_facade.hpp>
23#include <ossia/dataflow/node_process.hpp>
24#include <ossia/network/context.hpp>
25
26#include <ossia-qt/invoke.hpp>
27
28#include <QGuiApplication>
29
30#include <flicks.h>
31
32#if SCORE_PLUGIN_GFX
33#include <Crousti/GpuNode.hpp>
34#include <Gfx/GfxApplicationPlugin.hpp>
35#endif
36
37#include <score/tools/ThreadPool.hpp>
38
39#include <ossia/detail/type_if.hpp>
40
41#include <QTimer>
42
43#include <avnd/binding/ossia/data_node.hpp>
44#include <avnd/binding/ossia/mono_audio_node.hpp>
45#include <avnd/binding/ossia/node.hpp>
46#include <avnd/binding/ossia/ossia_audio_node.hpp>
47#include <avnd/binding/ossia/poly_audio_node.hpp>
48#include <avnd/concepts/temporality.hpp>
49#include <avnd/concepts/ui.hpp>
50#include <avnd/concepts/worker.hpp>
51
52namespace oscr
53{
54
55template <typename Node>
56class CustomNodeProcess : public ossia::node_process
57{
58 using node_process::node_process;
59 void start() override
60 {
61 node_process::start();
62 auto& n = static_cast<safe_node<Node>&>(*node);
63 if_possible(n.impl.effect.start());
64 }
65 void pause() override
66 {
67 auto& n = static_cast<safe_node<Node>&>(*node);
68 if_possible(n.impl.effect.pause());
69 node_process::pause();
70 }
71
72 void resume() override
73 {
74 node_process::resume();
75 auto& n = static_cast<safe_node<Node>&>(*node);
76 if_possible(n.impl.effect.resume());
77 }
78
79 void stop() override
80 {
81 auto& n = static_cast<safe_node<Node>&>(*node);
82 if_possible(n.impl.effect.stop());
83 node_process::stop();
84 }
85
86 void offset_impl(ossia::time_value date) override
87 {
88 node_process::offset_impl(date);
89 auto& n = static_cast<safe_node<Node>&>(*node);
90 util::flicks f{date.impl};
91 if_possible(n.impl.effect.transport(f));
92 }
93
94 void transport_impl(ossia::time_value date) override
95 {
96 node_process::transport_impl(date);
97 auto& n = static_cast<safe_node<Node>&>(*node);
98
99 util::flicks f{date.impl};
100 if_possible(n.impl.effect.transport(f));
101 }
102};
103
104template <typename Node>
105class Executor final
106 : public Execution::ProcessComponent_T<ProcessModel<Node>, ossia::node_process>
107{
108 Process::Inlets m_oldInlets;
109 Process::Outlets m_oldOutlets;
110
111public:
112 static Q_DECL_RELAXED_CONSTEXPR UuidKey<score::Component> static_key() noexcept
113 {
114 return uuid_from_string<Node>();
115 }
116
117 UuidKey<score::Component> key() const noexcept final override { return static_key(); }
118
119 bool key_match(UuidKey<score::Component> other) const noexcept final override
120 {
121 return static_key() == other || Execution::ProcessComponent::base_key_match(other);
122 }
123
124 [[no_unique_address]] ossia::type_if<int, is_gpu<Node>> node_id = -1;
125
126 Executor(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
128 element, ctx, "Executor::ProcessModel<Info>", p}
129 {
130#if SCORE_PLUGIN_GFX
131 if constexpr(is_gpu<Node>)
132 {
133 setup_gpu(element, ctx, p);
134 }
135 else
136#endif
137 {
138 setup_cpu(element, ctx, p);
139 }
140
141 if constexpr(avnd::tag_process_exec<Node>)
142 {
143 this->m_ossia_process = std::make_shared<CustomNodeProcess<Node>>(this->node);
144 }
145 else
146 {
147 this->m_ossia_process = std::make_shared<ossia::node_process>(this->node);
148 }
149 }
150
151 void
152 setup_cpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
153 {
154 auto& net_ctx
155 = *ctx.doc.findPlugin<Explorer::DeviceDocumentPlugin>()->networkContext();
156 const auto id
157 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
158
159 auto st = ossia::exec_state_facade{ctx.execState.get()};
160 std::shared_ptr<safe_node<Node>> ptr;
161 auto node = new safe_node<Node>{st.bufferSize(), (double)st.sampleRate(), id};
162 node->root_inputs().reserve(element.inlets().size());
163 node->root_outputs().reserve(element.outlets().size());
164
165 node->prepare(*ctx.execState.get()); // Preparation of the ossia side
166
167 if_possible(node->impl.effect.ossia_state = st);
168 if_possible(node->impl.effect.io_context = &net_ctx.context);
169 if_possible(node->impl.effect.ossia_document_context = &ctx.doc);
170 ptr.reset(node);
171 this->node = ptr;
172
173 if constexpr(requires { ptr->impl.effect; })
174 if constexpr(std::is_same_v<std::decay_t<decltype(ptr->impl.effect)>, Node>)
175 connect_message_bus(element, ctx, ptr->impl.effect);
176 connect_worker(ctx, ptr->impl);
177
178 node->dynamic_ports = element.dynamic_ports;
179 node->finish_init();
180
181 connect_controls(element, ctx, ptr);
182 update_controls(ptr);
183 QObject::connect(
184 &element, &Process::ProcessModel::inletsChanged, this,
185 &Executor::recompute_ports);
186 QObject::connect(
187 &element, &Process::ProcessModel::outletsChanged, this,
188 &Executor::recompute_ports);
189
190 // To call prepare() after evertyhing is ready
191 node->audio_configuration_changed(st);
192
193 m_oldInlets = element.inlets();
194 m_oldOutlets = element.outlets();
195 }
196
197 void
198 setup_gpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
199 {
200#if SCORE_PLUGIN_GFX
201 // FIXME net context for gpu node ?
202 const auto id
203 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
204
205 auto& gfx_exec = ctx.doc.plugin<Gfx::DocumentPlugin>().exec;
206
207 // Create the executor in the audio thread
208 struct named_exec_node final : Gfx::gfx_exec_node
209 {
210 using Gfx::gfx_exec_node::gfx_exec_node;
211 std::string label() const noexcept override
212 {
213 return std::string(avnd::get_name<Node>());
214 }
215 };
216
217 auto node = std::make_shared<named_exec_node>(gfx_exec);
218 node->prepare(*ctx.execState);
219
220 this->node = node;
221
222 // Create the controls, inputs outputs etc.
223 std::size_t i = 0;
224
225 for(auto& ctl : element.inlets())
226 {
227 if(auto ctrl = qobject_cast<Process::ControlInlet*>(ctl))
228 {
229 auto& p = node->add_control();
230 p->value = ctrl->value();
231 p->changed = true;
232
233 QObject::connect(
234 ctrl, &Process::ControlInlet::valueChanged, this,
235 Gfx::con_unvalidated{ctx, i, 0, node});
236 i++;
237 }
238 else if(auto ctrl = qobject_cast<Process::ValueInlet*>(ctl))
239 {
240 auto& p = node->add_control();
241 p->changed = true;
242 i++;
243 }
244 else if(auto ctrl = qobject_cast<Process::AudioInlet*>(ctl))
245 {
246 node->add_audio();
247 }
248 else if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
249 {
250 ossia::texture_inlet& inl = *node->add_texture();
251 ctrl->setupExecution(inl, this);
252 }
253 }
254
255 // FIXME refactor this with other GFX processes
256 for(auto* outlet : element.outlets())
257 {
258 if(auto ctrl = qobject_cast<Process::ControlOutlet*>(outlet))
259 {
260 node->add_control_out();
261 }
262 else if(auto ctrl = qobject_cast<Process::ValueOutlet*>(outlet))
263 {
264 node->add_control_out();
265 }
266 else if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
267 {
268 node->add_texture_out();
269 out->nodeId = node_id;
270 }
271 }
272
273 // Create the GPU node
274 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
275 ctx.alias.lock(), &ctx.executionQueue);
276 std::unique_ptr<score::gfx::Node> ptr;
277 if constexpr(GpuGraphicsNode2<Node>)
278 {
279 auto gpu_node = new CustomGpuNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
280 ptr.reset(gpu_node);
281 }
282 else if constexpr(GpuComputeNode2<Node>)
283 {
284 auto gpu_node = new GpuComputeNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
285 ptr.reset(gpu_node);
286 }
287 else if constexpr(GpuNode<Node>)
288 {
289 auto gpu_node
290 = new GfxNode<Node>(element, qex_ptr, node->control_outs, id, ctx.doc);
291 ptr.reset(gpu_node);
292 }
293
294 i = 0;
295 for(auto& ctl : element.inlets())
296 {
297 if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
298 {
299 ossia::texture_inlet& inl
300 = static_cast<ossia::texture_inlet&>(*node->root_inputs()[i]);
301 ptr->process(i, inl.data); // Setup render_target_spec
302 }
303 i++;
304 }
305 node->id = gfx_exec.ui->register_node(std::move(ptr));
306 node_id = node->id;
307#endif
308 }
309
310 void recompute_ports()
311 {
312 Execution::Transaction commands{this->system()};
313 auto n = std::dynamic_pointer_cast<safe_node<Node>>(this->node);
314 if(!n)
315 return;
316
317 // Re-run setup_inlets ?
318 this->in_exec([dp = this->process().dynamic_ports, node = n] {
319 node->dynamic_ports = dp;
320 node->root_inputs().clear();
321 node->root_outputs().clear();
322 node->initialize_all_ports();
323 });
324 }
325
326 void connect_controls(
327 ProcessModel<Node>& element, const ::Execution::Context& ctx,
328 std::shared_ptr<safe_node<Node>>& ptr)
329 {
330 using dynamic_ports_port_type = avnd::dynamic_ports_input_introspection<Node>;
331 using control_inputs_type = avnd::control_input_introspection<Node>;
332 using curve_inputs_type = avnd::curve_input_introspection<Node>;
333 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
334 using midifile_inputs_type = avnd::midifile_input_introspection<Node>;
335 using raw_file_inputs_type = avnd::raw_file_input_introspection<Node>;
336 using control_outputs_type = avnd::control_output_introspection<Node>;
337
338 // UI controls to engine
339 safe_node<Node>& node = *ptr;
340 avnd::effect_container<Node>& eff = node.impl;
341
342 // Initialize all the controls in the node with the current value.
343 // And update the node when the UI changes
344
345 if constexpr(dynamic_ports_port_type::size > 0)
346 {
347 for(auto state : eff.full_state())
348 {
349 dynamic_ports_port_type::for_all_n2(
350 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
351 }
352 }
353 if constexpr(control_inputs_type::size > 0)
354 {
355 for(auto state : eff.full_state())
356 {
357 control_inputs_type::for_all_n2(
358 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
359 }
360 }
361 if constexpr(curve_inputs_type::size > 0)
362 {
363 for(auto state : eff.full_state())
364 {
365 curve_inputs_type::for_all_n2(
366 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
367 }
368 }
369 if constexpr(soundfile_inputs_type::size > 0)
370 {
371 soundfile_inputs_type::for_all_n2(
372 avnd::get_inputs<Node>(eff),
373 dispatch_control_setup<Node>{element, ctx, ptr, this});
374
375 setup_soundfile_task_pool(element, ctx, ptr);
376 }
377 if constexpr(midifile_inputs_type::size > 0)
378 {
379 midifile_inputs_type::for_all_n2(
380 avnd::get_inputs<Node>(eff),
381 dispatch_control_setup<Node>{element, ctx, ptr, this});
382 }
383 if constexpr(raw_file_inputs_type::size > 0)
384 {
385 raw_file_inputs_type::for_all_n2(
386 avnd::get_inputs<Node>(eff),
387 dispatch_control_setup<Node>{element, ctx, ptr, this});
388 }
389
390 // Engine to ui controls
391 if constexpr(control_inputs_type::size > 0 || control_outputs_type::size > 0)
392 {
393 // Update the value in the UI
394 std::weak_ptr<safe_node<Node>> weak_node = ptr;
395 update_control_value_in_ui<Node> timer_action{weak_node, element};
396 timer_action();
397
398 con(ctx.doc.coarseUpdateTimer, &QTimer::timeout, this,
399 [timer_action = std::move(timer_action)] { timer_action(); },
400 Qt::QueuedConnection);
401 }
402 }
403
404 void setup_soundfile_task_pool(
405 ProcessModel<Node>& element, const ::Execution::Context& ctx,
406 std::shared_ptr<safe_node<Node>>& ptr)
407 {
408 safe_node<Node>& node = *ptr;
409
410 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
411
412 auto& tq = score::TaskPool::instance();
413 node.soundfiles.load_request
414 = [&tq, p = std::weak_ptr{ptr}, &ctx](std::string& str, int idx) {
415 auto eff_ptr = p.lock();
416 if(!eff_ptr)
417 return;
418 tq.post([eff_ptr = std::move(eff_ptr), filename = str, &ctx, idx]() mutable {
419 if(auto file = loadSoundfile(filename, ctx.doc, ctx.execState))
420 {
421 ctx.executionQueue.enqueue(
422 [sf = std::move(file), p = std::weak_ptr{eff_ptr}, idx]() mutable {
423 auto eff_ptr = p.lock();
424 if(!eff_ptr)
425 return;
426
427 avnd::effect_container<Node>& eff = eff_ptr->impl;
428 soundfile_inputs_type::for_nth_mapped_n2(
429 avnd::get_inputs<Node>(eff), idx,
430 [&]<std::size_t NField, std::size_t N>(
431 auto& field, avnd::predicate_index<N> p,
432 avnd::field_index<NField> f) {
433 eff_ptr->soundfile_loaded(sf, p, f);
434 });
435 });
436 }
437 });
438 };
439 }
440
441 void connect_message_bus(
442 ProcessModel<Node>& element, const ::Execution::Context& ctx, Node& eff)
443 {
444 // Custom UI messages to engine
445 if constexpr(avnd::has_gui_to_processor_bus<Node>)
446 {
447 element.from_ui = [p = QPointer{this}, &eff](QByteArray b) {
448 if(!p)
449 return;
450
451 p->in_exec([mess = std::move(b), &eff]() mutable {
452 using refl = avnd::function_reflection<&Node::process_message>;
453 static_assert(refl::count <= 1);
454
455 if constexpr(refl::count == 0)
456 {
457 // no arguments, just call it
458 eff.process_message();
459 }
460 else if constexpr(refl::count == 1)
461 {
462 using arg_type = avnd::first_argument<&Node::process_message>;
463 std::decay_t<arg_type> arg;
464 MessageBusReader reader{mess};
465 reader(arg);
466 eff.process_message(std::move(arg));
467 }
468 });
469 };
470 }
471
472 if constexpr(avnd::has_processor_to_gui_bus<Node>)
473 {
474 eff.send_message = [self = QPointer{this}]<typename T>(T&& b) mutable {
475 if(!self)
476 return;
477 if constexpr(
478 sizeof(QPointer<QObject>) + sizeof(b)
479 < Execution::ExecutionCommand::max_storage)
480 {
481 self->in_edit(
482 [proc = QPointer{&self->process()}, bb = std::move(b)]() mutable {
483 if(proc->to_ui)
484 MessageBusSender{proc->to_ui}(std::move(bb));
485 });
486 }
487 else
488 {
489 self->in_edit(
490 [proc = QPointer{&self->process()},
491 bb = std::make_unique<std::decay_t<T>>(std::move(b))]() mutable {
492 if(proc->to_ui)
493 MessageBusSender{proc->to_ui}(*std::move(bb));
494 });
495 }
496 };
497 }
498 }
499
500 void connect_worker(const ::Execution::Context& ctx, avnd::effect_container<Node>& eff)
501 {
502 if constexpr(avnd::has_worker<Node>)
503 {
504 // Initialize the thread pool beforehand
505 auto& tq = score::TaskPool::instance();
506 using worker_type = decltype(eff.effect.worker);
507 for(auto& eff : eff.effects())
508 {
509 std::weak_ptr eff_ptr = std::shared_ptr<Node>(this->node, &eff);
510 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
511 ctx.alias.lock(), &ctx.executionQueue);
512
513 eff.worker.request
514 = [&tq, qex_ptr = std::move(qex_ptr),
515 eff_ptr = std::move(eff_ptr)]<typename... Args>(Args&&... f) mutable {
516 // request() is invoked in the DSP / processor thread
517 // and just posts the task to the thread pool
518 tq.post([eff_ptr, qex_ptr, ... ff = std::forward<Args>(f)]() mutable {
519 // This happens in the worker thread
520 // If for some reason the object has already been removed, not much
521 // reason to perform the work
522 if(!eff_ptr.lock())
523 return;
524
525 using type_of_result
526 = decltype(worker_type::work(std::forward<decltype(ff)>(ff)...));
527 if constexpr(std::is_void_v<type_of_result>)
528 {
529 worker_type::work(std::forward<decltype(ff)>(ff)...);
530 }
531 else
532 {
533 // If the worker returns a std::function, it
534 // is to be invoked back in the processor DSP thread
535 auto res = worker_type::work(std::forward<decltype(ff)>(ff)...);
536 if(!res)
537 return;
538
539 // Execution queue is currently spsc from main thread to an exec thread,
540 // we cannot just yeet the result back from the thread-pool
541 ossia::qt::run_async(
542 qApp, [eff_ptr = std::move(eff_ptr), qex_ptr = std::move(qex_ptr),
543 res = std::move(res)]() mutable {
544 // Main thread
545 std::shared_ptr qex = qex_ptr.lock();
546 if(!qex)
547 return;
548
549 qex->enqueue(
550 [eff_ptr = std::move(eff_ptr), res = std::move(res)]() mutable {
551 // DSP / processor thread
552 // We need res to be mutable so that the worker can use it to e.g. store
553 // old data which will be freed back in the main thread
554 if(auto p = eff_ptr.lock())
555 res(*p);
556 });
557 });
558 }
559 });
560 };
561 }
562 }
563 }
564
565 // Update everything
566 void update_controls(std::shared_ptr<safe_node<Node>>& ptr)
567 {
568 avnd::effect_container<Node>& eff = ptr->impl;
569 {
570 for(auto state : eff.full_state())
571 {
572 avnd::input_introspection<Node>::for_all(
573 state.inputs, [&](auto& field) { if_possible(field.update(state.effect)); });
574 }
575 }
576 }
577
578 void cleanup() override
579 {
580 if constexpr(requires { this->process().from_ui; })
581 {
582 this->process().from_ui = [](QByteArray arr) {};
583 }
584 // FIXME cleanup eff.effect.send_message too ?
585
586#if SCORE_PLUGIN_GFX
587 if constexpr(is_gpu<Node>)
588 {
589 // FIXME this must move in the Node dtor. See video_node
590 auto& gfx_exec = this->system().doc.template plugin<Gfx::DocumentPlugin>().exec;
591 if(node_id >= 0)
592 gfx_exec.ui->unregister_node(node_id);
593 }
594
595 // FIXME refactor this with other GFX processes
596 for(auto* outlet : this->process().outlets())
597 {
598 if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
599 {
600 out->nodeId = -1;
601 }
602 }
603#endif
604 ::Execution::ProcessComponent::cleanup();
605 }
606
607 ~Executor() { }
608};
609}
Definition GfxApplicationPlugin.hpp:13
Definition GfxExecNode.hpp:40
The Path class is a typesafe wrapper around ObjectPath.
Definition Path.hpp:52
Definition UuidKey.hpp:344
Definition score-plugin-avnd/Crousti/Executor.hpp:57
Definition score-plugin-avnd/Crousti/Executor.hpp:107
Definition score-plugin-avnd/Crousti/ProcessModel.hpp:79
Definition Metadatas.hpp:26
Definition Metadatas.hpp:20
Definition Metadatas.hpp:16
Definition Metadatas.hpp:32
Definition Factories.hpp:19
Definition Process/Execution/ProcessComponent.hpp:89
Definition ExecutionTransaction.hpp:18
Definition GfxExecNode.hpp:120
Definition PortForward.hpp:23
Definition PortForward.hpp:27
Definition MessageBus.hpp:133
Definition MessageBus.hpp:37
Definition ExecutorPortSetup.hpp:337
Definition ExecutorUpdateControlValueInUi.hpp:76