Go to the documentation of this file.
47 ERROR(
"invalid number of arguments given");
60 ERROR(
"failed to add master Node object");
68 ERROR(
"failed to parse hosts file at path " << (*argv)[1] <<
69 ": result = " << (
int) hostsResult);
75 (*argv)[1] = (*argv)[0];
79 m_sock = socket(AF_INET, SOCK_DGRAM, 0);
87 struct sockaddr_in addr;
88 memset((
char *)&addr, 0,
sizeof(addr));
89 addr.sin_family = AF_INET;
90 addr.sin_addr.s_addr = htonl(INADDR_ANY);
91 addr.sin_port = htons(0);
94 int bindResult = bind(
m_sock, (
struct sockaddr *) &addr,
sizeof(addr));
123 ERROR(
"failed to send packet to nodeId " << i <<
": result = " << (
int) sendResult);
131 ERROR(
"failed to receive UDP packet for rankId = " << i <<
": result = " << (
int) recvResult);
139 ERROR(
"invalid response received: op = " <<
header->operation);
146 ERROR(
"rankId " << i <<
" failed to terminate with result = " <<
header->result);
150 NOTICE(
"rankId " << i <<
" terminated");
183 datasize =
sizeof(int);
187 datasize =
sizeof(
u8);
191 ERROR(
"unsupported datatype = " << (
int) datatype);
199 ERROR(
"data count too high: maximum is " <<
208 ERROR(
"nodeId " << dest <<
" not found");
229 ERROR(
"failed to send packet to nodeId " << dest <<
": result = " << (
int) sendResult);
250 ERROR(
"nodeId " << source <<
" not found");
266 ERROR(
"failed to send packet to nodeId " << source <<
": result = " << (
int) sendResult);
271 for (
int i = 0; i < count;)
273 Size packetSize =
sizeof(packet);
279 ERROR(
"failed to receive UDP packet for rankId = " << source <<
": result = " << (
int) recvResult);
287 ERROR(
"invalid response received: op = " <<
header->operation);
292 for (
Size j = 0; j <
header->datacount; j++, i++)
294 const u8 *data = ((
const u8 *)(
header + 1)) + j;
299 *(((
int *) buf) + i) = *(
int *)(data);
303 *(((
u8 *) buf) + i) = *data;
320 DEBUG(
"hostsfile = " << hostsfile);
322 if (
stat(hostsfile, &st) != 0)
329 if ((fp =
fopen(hostsfile,
"r")) ==
NULL)
336 char *contents =
new char[st.
st_size + 1];
356 String contentString(contents);
363 if (i.current()[0] ==
'#')
372 if (nodeLine.
count() != 3)
374 ERROR(
"invalid host format '" << *i.current() <<
"' in hosts file at " << hostsfile);
388 node->
ipAddress = inet_addr(*nodeLine[0]);
392 if (!
m_nodes.insert(idx, node))
394 ERROR(
"failed to insert Node object");
409 DEBUG(
"m_nodes[" << idx <<
"]: ip = " << *nodeLine[0] <<
", port = " << *nodeLine[1] <<
410 ", core = " << *nodeLine[2]);
421 const Size NumOfParallelStart = 32;
425 Size startIndex = 1, startCount = 0;
427 DEBUG(
"argc = " << argc);
437 for (
int i = 1; i < argc; i++)
448 NOTICE(
"cmdline = " << *cmdline);
451 while (startIndex <
m_nodes.count())
453 const Size receiveIndex = startIndex;
456 while (startIndex <
m_nodes.count() && startCount < NumOfParallelStart)
459 nodeAddr.s_addr =
m_nodes[startIndex]->ipAddress;
461 NOTICE(
"nodes[" << startIndex <<
"] = " << inet_ntoa(nodeAddr) <<
479 ERROR(
"failed to send packet to nodeId " << startIndex <<
": result = " << (
int) sendResult);
487 for (
Size i = receiveIndex; i < startIndex; i++)
495 ERROR(
"failed to receive acknowledge for MpiOpExec from nodeId " <<
496 i <<
": result = " << (
int) recvResult);
508 const Size size)
const
510 DEBUG(
"nodeId = " << nodeId <<
" size = " << size);
515 ERROR(
"nodeId " << nodeId <<
" not found");
520 struct sockaddr_in addr;
521 addr.sin_family = AF_INET;
523 addr.sin_port = htons(node->
udpPort);
527 (
const sockaddr *) &addr,
sizeof(addr));
546 ERROR(
"nodeId " << nodeId <<
" not found");
552 DEBUG(
"nodeId = " << nodeId <<
" addr = " << inet_ntoa(nodeAddr) <<
553 " operation = " << (
int) operation <<
" size = " << size);
558 Packet *pkt = i.current();
576 struct sockaddr_in addr;
578 const Size recvSize = size;
591 DEBUG(
"received " << r <<
" bytes from " << inet_ntoa(addr.sin_addr) <<
592 ":" << htons(addr.sin_port) <<
" with coreId = " << hdr->
coreId <<
593 " rankId = " << hdr->
rankId);
596 if (addr.sin_addr.s_addr == node->
ipAddress &&
597 htons(addr.sin_port) == node->
udpPort &&
603 ERROR(
"invalid MPI operation received in packet from node" << nodeId <<
604 " (" << inet_ntoa(nodeAddr) <<
"): " << (
int) hdr->
operation <<
605 " != " << (
int) operation);
616 Size otherNodeId = 0;
621 if (addr.sin_addr.s_addr ==
m_nodes[i]->ipAddress &&
622 htons(addr.sin_port) ==
m_nodes[i]->udpPort &&
630 if (otherNodeId == 0)
632 ERROR(
"nodeId not found for packet from " << inet_ntoa(addr.sin_addr) <<
633 " at port " << htons(addr.sin_port));
Index< Node, MaximumNodes > m_nodes
Contains all known nodes that participate in the computation.
off_t st_size
For regular files, the file size in bytes.
The <sys/stat.h> header shall define the stat structure.
static Size copy(void *dest, const void *src, Size count)
Copy memory from one place to another.
C int errno
The lvalue errno is used by many functions to return error values.
#define NOTICE(msg)
Output a notice message.
Result receivePacket(const Size nodeId, const MpiProxy::Operation operation, void *packet, Size &size)
Receive UDP packet from remote node.
Size length() const
Same as count().
FILE * fopen(const char *filename, const char *mode)
Open a stream.
uint MPI_Comm
Communicator identifier.
virtual Result initialize(int *argc, char ***argv)
Initialize the backend.
C int atoi(const char *nptr)
Convert a string to an integer.
Result startProcesses(int argc, char **argv)
Start remote processes.
Size count() const
Get the number of items on the list.
Implements a MPI backend for the host OS which communicates with mpiproxy servers.
uint MPI_Status
Status holder.
Defines a socket address and port pair.
virtual Result receive(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status)
Synchronous receive data.
Result sendPacket(const Size nodeId, const void *packet, const Size size) const
Send UDP packet to a remote node.
static const Size MaximumPacketSize
Maximum size of packet payload.
List< String > split(const char delimiter) const
Split the String into parts separated by a delimiter.
C int recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *addr, socklen_t addrlen)
Receive a single datagram from a socket.
u16 udpPort
< IP address of the node
#define DEBUG(msg)
Output a debug message to standard output.
char * basename(char *path)
Return the last component of a pathname.
virtual bool hasCurrent() const
Check if there is a current item on the List.
SystemDescriptorHeader header
char * strerror(int errnum)
The strerror function maps the number in errnum to a message string.
#define NULL
NULL means zero.
virtual Result send(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)
Synchronous send data.
unsigned int Size
Any sane size indicator cannot go negative.
C int sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *addr, socklen_t addrlen)
Send a single datagram to a remote host.
static T * create()
Abstract function to create an instance of T.
Describes a remote CPU node accessible via MPI.
MPI_Datatype
Named Predefined Datatypes.
int stat(const char *path, struct stat *buf)
Get file status.
Index< List< Packet * >, MaximumNodes > m_packetBuffers
Buffers incoming packets for later processing.
int m_sock
UDP socket for communicating with remote nodes.
Represents a Message Passing Interface (MPI) implementation backend.
#define ERROR(msg)
Output an error message.
virtual Result terminate()
Terminate the backend.
unsigned char u8
Unsigned 8-bit number.
void * memset(void *dest, int ch, size_t count)
Fill memory with a constant byte.
u32 coreId
< UDP port of the node
Describes data received via UDP.
size_t fread(void *ptr, size_t size, size_t nitems, FILE *stream)
Binary input.
A structure containing information about a file.
int fclose(FILE *stream)
Close a stream.
virtual Result getCommSize(MPI_Comm comm, int *size)
Retrieve communication size (total cores)
@ MPI_ERR_UNSUPPORTED_DATAREP
Operation
Encodes various MPI operations.
Result parseHostsFile(const char *hostsfile)
Parse the given hosts file.
virtual Result getCommRank(MPI_Comm comm, int *rank)
Retrieve communication rank (core id)