OSSIA
Open Scenario System for Interactive Application
Loading...
Searching...
No Matches
graph_parallel.hpp
1
#pragma once
2
#include <ossia-config.hpp>
3
#if defined(OSSIA_PARALLEL)
4
#include <ossia/dataflow/graph/graph_static.hpp>
5
#include <ossia/detail/hash_map.hpp>
6
/*
7
#include <tbb/flow_graph.h>
8
9
namespace ossia
10
{
11
12
struct parallel_exec;
13
template <typename Impl>
14
struct parallel_update
15
{
16
public:
17
using cont_node = tbb::flow::continue_node<tbb::flow::continue_msg>;
18
std::shared_ptr<spdlog::logger> logger;
19
std::shared_ptr<bench_map> perf_map;
20
21
template <typename Graph_T>
22
parallel_update(Graph_T& g) : impl{g}
23
{
24
}
25
26
void update_graph(ossia::node_map& nodes, ossia::graph_t& graph)
27
{
28
namespace tbf = tbb::flow;
29
flow_nodes.clear();
30
start_nodes.clear();
31
nodes_with_incoming_edges.clear();
32
33
flow_graph.~graph();
34
new (&flow_graph) tbf::graph;
35
36
if (logger)
37
{
38
if (perf_map)
39
{
40
for (const auto& n : nodes)
41
{
42
graph_node* node = n.first.get();
43
(*perf_map)[node] = std::nullopt;
44
flow_nodes.insert({node, std::make_unique<cont_node>(
45
flow_graph, node_exec_logger_bench{
46
cur_state, *perf_map,
47
*logger, *node})});
48
}
49
}
50
else
51
{
52
for (auto n : nodes)
53
{
54
graph_node* node = n.first.get();
55
flow_nodes.insert(
56
{node,
57
std::make_unique<cont_node>(
58
flow_graph, node_exec_logger{cur_state, *logger, *node})});
59
}
60
}
61
}
62
else
63
{
64
for (auto n : nodes)
65
{
66
graph_node* node = n.first.get();
67
flow_nodes.insert(
68
{node, std::make_unique<cont_node>(
69
flow_graph, node_exec{cur_state, *node})});
70
}
71
}
72
73
for (auto n : nodes)
74
{
75
graph_node* n1 = n.first.get();
76
for (auto m : nodes)
77
{
78
graph_node* n2 = m.first.get();
79
if (n2 != n1)
80
{
81
if (boost::edge(n.second, m.second, graph).second)
82
{
83
tbf::make_edge(*flow_nodes[n2], *flow_nodes[n1]);
84
nodes_with_incoming_edges.push_back(n1);
85
}
86
}
87
}
88
}
89
90
for (auto n : nodes)
91
{
92
if (!ossia::contains(nodes_with_incoming_edges, n.first.get()))
93
start_nodes.push_back(n.first.get());
94
}
95
96
start_node
97
= std::make_unique<tbf::broadcast_node<tbf::continue_msg>>(flow_graph);
98
for (auto node : start_nodes)
99
{
100
tbf::make_edge(*start_node, *flow_nodes[node]);
101
}
102
}
103
104
template <typename Graph_T, typename DevicesT>
105
void operator()(Graph_T& g, const DevicesT& devices)
106
{
107
impl(g, devices);
108
update_graph(g.m_nodes, impl.m_sub_graph);
109
}
110
111
private:
112
friend struct parallel_exec;
113
114
Impl impl;
115
execution_state* cur_state{};
116
std::unique_ptr<tbb::flow::broadcast_node<tbb::flow::continue_msg>>
117
start_node; std::vector<graph_node*> nodes_with_incoming_edges;
118
119
tbb::flow::graph flow_graph;
120
ossia::hash_map<graph_node*, std::unique_ptr<cont_node>> flow_nodes;
121
std::vector<graph_node*> start_nodes;
122
};
123
124
struct parallel_exec
125
{
126
template <typename Graph_T>
127
parallel_exec(Graph_T&)
128
{
129
}
130
131
template <typename Graph_T, typename Impl>
132
void operator()(
133
Graph_T& g, parallel_update<Impl>& self, ossia::execution_state& e,
134
const std::vector<ossia::graph_node*>&)
135
{
136
self.cur_state = &e;
137
self.start_node->try_put(tbb::flow::continue_msg{});
138
self.flow_graph.wait_for_all();
139
}
140
};
141
142
using parallel_tc_graph
143
= graph_static<parallel_update<tc_update<fast_tc>>, parallel_exec>;
144
}
145
*/
146
147
/*
148
#if __has_include(<taskflow/taskflow.hpp>)
149
#include <taskflow/taskflow.hpp>
150
namespace ossia
151
{
152
struct cpptf_exec;
153
template <typename Impl>
154
struct cpptf_update
155
{
156
public:
157
std::shared_ptr<spdlog::logger> logger;
158
std::shared_ptr<bench_map> perf_map;
159
160
template <typename Graph_T>
161
cpptf_update(Graph_T& g) : impl{g}
162
{
163
}
164
165
void update_graph(ossia::node_map& nodes, ossia::graph_t& graph)
166
{
167
flow_nodes.clear();
168
flow_graph.clear();
169
170
if (logger)
171
{
172
if (perf_map)
173
{
174
for (const auto& n : nodes)
175
{
176
graph_node* node = n.first.get();
177
(*perf_map)[node] = std::nullopt;
178
flow_nodes[node] =
179
flow_graph.emplace(node_exec_logger_bench{cur_state, *perf_map, *logger,
180
*node});
181
}
182
}
183
else
184
{
185
for (auto n : nodes)
186
{
187
graph_node* node = n.first.get();
188
flow_nodes[node] = flow_graph.emplace(node_exec_logger{cur_state,
189
*logger, *node});
190
}
191
}
192
}
193
else
194
{
195
for (auto n : nodes)
196
{
197
graph_node* node = n.first.get();
198
flow_nodes[node] = flow_graph.emplace(node_exec{cur_state, *node});
199
}
200
}
201
202
// TODO instead for all edge
203
for (auto n : nodes)
204
{
205
graph_node* n1 = n.first.get();
206
for (auto m : nodes)
207
{
208
graph_node* n2 = m.first.get();
209
if (n2 != n1)
210
{
211
if (boost::edge(n.second, m.second, graph).second)
212
{
213
auto& sender = flow_nodes[n2];
214
auto& receiver = flow_nodes[n1];
215
sender.precede(receiver);
216
}
217
}
218
}
219
}
220
}
221
222
template <typename Graph_T, typename DevicesT>
223
void operator()(Graph_T& g, const DevicesT& devices)
224
{
225
impl(g, devices);
226
update_graph(g.m_nodes, impl.m_sub_graph);
227
}
228
229
private:
230
friend struct cpptf_exec;
231
232
Impl impl;
233
execution_state* cur_state{};
234
235
tf::Taskflow flow_graph;
236
tf::Executor executor;
237
ossia::hash_map<graph_node*, tf::Task> flow_nodes;
238
};
239
240
struct cpptf_exec
241
{
242
template <typename Graph_T>
243
cpptf_exec(Graph_T&)
244
{
245
}
246
247
template <typename Graph_T, typename Impl>
248
void operator()(
249
Graph_T& g, cpptf_update<Impl>& self, ossia::execution_state& e,
250
const std::vector<ossia::graph_node*>&)
251
{
252
self.cur_state = &e;
253
self.executor.run(self.flow_graph).get();
254
}
255
};
256
257
using cpptf_tc_graph
258
= graph_static<cpptf_update<tc_update<fast_tc>>, cpptf_exec>;
259
}
260
#endif
261
*/
262
263
#include <ossia/dataflow/graph/graph_parallel_impl.hpp>
264
265
#endif
src
ossia
dataflow
graph
graph_parallel.hpp
Generated on Mon Mar 31 2025 23:58:28 for OSSIA by
1.9.8