score-plugin-avnd/Crousti/Executor.hpp
1 #pragma once
2 
3 #include <Process/Execution/ProcessComponent.hpp>
4 #include <Process/ExecutionContext.hpp>
5 
6 #include <Explorer/DocumentPlugin/DeviceDocumentPlugin.hpp>
7 
8 #include <Crousti/CpuAnalysisNode.hpp>
9 #include <Crousti/CpuFilterNode.hpp>
10 #include <Crousti/CpuGeneratorNode.hpp>
11 #include <Crousti/ExecutorPortSetup.hpp>
12 #include <Crousti/ExecutorUpdateControlValueInUi.hpp>
13 #include <Crousti/File.hpp>
14 #include <Crousti/GpuComputeNode.hpp>
15 #include <Crousti/GpuNode.hpp>
16 #include <Crousti/MessageBus.hpp>
17 #include <Crousti/Metadatas.hpp>
18 #include <Crousti/ProcessModel.hpp>
19 
20 #include <score/tools/Bind.hpp>
21 
22 #include <ossia/dataflow/exec_state_facade.hpp>
23 #include <ossia/dataflow/node_process.hpp>
24 #include <ossia/network/context.hpp>
25 
26 #include <ossia-qt/invoke.hpp>
27 
28 #include <QGuiApplication>
29 
30 #if SCORE_PLUGIN_GFX
31 #include <Crousti/GpuNode.hpp>
32 #include <Gfx/GfxApplicationPlugin.hpp>
33 #endif
34 
35 #include <score/tools/ThreadPool.hpp>
36 
37 #include <ossia/detail/type_if.hpp>
38 
39 #include <QTimer>
40 
41 #include <avnd/binding/ossia/data_node.hpp>
42 #include <avnd/binding/ossia/mono_audio_node.hpp>
43 #include <avnd/binding/ossia/node.hpp>
44 #include <avnd/binding/ossia/ossia_audio_node.hpp>
45 #include <avnd/binding/ossia/poly_audio_node.hpp>
46 #include <avnd/concepts/temporality.hpp>
47 #include <avnd/concepts/ui.hpp>
48 #include <avnd/concepts/worker.hpp>
49 
50 namespace oscr
51 {
52 
53 template <typename Node>
54 class CustomNodeProcess : public ossia::node_process
55 {
56  using node_process::node_process;
57  void start() override
58  {
59  node_process::start();
60  auto& n = static_cast<safe_node<Node>&>(*node);
61  n.impl.effect.start();
62  }
63  void stop() override
64  {
65  auto& n = static_cast<safe_node<Node>&>(*node);
66  n.impl.effect.stop();
67  node_process::stop();
68  }
69 };
70 
71 template <typename Node>
72 class Executor final
73  : public Execution::ProcessComponent_T<ProcessModel<Node>, ossia::node_process>
74 {
75  Process::Inlets m_oldInlets;
76  Process::Outlets m_oldOutlets;
77 
78 public:
79  static Q_DECL_RELAXED_CONSTEXPR UuidKey<score::Component> static_key() noexcept
80  {
81  return uuid_from_string<Node>();
82  }
83 
84  UuidKey<score::Component> key() const noexcept final override { return static_key(); }
85 
86  bool key_match(UuidKey<score::Component> other) const noexcept final override
87  {
88  return static_key() == other || Execution::ProcessComponent::base_key_match(other);
89  }
90 
91  [[no_unique_address]] ossia::type_if<int, is_gpu<Node>> node_id = -1;
92 
93  Executor(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
95  element, ctx, "Executor::ProcessModel<Info>", p}
96  {
97 #if SCORE_PLUGIN_GFX
98  if constexpr(is_gpu<Node>)
99  {
100  setup_gpu(element, ctx, p);
101  }
102  else
103 #endif
104  {
105  setup_cpu(element, ctx, p);
106  }
107 
108  if constexpr(avnd::tag_process_exec<Node>)
109  {
110  this->m_ossia_process = std::make_shared<CustomNodeProcess<Node>>(this->node);
111  }
112  else
113  {
114  this->m_ossia_process = std::make_shared<ossia::node_process>(this->node);
115  }
116  }
117 
118  void
119  setup_cpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
120  {
121  auto& net_ctx
122  = *ctx.doc.findPlugin<Explorer::DeviceDocumentPlugin>()->networkContext();
123  const auto id
124  = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
125 
126  auto st = ossia::exec_state_facade{ctx.execState.get()};
127  std::shared_ptr<safe_node<Node>> ptr;
128  auto node = new safe_node<Node>{st.bufferSize(), (double)st.sampleRate(), id};
129  node->root_inputs().reserve(element.inlets().size());
130  node->root_outputs().reserve(element.outlets().size());
131 
132  node->prepare(*ctx.execState.get()); // Preparation of the ossia side
133 
134  if_possible(node->impl.effect.ossia_state = st);
135  if_possible(node->impl.effect.io_context = &net_ctx.context);
136  ptr.reset(node);
137  this->node = ptr;
138 
139  if constexpr(requires { ptr->impl.effect; })
140  if constexpr(std::is_same_v<std::decay_t<decltype(ptr->impl.effect)>, Node>)
141  connect_message_bus(element, ctx, ptr->impl.effect);
142  connect_worker(ctx, ptr->impl);
143 
144  node->dynamic_ports = element.dynamic_ports;
145  node->finish_init();
146 
147  connect_controls(element, ctx, ptr);
148  update_controls(ptr);
149  QObject::connect(
150  &element, &Process::ProcessModel::inletsChanged, this,
151  &Executor::recompute_ports);
152  QObject::connect(
153  &element, &Process::ProcessModel::outletsChanged, this,
154  &Executor::recompute_ports);
155 
156  // To call prepare() after evertyhing is ready
157  node->audio_configuration_changed(st);
158 
159  m_oldInlets = element.inlets();
160  m_oldOutlets = element.outlets();
161  }
162 
163  void
164  setup_gpu(ProcessModel<Node>& element, const ::Execution::Context& ctx, QObject* p)
165  {
166 #if SCORE_PLUGIN_GFX
167  // FIXME net context for gpu node ?
168  const auto id
169  = std::hash<ObjectPath>{}(Path<Process::ProcessModel>{element}.unsafePath());
170 
171  auto& gfx_exec = ctx.doc.plugin<Gfx::DocumentPlugin>().exec;
172 
173  // Create the executor in the audio thread
174  struct named_exec_node final : Gfx::gfx_exec_node
175  {
176  using Gfx::gfx_exec_node::gfx_exec_node;
177  std::string label() const noexcept override
178  {
179  return std::string(avnd::get_name<Node>());
180  }
181  };
182 
183  auto node = std::make_shared<named_exec_node>(gfx_exec);
184  node->prepare(*ctx.execState);
185 
186  this->node = node;
187 
188  // Create the controls, inputs outputs etc.
189  std::size_t i = 0;
190  for(auto& ctl : element.inlets())
191  {
192  if(auto ctrl = qobject_cast<Process::ControlInlet*>(ctl))
193  {
194  auto& p = node->add_control();
195  p->value = ctrl->value();
196  p->changed = true;
197 
198  QObject::connect(
199  ctrl, &Process::ControlInlet::valueChanged, this,
200  Gfx::con_unvalidated{ctx, i, 0, node});
201  i++;
202  }
203  else if(auto ctrl = qobject_cast<Process::ValueInlet*>(ctl))
204  {
205  auto& p = node->add_control();
206  p->changed = true;
207  i++;
208  }
209  else if(auto ctrl = qobject_cast<Process::AudioInlet*>(ctl))
210  {
211  node->add_audio();
212  }
213  else if(auto ctrl = qobject_cast<Gfx::TextureInlet*>(ctl))
214  {
215  node->add_texture();
216  }
217  }
218 
219  // FIXME refactor this with other GFX processes
220  for(auto* outlet : element.outlets())
221  {
222  if(auto ctrl = qobject_cast<Process::ControlOutlet*>(outlet))
223  {
224  node->add_control_out();
225  }
226  else if(auto ctrl = qobject_cast<Process::ValueOutlet*>(outlet))
227  {
228  node->add_control_out();
229  }
230  else if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
231  {
232  node->add_texture_out();
233  out->nodeId = node_id;
234  }
235  }
236 
237  // Create the GPU node
238  std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
239  ctx.alias.lock(), &ctx.executionQueue);
240  std::unique_ptr<score::gfx::Node> ptr;
241  if constexpr(GpuGraphicsNode2<Node>)
242  {
243  auto gpu_node = new CustomGpuNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
244  ptr.reset(gpu_node);
245  }
246  else if constexpr(GpuComputeNode2<Node>)
247  {
248  auto gpu_node = new GpuComputeNode<Node>(qex_ptr, node->control_outs, id, ctx.doc);
249  ptr.reset(gpu_node);
250  }
251  else if constexpr(GpuNode<Node>)
252  {
253  auto gpu_node
254  = new GfxNode<Node>(element, qex_ptr, node->control_outs, id, ctx.doc);
255  ptr.reset(gpu_node);
256  }
257  node->id = gfx_exec.ui->register_node(std::move(ptr));
258  node_id = node->id;
259 #endif
260  }
261 
262  void recompute_ports()
263  {
264  Execution::Transaction commands{this->system()};
265  auto n = std::dynamic_pointer_cast<safe_node<Node>>(this->node);
266  if(!n)
267  return;
268 
269  // Re-run setup_inlets ?
270  this->in_exec([dp = this->process().dynamic_ports, node = n] {
271  node->dynamic_ports = dp;
272  node->root_inputs().clear();
273  node->root_outputs().clear();
274  node->initialize_all_ports();
275  });
276  }
277 
278  void connect_controls(
279  ProcessModel<Node>& element, const ::Execution::Context& ctx,
280  std::shared_ptr<safe_node<Node>>& ptr)
281  {
282  using dynamic_ports_port_type = avnd::dynamic_ports_input_introspection<Node>;
283  using control_inputs_type = avnd::control_input_introspection<Node>;
284  using curve_inputs_type = avnd::curve_input_introspection<Node>;
285  using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
286  using midifile_inputs_type = avnd::midifile_input_introspection<Node>;
287  using raw_file_inputs_type = avnd::raw_file_input_introspection<Node>;
288  using control_outputs_type = avnd::control_output_introspection<Node>;
289 
290  // UI controls to engine
291  safe_node<Node>& node = *ptr;
292  avnd::effect_container<Node>& eff = node.impl;
293 
294  // Initialize all the controls in the node with the current value.
295  // And update the node when the UI changes
296 
297  if constexpr(dynamic_ports_port_type::size > 0)
298  {
299  for(auto state : eff.full_state())
300  {
301  dynamic_ports_port_type::for_all_n2(
302  state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
303  }
304  }
305  if constexpr(control_inputs_type::size > 0)
306  {
307  for(auto state : eff.full_state())
308  {
309  control_inputs_type::for_all_n2(
310  state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
311  }
312  }
313  if constexpr(curve_inputs_type::size > 0)
314  {
315  for(auto state : eff.full_state())
316  {
317  curve_inputs_type::for_all_n2(
318  state.inputs, dispatch_control_setup<Node>{element, ctx, ptr, this});
319  }
320  }
321  if constexpr(soundfile_inputs_type::size > 0)
322  {
323  soundfile_inputs_type::for_all_n2(
324  avnd::get_inputs<Node>(eff),
325  dispatch_control_setup<Node>{element, ctx, ptr, this});
326 
327  setup_soundfile_task_pool(element, ctx, ptr);
328  }
329  if constexpr(midifile_inputs_type::size > 0)
330  {
331  midifile_inputs_type::for_all_n2(
332  avnd::get_inputs<Node>(eff),
333  dispatch_control_setup<Node>{element, ctx, ptr, this});
334  }
335  if constexpr(raw_file_inputs_type::size > 0)
336  {
337  raw_file_inputs_type::for_all_n2(
338  avnd::get_inputs<Node>(eff),
339  dispatch_control_setup<Node>{element, ctx, ptr, this});
340  }
341 
342  // Engine to ui controls
343  if constexpr(control_inputs_type::size > 0 || control_outputs_type::size > 0)
344  {
345  // Update the value in the UI
346  std::weak_ptr<safe_node<Node>> weak_node = ptr;
347  update_control_value_in_ui<Node> timer_action{weak_node, element};
348  timer_action();
349 
350  con(ctx.doc.coarseUpdateTimer, &QTimer::timeout, this,
351  [timer_action = std::move(timer_action)] { timer_action(); },
352  Qt::QueuedConnection);
353  }
354  }
355 
356  void setup_soundfile_task_pool(
357  ProcessModel<Node>& element, const ::Execution::Context& ctx,
358  std::shared_ptr<safe_node<Node>>& ptr)
359  {
360  safe_node<Node>& node = *ptr;
361  avnd::effect_container<Node>& eff = node.impl;
362 
363  using soundfile_inputs_type = avnd::soundfile_input_introspection<Node>;
364 
365  auto& tq = score::TaskPool::instance();
366  node.soundfiles.load_request
367  = [&tq, p = std::weak_ptr{ptr}, &ctx](std::string& str, int idx) {
368  auto eff_ptr = p.lock();
369  if(!eff_ptr)
370  return;
371  tq.post([eff_ptr = std::move(eff_ptr), filename = str, &ctx, idx]() mutable {
372  if(auto file = loadSoundfile(filename, ctx.doc, ctx.execState))
373  {
374  ctx.executionQueue.enqueue(
375  [sf = std::move(file), p = std::weak_ptr{eff_ptr}, idx]() mutable {
376  auto eff_ptr = p.lock();
377  if(!eff_ptr)
378  return;
379 
380  avnd::effect_container<Node>& eff = eff_ptr->impl;
381  soundfile_inputs_type::for_nth_mapped_n2(
382  avnd::get_inputs<Node>(eff), idx,
383  [&]<std::size_t NField, std::size_t N>(
384  auto& field, avnd::predicate_index<N> p,
385  avnd::field_index<NField> f) {
386  eff_ptr->soundfile_loaded(sf, p, f);
387  });
388  });
389  }
390  });
391  };
392  }
393 
394  void connect_message_bus(
395  ProcessModel<Node>& element, const ::Execution::Context& ctx, Node& eff)
396  {
397  // Custom UI messages to engine
398  if constexpr(avnd::has_gui_to_processor_bus<Node>)
399  {
400  element.from_ui = [p = QPointer{this}, &eff](QByteArray b) {
401  if(!p)
402  return;
403 
404  p->in_exec([mess = std::move(b), &eff]() mutable {
405  using refl = avnd::function_reflection<&Node::process_message>;
406  static_assert(refl::count <= 1);
407 
408  if constexpr(refl::count == 0)
409  {
410  // no arguments, just call it
411  eff.process_message();
412  }
413  else if constexpr(refl::count == 1)
414  {
415  using arg_type = avnd::first_argument<&Node::process_message>;
416  std::decay_t<arg_type> arg;
417  MessageBusReader reader{mess};
418  reader(arg);
419  eff.process_message(std::move(arg));
420  }
421  });
422  };
423  }
424 
425  if constexpr(avnd::has_processor_to_gui_bus<Node>)
426  {
427  eff.send_message = [self = QPointer{this}]<typename T>(T&& b) mutable {
428  if(!self)
429  return;
430  if constexpr(
431  sizeof(QPointer<QObject>) + sizeof(b)
432  < Execution::ExecutionCommand::max_storage)
433  {
434  self->in_edit(
435  [proc = QPointer{&self->process()}, bb = std::move(b)]() mutable {
436  if(proc->to_ui)
437  MessageBusSender{proc->to_ui}(std::move(bb));
438  });
439  }
440  else
441  {
442  self->in_edit(
443  [proc = QPointer{&self->process()},
444  bb = std::make_unique<std::decay_t<T>>(std::move(b))]() mutable {
445  if(proc->to_ui)
446  MessageBusSender{proc->to_ui}(*std::move(bb));
447  });
448  }
449  };
450  }
451  }
452 
453  void connect_worker(const ::Execution::Context& ctx, avnd::effect_container<Node>& eff)
454  {
455  if constexpr(avnd::has_worker<Node>)
456  {
457  // Initialize the thread pool beforehand
458  auto& tq = score::TaskPool::instance();
459  using worker_type = decltype(eff.effect.worker);
460  for(auto& eff : eff.effects())
461  {
462  std::weak_ptr eff_ptr = std::shared_ptr<Node>(this->node, &eff);
463  std::weak_ptr qex_ptr = std::shared_ptr<Execution::ExecutionCommandQueue>(
464  ctx.alias.lock(), &ctx.executionQueue);
465 
466  eff.worker.request
467  = [&tq, qex_ptr = std::move(qex_ptr),
468  eff_ptr = std::move(eff_ptr)]<typename... Args>(Args&&... f) mutable {
469  // request() is invoked in the DSP / processor thread
470  // and just posts the task to the thread pool
471  tq.post([eff_ptr, qex_ptr, ... ff = std::forward<Args>(f)]() mutable {
472  // This happens in the worker thread
473  // If for some reason the object has already been removed, not much
474  // reason to perform the work
475  if(!eff_ptr.lock())
476  return;
477 
478  using type_of_result
479  = decltype(worker_type::work(std::forward<decltype(ff)>(ff)...));
480  if constexpr(std::is_void_v<type_of_result>)
481  {
482  worker_type::work(std::forward<decltype(ff)>(ff)...);
483  }
484  else
485  {
486  // If the worker returns a std::function, it
487  // is to be invoked back in the processor DSP thread
488  auto res = worker_type::work(std::forward<decltype(ff)>(ff)...);
489  if(!res)
490  return;
491 
492  // Execution queue is currently spsc from main thread to an exec thread,
493  // we cannot just yeet the result back from the thread-pool
494  ossia::qt::run_async(
495  qApp, [eff_ptr = std::move(eff_ptr), qex_ptr = std::move(qex_ptr),
496  res = std::move(res)]() mutable {
497  // Main thread
498  std::shared_ptr qex = qex_ptr.lock();
499  if(!qex)
500  return;
501 
502  qex->enqueue(
503  [eff_ptr = std::move(eff_ptr), res = std::move(res)]() mutable {
504  // DSP / processor thread
505  // We need res to be mutable so that the worker can use it to e.g. store
506  // old data which will be freed back in the main thread
507  if(auto p = eff_ptr.lock())
508  res(*p);
509  });
510  });
511  }
512  });
513  };
514  }
515  }
516  }
517 
518  // Update everything
519  void update_controls(std::shared_ptr<safe_node<Node>>& ptr)
520  {
521  avnd::effect_container<Node>& eff = ptr->impl;
522  {
523  for(auto state : eff.full_state())
524  {
525  avnd::input_introspection<Node>::for_all(
526  state.inputs, [&](auto& field) { if_possible(field.update(state.effect)); });
527  }
528  }
529  }
530 
531  void cleanup() override
532  {
533  if constexpr(requires { this->process().from_ui; })
534  {
535  this->process().from_ui = [](QByteArray arr) {};
536  }
537  // FIXME cleanup eff.effect.send_message too ?
538 
539 #if SCORE_PLUGIN_GFX
540  if constexpr(is_gpu<Node>)
541  {
542  // FIXME this must move in the Node dtor. See video_node
543  auto& gfx_exec = this->system().doc.template plugin<Gfx::DocumentPlugin>().exec;
544  if(node_id >= 0)
545  gfx_exec.ui->unregister_node(node_id);
546  }
547 
548  // FIXME refactor this with other GFX processes
549  for(auto* outlet : this->process().outlets())
550  {
551  if(auto out = qobject_cast<Gfx::TextureOutlet*>(outlet))
552  {
553  out->nodeId = -1;
554  }
555  }
556 #endif
557  ::Execution::ProcessComponent::cleanup();
558  }
559 
560  ~Executor() { }
561 };
562 }
Definition: GfxApplicationPlugin.hpp:13
Definition: GfxExecNode.hpp:40
Definition: UuidKey.hpp:343
Definition: score-plugin-avnd/Crousti/Executor.hpp:55
Definition: score-plugin-avnd/Crousti/Executor.hpp:74
Definition: score-plugin-avnd/Crousti/ProcessModel.hpp:77
Definition: Factories.hpp:19
Definition: Process/Execution/ProcessComponent.hpp:89
Definition: ExecutionTransaction.hpp:18
Definition: GfxExecNode.hpp:117
Definition: PortForward.hpp:23
Definition: PortForward.hpp:27
Definition: MessageBus.hpp:133
Definition: MessageBus.hpp:37
Definition: ExecutorPortSetup.hpp:337
Definition: ExecutorUpdateControlValueInUi.hpp:76
Definition: ObjectPath.hpp:186