Compare commits
No commits in common. "3f5797a9037016a7043e3236607361cfa6371075" and "b2342ecade45f80b9877ac6cbbf99b10a1e83b9c" have entirely different histories.
3f5797a903
...
b2342ecade
BIN
Topic-7/task_7
BIN
Topic-7/task_7
Binary file not shown.
285
Topic-7/task_7.c
285
Topic-7/task_7.c
@ -1,285 +0,0 @@
|
|||||||
// master generates tasks and place them in a queue
|
|
||||||
|
|
||||||
// slave executes tasks (and sends the result back to the consumer)
|
|
||||||
|
|
||||||
#include <errno.h>
|
|
||||||
#include <fcntl.h>
|
|
||||||
#include <mqueue.h>
|
|
||||||
#include <stdio.h>
|
|
||||||
#include <stdlib.h>
|
|
||||||
#include <string.h>
|
|
||||||
#include <sys/stat.h>
|
|
||||||
#include <sys/types.h>
|
|
||||||
#include <sys/wait.h>
|
|
||||||
#include <time.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
|
|
||||||
void handle_arguments(int argc, char *argv[]) {
|
|
||||||
if (!(argc == 2 || argc == 4)) {
|
|
||||||
fprintf(stderr, "Usage: %s #_OF_TASKS [#_OF_PRODUCERS] [#_OF_CONSUMERS]\n", argv[0]);
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (atoi(argv[1]) < 1) {
|
|
||||||
fprintf(stderr, "I need at least one task!\n");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
// if (atoi(argv[1]) > MAXMSG) {
|
|
||||||
// fprintf(stderr, "I can handle up to %d tasks!\n", MAXMSG);
|
|
||||||
// exit(EXIT_FAILURE);
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (argc == 4) {
|
|
||||||
if (atoi(argv[2]) < 1) {
|
|
||||||
fprintf(stderr, "I need at least one producer!\n");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (atoi(argv[3]) < 1) {
|
|
||||||
fprintf(stderr, "I need at least one consumer!\n");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if (atoi(argv[2]) > 32) {
|
|
||||||
fprintf(stderr, "I am lazy so I can create up to 32 producers!\n");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
if (atoi(argv[3]) > 32) {
|
|
||||||
fprintf(stderr, "I am lazy so I can create up to 32 consumers!\n");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Task {
|
|
||||||
pid_t prod_pid;
|
|
||||||
int sqc;
|
|
||||||
int a;
|
|
||||||
int b;
|
|
||||||
int result;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct Task generate_task(int sqc) {
|
|
||||||
struct Task task;
|
|
||||||
task.prod_pid = getpid();
|
|
||||||
task.sqc = sqc;
|
|
||||||
task.a = random()/2;
|
|
||||||
task.b = random()/2;
|
|
||||||
task.result = 0;
|
|
||||||
|
|
||||||
return task;
|
|
||||||
};
|
|
||||||
|
|
||||||
void print_task(struct Task task) {
|
|
||||||
printf("====v TASK v====\n");
|
|
||||||
printf("Producer PID: %d\n", task.prod_pid);
|
|
||||||
printf("Sequence no: %d\n", task.sqc);
|
|
||||||
printf("A: %d\n", task.a);
|
|
||||||
printf("B: %d\n", task.b);
|
|
||||||
if (task.result != 0) { // Cloud replace it with bool solved in Task
|
|
||||||
printf("A + B = %d\n", task.result);
|
|
||||||
}
|
|
||||||
printf("====^ TASK ^====\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
void producer(int n_tasks, int prio) {
|
|
||||||
printf("A producer has been called with PID of %d\n", getpid());
|
|
||||||
|
|
||||||
// printf("The queue was created with fd = %d\n", queue);
|
|
||||||
|
|
||||||
mqd_t queue = mq_open("/tasks", O_WRONLY);
|
|
||||||
if (queue == (mqd_t) -1) {
|
|
||||||
perror("mq_open: ");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Task tasks[n_tasks];
|
|
||||||
|
|
||||||
for (int i = 0; i < n_tasks; ++i) {
|
|
||||||
tasks[i] = generate_task(i);
|
|
||||||
// print_task(tasks[i]);
|
|
||||||
if (mq_send(queue, (const char *)&tasks[i], sizeof(struct Task), prio)) {
|
|
||||||
perror("mq_send: ");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
printf("Task %d of %d sent...\n", i + 1, n_tasks);
|
|
||||||
}
|
|
||||||
|
|
||||||
mqd_t results_queue;
|
|
||||||
do {
|
|
||||||
char pid_s[10];
|
|
||||||
sprintf(pid_s, "%d", getpid());
|
|
||||||
char results_name[30] = "/results";
|
|
||||||
strcat(results_name, pid_s);
|
|
||||||
results_queue = mq_open(results_name, O_RDONLY);
|
|
||||||
if (results_queue == (mqd_t) -1) {
|
|
||||||
if (errno != ENOENT) {
|
|
||||||
perror("mq_open('/results'): ");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} while(results_queue == (mqd_t) -1);
|
|
||||||
|
|
||||||
struct Task tmp;
|
|
||||||
|
|
||||||
for (int i = 0; i < n_tasks; ++i) {
|
|
||||||
int received_bytes = mq_receive(results_queue, (char *) &tmp, sizeof(struct Task), NULL);
|
|
||||||
if (received_bytes < 0) {
|
|
||||||
perror("producer mq_receive('/results')");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
tasks[tmp.sqc] = tmp;
|
|
||||||
printf("%d received %d of %d results...\n", tmp.prod_pid, i + 1, n_tasks);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < n_tasks; ++i) {
|
|
||||||
print_task(tasks[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
mq_unlink("/test");
|
|
||||||
}
|
|
||||||
|
|
||||||
int solve_difficult_problem(int a, int b) {
|
|
||||||
sleep(random()%3 + 1);
|
|
||||||
return a + b;
|
|
||||||
}
|
|
||||||
|
|
||||||
void consumer() {
|
|
||||||
printf("A consumer has been called with PID of %d\n", getpid());
|
|
||||||
mqd_t queue;
|
|
||||||
do {
|
|
||||||
queue = mq_open("/tasks", O_RDONLY);
|
|
||||||
if (queue == (mqd_t) -1) {
|
|
||||||
if (errno != ENOENT) {
|
|
||||||
perror("mq_open('/tasks'): ");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} while(queue == (mqd_t) -1);
|
|
||||||
|
|
||||||
|
|
||||||
struct Task task;
|
|
||||||
|
|
||||||
struct timespec interval;
|
|
||||||
interval.tv_nsec = 0;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
interval.tv_sec = time(NULL) + 5;
|
|
||||||
int received_bytes = mq_timedreceive(queue, (char *) &task, sizeof(struct Task), NULL, &interval);
|
|
||||||
if (received_bytes < 0) {
|
|
||||||
perror("consumer mq_receive('/tasks'): ");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
task.result = solve_difficult_problem(task.a, task.b);
|
|
||||||
|
|
||||||
if (received_bytes < 0) {
|
|
||||||
perror("mq_receive: ");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
char prod_pid[10];
|
|
||||||
sprintf(prod_pid, "%d", task.prod_pid);
|
|
||||||
char results_name[30] = "/results";
|
|
||||||
strcat(results_name, prod_pid);
|
|
||||||
mqd_t results_queue = mq_open(results_name, O_WRONLY);
|
|
||||||
if (results_queue == (mqd_t) -1) {
|
|
||||||
perror("consumer mq_open('/results'): ");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
if (mq_send(results_queue, (const char *)&task, sizeof(struct Task), 0)) {
|
|
||||||
perror("mq_send: ");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
mq_unlink("/results");
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
|
||||||
srandom(time(NULL));
|
|
||||||
handle_arguments(argc, argv);
|
|
||||||
|
|
||||||
int n_tasks = atoi(argv[1]);
|
|
||||||
int n_producers = atoi(argv[2]);
|
|
||||||
int n_consumers = atoi(argv[3]);
|
|
||||||
char tasks_name[] = "/tasks";
|
|
||||||
// char results_name[] = "/results";
|
|
||||||
|
|
||||||
struct mq_attr attr;
|
|
||||||
attr.mq_flags = 0;
|
|
||||||
attr.mq_maxmsg = 10; // Probably more can be obtained by modifying /proc/sys/fs/mqueue/msg_max
|
|
||||||
attr.mq_msgsize = sizeof(struct Task);
|
|
||||||
attr.mq_curmsgs = 0;
|
|
||||||
|
|
||||||
mq_unlink(tasks_name);
|
|
||||||
// mq_unlink(results_name);
|
|
||||||
|
|
||||||
mqd_t tasks_queue = mq_open(tasks_name, O_RDWR | O_CREAT, S_IRWXU, &attr);
|
|
||||||
if (tasks_queue == (mqd_t) -1) {
|
|
||||||
perror("mq_open: ");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
pid_t pid;
|
|
||||||
int status[n_producers + n_consumers];
|
|
||||||
int producers_pids[n_producers];
|
|
||||||
int consumers_pids[n_consumers];
|
|
||||||
|
|
||||||
for (int i = 0; i < n_producers; ++i) {
|
|
||||||
pid = fork();
|
|
||||||
if (pid == -1) {
|
|
||||||
perror("fork: ");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
if (pid == 0) {
|
|
||||||
producer(n_tasks, i);
|
|
||||||
exit(EXIT_SUCCESS);
|
|
||||||
}
|
|
||||||
|
|
||||||
char pid_s[10];
|
|
||||||
sprintf(pid_s, "%d", pid);
|
|
||||||
char results_name[30] = "/results";
|
|
||||||
strcat(results_name, pid_s);
|
|
||||||
|
|
||||||
unlink(results_name);
|
|
||||||
mqd_t results_queue = mq_open(results_name, O_RDWR | O_CREAT, S_IRWXU, &attr);
|
|
||||||
if (results_queue == (mqd_t) -1) {
|
|
||||||
perror("mq_open: ");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
producers_pids[i] = pid;
|
|
||||||
}
|
|
||||||
for (int i = 0; i < n_consumers; ++i) {
|
|
||||||
pid = fork();
|
|
||||||
if (pid == -1) {
|
|
||||||
perror("fork: ");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
if (pid == 0) {
|
|
||||||
consumer();
|
|
||||||
exit(EXIT_SUCCESS);
|
|
||||||
}
|
|
||||||
consumers_pids[i] = pid;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < n_producers; ++i) {
|
|
||||||
waitpid(producers_pids[i], &status[i], 0);
|
|
||||||
}
|
|
||||||
for (int i = 0; i < n_consumers; ++i) {
|
|
||||||
waitpid(consumers_pids[i], &status[i], 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
printf("This is the end!\n");
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user