ESPResSo
Extensible Simulation Package for Research on Soft Matter Systems
Loading...
Searching...
No Matches
MpiCallbacks.hpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2010-2022 The ESPResSo project
3 * Copyright (C) 2002,2003,2004,2005,2006,2007,2008,2009,2010
4 * Max-Planck-Institute for Polymer Research, Theory Group
5 *
6 * This file is part of ESPResSo.
7 *
8 * ESPResSo is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * ESPResSo is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22#ifndef COMMUNICATION_MPI_CALLBACKS
23#define COMMUNICATION_MPI_CALLBACKS
24
25/**
26 * @file
27 *
28 * @ref Communication::MpiCallbacks manages MPI communication using a
29 * visitor pattern. The program runs on the head node and is responsible
30 * for calling callback functions on the worker nodes when necessary,
31 * e.g. to broadcast global variables or run an algorithm in parallel.
32 *
33 * Callbacks are registered on the head node as function pointers via
34 * the @ref REGISTER_CALLBACK. The visitor pattern allows using arbitrary
35 * function signatures.
36 */
37
39#include <utils/tuple.hpp>
40#include <utils/type_traits.hpp>
41
42#include <boost/mpi/collectives/broadcast.hpp>
43#include <boost/mpi/communicator.hpp>
44#include <boost/mpi/environment.hpp>
45#include <boost/mpi/packed_iarchive.hpp>
46
47#include <cassert>
48#include <memory>
49#include <tuple>
50#include <type_traits>
51#include <utility>
52#include <vector>
53
54namespace Communication {
55
56class MpiCallbacks;
57
58namespace detail {
59/**
60 * @brief Check if a type can be used as a callback argument.
61 *
62 * This checks is a type can be a parameter type for a MPI callback.
63 * Not allowed are pointers and non-const references, as output
64 * parameters can not work across ranks.
65 */
66template <class T>
67using is_allowed_argument =
68 std::integral_constant<bool,
69 not(std::is_pointer_v<T> ||
70 (!std::is_const_v<std::remove_reference_t<T>> &&
71 std::is_lvalue_reference_v<T>))>;
72
73template <class... Args>
74using are_allowed_arguments =
76
77/**
78 * @brief Invoke a callable with arguments from an mpi buffer.
79 *
80 * @tparam F A Callable that can be called with Args as parameters.
81 * @tparam Args Pack of arguments for @p F
82 *
83 * @param f Functor to be called
84 * @param ia Buffer to extract the parameters from
85 *
86 * @return Return value of calling @p f.
87 */
88template <class F, class... Args>
89auto invoke(F f, boost::mpi::packed_iarchive &ia) {
90 static_assert(are_allowed_arguments<Args...>::value,
91 "Pointers and non-const references are not allowed as "
92 "arguments for callbacks.");
93
94 /* This is the local receive buffer for the parameters. We have to strip
95 away const so we can actually deserialize into it. */
96 std::tuple<std::remove_const_t<std::remove_reference_t<Args>>...> params;
97 Utils::for_each([&ia](auto &e) { ia >> e; }, params);
98
99 /* We add const here, so that parameters can only be by value
100 or const reference. Output parameters on callbacks are not
101 sensible because the changes are not propagated back, so
102 we make sure this does not compile. */
103 return std::apply(f, std::as_const(params));
104}
105
106/**
107 * @brief Type-erased interface for callbacks.
108 *
109 * This encapsulates the signature of the callback
110 * and the parameter transfer, so that it can be
111 * called without any type information on the parameters.
112 */
113struct callback_concept_t {
114 /**
115 * @brief Execute the callback.
116 *
117 * Unpack parameters for this callback, and then call it.
118 */
119 virtual void operator()(boost::mpi::communicator const &,
120 boost::mpi::packed_iarchive &) const = 0;
121 virtual ~callback_concept_t() = default;
122};
123
124/**
125 * @brief Callback without a return value.
126 *
127 * This is an implementation of a callback for a specific callable
128 * @p F and a set of arguments to call it with.
129 */
130template <class F, class... Args>
131struct callback_void_t final : public callback_concept_t {
132 F m_f;
133
134 callback_void_t(callback_void_t const &) = delete;
135 callback_void_t(callback_void_t &&) = delete;
136
137 template <class FRef>
138 explicit callback_void_t(FRef &&f) : m_f(std::forward<FRef>(f)) {}
139 void operator()(boost::mpi::communicator const &,
140 boost::mpi::packed_iarchive &ia) const override {
141 detail::invoke<F, Args...>(m_f, ia);
142 }
143};
144
145template <class F, class R, class... Args> struct FunctorTypes {
146 using functor_type = F;
147 using return_type = R;
148 using argument_types = std::tuple<Args...>;
149};
150
151template <class C, class R, class... Args>
152auto functor_types_impl(R (C::*)(Args...) const) {
153 return FunctorTypes<C, R, Args...>{};
154}
155
156template <class F>
157using functor_types =
158 decltype(functor_types_impl(&std::remove_reference_t<F>::operator()));
159
160template <class CRef, class C, class R, class... Args>
161auto make_model_impl(CRef &&c, FunctorTypes<C, R, Args...>) {
162 return std::make_unique<callback_void_t<C, Args...>>(std::forward<CRef>(c));
163}
164
165/**
166 * @brief Make a @ref callback_model_t for a functor or lambda.
167 *
168 * The signature is deduced from F::operator() const, which has
169 * to exist and can not be overloaded.
170 */
171template <typename F> auto make_model(F &&f) {
172 return make_model_impl(std::forward<F>(f), functor_types<F>{});
173}
174
175/**
176 * @brief Make a @ref callback_model_t for a function pointer.
177 */
178template <class... Args> auto make_model(void (*f_ptr)(Args...)) {
179 return std::make_unique<callback_void_t<void (*)(Args...), Args...>>(f_ptr);
180}
181} // namespace detail
182
183/**
184 * @brief The interface of the MPI callback mechanism.
185 */
187public:
188 /**
189 * @brief RAII handle for a callback.
190 *
191 * This is what the client gets for registering a
192 * dynamic (= not function pointer) callback.
193 * It manages the lifetime of the callback handle
194 * needed to call it. The handle has a type derived
195 * from the signature of the callback, which makes
196 * it possible to do static type checking on the
197 * arguments.
198 */
199 template <class... Args> class CallbackHandle {
200 public:
201 template <typename F, class = std::enable_if_t<std::is_same_v<
202 typename detail::functor_types<F>::argument_types,
203 std::tuple<Args...>>>>
204 CallbackHandle(std::shared_ptr<MpiCallbacks> cb, F &&f)
205 : m_id(cb->add(std::forward<F>(f))), m_cb(std::move(cb)) {}
206
208 CallbackHandle(CallbackHandle &&rhs) noexcept = default;
210 CallbackHandle &operator=(CallbackHandle &&rhs) noexcept = default;
211
212 private:
213 int m_id;
214 std::shared_ptr<MpiCallbacks> m_cb;
215
216 public:
217 /**
218 * @brief Call the callback managed by this handle.
219 *
220 * The arguments are passed to the remote callees, it
221 * must be possible to call the function with the provided
222 * arguments, otherwise this will not compile.
223 */
224 template <class... ArgRef>
225 auto operator()(ArgRef &&...args) const
226 /* Enable if a hypothetical function with signature void(Args..)
227 * could be called with the provided arguments. */
228 -> std::enable_if_t<
229 std::is_void_v<decltype(std::declval<void (*)(Args...)>()(
230 std::forward<ArgRef>(args)...))>> {
231 if (m_cb)
232 m_cb->call(m_id, std::forward<ArgRef>(args)...);
233 }
234
236 if (m_cb)
237 m_cb->remove(m_id);
238 }
239
240 int id() const { return m_id; }
241 };
242
243 /* Avoid accidental copy, leads to mpi deadlock or split brain */
244 MpiCallbacks(MpiCallbacks const &) = delete;
246
247private:
248 static auto &static_callbacks() {
249 static std::vector<
250 std::pair<void (*)(), std::unique_ptr<detail::callback_concept_t>>>
251 callbacks;
252
253 return callbacks;
254 }
255
256public:
257 MpiCallbacks(boost::mpi::communicator comm,
258 std::shared_ptr<boost::mpi::environment> mpi_env)
259 : m_comm(std::move(comm)), m_mpi_env(std::move(mpi_env)) {
260 /* Add a dummy at id 0 for loop abort. */
261 m_callback_map.add(nullptr);
262
263 for (auto &kv : static_callbacks()) {
264 m_func_ptr_to_id[kv.first] = m_callback_map.add(kv.second.get());
265 }
266 }
267
269 /* Release the clients on exit */
270 if (m_comm.rank() == 0) {
271 try {
272 abort_loop();
273 } catch (...) { // NOLINT(bugprone-empty-catch)
274 }
275 }
276 }
277
278private:
279 /**
280 * @brief Add a new callback.
281 *
282 * Add a new callback to the system. This is a collective
283 * function that must be run on all nodes.
284 *
285 * @tparam F An object with a const call operator.
286 *
287 * @param f The callback function to add.
288 * @return A handle with which the callback can be called.
289 */
290 template <typename F> auto add(F &&f) {
291 m_callbacks.emplace_back(detail::make_model(std::forward<F>(f)));
292 return m_callback_map.add(m_callbacks.back().get());
293 }
294
295public:
296 /**
297 * @brief Add a new callback.
298 *
299 * Add a new callback to the system. This is a collective
300 * function that must be run on all nodes.
301 *
302 * @param fp Pointer to the static callback function to add.
303 */
304 template <class... Args> void add(void (*fp)(Args...)) {
305 m_callbacks.emplace_back(detail::make_model(fp));
306 const int id = m_callback_map.add(m_callbacks.back().get());
307 m_func_ptr_to_id[reinterpret_cast<void (*)()>(fp)] = id;
308 }
309
310 /**
311 * @brief Add a new callback.
312 *
313 * Add a new callback to the system. This is a collective
314 * function that must be run on all nodes.
315 *
316 * @param fp Pointer to the static callback function to add.
317 */
318 template <class... Args> static void add_static(void (*fp)(Args...)) {
319 static_callbacks().emplace_back(reinterpret_cast<void (*)()>(fp),
320 detail::make_model(fp));
321 }
322
323private:
324 /**
325 * @brief Remove callback.
326 *
327 * Remove the callback id from the callback list.
328 * This is a collective call that must be run on all nodes.
329 *
330 * @param id Identifier of the callback to remove.
331 */
332 void remove(int id) {
333 std::erase_if(m_callbacks, [ptr = m_callback_map[id]](auto const &e) {
334 return e.get() == ptr;
335 });
336 m_callback_map.remove(id);
337 }
338
339private:
340 /**
341 * @brief call a callback.
342 *
343 * Call the callback id.
344 * The method can only be called on the head node
345 * and has the prerequisite that the other nodes are
346 * in the MPI loop.
347 *
348 * @param id The callback to call.
349 * @param args Arguments for the callback.
350 */
351 template <class... Args> void call(int id, Args &&...args) const {
352 if (m_comm.rank() != 0) {
353 throw std::logic_error("Callbacks can only be invoked on rank 0.");
354 }
355
356 assert(m_callback_map.find(id) != m_callback_map.end() &&
357 "m_callback_map and m_func_ptr_to_id disagree");
358
359 /* Send request to worker nodes */
360 boost::mpi::packed_oarchive oa(m_comm);
361 oa << id;
362
363 /* Pack the arguments into a packed mpi buffer. */
364 Utils::for_each([&oa](auto &&e) { oa << e; },
365 std::forward_as_tuple(std::forward<Args>(args)...));
366
367 boost::mpi::broadcast(m_comm, oa, 0);
368 }
369
370public:
371 /**
372 * @brief Call a callback on worker nodes.
373 *
374 * The callback is **not** called on the head node.
375 *
376 * This method can only be called on the head node.
377 *
378 * @param fp Pointer to the function to call.
379 * @param args Arguments for the callback.
380 */
381 template <class... Args, class... ArgRef>
382 auto call(void (*fp)(Args...), ArgRef &&...args) const ->
383 /* enable only if fp can be called with the provided arguments */
384 std::enable_if_t<std::is_void_v<decltype(fp(args...))>> {
385 const int id = m_func_ptr_to_id.at(reinterpret_cast<void (*)()>(fp));
386
387 call(id, std::forward<ArgRef>(args)...);
388 }
389
390 /**
391 * @brief Call a callback on all nodes.
392 *
393 * This calls a callback on all nodes, including the head node.
394 *
395 * This method can only be called on the head node.
396 *
397 * @param fp Pointer to the function to call.
398 * @param args Arguments for the callback.
399 */
400 template <class... Args, class... ArgRef>
401 auto call_all(void (*fp)(Args...), ArgRef &&...args) const ->
402 /* enable only if fp can be called with the provided arguments */
403 std::enable_if_t<std::is_void_v<decltype(fp(args...))>> {
404 call(fp, args...);
405 fp(args...);
406 }
407
408 /**
409 * @brief Start the MPI loop.
410 *
411 * This is the callback loop for the worker nodes. They block
412 * on the MPI call and wait for a new callback request
413 * coming from the head node.
414 * This should be run on the worker nodes and must be running
415 * so that the head node can issue call().
416 */
417 void loop() const {
418 for (;;) {
419 /* Communicate callback id and parameters */
420 boost::mpi::packed_iarchive ia(m_comm);
421 boost::mpi::broadcast(m_comm, ia, 0);
422
423 int request;
424 ia >> request;
425
426 if (request == LOOP_ABORT) {
427 break;
428 }
429 /* Call the callback */
430 m_callback_map[request]->operator()(m_comm, ia);
431 }
432 }
433
434 /**
435 * @brief Abort the MPI loop.
436 *
437 * Make the worker nodes exit the MPI loop.
438 */
439 void abort_loop() { call(LOOP_ABORT); }
440
441 /**
442 * @brief The boost mpi communicator used by this instance
443 */
444 boost::mpi::communicator const &comm() const { return m_comm; }
445
446 std::shared_ptr<boost::mpi::environment> share_mpi_env() const {
447 return m_mpi_env;
448 }
449
450private:
451 /**
452 * @brief Id for the @ref abort_loop. Has to be 0.
453 */
454 static constexpr int LOOP_ABORT = 0;
455
456 /**
457 * The MPI communicator used for the callbacks.
458 */
459 boost::mpi::communicator m_comm;
460
461 /**
462 * The MPI environment used for the callbacks.
463 */
464 std::shared_ptr<boost::mpi::environment> m_mpi_env;
465
466 /**
467 * Internal storage for the callback functions.
468 */
469 std::vector<std::unique_ptr<detail::callback_concept_t>> m_callbacks;
470
471 /**
472 * Map of ids to callbacks.
473 */
475
476 /**
477 * Mapping of function pointers to ids, so static callbacks can be
478 * called by their pointer.
479 */
480 std::unordered_map<void (*)(), int> m_func_ptr_to_id;
481};
482
483template <class... Args>
485
486/**
487 * @brief Helper class to add callbacks before main.
488 *
489 * Should not be used directly, but via @ref REGISTER_CALLBACK.
490 */
492
493public:
495
496 template <class... Args> explicit RegisterCallback(void (*cb)(Args...)) {
498 }
499};
500} /* namespace Communication */
501
502/**
503 * @brief Register a static callback without return value.
504 *
505 * This registers a function as an mpi callback.
506 * The macro should be used at global scope.
507 *
508 * @param cb A function
509 */
510#define REGISTER_CALLBACK(cb) \
511 namespace Communication { \
512 static ::Communication::RegisterCallback register_##cb(&(cb)); \
513 }
514
515#endif
Keep an enumerated list of T objects, managed by the class.
auto operator()(ArgRef &&...args) const -> std::enable_if_t< std::is_void_v< decltype(std::declval< void(*)(Args...)>()(std::forward< ArgRef >(args)...))> >
Call the callback managed by this handle.
CallbackHandle(CallbackHandle &&rhs) noexcept=default
CallbackHandle(CallbackHandle const &)=delete
CallbackHandle & operator=(CallbackHandle &&rhs) noexcept=default
CallbackHandle(std::shared_ptr< MpiCallbacks > cb, F &&f)
CallbackHandle & operator=(CallbackHandle const &)=delete
The interface of the MPI callback mechanism.
auto call_all(void(*fp)(Args...), ArgRef &&...args) const -> std::enable_if_t< std::is_void_v< decltype(fp(args...))> >
Call a callback on all nodes.
auto call(void(*fp)(Args...), ArgRef &&...args) const -> std::enable_if_t< std::is_void_v< decltype(fp(args...))> >
Call a callback on worker nodes.
MpiCallbacks(boost::mpi::communicator comm, std::shared_ptr< boost::mpi::environment > mpi_env)
void add(void(*fp)(Args...))
Add a new callback.
std::shared_ptr< boost::mpi::environment > share_mpi_env() const
boost::mpi::communicator const & comm() const
The boost mpi communicator used by this instance.
void abort_loop()
Abort the MPI loop.
static void add_static(void(*fp)(Args...))
Add a new callback.
MpiCallbacks & operator=(MpiCallbacks const &)=delete
MpiCallbacks(MpiCallbacks const &)=delete
void loop() const
Start the MPI loop.
Helper class to add callbacks before main.
RegisterCallback(void(*cb)(Args...))
Container for objects that are identified by a numeric id.
void for_each(F &&f, Tuple &t)
Definition tuple.hpp:46
static SteepestDescentParameters params
Currently active steepest descent instance.
Algorithms for tuple-like inhomogeneous containers.