ESPResSo
Extensible Simulation Package for Research on Soft Matter Systems
Loading...
Searching...
No Matches
MpiCallbacks.hpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2010-2022 The ESPResSo project
3 * Copyright (C) 2002,2003,2004,2005,2006,2007,2008,2009,2010
4 * Max-Planck-Institute for Polymer Research, Theory Group
5 *
6 * This file is part of ESPResSo.
7 *
8 * ESPResSo is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * ESPResSo is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22#ifndef COMMUNICATION_MPI_CALLBACKS
23#define COMMUNICATION_MPI_CALLBACKS
24
25/**
26 * @file
27 *
28 * @ref Communication::MpiCallbacks manages MPI communication using a
29 * visitor pattern. The program runs on the head node and is responsible
30 * for calling callback functions on the worker nodes when necessary,
31 * e.g. to broadcast global variables or run an algorithm in parallel.
32 *
33 * Callbacks are registered on the head node as function pointers via
34 * the @ref REGISTER_CALLBACK. The visitor pattern allows using arbitrary
35 * function signatures.
36 */
37
39
40#include <boost/mpi/collectives/broadcast.hpp>
41#include <boost/mpi/communicator.hpp>
42#include <boost/mpi/environment.hpp>
43#include <boost/mpi/packed_iarchive.hpp>
44
45#include <cassert>
46#include <memory>
47#include <tuple>
48#include <type_traits>
49#include <utility>
50#include <vector>
51
52namespace Communication {
53
54class MpiCallbacks;
55
56namespace detail {
57/**
58 * @brief Check if a type can be used as a callback argument.
59 *
60 * This checks is a type can be a parameter type for a MPI callback.
61 * Not allowed are pointers and non-const references, as output
62 * parameters can not work across ranks.
63 */
64template <class T>
65using is_allowed_argument =
66 std::integral_constant<bool,
67 not(std::is_pointer_v<T> ||
68 (!std::is_const_v<std::remove_reference_t<T>> &&
69 std::is_lvalue_reference_v<T>))>;
70
71/**
72 * @brief Invoke a callable with arguments from an mpi buffer.
73 *
74 * @tparam F A Callable that can be called with Args as parameters.
75 * @tparam Args Pack of arguments for @p F
76 *
77 * @param f Functor to be called
78 * @param ia Buffer to extract the parameters from
79 *
80 * @return Return value of calling @p f.
81 */
82template <class F, class... Args>
83auto invoke(F f, boost::mpi::packed_iarchive &ia) {
84 static_assert(std::conjunction_v<is_allowed_argument<Args>...>,
85 "Pointers and non-const references are not allowed as "
86 "arguments for callbacks.");
87
88 /* This is the local receive buffer for the parameters. We have to strip
89 away const so we can actually deserialize into it. */
90 std::tuple<std::remove_const_t<std::remove_reference_t<Args>>...> params;
91 std::apply([&ia](auto &&...e) { ((ia >> e), ...); }, params);
92
93 /* We add const here, so that parameters can only be by value
94 or const reference. Output parameters on callbacks are not
95 sensible because the changes are not propagated back, so
96 we make sure this does not compile. */
97 return std::apply(f, std::as_const(params));
98}
99
100/**
101 * @brief Type-erased interface for callbacks.
102 *
103 * This encapsulates the signature of the callback
104 * and the parameter transfer, so that it can be
105 * called without any type information on the parameters.
106 */
107struct callback_concept_t {
108 /**
109 * @brief Execute the callback.
110 *
111 * Unpack parameters for this callback, and then call it.
112 */
113 virtual void operator()(boost::mpi::communicator const &,
114 boost::mpi::packed_iarchive &) const = 0;
115 virtual ~callback_concept_t() = default;
116};
117
118/**
119 * @brief Callback without a return value.
120 *
121 * This is an implementation of a callback for a specific callable
122 * @p F and a set of arguments to call it with.
123 */
124template <class F, class... Args>
125struct callback_void_t final : public callback_concept_t {
126 F m_f;
127
128 callback_void_t(callback_void_t const &) = delete;
129 callback_void_t(callback_void_t &&) = delete;
130
131 template <class FRef>
132 explicit callback_void_t(FRef &&f) : m_f(std::forward<FRef>(f)) {}
133 void operator()(boost::mpi::communicator const &,
134 boost::mpi::packed_iarchive &ia) const override {
135 detail::invoke<F, Args...>(m_f, ia);
136 }
137};
138
139template <class F, class R, class... Args> struct FunctorTypes {
140 using functor_type = F;
141 using return_type = R;
142 using argument_types = std::tuple<Args...>;
143};
144
145template <class C, class R, class... Args>
146auto functor_types_impl(R (C::*)(Args...) const) {
147 return FunctorTypes<C, R, Args...>{};
148}
149
150template <class F>
151using functor_types =
152 decltype(functor_types_impl(&std::remove_reference_t<F>::operator()));
153
154template <class CRef, class C, class R, class... Args>
155auto make_model_impl(CRef &&c, FunctorTypes<C, R, Args...>) {
156 return std::make_unique<callback_void_t<C, Args...>>(std::forward<CRef>(c));
157}
158
159/**
160 * @brief Make a @ref callback_model_t for a functor or lambda.
161 *
162 * The signature is deduced from F::operator() const, which has
163 * to exist and can not be overloaded.
164 */
165template <typename F> auto make_model(F &&f) {
166 return make_model_impl(std::forward<F>(f), functor_types<F>{});
167}
168
169/**
170 * @brief Make a @ref callback_model_t for a function pointer.
171 */
172template <class... Args> auto make_model(void (*f_ptr)(Args...)) {
173 return std::make_unique<callback_void_t<void (*)(Args...), Args...>>(f_ptr);
174}
175} // namespace detail
176
177/**
178 * @brief The interface of the MPI callback mechanism.
179 */
181public:
182 /**
183 * @brief RAII handle for a callback.
184 *
185 * This is what the client gets for registering a
186 * dynamic (= not function pointer) callback.
187 * It manages the lifetime of the callback handle
188 * needed to call it. The handle has a type derived
189 * from the signature of the callback, which makes
190 * it possible to do static type checking on the
191 * arguments.
192 */
193 template <class... Args> class CallbackHandle {
194 public:
195 template <typename F, class = std::enable_if_t<std::is_same_v<
196 typename detail::functor_types<F>::argument_types,
197 std::tuple<Args...>>>>
198 CallbackHandle(std::shared_ptr<MpiCallbacks> cb, F &&f)
199 : m_id(cb->add(std::forward<F>(f))), m_cb(std::move(cb)) {}
200
202 CallbackHandle(CallbackHandle &&rhs) noexcept = default;
204 CallbackHandle &operator=(CallbackHandle &&rhs) noexcept = default;
205
206 private:
207 int m_id;
208 std::shared_ptr<MpiCallbacks> m_cb;
209
210 public:
211 /**
212 * @brief Call the callback managed by this handle.
213 *
214 * The arguments are passed to the remote callees, it
215 * must be possible to call the function with the provided
216 * arguments, otherwise this will not compile.
217 */
218 template <class... ArgRef>
219 auto operator()(ArgRef &&...args) const
220 /* Enable if a hypothetical function with signature void(Args..)
221 * could be called with the provided arguments. */
222 -> std::enable_if_t<
223 std::is_void_v<decltype(std::declval<void (*)(Args...)>()(
224 std::forward<ArgRef>(args)...))>> {
225 if (m_cb)
226 m_cb->call(m_id, std::forward<ArgRef>(args)...);
227 }
228
230 if (m_cb)
231 m_cb->remove(m_id);
232 }
233
234 int id() const { return m_id; }
235 };
236
237 /* Avoid accidental copy, leads to mpi deadlock or split brain */
238 MpiCallbacks(MpiCallbacks const &) = delete;
240
241private:
242 static auto &static_callbacks() {
243 static std::vector<
244 std::pair<void (*)(), std::unique_ptr<detail::callback_concept_t>>>
245 callbacks;
246
247 return callbacks;
248 }
249
250public:
251 MpiCallbacks(boost::mpi::communicator comm,
252 std::shared_ptr<boost::mpi::environment> mpi_env)
253 : m_comm(std::move(comm)), m_mpi_env(std::move(mpi_env)) {
254 /* Add a dummy at id 0 for loop abort. */
255 m_callback_map.add(nullptr);
256
257 for (auto &kv : static_callbacks()) {
258 m_func_ptr_to_id[kv.first] = m_callback_map.add(kv.second.get());
259 }
260 }
261
263 /* Release the clients on exit */
264 if (m_comm.rank() == 0) {
265 try {
266 abort_loop();
267 } catch (...) { // NOLINT(bugprone-empty-catch)
268 }
269 }
270 }
271
272private:
273 /**
274 * @brief Add a new callback.
275 *
276 * Add a new callback to the system. This is a collective
277 * function that must be run on all nodes.
278 *
279 * @tparam F An object with a const call operator.
280 *
281 * @param f The callback function to add.
282 * @return A handle with which the callback can be called.
283 */
284 template <typename F> auto add(F &&f) {
285 m_callbacks.emplace_back(detail::make_model(std::forward<F>(f)));
286 return m_callback_map.add(m_callbacks.back().get());
287 }
288
289public:
290 /**
291 * @brief Add a new callback.
292 *
293 * Add a new callback to the system. This is a collective
294 * function that must be run on all nodes.
295 *
296 * @param fp Pointer to the static callback function to add.
297 */
298 template <class... Args> void add(void (*fp)(Args...)) {
299 m_callbacks.emplace_back(detail::make_model(fp));
300 const int id = m_callback_map.add(m_callbacks.back().get());
301 m_func_ptr_to_id[reinterpret_cast<void (*)()>(fp)] = id;
302 }
303
304 /**
305 * @brief Add a new callback.
306 *
307 * Add a new callback to the system. This is a collective
308 * function that must be run on all nodes.
309 *
310 * @param fp Pointer to the static callback function to add.
311 */
312 template <class... Args> static void add_static(void (*fp)(Args...)) {
313 static_callbacks().emplace_back(reinterpret_cast<void (*)()>(fp),
314 detail::make_model(fp));
315 }
316
317private:
318 /**
319 * @brief Remove callback.
320 *
321 * Remove the callback id from the callback list.
322 * This is a collective call that must be run on all nodes.
323 *
324 * @param id Identifier of the callback to remove.
325 */
326 void remove(int id) {
327 std::erase_if(m_callbacks, [ptr = m_callback_map[id]](auto const &e) {
328 return e.get() == ptr;
329 });
330 m_callback_map.remove(id);
331 }
332
333private:
334 /**
335 * @brief call a callback.
336 *
337 * Call the callback id.
338 * The method can only be called on the head node
339 * and has the prerequisite that the other nodes are
340 * in the MPI loop.
341 *
342 * @param id The callback to call.
343 * @param args Arguments for the callback.
344 */
345 template <class... Args> void call(int id, Args &&...args) const {
346 if (m_comm.rank() != 0) {
347 throw std::logic_error("Callbacks can only be invoked on rank 0.");
348 }
349
350 assert(m_callback_map.find(id) != m_callback_map.end() &&
351 "m_callback_map and m_func_ptr_to_id disagree");
352
353 /* Send request to worker nodes */
354 boost::mpi::packed_oarchive oa(m_comm);
355 oa << id;
356
357 /* Pack the arguments into a packed mpi buffer. */
358 std::apply([&oa](auto &&...e) { ((oa << e), ...); },
359 std::forward_as_tuple(std::forward<Args>(args)...));
360
361 boost::mpi::broadcast(m_comm, oa, 0);
362 }
363
364public:
365 /**
366 * @brief Call a callback on worker nodes.
367 *
368 * The callback is **not** called on the head node.
369 *
370 * This method can only be called on the head node.
371 *
372 * @param fp Pointer to the function to call.
373 * @param args Arguments for the callback.
374 */
375 template <class... Args, class... ArgRef>
376 auto call(void (*fp)(Args...), ArgRef &&...args) const ->
377 /* enable only if fp can be called with the provided arguments */
378 std::enable_if_t<std::is_void_v<decltype(fp(args...))>> {
379 const int id = m_func_ptr_to_id.at(reinterpret_cast<void (*)()>(fp));
380
381 call(id, std::forward<ArgRef>(args)...);
382 }
383
384 /**
385 * @brief Call a callback on all nodes.
386 *
387 * This calls a callback on all nodes, including the head node.
388 *
389 * This method can only be called on the head node.
390 *
391 * @param fp Pointer to the function to call.
392 * @param args Arguments for the callback.
393 */
394 template <class... Args, class... ArgRef>
395 auto call_all(void (*fp)(Args...), ArgRef &&...args) const ->
396 /* enable only if fp can be called with the provided arguments */
397 std::enable_if_t<std::is_void_v<decltype(fp(args...))>> {
398 call(fp, args...);
399 fp(args...);
400 }
401
402 /**
403 * @brief Start the MPI loop.
404 *
405 * This is the callback loop for the worker nodes. They block
406 * on the MPI call and wait for a new callback request
407 * coming from the head node.
408 * This should be run on the worker nodes and must be running
409 * so that the head node can issue call().
410 */
411 void loop() const {
412 for (;;) {
413 /* Communicate callback id and parameters */
414 boost::mpi::packed_iarchive ia(m_comm);
415 boost::mpi::broadcast(m_comm, ia, 0);
416
417 int request;
418 ia >> request;
419
420 if (request == LOOP_ABORT) {
421 break;
422 }
423 /* Call the callback */
424 m_callback_map[request]->operator()(m_comm, ia);
425 }
426 }
427
428 /**
429 * @brief Abort the MPI loop.
430 *
431 * Make the worker nodes exit the MPI loop.
432 */
433 void abort_loop() { call(LOOP_ABORT); }
434
435 /**
436 * @brief The boost mpi communicator used by this instance
437 */
438 boost::mpi::communicator const &comm() const { return m_comm; }
439
440 std::shared_ptr<boost::mpi::environment> share_mpi_env() const {
441 return m_mpi_env;
442 }
443
444private:
445 /**
446 * @brief Id for the @ref abort_loop. Has to be 0.
447 */
448 static constexpr int LOOP_ABORT = 0;
449
450 /**
451 * The MPI communicator used for the callbacks.
452 */
453 boost::mpi::communicator m_comm;
454
455 /**
456 * The MPI environment used for the callbacks.
457 */
458 std::shared_ptr<boost::mpi::environment> m_mpi_env;
459
460 /**
461 * Internal storage for the callback functions.
462 */
463 std::vector<std::unique_ptr<detail::callback_concept_t>> m_callbacks;
464
465 /**
466 * Map of ids to callbacks.
467 */
469
470 /**
471 * Mapping of function pointers to ids, so static callbacks can be
472 * called by their pointer.
473 */
474 std::unordered_map<void (*)(), int> m_func_ptr_to_id;
475};
476
477template <class... Args>
479
480/**
481 * @brief Helper class to add callbacks before main.
482 *
483 * Should not be used directly, but via @ref REGISTER_CALLBACK.
484 */
486
487public:
489
490 template <class... Args> explicit RegisterCallback(void (*cb)(Args...)) {
492 }
493};
494} /* namespace Communication */
495
496/**
497 * @brief Register a static callback without return value.
498 *
499 * This registers a function as an mpi callback.
500 * The macro should be used at global scope.
501 *
502 * @param cb A function
503 */
504#define REGISTER_CALLBACK(cb) \
505 namespace Communication { \
506 static ::Communication::RegisterCallback register_##cb(&(cb)); \
507 }
508
509#endif
Keep an enumerated list of T objects, managed by the class.
auto operator()(ArgRef &&...args) const -> std::enable_if_t< std::is_void_v< decltype(std::declval< void(*)(Args...)>()(std::forward< ArgRef >(args)...))> >
Call the callback managed by this handle.
CallbackHandle(CallbackHandle &&rhs) noexcept=default
CallbackHandle(CallbackHandle const &)=delete
CallbackHandle & operator=(CallbackHandle &&rhs) noexcept=default
CallbackHandle(std::shared_ptr< MpiCallbacks > cb, F &&f)
CallbackHandle & operator=(CallbackHandle const &)=delete
The interface of the MPI callback mechanism.
auto call_all(void(*fp)(Args...), ArgRef &&...args) const -> std::enable_if_t< std::is_void_v< decltype(fp(args...))> >
Call a callback on all nodes.
auto call(void(*fp)(Args...), ArgRef &&...args) const -> std::enable_if_t< std::is_void_v< decltype(fp(args...))> >
Call a callback on worker nodes.
MpiCallbacks(boost::mpi::communicator comm, std::shared_ptr< boost::mpi::environment > mpi_env)
void add(void(*fp)(Args...))
Add a new callback.
std::shared_ptr< boost::mpi::environment > share_mpi_env() const
boost::mpi::communicator const & comm() const
The boost mpi communicator used by this instance.
void abort_loop()
Abort the MPI loop.
static void add_static(void(*fp)(Args...))
Add a new callback.
MpiCallbacks & operator=(MpiCallbacks const &)=delete
MpiCallbacks(MpiCallbacks const &)=delete
void loop() const
Start the MPI loop.
Helper class to add callbacks before main.
RegisterCallback(void(*cb)(Args...))
Container for objects that are identified by a numeric id.
static SteepestDescentParameters params
Currently active steepest descent instance.