diff --git a/Topic-7/task_7 b/Topic-7/task_7 new file mode 100755 index 0000000..08c860a Binary files /dev/null and b/Topic-7/task_7 differ diff --git a/Topic-7/task_7.c b/Topic-7/task_7.c new file mode 100644 index 0000000..e0fe28c --- /dev/null +++ b/Topic-7/task_7.c @@ -0,0 +1,230 @@ +// master generates tasks and place them in a queue + +// slave executes tasks (and sends the result back to the consumer) + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAXMSG 64 + +void handle_arguments(int argc, char *argv[]) { + if (!(argc == 2 || argc == 4)) { + fprintf(stderr, "Usage: %s #_OF_TASKS [#_OF_PRODUCERS] [#_OF_CONSUMERS]\n", argv[0]); + exit(EXIT_FAILURE); + } + + if (atoi(argv[1]) < 1) { + fprintf(stderr, "I need at least one task!\n"); + exit(EXIT_FAILURE); + } + + if (atoi(argv[1]) > MAXMSG) { + fprintf(stderr, "I can handle up to %d tasks!\n", MAXMSG); + exit(EXIT_FAILURE); + } + + if (argc == 4) { + if (atoi(argv[2]) < 1) { + fprintf(stderr, "I need at least one producer!\n"); + exit(EXIT_FAILURE); + } + + if (atoi(argv[3]) < 1) { + fprintf(stderr, "I need at least one consumer!\n"); + exit(EXIT_FAILURE); + } + + } + + + +} + +struct Task { + pid_t prod_pid; + int sqc; + int a; + int b; + int result; +}; + +struct Task generate_task(int sqc) { + struct Task task; + task.prod_pid = getpid(); + task.sqc = sqc; + task.a = random()/2; + task.b = random()/2; + task.result = 0; + + return task; +}; + +void print_task(struct Task task) { + printf("====v TASK v====\n"); + printf("Producer PID: %d\n", task.prod_pid); + printf("Sequence no: %d\n", task.sqc); + printf("A: %d\n", task.a); + printf("B: %d\n", task.b); + if (task.result != 0) { // Cloud replace it with bool solved in Task + printf("A + B = %d\n", task.result); + } + printf("====^ TASK ^====\n"); +} + +void producer(int n_tasks) { + // printf("A producer has been called\n"); + + struct mq_attr attr; + attr.mq_flags = 0; + attr.mq_maxmsg = 10; // Probably more can be obtained by modifying /proc/sys/fs/mqueue/msg_max + attr.mq_msgsize = sizeof(struct Task); + attr.mq_curmsgs = 0; + + mq_unlink("/tasks"); + mqd_t queue = mq_open("/tasks", O_RDWR | O_CREAT, S_IRWXU, &attr); + + if (queue == (mqd_t) -1) { + perror("mq_open: "); + exit(EXIT_FAILURE); + } + + // printf("The queue was created with fd = %d\n", queue); + + struct Task tasks[n_tasks]; + + for (int i = 0; i < n_tasks; ++i) { + tasks[i] = generate_task(i); + // print_task(tasks[i]); + if (mq_send(queue, (const char *)&tasks[i], sizeof(struct Task), 0)) { + perror("mq_send: "); + exit(EXIT_FAILURE); + } + } + + mqd_t results_queue; + do { + results_queue = mq_open("/results", O_RDONLY); + if (results_queue == (mqd_t) -1) { + if (errno != ENOENT) { + perror("mq_open('/results'): "); + exit(EXIT_FAILURE); + } + } + } while(results_queue == (mqd_t) -1); + + struct Task tmp; + + for (int i = 0; i < n_tasks; ++i) { + int received_bytes = mq_receive(results_queue, (char *) &tmp, sizeof(struct Task), NULL); + if (received_bytes < 0) { + perror("producer mq_receive('/results')"); + exit(EXIT_FAILURE); + } + tasks[tmp.sqc] = tmp; + printf("Calculated %d of %d tasks...\n", i + 1, n_tasks); + } + + for (int i = 0; i < n_tasks; ++i) { + print_task(tasks[i]); + } + + + mq_unlink("/test"); +} + +int solve_difficult_problem(int a, int b) { + sleep(random()%3 + 1); + return a + b; +} + +void consumer() { + // printf("A consumer has been called\n"); + mqd_t queue; + do { + queue = mq_open("/tasks", O_RDONLY); + if (queue == (mqd_t) -1) { + if (errno != ENOENT) { + perror("mq_open('/tasks'): "); + exit(EXIT_FAILURE); + } + } + } while(queue == (mqd_t) -1); + + struct mq_attr attr; + attr.mq_flags = 0; + attr.mq_maxmsg = 10; // Probably more can be obtained by modifying /proc/sys/fs/mqueue/msg_max + attr.mq_msgsize = sizeof(struct Task); + attr.mq_curmsgs = 0; + + mq_unlink("/results"); + mqd_t results_queue = mq_open("/results", O_RDWR | O_CREAT, S_IRWXU, &attr); + if (results_queue == (mqd_t) -1) { + perror("consumer mq_open('/results'): "); + exit(EXIT_FAILURE); + } + + struct Task task; + + while (1) { + int received_bytes = mq_receive(queue, (char *) &task, sizeof(struct Task), NULL); + if (received_bytes < 0) { + perror("consumer mq_receive('/results'): "); + exit(EXIT_FAILURE); + } + + task.result = solve_difficult_problem(task.a, task.b); + + // print_task(task); + + if (received_bytes < 0) { + perror("mq_receive: "); + exit(EXIT_FAILURE); + } + + if (mq_send(results_queue, (const char *)&task, sizeof(struct Task), 0)) { + perror("mq_send: "); + exit(EXIT_FAILURE); + } + + } + + + mq_unlink("/results"); + +} + +int main(int argc, char *argv[]) { + srandom(time(NULL)); + + handle_arguments(argc, argv); + int n_tasks = atoi(argv[1]); + char tasks_name[] = "/test"; + char results_name[] = "/results"; + + pid_t pid; + + pid = fork(); + if (pid == -1) { + perror("fork: "); + exit(EXIT_FAILURE); + } + if (pid == 0) { + consumer(); + exit(EXIT_SUCCESS); + } else { + producer(n_tasks); + } + + int status; + wait(&status); + + return 0; +} \ No newline at end of file