Block-Structured AMR Software Framework
 
Loading...
Searching...
No Matches
AMReX_MPMD.H
Go to the documentation of this file.
1#ifndef AMREX_MPMD_H_
2#define AMREX_MPMD_H_
3#include <AMReX_Config.H>
4
5#ifdef AMREX_USE_MPI
6
7#include <AMReX_FabArray.H>
8
9#include <mpi.h>
10
11namespace amrex::MPMD {
12
13void Initialize_without_split (int argc, char* argv[]);
14
15MPI_Comm Initialize (int argc, char* argv[]);
16
17void Finalize ();
18
19bool Initialized ();
20
21int MyProc ();
22int NProcs ();
23int AppNum ();
24int MyProgId ();
25
26class Copier
27{
28public:
29 explicit Copier (bool);
30
31 Copier (BoxArray const& ba, DistributionMapping const& dm,
32 bool send_ba = false);
33
34 template <typename FAB>
35 void send (FabArray<FAB> const& mf, int icomp, int ncomp) const;
36
37 template <typename FAB>
38 void recv (FabArray<FAB>& mf, int icomp, int ncomp) const;
39
40 [[nodiscard]] BoxArray const& boxArray () const;
41
42 [[nodiscard]] DistributionMapping const& DistributionMap () const;
43
44private:
45 std::uint64_t m_send_id;
46 std::uint64_t m_recv_id;
47 std::map<int,FabArrayBase::CopyComTagsContainer> m_SndTags;
48 std::map<int,FabArrayBase::CopyComTagsContainer> m_RcvTags;
52};
53
54template <typename FAB>
55void Copier::send (FabArray<FAB> const& mf, int icomp, int ncomp) const
56{
57 const auto N_snds = static_cast<int>(m_SndTags.size());
58
59 if (N_snds == 0) { return; }
60
61 // Prepare buffer
62
63 Vector<char*> send_data;
64 Vector<std::size_t> send_size;
65 Vector<int> send_rank;
66 Vector<MPI_Request> send_reqs;
68
70 std::size_t total_volume = 0;
71 for (auto const& kv : m_SndTags) {
72 auto const& cctc = kv.second;
73
74 std::size_t nbytes = 0;
75 for (auto const& cct : cctc) {
76 nbytes += cct.sbox.numPts() * ncomp * sizeof(typename FAB::value_type);
77 }
78
79 std::size_t acd = ParallelDescriptor::sizeof_selected_comm_data_type(nbytes);
80 nbytes = amrex::aligned_size(acd, nbytes); // so that bytes are aligned
81
82 // Also need to align the offset properly
83 total_volume = amrex::aligned_size(std::max(alignof(typename FAB::value_type),
84 acd), total_volume);
85
86 offset.push_back(total_volume);
87 total_volume += nbytes;
88
89 send_data.push_back(nullptr);
90 send_size.push_back(nbytes);
91 send_rank.push_back(kv.first);
92 send_reqs.push_back(MPI_REQUEST_NULL);
93 send_cctc.push_back(&cctc);
94 }
95
96 Gpu::PinnedVector<char> send_buffer(total_volume);
97 char* the_send_data = send_buffer.data();
98 for (int i = 0; i < N_snds; ++i) {
99 send_data[i] = the_send_data + offset[i];
100 }
101
102 // Pack buffer
103#ifdef AMREX_USE_GPU
104 if (Gpu::inLaunchRegion() && (mf.arena()->isDevice() || mf.arena()->isManaged())) {
105 mf.pack_send_buffer_gpu(mf, icomp, ncomp, send_data, send_size, send_cctc, m_send_id);
106 } else
107#endif
108 {
109 mf.pack_send_buffer_cpu(mf, icomp, ncomp, send_data, send_size, send_cctc);
110 }
111
112 // Send
113 for (int i = 0; i < N_snds; ++i) {
114 send_reqs[i] = ParallelDescriptor::Asend
115 (send_data[i], send_size[i], send_rank[i], 100, MPI_COMM_WORLD).req();
116 }
117 Vector<MPI_Status> stats(N_snds);
118 ParallelDescriptor::Waitall(send_reqs, stats);
119}
120
121template <typename FAB>
122void Copier::recv (FabArray<FAB>& mf, int icomp, int ncomp) const
123{
124 const auto N_rcvs = static_cast<int>(m_RcvTags.size());
125
126 if (N_rcvs == 0) { return; }
127
128 // Prepare buffer
129
130 Vector<char*> recv_data;
131 Vector<std::size_t> recv_size;
132 Vector<int> recv_from;
133 Vector<MPI_Request> recv_reqs;
134
136 std::size_t TotalRcvsVolume = 0;
137 for (auto const& kv : m_RcvTags) {
138 std::size_t nbytes = 0;
139 for (auto const& cct : kv.second) {
140 nbytes += cct.dbox.numPts() * ncomp * sizeof(typename FAB::value_type);
141 }
142
143 std::size_t acd = ParallelDescriptor::sizeof_selected_comm_data_type(nbytes);
144 nbytes = amrex::aligned_size(acd, nbytes); // so that nbytes are aligned
145
146 // Also need to align the offset properly
147 TotalRcvsVolume = amrex::aligned_size(std::max(alignof(typename FAB::value_type),
148 acd), TotalRcvsVolume);
149
150 offset.push_back(TotalRcvsVolume);
151 TotalRcvsVolume += nbytes;
152
153 recv_data.push_back(nullptr);
154 recv_size.push_back(nbytes);
155 recv_from.push_back(kv.first);
156 recv_reqs.push_back(MPI_REQUEST_NULL);
157 }
158
159 Gpu::PinnedVector<char> recv_buffer(TotalRcvsVolume);
160 char* the_recv_data = recv_buffer.data();
161
162 // Recv
163 for (int i = 0; i < N_rcvs; ++i) {
164 recv_data[i] = the_recv_data + offset[i];
165 recv_reqs[i] = ParallelDescriptor::Arecv
166 (recv_data[i], recv_size[i], recv_from[i], 100, MPI_COMM_WORLD).req();
167 }
168
169 Vector<FabArrayBase::CopyComTagsContainer const*> recv_cctc(N_rcvs, nullptr);
170 for (int i = 0; i < N_rcvs; ++i) {
171 recv_cctc[i] = &(m_RcvTags.at(recv_from[i]));
172 }
173
174 Vector<MPI_Status> stats(N_rcvs);
175 ParallelDescriptor::Waitall(recv_reqs, stats);
176
177 // Unpack buffer
178#ifdef AMREX_USE_GPU
179 if (Gpu::inLaunchRegion() && (mf.arena()->isDevice() || mf.arena()->isManaged())) {
180 mf.unpack_recv_buffer_gpu(mf, icomp, ncomp, recv_data, recv_size, recv_cctc,
182 } else
183#endif
184 {
185 mf.unpack_recv_buffer_cpu(mf, icomp, ncomp, recv_data, recv_size, recv_cctc,
187 }
188}
189
190
191}
192
193#endif
194#endif
Array4< int const > offset
Definition AMReX_HypreMLABecLap.cpp:1089
virtual bool isManaged() const
Definition AMReX_Arena.cpp:89
virtual bool isDevice() const
Definition AMReX_Arena.cpp:101
A collection of Boxes stored in an Array.
Definition AMReX_BoxArray.H:551
Calculates the distribution of FABs to MPI processes.
Definition AMReX_DistributionMapping.H:41
@ COPY
Definition AMReX_FabArrayBase.H:394
An Array of FortranArrayBox(FAB)-like Objects.
Definition AMReX_FabArray.H:345
static void unpack_recv_buffer_gpu(FabArray< FAB > &dst, int dcomp, int ncomp, Vector< char * > const &recv_data, Vector< std::size_t > const &recv_size, Vector< const CopyComTagsContainer * > const &recv_cctc, CpOp op, bool is_thread_safe, std::uint64_t id, bool deterministic)
static void unpack_recv_buffer_cpu(FabArray< FAB > &dst, int dcomp, int ncomp, Vector< char * > const &recv_data, Vector< std::size_t > const &recv_size, Vector< const CopyComTagsContainer * > const &recv_cctc, CpOp op, bool is_thread_safe)
Arena * arena() const noexcept
Definition AMReX_FabArray.H:446
static void pack_send_buffer_gpu(FabArray< FAB > const &src, int scomp, int ncomp, Vector< char * > const &send_data, Vector< std::size_t > const &send_size, Vector< const CopyComTagsContainer * > const &send_cctc, std::uint64_t id)
static void pack_send_buffer_cpu(FabArray< FAB > const &src, int scomp, int ncomp, Vector< char * > const &send_data, Vector< std::size_t > const &send_size, Vector< const CopyComTagsContainer * > const &send_cctc)
Program ID.
Definition AMReX_MPMD.H:27
bool m_is_thread_safe
Definition AMReX_MPMD.H:49
std::uint64_t m_recv_id
Definition AMReX_MPMD.H:46
DistributionMapping m_dm
Definition AMReX_MPMD.H:51
DistributionMapping const & DistributionMap() const
Definition AMReX_MPMD.cpp:374
void recv(FabArray< FAB > &mf, int icomp, int ncomp) const
Definition AMReX_MPMD.H:122
std::uint64_t m_send_id
Definition AMReX_MPMD.H:45
BoxArray m_ba
Definition AMReX_MPMD.H:50
BoxArray const & boxArray() const
Definition AMReX_MPMD.cpp:369
std::map< int, FabArrayBase::CopyComTagsContainer > m_SndTags
Definition AMReX_MPMD.H:47
std::map< int, FabArrayBase::CopyComTagsContainer > m_RcvTags
Definition AMReX_MPMD.H:48
void send(FabArray< FAB > const &mf, int icomp, int ncomp) const
Definition AMReX_MPMD.H:55
Definition AMReX_PODVector.H:297
T * data() noexcept
Definition AMReX_PODVector.H:655
MPI_Request req() const
Definition AMReX_ParallelDescriptor.H:74
This class is a thin wrapper around std::vector. Unlike vector, Vector::operator[] provides bound che...
Definition AMReX_Vector.H:28
bool inLaunchRegion() noexcept
Definition AMReX_GpuControl.H:92
Definition AMReX_MPMD.cpp:12
void Initialize_without_split(int argc, char *argv[])
Definition AMReX_MPMD.cpp:44
bool Initialized()
Definition AMReX_MPMD.cpp:115
int NProcs()
Process ID in MPI_COMM_WORLD.
Definition AMReX_MPMD.cpp:122
void Finalize()
Definition AMReX_MPMD.cpp:105
int MyProc()
Definition AMReX_MPMD.cpp:117
int MyProgId()
Get the appnum (color) required for MPI_Comm_split.
Definition AMReX_MPMD.cpp:137
int AppNum()
Number of processes in MPI_COMM_WORLD.
Definition AMReX_MPMD.cpp:132
MPI_Comm Initialize(int argc, char *argv[])
Definition AMReX_MPMD.cpp:97
Message Asend(const T *, size_t n, int pid, int tag)
Definition AMReX_ParallelDescriptor.H:1088
void Waitall(Vector< MPI_Request > &, Vector< MPI_Status > &)
Definition AMReX_ParallelDescriptor.cpp:1304
Message Arecv(T *, size_t n, int pid, int tag)
Definition AMReX_ParallelDescriptor.H:1130
int MPI_Comm
Definition AMReX_ccse-mpi.H:51
static constexpr int MPI_COMM_WORLD
Definition AMReX_ccse-mpi.H:58
static constexpr int MPI_REQUEST_NULL
Definition AMReX_ccse-mpi.H:57
std::size_t aligned_size(std::size_t align_requirement, std::size_t size) noexcept
Given a minimum required size of size bytes, this returns the next largest arena size that will align...
Definition AMReX_Arena.H:30