Go to the documentation of this file.
18 #include <FreeNOS/System.h>
38 #pragma clang optimize off
39 #pragma GCC push_options
40 #pragma GCC optimize ("O0")
75 ERROR(
"failed to allocate NetworkClient");
83 ERROR(
"failed to initialize network client for device "
84 << device <<
": result = " << (
int) result);
92 ERROR(
"failed to create UDP socket on device " << device <<
93 ": result = " << (
int) result);
101 ERROR(
"failed to bind socket to UDP port " <<
UdpPort <<
102 " on device " << device <<
": result = " << (
int) result);
116 Size packetSize =
sizeof(packet);
125 ERROR(
"failed to receive UDP packet: result = " << (
int) recvResult);
134 ERROR(
"failed to process UDP packet: result = " << (
int) procResult);
146 DEBUG(
"size = " << size);
166 DEBUG(
"count = " << count);
208 &addr,
sizeof(addr));
217 " at port " << addr.
port);
228 DEBUG(
"hdr->operation = " << hdr->
operation <<
" size = " << size);
258 const u8 *buf = (packet +
sizeof(*header));
260 NOTICE(
"rankId = " <<
header->rankId <<
" datatype = " <<
261 header->datatype <<
" datacount = " <<
header->datacount);
265 ERROR(
"rankId " <<
header->rankId <<
" not found");
274 msg.
integer = *(((
int *) buf) + i);
278 msg.
uchar = *(((
u8 *) buf) + i);
283 ERROR(
"unsupported datatype = " <<
header->datatype);
303 Size packetCount = 0;
307 NOTICE("rankId = " << header->rankId << " datatype = " <<
308 header->datatype << " datacount = " << header->datacount);
310 if (!(ch = m_readChannels.get(header->rankId)))
312 ERROR("rankId " << header->rankId << " not found");
317 for (Size i = 0; i < header->datacount;)
320 Header *hdr = (Header *) pkts[packetCount];
321 u8 *buf = (u8 *)(hdr + 1);
322 Size pktSize = sizeof(Header);
325 hdr->operation = MpiOpRecv;
326 hdr->result = MPI_SUCCESS;
327 hdr->coreId = header->coreId;
328 hdr->rankId = header->rankId;
329 hdr->datatype = header->datatype;
332 while (pktSize < MaximumPacketSize && i < header->datacount)
334 while (ch->read(&msg) != Channel::Success)
342 *(((
int *) buf) + hdr->datacount) = msg.integer;
343 pktSize +=
sizeof(
int);
347 *(((
u8 *) buf) + hdr->datacount) = msg.uchar;
348 pktSize +=
sizeof(
u8);
353 ERROR(
"unsupported datatype = " <<
header->datatype);
364 vec[packetCount].iov_base = (
void *) hdr;
365 vec[packetCount].iov_len = pktSize;
371 const Result sendResult = udpSendMultiple(vec, packetCount, addr);
374 ERROR(
"failed to send multiple UDP packets: result = " << (
int) sendResult);
396 DEBUG(
"exec: cmd = '" << cmd <<
"' rankId = " <<
header->rankId <<
397 " coreId = " <<
header->coreId <<
" coreCount = " <<
header->coreCount);
402 ERROR(
"failed to create MPI communication channels for rankId = " <<
header->rankId <<
403 " result = " << (
int) chanResult);
423 Size pktSize =
sizeof(*hdr);
428 ERROR(
"failed to send UDP packet: result = " << (
int) sendResult);
444 NOTICE(
"size = " << size);
472 ERROR(
"failed to release memory of communication channels: result = " << (
int) releaseResult);
489 ERROR(
"failed to send UDP packet: result = " << (
int) sendResult);
497 const Size coreCount)
499 DEBUG(
"rankId = " << rankId <<
" coreCount = " << coreCount);
512 ERROR(
"failed to allocate MemoryChannel: result = " << (
int) vmResult);
527 ERROR(
"failed to allocate consumer MemoryChannel for rankId = " << rankId);
538 ERROR(
"failed to unmap read MemoryChannel: result = " << (
int) unmapResult);
546 NOTICE(
"readChannel: rank" << rankId <<
": data = " << (
void *) readMemoryBase <<
547 " feedback = " << (
void *) (readMemoryBase +
PAGESIZE));
555 ERROR(
"failed to allocate producer MemoryChannel for rankId = " << rankId);
566 ERROR(
"failed to unmap write MemoryChannel: result = " << (
int) unmapResult);
574 NOTICE(
"writeChannel: rank" << rankId <<
": data = " << (
void *) writeMemoryBase <<
575 " feedback = " << (
void *) (writeMemoryBase +
PAGESIZE));
582 const Size coreCount)
584 DEBUG(
"command = '" << command <<
"' rankId = " << rankId <<
585 " coreCount = " << coreCount);
591 programPath <<
"/bin/" << *programArgs[0];
595 programCmd << programPath <<
" --slave " <<
604 programCmd <<
" " << (*it.
current());
607 NOTICE(
"programCmd = '" << *programCmd <<
"'");
611 char **argv =
new char*[fullProgramArgs.
count() + 1];
616 argv[argc++] = *i.current();
618 NOTICE(
"argv[" << (argc-1) <<
"] = " << argv[argc-1]);
630 ERROR(
"failed to start program on local core: result = " << (
int) execResult);
645 const Size coreCount)
647 DEBUG(
"coreId = " <<
coreId <<
" command = '" << command <<
648 "' rankId = " << rankId <<
" coreCount = " << coreCount);
655 programPath <<
"/bin/" << *programArgs[0];
662 ERROR(
"failed to read program at path '" << *programPath <<
663 "': result = " << (
int) readResult);
672 ERROR(
"failed to initialize LZ4 decompressor: result = " << (
int) lz4Result);
678 uncompProgRange.
virt = 0;
679 uncompProgRange.
phys = 0;
685 ERROR(
"failed to allocate program buffer: result = " << (
int) vmResult);
689 u8 *programBuffer = (
u8 *) uncompProgRange.
virt;
696 ERROR(
"failed to decompress program buffer: result = " << (
int) decompResult);
702 programCmd << programPath <<
" --slave " <<
711 programCmd <<
" " << (*i.
current());
714 DEBUG(
"programCmd = '" << *programCmd <<
"'");
721 ERROR(
"failed to create process on core" <<
coreId <<
": result = " << (
int) result);
729 ERROR(
"failed to release memory of uncompressed program: result = " << (
int) releaseResult);
const ProcessID getPid() const
Retrieve Process Identifier of the program.
const char * get(const char *name) const
Get argument by name.
Result udpSend(const void *packet, const Size size, const struct sockaddr &addr) const
Send UDP packet.
CoreClient provides a simple interface to a CoreServer.
Provides a buffered abstract interface to a file.
static Size copy(void *dest, const void *src, Size count)
Copy memory from one place to another.
NetworkClient * m_client
Networking client object.
virtual bool insertAt(const Size position, T *item)
Inserts the given item at the given position.
Result
Enumeration of generic kernel API result codes.
Result initialize()
Perform initialization.
Result createSocket(const SocketType type, int *socket)
Create new socket.
C int errno
The lvalue errno is used by many functions to return error values.
#define NOTICE(msg)
Output a notice message.
static void * set(void *dest, int ch, unsigned count)
Fill memory with a constant byte.
Result processRecv(const Header *header, const u8 *packet, const Size size, const struct sockaddr &addr)
Process MPI recv request.
Result setPhysical(const Address data, const Address feedback, const bool hardReset=true)
Set memory pages by physical address.
int m_sock
IP/UDP socket for external communication.
u64 getUncompressedSize() const
Get size of the uncompressed data.
#define PAGESIZE
ARM uses 4K pages.
Result processRequest(const u8 *packet, const Size size, const struct sockaddr &addr)
Process incoming packet.
static const Size MaxPackets
Maximum number of packets available.
Result initialize()
Initialize the decompressor.
POSIX-compatible application.
unsigned long Address
A memory address.
Size count() const
Get the number of items on the list.
API::Result VMCtl(const ProcessID procID, const MemoryOperation op, Memory::Range *range=ZERO)
Prototype for user applications.
Result processTerminate(const Header *header, const u8 *packet, const Size size, const struct sockaddr &addr)
Process MPI terminate request.
Result udpReceive(void *packet, Size &size, struct sockaddr &addr) const
Receive UDP packet.
const ArgumentContainer & arguments() const
Get program arguments.
void setDescription(const String &desc)
Set program description.
API::Result ProcessCtl(const ProcessID proc, const ProcessOperation op, const Address addr=0, const Address output=0)
Prototype for user applications.
Core::Result createProcess(const Size coreId, const Address programAddr, const Size programSize, const char *programCmd) const
Create a new process on a different core.
Result startRemoteProcess(const Size coreId, const char *command, const Size rankId, const Size coreCount)
Start a process on a secondary processor.
Networking packet queue implementation.
Defines a socket address and port pair.
virtual const T & current() const
Get current item in the List.
virtual void fill(T value)
Fill the Sequence with the given value.
static const Size MaximumPacketSize
Maximum size of packet payload.
Array< ProcessID, MaximumChannels > m_pids
Records the PID of each process participating in the computation.
const void * buffer() const
Get file buffer.
List< String > split(const char delimiter) const
Split the String into parts separated by a delimiter.
C int recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *addr, socklen_t addrlen)
Receive a single datagram from a socket.
Result read()
Read the file (buffered)
MpiProxy(int argc, char **argv)
Constructor.
#define DEBUG(msg)
Output a debug message to standard output.
static const Size MaximumLength
Maximum length of a filesystem path in bytes.
Address phys
Physical address.
virtual bool hasCurrent() const
Check if there is a current item on the List.
Describes one or more datagrams.
SystemDescriptorHeader header
char * strerror(int errnum)
The strerror function maps the number in errnum to a message string.
Result createChannels(const Size rankId, const Size coreCount)
Create communication channels.
#define NULL
NULL means zero.
static const Size ReceiveTimeoutMs
Timeout in milliseconds to wait for packet receive.
unsigned int Size
Any sane size indicator cannot go negative.
C int sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *addr, socklen_t addrlen)
Send a single datagram to a remote host.
Result exec()
Runs the external program.
Index< MemoryChannel, MaximumChannels > m_readChannels
Stores all channels for receiving data from processes.
virtual Result write(const void *buffer)
Write a message.
Unidirectional point-to-point channel using shared memory.
Networking Client implementation.
virtual T * get(const Size position) const
Returns the item at the given position.
Result startLocalProcess(const char *command, const Size rankId, const Size coreCount)
Start a process on the local processor.
Input/Output vector for multi-packet operations.
Result bindSocket(const int sock, const IPV4::Address addr=0, const u16 port=0)
Bind socket to address/port.
virtual ~MpiProxy()
Destructor.
Result
Result code for Actions.
Result registerPositional(const char *name, const char *description, Size count=1)
Register a positional argument.
Result processExec(const Header *header, const u8 *packet, const Size size, const struct sockaddr &addr)
Process execute request.
Helper class to launch an external program.
static const String toString(const Address address)
Convert address to string.
ArgumentParser & parser()
Get program arguments parser.
const Size size() const
Get file size.
#define assert(exp)
Insert program diagnostics.
Result read(void *buffer, const Size size) const
Reads compressed data.
#define ERROR(msg)
Output an error message.
Result processSend(const Header *header, const u8 *packet, const Size size)
Process MPI send request.
Result
Result code for filesystem Actions.
unsigned char u8
Unsigned 8-bit number.
virtual Result exec()
Run the server.
Decompress data using the LZ4 algorithm created by Yann Collet.
Result unmap()
Unmap memory pages from virtual address space.
static const Size PayloadBufferSize
Size of payload memory buffer.
Address virt
Virtual address.
pid_t waitpid(pid_t pid, int *stat_loc, int options)
Wait for a child process to stop or terminate.
Result waitSocket(const NetworkClient::SocketType type, const int sock, const Size msecTimeout)
Wait until the given socket has data to receive.
Size size
Size in number of bytes.
Access access
Page access flags.
static const u16 UdpPort
Port number for IP/UDP traffic.
Memory::Range m_memChannelBase
Memory base address for local MPI communication.
virtual Size size() const
Returns the maximum size of this Array.
C int sendmsg(int sockfd, const struct msghdr *msg, int flags)
Send multiple datagrams to a remote host.
virtual Result initialize()
Initialize the server.
Index< MemoryChannel, MaximumChannels > m_writeChannels
Stores all channels for sending data to processes.
Result udpSendMultiple(const struct iovec *vec, const Size count, const struct sockaddr &addr) const
Send multiple UDP packets.
virtual bool insert(Size position, const T &item)
Puts the given item at the given position.