diff --git a/Topic-7/task_7.c b/Topic-7/task_7.c index e0fe28c..76b7657 100644 --- a/Topic-7/task_7.c +++ b/Topic-7/task_7.c @@ -7,14 +7,13 @@ #include #include #include +#include #include #include #include #include #include -#define MAXMSG 64 - void handle_arguments(int argc, char *argv[]) { if (!(argc == 2 || argc == 4)) { fprintf(stderr, "Usage: %s #_OF_TASKS [#_OF_PRODUCERS] [#_OF_CONSUMERS]\n", argv[0]); @@ -26,10 +25,10 @@ void handle_arguments(int argc, char *argv[]) { exit(EXIT_FAILURE); } - if (atoi(argv[1]) > MAXMSG) { - fprintf(stderr, "I can handle up to %d tasks!\n", MAXMSG); - exit(EXIT_FAILURE); - } + // if (atoi(argv[1]) > MAXMSG) { + // fprintf(stderr, "I can handle up to %d tasks!\n", MAXMSG); + // exit(EXIT_FAILURE); + // } if (argc == 4) { if (atoi(argv[2]) < 1) { @@ -44,6 +43,15 @@ void handle_arguments(int argc, char *argv[]) { } + if (atoi(argv[2]) > 32) { + fprintf(stderr, "I am lazy so I can create up to 32 producers!\n"); + exit(EXIT_FAILURE); + } + if (atoi(argv[3]) > 32) { + fprintf(stderr, "I am lazy so I can create up to 32 consumers!\n"); + exit(EXIT_FAILURE); + } + } @@ -79,39 +87,36 @@ void print_task(struct Task task) { printf("====^ TASK ^====\n"); } -void producer(int n_tasks) { - // printf("A producer has been called\n"); +void producer(int n_tasks, int prio) { + printf("A producer has been called with PID of %d\n", getpid()); - struct mq_attr attr; - attr.mq_flags = 0; - attr.mq_maxmsg = 10; // Probably more can be obtained by modifying /proc/sys/fs/mqueue/msg_max - attr.mq_msgsize = sizeof(struct Task); - attr.mq_curmsgs = 0; - - mq_unlink("/tasks"); - mqd_t queue = mq_open("/tasks", O_RDWR | O_CREAT, S_IRWXU, &attr); + // printf("The queue was created with fd = %d\n", queue); + mqd_t queue = mq_open("/tasks", O_WRONLY); if (queue == (mqd_t) -1) { perror("mq_open: "); exit(EXIT_FAILURE); } - // printf("The queue was created with fd = %d\n", queue); - struct Task tasks[n_tasks]; for (int i = 0; i < n_tasks; ++i) { tasks[i] = generate_task(i); // print_task(tasks[i]); - if (mq_send(queue, (const char *)&tasks[i], sizeof(struct Task), 0)) { + if (mq_send(queue, (const char *)&tasks[i], sizeof(struct Task), prio)) { perror("mq_send: "); exit(EXIT_FAILURE); } + printf("Task %d of %d sent...\n", i + 1, n_tasks); } mqd_t results_queue; do { - results_queue = mq_open("/results", O_RDONLY); + char pid_s[10]; + sprintf(pid_s, "%d", getpid()); + char results_name[30] = "/results"; + strcat(results_name, pid_s); + results_queue = mq_open(results_name, O_RDONLY); if (results_queue == (mqd_t) -1) { if (errno != ENOENT) { perror("mq_open('/results'): "); @@ -129,7 +134,7 @@ void producer(int n_tasks) { exit(EXIT_FAILURE); } tasks[tmp.sqc] = tmp; - printf("Calculated %d of %d tasks...\n", i + 1, n_tasks); + printf("%d received %d of %d results...\n", tmp.prod_pid, i + 1, n_tasks); } for (int i = 0; i < n_tasks; ++i) { @@ -146,7 +151,7 @@ int solve_difficult_problem(int a, int b) { } void consumer() { - // printf("A consumer has been called\n"); + printf("A consumer has been called with PID of %d\n", getpid()); mqd_t queue; do { queue = mq_open("/tasks", O_RDONLY); @@ -158,37 +163,35 @@ void consumer() { } } while(queue == (mqd_t) -1); - struct mq_attr attr; - attr.mq_flags = 0; - attr.mq_maxmsg = 10; // Probably more can be obtained by modifying /proc/sys/fs/mqueue/msg_max - attr.mq_msgsize = sizeof(struct Task); - attr.mq_curmsgs = 0; - - mq_unlink("/results"); - mqd_t results_queue = mq_open("/results", O_RDWR | O_CREAT, S_IRWXU, &attr); - if (results_queue == (mqd_t) -1) { - perror("consumer mq_open('/results'): "); - exit(EXIT_FAILURE); - } struct Task task; + struct timespec interval; + interval.tv_nsec = 0; + while (1) { - int received_bytes = mq_receive(queue, (char *) &task, sizeof(struct Task), NULL); + interval.tv_sec = time(NULL) + 5; + int received_bytes = mq_timedreceive(queue, (char *) &task, sizeof(struct Task), NULL, &interval); if (received_bytes < 0) { - perror("consumer mq_receive('/results'): "); + perror("consumer mq_receive('/tasks'): "); exit(EXIT_FAILURE); } task.result = solve_difficult_problem(task.a, task.b); - // print_task(task); - if (received_bytes < 0) { perror("mq_receive: "); exit(EXIT_FAILURE); } - + char prod_pid[10]; + sprintf(prod_pid, "%d", task.prod_pid); + char results_name[30] = "/results"; + strcat(results_name, prod_pid); + mqd_t results_queue = mq_open(results_name, O_WRONLY); + if (results_queue == (mqd_t) -1) { + perror("consumer mq_open('/results'): "); + exit(EXIT_FAILURE); + } if (mq_send(results_queue, (const char *)&task, sizeof(struct Task), 0)) { perror("mq_send: "); exit(EXIT_FAILURE); @@ -203,28 +206,80 @@ void consumer() { int main(int argc, char *argv[]) { srandom(time(NULL)); - handle_arguments(argc, argv); + int n_tasks = atoi(argv[1]); - char tasks_name[] = "/test"; - char results_name[] = "/results"; + int n_producers = atoi(argv[2]); + int n_consumers = atoi(argv[3]); + char tasks_name[] = "/tasks"; + // char results_name[] = "/results"; + + struct mq_attr attr; + attr.mq_flags = 0; + attr.mq_maxmsg = 10; // Probably more can be obtained by modifying /proc/sys/fs/mqueue/msg_max + attr.mq_msgsize = sizeof(struct Task); + attr.mq_curmsgs = 0; + + mq_unlink(tasks_name); + // mq_unlink(results_name); + + mqd_t tasks_queue = mq_open(tasks_name, O_RDWR | O_CREAT, S_IRWXU, &attr); + if (tasks_queue == (mqd_t) -1) { + perror("mq_open: "); + exit(EXIT_FAILURE); + } pid_t pid; + int status[n_producers + n_consumers]; + int producers_pids[n_producers]; + int consumers_pids[n_consumers]; + + for (int i = 0; i < n_producers; ++i) { + pid = fork(); + if (pid == -1) { + perror("fork: "); + exit(EXIT_FAILURE); + } + if (pid == 0) { + producer(n_tasks, i); + exit(EXIT_SUCCESS); + } - pid = fork(); - if (pid == -1) { - perror("fork: "); - exit(EXIT_FAILURE); + char pid_s[10]; + sprintf(pid_s, "%d", pid); + char results_name[30] = "/results"; + strcat(results_name, pid_s); + + unlink(results_name); + mqd_t results_queue = mq_open(results_name, O_RDWR | O_CREAT, S_IRWXU, &attr); + if (results_queue == (mqd_t) -1) { + perror("mq_open: "); + exit(EXIT_FAILURE); + } + + producers_pids[i] = pid; } - if (pid == 0) { - consumer(); - exit(EXIT_SUCCESS); - } else { - producer(n_tasks); + for (int i = 0; i < n_consumers; ++i) { + pid = fork(); + if (pid == -1) { + perror("fork: "); + exit(EXIT_FAILURE); + } + if (pid == 0) { + consumer(); + exit(EXIT_SUCCESS); + } + consumers_pids[i] = pid; } - int status; - wait(&status); + for (int i = 0; i < n_producers; ++i) { + waitpid(producers_pids[i], &status[i], 0); + } + for (int i = 0; i < n_consumers; ++i) { + waitpid(consumers_pids[i], &status[i], 0); + } + + printf("This is the end!\n"); return 0; } \ No newline at end of file