OSSIA
Open Scenario System for Interactive Application
Loading...
Searching...
No Matches
graph_parallel.hpp
1#pragma once
2#include <ossia-config.hpp>
3#if defined(OSSIA_PARALLEL)
4#include <ossia/dataflow/graph/graph_static.hpp>
5#include <ossia/detail/hash_map.hpp>
6/*
7#include <tbb/flow_graph.h>
8
9namespace ossia
10{
11
12struct parallel_exec;
13template <typename Impl>
14struct parallel_update
15{
16public:
17 using cont_node = tbb::flow::continue_node<tbb::flow::continue_msg>;
18 std::shared_ptr<spdlog::logger> logger;
19 std::shared_ptr<bench_map> perf_map;
20
21 template <typename Graph_T>
22 parallel_update(Graph_T& g) : impl{g}
23 {
24 }
25
26 void update_graph(ossia::node_map& nodes, ossia::graph_t& graph)
27 {
28 namespace tbf = tbb::flow;
29 flow_nodes.clear();
30 start_nodes.clear();
31 nodes_with_incoming_edges.clear();
32
33 flow_graph.~graph();
34 new (&flow_graph) tbf::graph;
35
36 if (logger)
37 {
38 if (perf_map)
39 {
40 for (const auto& n : nodes)
41 {
42 graph_node* node = n.first.get();
43 (*perf_map)[node] = std::nullopt;
44 flow_nodes.insert({node, std::make_unique<cont_node>(
45 flow_graph, node_exec_logger_bench{
46 cur_state, *perf_map,
47 *logger, *node})});
48 }
49 }
50 else
51 {
52 for (auto n : nodes)
53 {
54 graph_node* node = n.first.get();
55 flow_nodes.insert(
56 {node,
57 std::make_unique<cont_node>(
58 flow_graph, node_exec_logger{cur_state, *logger, *node})});
59 }
60 }
61 }
62 else
63 {
64 for (auto n : nodes)
65 {
66 graph_node* node = n.first.get();
67 flow_nodes.insert(
68 {node, std::make_unique<cont_node>(
69 flow_graph, node_exec{cur_state, *node})});
70 }
71 }
72
73 for (auto n : nodes)
74 {
75 graph_node* n1 = n.first.get();
76 for (auto m : nodes)
77 {
78 graph_node* n2 = m.first.get();
79 if (n2 != n1)
80 {
81 if (boost::edge(n.second, m.second, graph).second)
82 {
83 tbf::make_edge(*flow_nodes[n2], *flow_nodes[n1]);
84 nodes_with_incoming_edges.push_back(n1);
85 }
86 }
87 }
88 }
89
90 for (auto n : nodes)
91 {
92 if (!ossia::contains(nodes_with_incoming_edges, n.first.get()))
93 start_nodes.push_back(n.first.get());
94 }
95
96 start_node
97 = std::make_unique<tbf::broadcast_node<tbf::continue_msg>>(flow_graph);
98 for (auto node : start_nodes)
99 {
100 tbf::make_edge(*start_node, *flow_nodes[node]);
101 }
102 }
103
104 template <typename Graph_T, typename DevicesT>
105 void operator()(Graph_T& g, const DevicesT& devices)
106 {
107 impl(g, devices);
108 update_graph(g.m_nodes, impl.m_sub_graph);
109 }
110
111private:
112 friend struct parallel_exec;
113
114 Impl impl;
115 execution_state* cur_state{};
116 std::unique_ptr<tbb::flow::broadcast_node<tbb::flow::continue_msg>>
117start_node; std::vector<graph_node*> nodes_with_incoming_edges;
118
119 tbb::flow::graph flow_graph;
120 ossia::hash_map<graph_node*, std::unique_ptr<cont_node>> flow_nodes;
121 std::vector<graph_node*> start_nodes;
122};
123
124struct parallel_exec
125{
126 template <typename Graph_T>
127 parallel_exec(Graph_T&)
128 {
129 }
130
131 template <typename Graph_T, typename Impl>
132 void operator()(
133 Graph_T& g, parallel_update<Impl>& self, ossia::execution_state& e,
134 const std::vector<ossia::graph_node*>&)
135 {
136 self.cur_state = &e;
137 self.start_node->try_put(tbb::flow::continue_msg{});
138 self.flow_graph.wait_for_all();
139 }
140};
141
142using parallel_tc_graph
143 = graph_static<parallel_update<tc_update<fast_tc>>, parallel_exec>;
144}
145*/
146
147/*
148#if __has_include(<taskflow/taskflow.hpp>)
149#include <taskflow/taskflow.hpp>
150namespace ossia
151{
152struct cpptf_exec;
153template <typename Impl>
154struct cpptf_update
155{
156public:
157 std::shared_ptr<spdlog::logger> logger;
158 std::shared_ptr<bench_map> perf_map;
159
160 template <typename Graph_T>
161 cpptf_update(Graph_T& g) : impl{g}
162 {
163 }
164
165 void update_graph(ossia::node_map& nodes, ossia::graph_t& graph)
166 {
167 flow_nodes.clear();
168 flow_graph.clear();
169
170 if (logger)
171 {
172 if (perf_map)
173 {
174 for (const auto& n : nodes)
175 {
176 graph_node* node = n.first.get();
177 (*perf_map)[node] = std::nullopt;
178 flow_nodes[node] =
179flow_graph.emplace(node_exec_logger_bench{cur_state, *perf_map, *logger,
180*node});
181 }
182 }
183 else
184 {
185 for (auto n : nodes)
186 {
187 graph_node* node = n.first.get();
188 flow_nodes[node] = flow_graph.emplace(node_exec_logger{cur_state,
189*logger, *node});
190 }
191 }
192 }
193 else
194 {
195 for (auto n : nodes)
196 {
197 graph_node* node = n.first.get();
198 flow_nodes[node] = flow_graph.emplace(node_exec{cur_state, *node});
199 }
200 }
201
202 // TODO instead for all edge
203 for (auto n : nodes)
204 {
205 graph_node* n1 = n.first.get();
206 for (auto m : nodes)
207 {
208 graph_node* n2 = m.first.get();
209 if (n2 != n1)
210 {
211 if (boost::edge(n.second, m.second, graph).second)
212 {
213 auto& sender = flow_nodes[n2];
214 auto& receiver = flow_nodes[n1];
215 sender.precede(receiver);
216 }
217 }
218 }
219 }
220 }
221
222 template <typename Graph_T, typename DevicesT>
223 void operator()(Graph_T& g, const DevicesT& devices)
224 {
225 impl(g, devices);
226 update_graph(g.m_nodes, impl.m_sub_graph);
227 }
228
229private:
230 friend struct cpptf_exec;
231
232 Impl impl;
233 execution_state* cur_state{};
234
235 tf::Taskflow flow_graph;
236 tf::Executor executor;
237 ossia::hash_map<graph_node*, tf::Task> flow_nodes;
238};
239
240struct cpptf_exec
241{
242 template <typename Graph_T>
243 cpptf_exec(Graph_T&)
244 {
245 }
246
247 template <typename Graph_T, typename Impl>
248 void operator()(
249 Graph_T& g, cpptf_update<Impl>& self, ossia::execution_state& e,
250 const std::vector<ossia::graph_node*>&)
251 {
252 self.cur_state = &e;
253 self.executor.run(self.flow_graph).get();
254 }
255};
256
257using cpptf_tc_graph
258 = graph_static<cpptf_update<tc_update<fast_tc>>, cpptf_exec>;
259}
260#endif
261*/
262
263#include <ossia/dataflow/graph/graph_parallel_impl.hpp>
264
265#endif