Block-Structured AMR Software Framework
AMReX_MPMD.H
Go to the documentation of this file.
1 #ifndef AMREX_MPMD_H_
2 #define AMREX_MPMD_H_
3 #include <AMReX_Config.H>
4 
5 #ifdef AMREX_USE_MPI
6 
7 #include <AMReX_FabArray.H>
8 
9 #include <mpi.h>
10 
11 namespace amrex::MPMD {
12 
13 void Initialize_without_split (int argc, char* argv[]);
14 
15 MPI_Comm Initialize (int argc, char* argv[]);
16 
17 void Finalize ();
18 
19 bool Initialized ();
20 
21 int MyProc ();
22 int NProcs ();
23 int AppNum ();
24 int MyProgId ();
25 
26 class Copier
27 {
28 public:
29  explicit Copier (bool);
30 
31  Copier (BoxArray const& ba, DistributionMapping const& dm,
32  bool send_ba = false);
33 
34  template <typename FAB>
35  void send (FabArray<FAB> const& mf, int icomp, int ncomp) const;
36 
37  template <typename FAB>
38  void recv (FabArray<FAB>& mf, int icomp, int ncomp) const;
39 
40  [[nodiscard]] BoxArray const& boxArray () const;
41 
42  [[nodiscard]] DistributionMapping const& DistributionMap () const;
43 
44 private:
45  std::map<int,FabArrayBase::CopyComTagsContainer> m_SndTags;
46  std::map<int,FabArrayBase::CopyComTagsContainer> m_RcvTags;
50 };
51 
52 template <typename FAB>
53 void Copier::send (FabArray<FAB> const& mf, int icomp, int ncomp) const
54 {
55  const auto N_snds = static_cast<int>(m_SndTags.size());
56 
57  if (N_snds == 0) { return; }
58 
59  // Prepare buffer
60 
61  Vector<char*> send_data;
62  Vector<std::size_t> send_size;
63  Vector<int> send_rank;
64  Vector<MPI_Request> send_reqs;
66 
68  std::size_t total_volume = 0;
69  for (auto const& kv : m_SndTags) {
70  auto const& cctc = kv.second;
71 
72  std::size_t nbytes = 0;
73  for (auto const& cct : cctc) {
74  nbytes += cct.sbox.numPts() * ncomp * sizeof(typename FAB::value_type);
75  }
76 
77  std::size_t acd = ParallelDescriptor::sizeof_selected_comm_data_type(nbytes);
78  nbytes = amrex::aligned_size(acd, nbytes); // so that bytes are aligned
79 
80  // Also need to align the offset properly
81  total_volume = amrex::aligned_size(std::max(alignof(typename FAB::value_type),
82  acd), total_volume);
83 
84  offset.push_back(total_volume);
85  total_volume += nbytes;
86 
87  send_data.push_back(nullptr);
88  send_size.push_back(nbytes);
89  send_rank.push_back(kv.first);
90  send_reqs.push_back(MPI_REQUEST_NULL);
91  send_cctc.push_back(&cctc);
92  }
93 
94  Gpu::PinnedVector<char> send_buffer(total_volume);
95  char* the_send_data = send_buffer.data();
96  for (int i = 0; i < N_snds; ++i) {
97  send_data[i] = the_send_data + offset[i];
98  }
99 
100  // Pack buffer
101 #ifdef AMREX_USE_GPU
102  if (Gpu::inLaunchRegion() && (mf.arena()->isDevice() || mf.arena()->isManaged())) {
103  mf.pack_send_buffer_gpu(mf, icomp, ncomp, send_data, send_size, send_cctc);
104  } else
105 #endif
106  {
107  mf.pack_send_buffer_cpu(mf, icomp, ncomp, send_data, send_size, send_cctc);
108  }
109 
110  // Send
111  for (int i = 0; i < N_snds; ++i) {
112  send_reqs[i] = ParallelDescriptor::Asend
113  (send_data[i], send_size[i], send_rank[i], 100, MPI_COMM_WORLD).req();
114  }
115  Vector<MPI_Status> stats(N_snds);
116  ParallelDescriptor::Waitall(send_reqs, stats);
117 }
118 
119 template <typename FAB>
120 void Copier::recv (FabArray<FAB>& mf, int icomp, int ncomp) const
121 {
122  const auto N_rcvs = static_cast<int>(m_RcvTags.size());
123 
124  if (N_rcvs == 0) { return; }
125 
126  // Prepare buffer
127 
128  Vector<char*> recv_data;
129  Vector<std::size_t> recv_size;
130  Vector<int> recv_from;
131  Vector<MPI_Request> recv_reqs;
132 
134  std::size_t TotalRcvsVolume = 0;
135  for (auto const& kv : m_RcvTags) {
136  std::size_t nbytes = 0;
137  for (auto const& cct : kv.second) {
138  nbytes += cct.dbox.numPts() * ncomp * sizeof(typename FAB::value_type);
139  }
140 
141  std::size_t acd = ParallelDescriptor::sizeof_selected_comm_data_type(nbytes);
142  nbytes = amrex::aligned_size(acd, nbytes); // so that nbytes are aligned
143 
144  // Also need to align the offset properly
145  TotalRcvsVolume = amrex::aligned_size(std::max(alignof(typename FAB::value_type),
146  acd), TotalRcvsVolume);
147 
148  offset.push_back(TotalRcvsVolume);
149  TotalRcvsVolume += nbytes;
150 
151  recv_data.push_back(nullptr);
152  recv_size.push_back(nbytes);
153  recv_from.push_back(kv.first);
154  recv_reqs.push_back(MPI_REQUEST_NULL);
155  }
156 
157  Gpu::PinnedVector<char> recv_buffer(TotalRcvsVolume);
158  char* the_recv_data = recv_buffer.data();
159 
160  // Recv
161  for (int i = 0; i < N_rcvs; ++i) {
162  recv_data[i] = the_recv_data + offset[i];
163  recv_reqs[i] = ParallelDescriptor::Arecv
164  (recv_data[i], recv_size[i], recv_from[i], 100, MPI_COMM_WORLD).req();
165  }
166 
167  Vector<FabArrayBase::CopyComTagsContainer const*> recv_cctc(N_rcvs, nullptr);
168  for (int i = 0; i < N_rcvs; ++i) {
169  recv_cctc[i] = &(m_RcvTags.at(recv_from[i]));
170  }
171 
172  Vector<MPI_Status> stats(N_rcvs);
173  ParallelDescriptor::Waitall(recv_reqs, stats);
174 
175  // Unpack buffer
176 #ifdef AMREX_USE_GPU
177  if (Gpu::inLaunchRegion() && (mf.arena()->isDevice() || mf.arena()->isManaged())) {
178  mf.unpack_recv_buffer_gpu(mf, icomp, ncomp, recv_data, recv_size, recv_cctc,
180  } else
181 #endif
182  {
183  mf.unpack_recv_buffer_cpu(mf, icomp, ncomp, recv_data, recv_size, recv_cctc,
185  }
186 }
187 
188 
189 }
190 
191 #endif
192 #endif
Array4< int const > offset
Definition: AMReX_HypreMLABecLap.cpp:1089
int MPI_Comm
Definition: AMReX_ccse-mpi.H:47
static constexpr int MPI_COMM_WORLD
Definition: AMReX_ccse-mpi.H:54
static constexpr int MPI_REQUEST_NULL
Definition: AMReX_ccse-mpi.H:53
virtual bool isManaged() const
Definition: AMReX_Arena.cpp:79
virtual bool isDevice() const
Definition: AMReX_Arena.cpp:91
A collection of Boxes stored in an Array.
Definition: AMReX_BoxArray.H:550
Calculates the distribution of FABs to MPI processes.
Definition: AMReX_DistributionMapping.H:41
@ COPY
Definition: AMReX_FabArrayBase.H:393
An Array of FortranArrayBox(FAB)-like Objects.
Definition: AMReX_FabArray.H:344
static void pack_send_buffer_gpu(FabArray< FAB > const &src, int scomp, int ncomp, Vector< char * > const &send_data, Vector< std::size_t > const &send_size, Vector< const CopyComTagsContainer * > const &send_cctc)
static void unpack_recv_buffer_cpu(FabArray< FAB > &dst, int dcomp, int ncomp, Vector< char * > const &recv_data, Vector< std::size_t > const &recv_size, Vector< const CopyComTagsContainer * > const &recv_cctc, CpOp op, bool is_thread_safe)
static void unpack_recv_buffer_gpu(FabArray< FAB > &dst, int dcomp, int ncomp, Vector< char * > const &recv_data, Vector< std::size_t > const &recv_size, Vector< const CopyComTagsContainer * > const &recv_cctc, CpOp op, bool is_thread_safe)
Arena * arena() const noexcept
Definition: AMReX_FabArray.H:445
static void pack_send_buffer_cpu(FabArray< FAB > const &src, int scomp, int ncomp, Vector< char * > const &send_data, Vector< std::size_t > const &send_size, Vector< const CopyComTagsContainer * > const &send_cctc)
Program ID.
Definition: AMReX_MPMD.H:27
bool m_is_thread_safe
Definition: AMReX_MPMD.H:47
Copier(bool)
Definition: AMReX_MPMD.cpp:279
DistributionMapping m_dm
Definition: AMReX_MPMD.H:49
DistributionMapping const & DistributionMap() const
Definition: AMReX_MPMD.cpp:370
void recv(FabArray< FAB > &mf, int icomp, int ncomp) const
Definition: AMReX_MPMD.H:120
BoxArray m_ba
Definition: AMReX_MPMD.H:48
BoxArray const & boxArray() const
Definition: AMReX_MPMD.cpp:365
std::map< int, FabArrayBase::CopyComTagsContainer > m_SndTags
Definition: AMReX_MPMD.H:45
std::map< int, FabArrayBase::CopyComTagsContainer > m_RcvTags
Definition: AMReX_MPMD.H:46
void send(FabArray< FAB > const &mf, int icomp, int ncomp) const
Definition: AMReX_MPMD.H:53
Definition: AMReX_PODVector.H:246
T * data() noexcept
Definition: AMReX_PODVector.H:593
MPI_Request req() const
Definition: AMReX_ParallelDescriptor.H:74
bool inLaunchRegion() noexcept
Definition: AMReX_GpuControl.H:86
Definition: AMReX_MPMD.cpp:12
void Initialize_without_split(int argc, char *argv[])
Definition: AMReX_MPMD.cpp:44
bool Initialized()
Definition: AMReX_MPMD.cpp:115
int NProcs()
Process ID in MPI_COMM_WORLD.
Definition: AMReX_MPMD.cpp:122
void Finalize()
Definition: AMReX_MPMD.cpp:105
int MyProc()
Definition: AMReX_MPMD.cpp:117
int MyProgId()
Get the appnum (color) required for MPI_Comm_split.
Definition: AMReX_MPMD.cpp:137
int AppNum()
Number of processes in MPI_COMM_WORLD.
Definition: AMReX_MPMD.cpp:132
MPI_Comm Initialize(int argc, char *argv[])
Definition: AMReX_MPMD.cpp:97
Message Asend(const T *, size_t n, int pid, int tag)
Definition: AMReX_ParallelDescriptor.H:1088
void Waitall(Vector< MPI_Request > &, Vector< MPI_Status > &)
Definition: AMReX_ParallelDescriptor.cpp:1295
Message Arecv(T *, size_t n, int pid, int tag)
Definition: AMReX_ParallelDescriptor.H:1130
@ max
Definition: AMReX_ParallelReduce.H:17
std::size_t aligned_size(std::size_t align_requirement, std::size_t size) noexcept
Given a minimum required size of size bytes, this returns the next largest arena size that will align...
Definition: AMReX_Arena.H:30