Loading...
Searching...
No Matches
score-plugin-avnd/Crousti/Executor.hpp
1#pragma once
2
3#include <Process/Execution/ProcessComponent.hpp>
4#include <Process/ExecutionContext.hpp>
5
6#include <Explorer/DocumentPlugin/DeviceDocumentPlugin.hpp>
7
8#include <Crousti/CpuAnalysisNode.hpp>
9#include <Crousti/CpuFilterNode.hpp>
10#include <Crousti/CpuGeneratorNode.hpp>
11#include <Crousti/ExecutorPortSetup.hpp>
12#include <Crousti/ExecutorUpdateControlValueInUi.hpp>
13#include <Crousti/File.hpp>
14#include <Crousti/GpuComputeNode.hpp>
15#include <Crousti/GpuNode.hpp>
16#include <Crousti/MessageBus.hpp>
17#include <Crousti/Metadatas.hpp>
18#include <Crousti/ProcessModel.hpp>
19
20#include <score/tools/Bind.hpp>
21
22#include <ossia/dataflow/exec_state_facade.hpp>
23#include <ossia/dataflow/node_process.hpp>
24#include <ossia/network/context.hpp>
25
26#include <ossia-qt/invoke.hpp>
27
28#include <QGuiApplication>
29
30#include <flicks.h>
31
32#if SCORE_PLUGIN_GFX
33#include <Crousti/GpuNode.hpp>
34#include <Gfx/GfxApplicationPlugin.hpp>
35#endif
36
37#include <score/tools/ThreadPool.hpp>
38
39#include <ossia/detail/type_if.hpp>
40
41#include <QTimer>
42
43#include <avnd/binding/ossia/data_node.hpp>
44#include <avnd/binding/ossia/mono_audio_node.hpp>
45#include <avnd/binding/ossia/node.hpp>
46#include <avnd/binding/ossia/ossia_audio_node.hpp>
47#include <avnd/binding/ossia/poly_audio_node.hpp>
48#include <avnd/concepts/temporality.hpp>
49#include <avnd/concepts/ui.hpp>
50#include <avnd/concepts/worker.hpp>
51
52namespace oscr
53{
54
55template <typename Node>
56class CustomNodeProcess : public ossia::node_process
57{
58 using node_process::node_process;
59 void start() override
60 {
61 node_process::start();
62 auto& n = static_cast<safe_node<Node>&>(*node);
63 if_possible(n.impl.effect.start());
64 }
65 void pause() override
66 {
67 auto& n = static_cast<safe_node<Node>&>(*node);
68 if_possible(n.impl.effect.pause());
69 node_process::pause();
70 }
71
72 void resume() override
73 {
74 node_process::resume();
75 auto& n = static_cast<safe_node<Node>&>(*node);
76 if_possible(n.impl.effect.resume());
77 }
78
79 void stop() override
80 {
81 auto& n = static_cast<safe_node<Node>&>(*node);
82 if_possible(n.impl.effect.stop());
83 node_process::stop();
84 }
85
86 void offset_impl(ossia::time_value date) override
87 {
88 node_process::offset_impl(date);
89 auto& n = static_cast<safe_node<Node>&>(*node);
90 util::flicks f{date.impl};
91 if_possible(n.impl.effect.transport(f));
92 }
93
94 void transport_impl(ossia::time_value date) override
95 {
96 node_process::transport_impl(date);
97 auto& n = static_cast<safe_node<Node>&>(*node);
98
99 util::flicks f{date.impl};
100 if_possible(n.impl.effect.transport(f));
101 }
102};
103
104template <typename Node>
105class Executor final
106 : public Execution::ProcessComponent_T<ProcessModel<Node>, ossia::node_process>
107{
108 Process::Inlets m_oldInlets;
109 Process::Outlets m_oldOutlets;
110
111public:
112 static Q_DECL_RELAXED_CONSTEXPR UuidKey<score::Component> static_key() noexcept
113 {
114 return uuid_from_string<Node>();
115 }
116
117 UuidKey<score::Component> key() const noexcept final override { return static_key(); }
118
119 bool key_match(UuidKey<score::Component> other) const noexcept final override
120 {
121 return static_key() == other || Execution::ProcessComponent::base_key_match(other);
122 }
123
124 [[no_unique_address]] ossia::type_if<int, is_gpu<Node>> node_id = -1;
125
126 Executor(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
128 element, ctx, "Executor::ProcessModel<Info>", p}
129 {
130#if SCORE_PLUGIN_GFX
131 if constexpr(is_gpu<Node>)
132 {
133 setup_gpu(element, ctx, p);
134 }
135 else
136#endif
137 {
138 setup_cpu(element, ctx, p);
139 }
140
141 if constexpr(avnd::tag_process_exec<Node>)
142 {
143 this->m_ossia_process = std::make_shared<CustomNodeProcess<Node>>(this->node);
144 }
145 else
146 {
147 this->m_ossia_process = std::make_shared<ossia::node_process>(this->node);
148 }
149 }
150
151 void
152 setup_cpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
153 {
154 auto& net_ctx
155 = *ctx.doc.findPlugin<Explorer::DeviceDocumentPlugin>()->networkContext();
156 const auto id
157 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
158
159 auto st = ossia::exec_state_facade{ctx.execState.get()};
160 std::shared_ptr<safe_node<Node>> ptr;
161 auto node = new safe_node<Node>{st.bufferSize(), (double)st.sampleRate(), id};
162 node->root_inputs().reserve(element.inlets().size());
163 node->root_outputs().reserve(element.outlets().size());
164
165 node->prepare(*ctx.execState.get()); // Preparation of the ossia side
166
167 if_possible(node->impl.effect.ossia_state = st);
168 if_possible(node->impl.effect.io_context = &net_ctx.context);
169 if_possible(node->impl.effect.ossia_document_context = &ctx.doc);
170 ptr.reset(node);
171 this->node = ptr;
172
173 if constexpr(requires { ptr->impl.effect; })
174 if constexpr(std::is_same_v<std::decay_t<decltype(ptr->impl.effect)>, Node>)
175 connect_message_bus(element, ctx, ptr->impl.effect);
176 connect_worker(ctx, ptr->impl);
177
178 node->dynamic_ports = element.dynamic_ports;
179 node->finish_init();
180
181 connect_controls(element, ctx, ptr);
182 update_controls(ptr);
183 QObject::connect(
184 &element, &Process::ProcessModel::inletsChanged, this,
185 &Executor::recompute_ports);
186 QObject::connect(
187 &element, &Process::ProcessModel::outletsChanged, this,
188 &Executor::recompute_ports);
189
190 // To call prepare() after evertyhing is ready
191 node->audio_configuration_changed(st);
192
193 m_oldInlets = element.inlets();
194 m_oldOutlets = element.outlets();
195 }
196
197 void
198 setup_gpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
199 {
200#if SCORE_PLUGIN_GFX
201 // FIXME net context for gpu node ?
202 const auto id
203 = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
204
205 auto& gfx_exec = ctx.doc.plugin<Gfx::DocumentPlugin>().exec;
206
207 // Create the executor in the audio thread
208 struct named_exec_node final : Gfx::gfx_exec_node
209 {
210 using Gfx::gfx_exec_node::gfx_exec_node;
211 std::string label() const noexcept override
212 {
213 return std::string(avnd::get_name<Node>());
214 }
215 };
216
217 auto node = std::make_shared<named_exec_node>(gfx_exec);
218 node->prepare(*ctx.execState);
219
220 this->node = node;
221
222 // Create the controls, inputs outputs etc.
223 std::size_t i = 0;
224
225 for(auto& ctl : element.inlets())
226 {
227 if(auto ctrl = qobject_cast<Process::ControlInlet*>(ctl))
228 {
229 auto& p = node->add_control();
230 p->value = ctrl->value();
231 p->changed = true;
232
233 QObject::connect(
234 ctrl, &Process::ControlInlet::valueChanged, this,
235 Gfx::con_unvalidated{ctx, i, 0, node});
236 i++;
237 }
238 else if(auto ctrl = qobject_cast<Process::ValueInlet*>(ctl))
239 {
240 auto& p = node->add_control();
241 p->changed = true;
242 i++;
243 }
244 else if(auto ctrl = qobject_cast<Process::AudioInlet*>(ctl))
245 {
246 node->add_audio();
247 }
248 else if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
249 {
250 ossia::texture_inlet& inl = *node->add_texture();
251 ctrl->setupExecution(inl, this);
252 }
253 }
254
255 // FIXME refactor this with other GFX processes
256 for(auto* outlet : element.outlets())
257 {
258 if(auto ctrl = qobject_cast<Process::ControlOutlet*>(outlet))
259 {
260 node->add_control_out();
261 }
262 else if(auto ctrl = qobject_cast<Process::ValueOutlet*>(outlet))
263 {
264 node->add_control_out();
265 }
266 else if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
267 {
268 node->add_texture_out();
269 out->nodeId = node_id;
270 }
271 }
272
273 // Create the GPU node
274 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
275 ctx.alias.lock(), &ctx.executionQueue);
276 std::unique_ptr<score::gfx::Node> ptr;
277 if constexpr(GpuGraphicsNode2<Node>)
278 {
279 auto gpu_node = new CustomGpuNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
280 ptr.reset(gpu_node);
281 }
282 else if constexpr(GpuComputeNode2<Node>)
283 {
284 auto gpu_node = new GpuComputeNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
285 ptr.reset(gpu_node);
286 }
287 else if constexpr(GpuNode<Node>)
288 {
289 auto gpu_node
290 = new GfxNode<Node>(element, qex_ptr, node->control_outs, id, ctx.doc);
291 ptr.reset(gpu_node);
292 }
293
294 i = 0;
295 for(auto& ctl : element.inlets())
296 {
297 if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
298 {
299 ossia::texture_inlet& inl
300 = static_cast<ossia::texture_inlet&>(*node->root_inputs()[i]);
301 ptr->process(i, inl.data); // Setup render_target_spec
302 }
303 i++;
304 }
305 node->id = gfx_exec.ui->register_node(std::move(ptr));
306 node_id = node->id;
307#endif
308 }
309
310 void recompute_ports()
311 {
312 Execution::Transaction commands{this->system()};
313 auto n = std::dynamic_pointer_cast<safe_node<Node>>(this->node);
314 if(!n)
315 return;
316
317 // Re-run setup_inlets ?
318 this->in_exec([dp = this->process().dynamic_ports, node = n] {
319 node->dynamic_ports = dp;
320 node->root_inputs().clear();
321 node->root_outputs().clear();
322 node->initialize_all_ports();
323 });
324 }
325
326 void connect_controls(
327 ProcessModel<Node>& element, const ::Execution::Context& ctx,
328 std::shared_ptr<safe_node<Node>>& ptr)
329 {
330 using dynamic_ports_port_type = avnd::dynamic_ports_input_introspection<Node>;
331 using control_inputs_type = avnd::control_input_introspection<Node>;
332 using curve_inputs_type = avnd::curve_input_introspection<Node>;
333 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
334 using midifile_inputs_type = avnd::midifile_input_introspection<Node>;
335 using raw_file_inputs_type = avnd::raw_file_input_introspection<Node>;
336 using control_outputs_type = avnd::control_output_introspection<Node>;
337
338 // UI controls to engine
339 safe_node<Node>& node = *ptr;
340 avnd::effect_container<Node>& eff = node.impl;
341
342 // Initialize all the controls in the node with the current value.
343 // And update the node when the UI changes
344
345 if constexpr(dynamic_ports_port_type::size > 0)
346 {
347 for(auto state : eff.full_state())
348 {
349 dynamic_ports_port_type::for_all_n2(
350 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
351 }
352 }
353 if constexpr(control_inputs_type::size > 0)
354 {
355 for(auto state : eff.full_state())
356 {
357 control_inputs_type::for_all_n2(
358 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
359 }
360 }
361 if constexpr(curve_inputs_type::size > 0)
362 {
363 for(auto state : eff.full_state())
364 {
365 curve_inputs_type::for_all_n2(
366 state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
367 }
368 }
369 if constexpr(soundfile_inputs_type::size > 0)
370 {
371 soundfile_inputs_type::for_all_n2(
372 avnd::get_inputs<Node>(eff),
373 dispatch_control_setup<Node>{element, ctx, ptr, this});
374
375 setup_soundfile_task_pool(element, ctx, ptr);
376 }
377 if constexpr(midifile_inputs_type::size > 0)
378 {
379 midifile_inputs_type::for_all_n2(
380 avnd::get_inputs<Node>(eff),
381 dispatch_control_setup<Node>{element, ctx, ptr, this});
382 }
383 if constexpr(raw_file_inputs_type::size > 0)
384 {
385 raw_file_inputs_type::for_all_n2(
386 avnd::get_inputs<Node>(eff),
387 dispatch_control_setup<Node>{element, ctx, ptr, this});
388 }
389
390 // Engine to ui controls
391 if constexpr(control_inputs_type::size > 0 || control_outputs_type::size > 0)
392 {
393 // Update the value in the UI
394 std::weak_ptr<safe_node<Node>> weak_node = ptr;
395 update_control_value_in_ui<Node> timer_action{weak_node, element};
396 timer_action();
397
398 con(ctx.doc.coarseUpdateTimer, &QTimer::timeout, this,
399 [timer_action = std::move(timer_action)] { timer_action(); },
400 Qt::QueuedConnection);
401 }
402 }
403
404 void setup_soundfile_task_pool(
405 ProcessModel<Node>& element, const ::Execution::Context& ctx,
406 std::shared_ptr<safe_node<Node>>& ptr)
407 {
408 safe_node<Node>& node = *ptr;
409 avnd::effect_container<Node>& eff = node.impl;
410
411 using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
412
413 auto& tq = score::TaskPool::instance();
414 node.soundfiles.load_request
415 = [&tq, p = std::weak_ptr{ptr}, &ctx](std::string& str, int idx) {
416 auto eff_ptr = p.lock();
417 if(!eff_ptr)
418 return;
419 tq.post([eff_ptr = std::move(eff_ptr), filename = str, &ctx, idx]() mutable {
420 if(auto file = loadSoundfile(filename, ctx.doc, ctx.execState))
421 {
422 ctx.executionQueue.enqueue(
423 [sf = std::move(file), p = std::weak_ptr{eff_ptr}, idx]() mutable {
424 auto eff_ptr = p.lock();
425 if(!eff_ptr)
426 return;
427
428 avnd::effect_container<Node>& eff = eff_ptr->impl;
429 soundfile_inputs_type::for_nth_mapped_n2(
430 avnd::get_inputs<Node>(eff), idx,
431 [&]<std::size_t NField, std::size_t N>(
432 auto& field, avnd::predicate_index<N> p,
433 avnd::field_index<NField> f) {
434 eff_ptr->soundfile_loaded(sf, p, f);
435 });
436 });
437 }
438 });
439 };
440 }
441
442 void connect_message_bus(
443 ProcessModel<Node>& element, const ::Execution::Context& ctx, Node& eff)
444 {
445 // Custom UI messages to engine
446 if constexpr(avnd::has_gui_to_processor_bus<Node>)
447 {
448 element.from_ui = [p = QPointer{this}, &eff](QByteArray b) {
449 if(!p)
450 return;
451
452 p->in_exec([mess = std::move(b), &eff]() mutable {
453 using refl = avnd::function_reflection<&Node::process_message>;
454 static_assert(refl::count <= 1);
455
456 if constexpr(refl::count == 0)
457 {
458 // no arguments, just call it
459 eff.process_message();
460 }
461 else if constexpr(refl::count == 1)
462 {
463 using arg_type = avnd::first_argument<&Node::process_message>;
464 std::decay_t<arg_type> arg;
465 MessageBusReader reader{mess};
466 reader(arg);
467 eff.process_message(std::move(arg));
468 }
469 });
470 };
471 }
472
473 if constexpr(avnd::has_processor_to_gui_bus<Node>)
474 {
475 eff.send_message = [self = QPointer{this}]<typename T>(T&& b) mutable {
476 if(!self)
477 return;
478 if constexpr(
479 sizeof(QPointer<QObject>) + sizeof(b)
480 < Execution::ExecutionCommand::max_storage)
481 {
482 self->in_edit(
483 [proc = QPointer{&self->process()}, bb = std::move(b)]() mutable {
484 if(proc->to_ui)
485 MessageBusSender{proc->to_ui}(std::move(bb));
486 });
487 }
488 else
489 {
490 self->in_edit(
491 [proc = QPointer{&self->process()},
492 bb = std::make_unique<std::decay_t<T>>(std::move(b))]() mutable {
493 if(proc->to_ui)
494 MessageBusSender{proc->to_ui}(*std::move(bb));
495 });
496 }
497 };
498 }
499 }
500
501 void connect_worker(const ::Execution::Context& ctx, avnd::effect_container<Node>& eff)
502 {
503 if constexpr(avnd::has_worker<Node>)
504 {
505 // Initialize the thread pool beforehand
506 auto& tq = score::TaskPool::instance();
507 using worker_type = decltype(eff.effect.worker);
508 for(auto& eff : eff.effects())
509 {
510 std::weak_ptr eff_ptr = std::shared_ptr<Node>(this->node, &eff);
511 std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
512 ctx.alias.lock(), &ctx.executionQueue);
513
514 eff.worker.request
515 = [&tq, qex_ptr = std::move(qex_ptr),
516 eff_ptr = std::move(eff_ptr)]<typename... Args>(Args&&... f) mutable {
517 // request() is invoked in the DSP / processor thread
518 // and just posts the task to the thread pool
519 tq.post([eff_ptr, qex_ptr, ... ff = std::forward<Args>(f)]() mutable {
520 // This happens in the worker thread
521 // If for some reason the object has already been removed, not much
522 // reason to perform the work
523 if(!eff_ptr.lock())
524 return;
525
526 using type_of_result
527 = decltype(worker_type::work(std::forward<decltype(ff)>(ff)...));
528 if constexpr(std::is_void_v<type_of_result>)
529 {
530 worker_type::work(std::forward<decltype(ff)>(ff)...);
531 }
532 else
533 {
534 // If the worker returns a std::function, it
535 // is to be invoked back in the processor DSP thread
536 auto res = worker_type::work(std::forward<decltype(ff)>(ff)...);
537 if(!res)
538 return;
539
540 // Execution queue is currently spsc from main thread to an exec thread,
541 // we cannot just yeet the result back from the thread-pool
542 ossia::qt::run_async(
543 qApp, [eff_ptr = std::move(eff_ptr), qex_ptr = std::move(qex_ptr),
544 res = std::move(res)]() mutable {
545 // Main thread
546 std::shared_ptr qex = qex_ptr.lock();
547 if(!qex)
548 return;
549
550 qex->enqueue(
551 [eff_ptr = std::move(eff_ptr), res = std::move(res)]() mutable {
552 // DSP / processor thread
553 // We need res to be mutable so that the worker can use it to e.g. store
554 // old data which will be freed back in the main thread
555 if(auto p = eff_ptr.lock())
556 res(*p);
557 });
558 });
559 }
560 });
561 };
562 }
563 }
564 }
565
566 // Update everything
567 void update_controls(std::shared_ptr<safe_node<Node>>& ptr)
568 {
569 avnd::effect_container<Node>& eff = ptr->impl;
570 {
571 for(auto state : eff.full_state())
572 {
573 avnd::input_introspection<Node>::for_all(
574 state.inputs, [&](auto& field) { if_possible(field.update(state.effect)); });
575 }
576 }
577 }
578
579 void cleanup() override
580 {
581 if constexpr(requires { this->process().from_ui; })
582 {
583 this->process().from_ui = [](QByteArray arr) {};
584 }
585 // FIXME cleanup eff.effect.send_message too ?
586
587#if SCORE_PLUGIN_GFX
588 if constexpr(is_gpu<Node>)
589 {
590 // FIXME this must move in the Node dtor. See video_node
591 auto& gfx_exec = this->system().doc.template plugin<Gfx::DocumentPlugin>().exec;
592 if(node_id >= 0)
593 gfx_exec.ui->unregister_node(node_id);
594 }
595
596 // FIXME refactor this with other GFX processes
597 for(auto* outlet : this->process().outlets())
598 {
599 if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
600 {
601 out->nodeId = -1;
602 }
603 }
604#endif
605 ::Execution::ProcessComponent::cleanup();
606 }
607
608 ~Executor() { }
609};
610}
Definition GfxApplicationPlugin.hpp:13
Definition GfxExecNode.hpp:40
The Path class is a typesafe wrapper around ObjectPath.
Definition Path.hpp:52
Definition UuidKey.hpp:344
Definition score-plugin-avnd/Crousti/Executor.hpp:57
Definition score-plugin-avnd/Crousti/Executor.hpp:107
Definition score-plugin-avnd/Crousti/ProcessModel.hpp:79
Definition Metadatas.hpp:26
Definition Metadatas.hpp:20
Definition Metadatas.hpp:16
Definition Metadatas.hpp:32
Definition Factories.hpp:19
Definition Process/Execution/ProcessComponent.hpp:89
Definition ExecutionTransaction.hpp:18
Definition GfxExecNode.hpp:120
Definition PortForward.hpp:23
Definition PortForward.hpp:27
Definition MessageBus.hpp:133
Definition MessageBus.hpp:37
Definition ExecutorPortSetup.hpp:337
Definition ExecutorUpdateControlValueInUi.hpp:76