#include #include #include #include #include #include #include #include #include void print_error(const int line, const int error) { printf("%d\tline %d\t%s\n", getpid(), line, strerror(error)); } #define HANDLE_ERROR(result) do {if (result < 0){print_error(__LINE__, errno);return result;}} while (0); #define HANDLE_RESULT(result) do {if (result < 0){return result;}} while (0); int make_reader(const char *const name, int *const rfd, int *const wfd) { //printf("%d\tmake\t%s\n", getpid(), name); /* In case it exists */ remove(name); const int mode = O_CREAT | S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; const int fifo = mkfifo(name, mode) ? -1 : 0; HANDLE_ERROR(fifo); *rfd = open(name, O_RDONLY | O_NONBLOCK); HANDLE_ERROR(*rfd); *wfd = open(name, O_WRONLY); HANDLE_ERROR(*wfd); return 0; } int stop_reader(const char *const name, const int *const rfd, const int *const wfd) { //printf("%d\tstop\t%s\n", getpid(), name); HANDLE_ERROR(close(*wfd)); HANDLE_ERROR(close(*rfd)); HANDLE_ERROR(unlink(name)); return 0; } int wait_child(const int pid) { int status = 0; HANDLE_ERROR(waitpid(pid, &status, 0)); if (WIFEXITED(status)) { //printf("%d\tchild %d exited\n", getpid(), pid); } return 0; } int send_messages(const char *const name, const int messages) { //printf("%d\tsend\t%s\n", getpid(), name); char message[PIPE_BUF]; memset(message, ' ', sizeof message); for (int index = 0; index < messages; ++index) { const int wfd = open(name, O_WRONLY | O_NONBLOCK); HANDLE_ERROR(wfd); if (::write(wfd, message, sizeof message) == -1 && errno == EAGAIN) { //printf("%d\tsend\t%s\t[%d]\n", getpid(), name, index); HANDLE_ERROR(fcntl(wfd, F_SETFL, ~O_NONBLOCK & fcntl(wfd, F_GETFL))); HANDLE_ERROR(::write(wfd, message, sizeof message)); } HANDLE_ERROR(close(wfd)); } return 0; } int read_messages(const char *const name, const int rfd, const int messages) { //printf("%d\tread\t%s\n", getpid(), name); char message[PIPE_BUF] = {}; for (int index = 0; index < messages; ++index) { HANDLE_ERROR(fcntl(rfd, F_SETFL, O_NONBLOCK | fcntl(rfd, F_GETFL))); if (::read(rfd, message, sizeof message) == -1 && errno == EAGAIN) { //printf("%d\tread\t%s\t[%d]\n", getpid(), name, index); HANDLE_ERROR(fcntl(rfd, F_SETFL, ~O_NONBLOCK & fcntl(rfd, F_GETFL))); HANDLE_ERROR(::read(rfd, &message, sizeof(message))); } } return 0; } int main() { const int messages = 5; const int children = 25; const char *name_parent = "/tmp/pipe_parent"; int rfd, wfd; HANDLE_RESULT(make_reader(name_parent, &rfd, &wfd)); for (int lap = 1; lap < 5; ++lap) { printf("lap %d\n", lap); int first_generation_pids[children]; for (int idx = 0; idx < children; ++idx) { const int first_generation_pid = fork(); HANDLE_ERROR(first_generation_pid); if (first_generation_pid == 0) { char name_child[32]; snprintf(name_child, sizeof name_child, "/tmp/pipe_child_%d", idx); int rfd, wfd; HANDLE_RESULT(make_reader(name_child, &rfd, &wfd)); int second_generation_pids[children]; for (int idx = 0; idx < children; ++idx) { const int second_generation_pid = fork(); HANDLE_ERROR(second_generation_pid); if (second_generation_pid == 0) { return send_messages(name_child, messages); } second_generation_pids[idx] = second_generation_pid; } HANDLE_RESULT(read_messages(name_child, rfd, children * messages)); for (int idx = 0; idx < children; ++idx) { wait_child(second_generation_pids[idx]); } HANDLE_RESULT(send_messages(name_parent, messages)); HANDLE_RESULT(stop_reader(name_child, &rfd, &wfd)); return 0; } first_generation_pids[idx] = first_generation_pid; } HANDLE_RESULT(read_messages(name_parent, rfd, children * messages)); for (int idx = 0; idx < children; ++idx) { wait_child(first_generation_pids[idx]); } } HANDLE_RESULT(stop_reader(name_parent, &rfd, &wfd)); return 0; }