// master generates tasks and place them in a queue // slave executes tasks (and sends the result back to the consumer) #include #include #include #include #include #include #include #include #include #include #define MAXMSG 64 void handle_arguments(int argc, char *argv[]) { if (!(argc == 2 || argc == 4)) { fprintf(stderr, "Usage: %s #_OF_TASKS [#_OF_PRODUCERS] [#_OF_CONSUMERS]\n", argv[0]); exit(EXIT_FAILURE); } if (atoi(argv[1]) < 1) { fprintf(stderr, "I need at least one task!\n"); exit(EXIT_FAILURE); } if (atoi(argv[1]) > MAXMSG) { fprintf(stderr, "I can handle up to %d tasks!\n", MAXMSG); exit(EXIT_FAILURE); } if (argc == 4) { if (atoi(argv[2]) < 1) { fprintf(stderr, "I need at least one producer!\n"); exit(EXIT_FAILURE); } if (atoi(argv[3]) < 1) { fprintf(stderr, "I need at least one consumer!\n"); exit(EXIT_FAILURE); } } } struct Task { pid_t prod_pid; int sqc; int a; int b; int result; }; struct Task generate_task(int sqc) { struct Task task; task.prod_pid = getpid(); task.sqc = sqc; task.a = random()/2; task.b = random()/2; task.result = 0; return task; }; void print_task(struct Task task) { printf("====v TASK v====\n"); printf("Producer PID: %d\n", task.prod_pid); printf("Sequence no: %d\n", task.sqc); printf("A: %d\n", task.a); printf("B: %d\n", task.b); if (task.result != 0) { // Cloud replace it with bool solved in Task printf("A + B = %d\n", task.result); } printf("====^ TASK ^====\n"); } void producer(int n_tasks) { // printf("A producer has been called\n"); struct mq_attr attr; attr.mq_flags = 0; attr.mq_maxmsg = 10; // Probably more can be obtained by modifying /proc/sys/fs/mqueue/msg_max attr.mq_msgsize = sizeof(struct Task); attr.mq_curmsgs = 0; mq_unlink("/tasks"); mqd_t queue = mq_open("/tasks", O_RDWR | O_CREAT, S_IRWXU, &attr); if (queue == (mqd_t) -1) { perror("mq_open: "); exit(EXIT_FAILURE); } // printf("The queue was created with fd = %d\n", queue); struct Task tasks[n_tasks]; for (int i = 0; i < n_tasks; ++i) { tasks[i] = generate_task(i); // print_task(tasks[i]); if (mq_send(queue, (const char *)&tasks[i], sizeof(struct Task), 0)) { perror("mq_send: "); exit(EXIT_FAILURE); } } mqd_t results_queue; do { results_queue = mq_open("/results", O_RDONLY); if (results_queue == (mqd_t) -1) { if (errno != ENOENT) { perror("mq_open('/results'): "); exit(EXIT_FAILURE); } } } while(results_queue == (mqd_t) -1); struct Task tmp; for (int i = 0; i < n_tasks; ++i) { int received_bytes = mq_receive(results_queue, (char *) &tmp, sizeof(struct Task), NULL); if (received_bytes < 0) { perror("producer mq_receive('/results')"); exit(EXIT_FAILURE); } tasks[tmp.sqc] = tmp; printf("Calculated %d of %d tasks...\n", i + 1, n_tasks); } for (int i = 0; i < n_tasks; ++i) { print_task(tasks[i]); } mq_unlink("/test"); } int solve_difficult_problem(int a, int b) { sleep(random()%3 + 1); return a + b; } void consumer() { // printf("A consumer has been called\n"); mqd_t queue; do { queue = mq_open("/tasks", O_RDONLY); if (queue == (mqd_t) -1) { if (errno != ENOENT) { perror("mq_open('/tasks'): "); exit(EXIT_FAILURE); } } } while(queue == (mqd_t) -1); struct mq_attr attr; attr.mq_flags = 0; attr.mq_maxmsg = 10; // Probably more can be obtained by modifying /proc/sys/fs/mqueue/msg_max attr.mq_msgsize = sizeof(struct Task); attr.mq_curmsgs = 0; mq_unlink("/results"); mqd_t results_queue = mq_open("/results", O_RDWR | O_CREAT, S_IRWXU, &attr); if (results_queue == (mqd_t) -1) { perror("consumer mq_open('/results'): "); exit(EXIT_FAILURE); } struct Task task; while (1) { int received_bytes = mq_receive(queue, (char *) &task, sizeof(struct Task), NULL); if (received_bytes < 0) { perror("consumer mq_receive('/results'): "); exit(EXIT_FAILURE); } task.result = solve_difficult_problem(task.a, task.b); // print_task(task); if (received_bytes < 0) { perror("mq_receive: "); exit(EXIT_FAILURE); } if (mq_send(results_queue, (const char *)&task, sizeof(struct Task), 0)) { perror("mq_send: "); exit(EXIT_FAILURE); } } mq_unlink("/results"); } int main(int argc, char *argv[]) { srandom(time(NULL)); handle_arguments(argc, argv); int n_tasks = atoi(argv[1]); char tasks_name[] = "/test"; char results_name[] = "/results"; pid_t pid; pid = fork(); if (pid == -1) { perror("fork: "); exit(EXIT_FAILURE); } if (pid == 0) { consumer(); exit(EXIT_SUCCESS); } else { producer(n_tasks); } int status; wait(&status); return 0; }