Task 7 completed

This commit is contained in:
Sergiusz Warga 2021-04-19 03:56:19 +02:00
parent c5803ff916
commit 3f5797a903

View File

@ -7,14 +7,13 @@
#include <mqueue.h> #include <mqueue.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <time.h> #include <time.h>
#include <unistd.h> #include <unistd.h>
#define MAXMSG 64
void handle_arguments(int argc, char *argv[]) { void handle_arguments(int argc, char *argv[]) {
if (!(argc == 2 || argc == 4)) { if (!(argc == 2 || argc == 4)) {
fprintf(stderr, "Usage: %s #_OF_TASKS [#_OF_PRODUCERS] [#_OF_CONSUMERS]\n", argv[0]); fprintf(stderr, "Usage: %s #_OF_TASKS [#_OF_PRODUCERS] [#_OF_CONSUMERS]\n", argv[0]);
@ -26,10 +25,10 @@ void handle_arguments(int argc, char *argv[]) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (atoi(argv[1]) > MAXMSG) { // if (atoi(argv[1]) > MAXMSG) {
fprintf(stderr, "I can handle up to %d tasks!\n", MAXMSG); // fprintf(stderr, "I can handle up to %d tasks!\n", MAXMSG);
exit(EXIT_FAILURE); // exit(EXIT_FAILURE);
} // }
if (argc == 4) { if (argc == 4) {
if (atoi(argv[2]) < 1) { if (atoi(argv[2]) < 1) {
@ -44,6 +43,15 @@ void handle_arguments(int argc, char *argv[]) {
} }
if (atoi(argv[2]) > 32) {
fprintf(stderr, "I am lazy so I can create up to 32 producers!\n");
exit(EXIT_FAILURE);
}
if (atoi(argv[3]) > 32) {
fprintf(stderr, "I am lazy so I can create up to 32 consumers!\n");
exit(EXIT_FAILURE);
}
} }
@ -79,39 +87,36 @@ void print_task(struct Task task) {
printf("====^ TASK ^====\n"); printf("====^ TASK ^====\n");
} }
void producer(int n_tasks) { void producer(int n_tasks, int prio) {
// printf("A producer has been called\n"); printf("A producer has been called with PID of %d\n", getpid());
struct mq_attr attr; // printf("The queue was created with fd = %d\n", queue);
attr.mq_flags = 0;
attr.mq_maxmsg = 10; // Probably more can be obtained by modifying /proc/sys/fs/mqueue/msg_max
attr.mq_msgsize = sizeof(struct Task);
attr.mq_curmsgs = 0;
mq_unlink("/tasks");
mqd_t queue = mq_open("/tasks", O_RDWR | O_CREAT, S_IRWXU, &attr);
mqd_t queue = mq_open("/tasks", O_WRONLY);
if (queue == (mqd_t) -1) { if (queue == (mqd_t) -1) {
perror("mq_open: "); perror("mq_open: ");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
// printf("The queue was created with fd = %d\n", queue);
struct Task tasks[n_tasks]; struct Task tasks[n_tasks];
for (int i = 0; i < n_tasks; ++i) { for (int i = 0; i < n_tasks; ++i) {
tasks[i] = generate_task(i); tasks[i] = generate_task(i);
// print_task(tasks[i]); // print_task(tasks[i]);
if (mq_send(queue, (const char *)&tasks[i], sizeof(struct Task), 0)) { if (mq_send(queue, (const char *)&tasks[i], sizeof(struct Task), prio)) {
perror("mq_send: "); perror("mq_send: ");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
printf("Task %d of %d sent...\n", i + 1, n_tasks);
} }
mqd_t results_queue; mqd_t results_queue;
do { do {
results_queue = mq_open("/results", O_RDONLY); char pid_s[10];
sprintf(pid_s, "%d", getpid());
char results_name[30] = "/results";
strcat(results_name, pid_s);
results_queue = mq_open(results_name, O_RDONLY);
if (results_queue == (mqd_t) -1) { if (results_queue == (mqd_t) -1) {
if (errno != ENOENT) { if (errno != ENOENT) {
perror("mq_open('/results'): "); perror("mq_open('/results'): ");
@ -129,7 +134,7 @@ void producer(int n_tasks) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
tasks[tmp.sqc] = tmp; tasks[tmp.sqc] = tmp;
printf("Calculated %d of %d tasks...\n", i + 1, n_tasks); printf("%d received %d of %d results...\n", tmp.prod_pid, i + 1, n_tasks);
} }
for (int i = 0; i < n_tasks; ++i) { for (int i = 0; i < n_tasks; ++i) {
@ -146,7 +151,7 @@ int solve_difficult_problem(int a, int b) {
} }
void consumer() { void consumer() {
// printf("A consumer has been called\n"); printf("A consumer has been called with PID of %d\n", getpid());
mqd_t queue; mqd_t queue;
do { do {
queue = mq_open("/tasks", O_RDONLY); queue = mq_open("/tasks", O_RDONLY);
@ -158,37 +163,35 @@ void consumer() {
} }
} while(queue == (mqd_t) -1); } while(queue == (mqd_t) -1);
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10; // Probably more can be obtained by modifying /proc/sys/fs/mqueue/msg_max
attr.mq_msgsize = sizeof(struct Task);
attr.mq_curmsgs = 0;
mq_unlink("/results");
mqd_t results_queue = mq_open("/results", O_RDWR | O_CREAT, S_IRWXU, &attr);
if (results_queue == (mqd_t) -1) {
perror("consumer mq_open('/results'): ");
exit(EXIT_FAILURE);
}
struct Task task; struct Task task;
struct timespec interval;
interval.tv_nsec = 0;
while (1) { while (1) {
int received_bytes = mq_receive(queue, (char *) &task, sizeof(struct Task), NULL); interval.tv_sec = time(NULL) + 5;
int received_bytes = mq_timedreceive(queue, (char *) &task, sizeof(struct Task), NULL, &interval);
if (received_bytes < 0) { if (received_bytes < 0) {
perror("consumer mq_receive('/results'): "); perror("consumer mq_receive('/tasks'): ");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
task.result = solve_difficult_problem(task.a, task.b); task.result = solve_difficult_problem(task.a, task.b);
// print_task(task);
if (received_bytes < 0) { if (received_bytes < 0) {
perror("mq_receive: "); perror("mq_receive: ");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
char prod_pid[10];
sprintf(prod_pid, "%d", task.prod_pid);
char results_name[30] = "/results";
strcat(results_name, prod_pid);
mqd_t results_queue = mq_open(results_name, O_WRONLY);
if (results_queue == (mqd_t) -1) {
perror("consumer mq_open('/results'): ");
exit(EXIT_FAILURE);
}
if (mq_send(results_queue, (const char *)&task, sizeof(struct Task), 0)) { if (mq_send(results_queue, (const char *)&task, sizeof(struct Task), 0)) {
perror("mq_send: "); perror("mq_send: ");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
@ -203,28 +206,80 @@ void consumer() {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
srandom(time(NULL)); srandom(time(NULL));
handle_arguments(argc, argv); handle_arguments(argc, argv);
int n_tasks = atoi(argv[1]); int n_tasks = atoi(argv[1]);
char tasks_name[] = "/test"; int n_producers = atoi(argv[2]);
char results_name[] = "/results"; int n_consumers = atoi(argv[3]);
char tasks_name[] = "/tasks";
// char results_name[] = "/results";
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10; // Probably more can be obtained by modifying /proc/sys/fs/mqueue/msg_max
attr.mq_msgsize = sizeof(struct Task);
attr.mq_curmsgs = 0;
mq_unlink(tasks_name);
// mq_unlink(results_name);
mqd_t tasks_queue = mq_open(tasks_name, O_RDWR | O_CREAT, S_IRWXU, &attr);
if (tasks_queue == (mqd_t) -1) {
perror("mq_open: ");
exit(EXIT_FAILURE);
}
pid_t pid; pid_t pid;
int status[n_producers + n_consumers];
int producers_pids[n_producers];
int consumers_pids[n_consumers];
for (int i = 0; i < n_producers; ++i) {
pid = fork();
if (pid == -1) {
perror("fork: ");
exit(EXIT_FAILURE);
}
if (pid == 0) {
producer(n_tasks, i);
exit(EXIT_SUCCESS);
}
pid = fork(); char pid_s[10];
if (pid == -1) { sprintf(pid_s, "%d", pid);
perror("fork: "); char results_name[30] = "/results";
exit(EXIT_FAILURE); strcat(results_name, pid_s);
unlink(results_name);
mqd_t results_queue = mq_open(results_name, O_RDWR | O_CREAT, S_IRWXU, &attr);
if (results_queue == (mqd_t) -1) {
perror("mq_open: ");
exit(EXIT_FAILURE);
}
producers_pids[i] = pid;
} }
if (pid == 0) { for (int i = 0; i < n_consumers; ++i) {
consumer(); pid = fork();
exit(EXIT_SUCCESS); if (pid == -1) {
} else { perror("fork: ");
producer(n_tasks); exit(EXIT_FAILURE);
}
if (pid == 0) {
consumer();
exit(EXIT_SUCCESS);
}
consumers_pids[i] = pid;
} }
int status; for (int i = 0; i < n_producers; ++i) {
wait(&status); waitpid(producers_pids[i], &status[i], 0);
}
for (int i = 0; i < n_consumers; ++i) {
waitpid(consumers_pids[i], &status[i], 0);
}
printf("This is the end!\n");
return 0; return 0;
} }