// master generates tasks and place them in a queue // slave executes tasks (and sends the result back to the consumer) #include #include #include #include #include #include #include #include #include #include #include void handle_arguments(int argc, char *argv[]) { if (!(argc == 2 || argc == 4)) { fprintf(stderr, "Usage: %s #_OF_TASKS [#_OF_PRODUCERS] [#_OF_CONSUMERS]\n", argv[0]); exit(EXIT_FAILURE); } if (atoi(argv[1]) < 1) { fprintf(stderr, "I need at least one task!\n"); exit(EXIT_FAILURE); } // if (atoi(argv[1]) > MAXMSG) { // fprintf(stderr, "I can handle up to %d tasks!\n", MAXMSG); // exit(EXIT_FAILURE); // } if (argc == 4) { if (atoi(argv[2]) < 1) { fprintf(stderr, "I need at least one producer!\n"); exit(EXIT_FAILURE); } if (atoi(argv[3]) < 1) { fprintf(stderr, "I need at least one consumer!\n"); exit(EXIT_FAILURE); } } if (atoi(argv[2]) > 32) { fprintf(stderr, "I am lazy so I can create up to 32 producers!\n"); exit(EXIT_FAILURE); } if (atoi(argv[3]) > 32) { fprintf(stderr, "I am lazy so I can create up to 32 consumers!\n"); exit(EXIT_FAILURE); } } struct Task { pid_t prod_pid; int sqc; int a; int b; int result; }; struct Task generate_task(int sqc) { struct Task task; task.prod_pid = getpid(); task.sqc = sqc; task.a = random()/2; task.b = random()/2; task.result = 0; return task; }; void print_task(struct Task task) { printf("====v TASK v====\n"); printf("Producer PID: %d\n", task.prod_pid); printf("Sequence no: %d\n", task.sqc); printf("A: %d\n", task.a); printf("B: %d\n", task.b); if (task.result != 0) { // Cloud replace it with bool solved in Task printf("A + B = %d\n", task.result); } printf("====^ TASK ^====\n"); } void producer(int n_tasks, int prio) { printf("A producer has been called with PID of %d\n", getpid()); // printf("The queue was created with fd = %d\n", queue); mqd_t queue = mq_open("/tasks", O_WRONLY); if (queue == (mqd_t) -1) { perror("mq_open: "); exit(EXIT_FAILURE); } struct Task tasks[n_tasks]; for (int i = 0; i < n_tasks; ++i) { tasks[i] = generate_task(i); // print_task(tasks[i]); if (mq_send(queue, (const char *)&tasks[i], sizeof(struct Task), prio)) { perror("mq_send: "); exit(EXIT_FAILURE); } printf("Task %d of %d sent...\n", i + 1, n_tasks); } mqd_t results_queue; do { char pid_s[10]; sprintf(pid_s, "%d", getpid()); char results_name[30] = "/results"; strcat(results_name, pid_s); results_queue = mq_open(results_name, O_RDONLY); if (results_queue == (mqd_t) -1) { if (errno != ENOENT) { perror("mq_open('/results'): "); exit(EXIT_FAILURE); } } } while(results_queue == (mqd_t) -1); struct Task tmp; for (int i = 0; i < n_tasks; ++i) { int received_bytes = mq_receive(results_queue, (char *) &tmp, sizeof(struct Task), NULL); if (received_bytes < 0) { perror("producer mq_receive('/results')"); exit(EXIT_FAILURE); } tasks[tmp.sqc] = tmp; printf("%d received %d of %d results...\n", tmp.prod_pid, i + 1, n_tasks); } for (int i = 0; i < n_tasks; ++i) { print_task(tasks[i]); } mq_unlink("/test"); } int solve_difficult_problem(int a, int b) { sleep(random()%3 + 1); return a + b; } void consumer() { printf("A consumer has been called with PID of %d\n", getpid()); mqd_t queue; do { queue = mq_open("/tasks", O_RDONLY); if (queue == (mqd_t) -1) { if (errno != ENOENT) { perror("mq_open('/tasks'): "); exit(EXIT_FAILURE); } } } while(queue == (mqd_t) -1); struct Task task; struct timespec interval; interval.tv_nsec = 0; while (1) { interval.tv_sec = time(NULL) + 5; int received_bytes = mq_timedreceive(queue, (char *) &task, sizeof(struct Task), NULL, &interval); if (received_bytes < 0) { perror("consumer mq_receive('/tasks'): "); exit(EXIT_FAILURE); } task.result = solve_difficult_problem(task.a, task.b); if (received_bytes < 0) { perror("mq_receive: "); exit(EXIT_FAILURE); } char prod_pid[10]; sprintf(prod_pid, "%d", task.prod_pid); char results_name[30] = "/results"; strcat(results_name, prod_pid); mqd_t results_queue = mq_open(results_name, O_WRONLY); if (results_queue == (mqd_t) -1) { perror("consumer mq_open('/results'): "); exit(EXIT_FAILURE); } if (mq_send(results_queue, (const char *)&task, sizeof(struct Task), 0)) { perror("mq_send: "); exit(EXIT_FAILURE); } } mq_unlink("/results"); } int main(int argc, char *argv[]) { srandom(time(NULL)); handle_arguments(argc, argv); int n_tasks = atoi(argv[1]); int n_producers = atoi(argv[2]); int n_consumers = atoi(argv[3]); char tasks_name[] = "/tasks"; // char results_name[] = "/results"; struct mq_attr attr; attr.mq_flags = 0; attr.mq_maxmsg = 10; // Probably more can be obtained by modifying /proc/sys/fs/mqueue/msg_max attr.mq_msgsize = sizeof(struct Task); attr.mq_curmsgs = 0; mq_unlink(tasks_name); // mq_unlink(results_name); mqd_t tasks_queue = mq_open(tasks_name, O_RDWR | O_CREAT, S_IRWXU, &attr); if (tasks_queue == (mqd_t) -1) { perror("mq_open: "); exit(EXIT_FAILURE); } pid_t pid; int status[n_producers + n_consumers]; int producers_pids[n_producers]; int consumers_pids[n_consumers]; for (int i = 0; i < n_producers; ++i) { pid = fork(); if (pid == -1) { perror("fork: "); exit(EXIT_FAILURE); } if (pid == 0) { producer(n_tasks, i); exit(EXIT_SUCCESS); } char pid_s[10]; sprintf(pid_s, "%d", pid); char results_name[30] = "/results"; strcat(results_name, pid_s); unlink(results_name); mqd_t results_queue = mq_open(results_name, O_RDWR | O_CREAT, S_IRWXU, &attr); if (results_queue == (mqd_t) -1) { perror("mq_open: "); exit(EXIT_FAILURE); } producers_pids[i] = pid; } for (int i = 0; i < n_consumers; ++i) { pid = fork(); if (pid == -1) { perror("fork: "); exit(EXIT_FAILURE); } if (pid == 0) { consumer(); exit(EXIT_SUCCESS); } consumers_pids[i] = pid; } for (int i = 0; i < n_producers; ++i) { waitpid(producers_pids[i], &status[i], 0); } for (int i = 0; i < n_consumers; ++i) { waitpid(consumers_pids[i], &status[i], 0); } printf("This is the end!\n"); return 0; }