Threads and Scheduling
1#include <assert.h>
2#include <stdlib.h>
3#include <threads.h>
4#include <string.h>
5#include <stdbool.h>
6#include <unistd.h>
7#include <semaphore.h>
8#include <fcntl.h>
9#include "lib/log.h"
10#include "lib/enchufe.h"
11#include "lib/lib.h"
12#include "lib/pq.h"
13#define THREAD_COUNT 2
14#define BUF_LEN 0x1000
15#define MAX 0x10
16
17// Global variables
18PQ pq = {0};
19mtx_t pq_mtx = {0};
20sem_t full = {0};
21sem_t empty = {0};
22bool client_closed = false;
23
24// The consumer.
25int consume(void*args) {
26 // Turns the strange looking thing thrd_current() into a smaller number.
27 size_t tid = ((thrd_current() & 0xFFFF) >> 12) % 12;
28
29 log(INFO, "Consumer %zu started.\n", tid);
30
31 // Get client from argument.
32 exists(args);
33 Enchufe cliente = *(Enchufe*)args;
34
35 size_t total_time = 0;
36 Proc proc = {0};
37 while (true) {
38 // INFO: Enter crit region
39 /* In the case where the client closes the connection, the queue will
40 * start losing processes. Eventually, it will be empty. When that
41 * happens, both consumers will start waiting for the producer to put
42 * processes into the queue. Something that will never happen since the
43 * client closed the connection, causing a deadlock. To avoid running
44 * into this deadlock you must check when the queue becomes empty after
45 * the client closes the connection.
46 */
47 if (!client_closed) {
48 // Wait for semaphore to have stuff inside.
49 sem_wait(&full);
50 } else {
51 bool is_empty = false;
52 mtx_lock(&pq_mtx);
53 is_empty = pq_empty(pq);
54 mtx_unlock(&pq_mtx);
55 if (is_empty) break;
56 }
57
58 // Take out the process from the queue.
59 mtx_lock(&pq_mtx);
60 proc = pq_access(pq);
61 pq_delete(&pq);
62 mtx_unlock(&pq_mtx);
63
64 // Let the other threads know that the queue is no longer full.
65 sem_post(&empty);
66 // INFO: Left crit region
67
68 // Let the user know what just happened.
69 log(INFO, "Consumer %zu:\n\tProcessing: %s for %d seconds.\n", tid, (const char*)proc.program.buf, proc.time);
70
71 // Perform work.
72 sleep(proc.time);
73 total_time += proc.time;
74 // Free the buffer.
75 free(proc.program.buf);
76 }
77
78 // Report back to the user.
79 log(INFO, "Consumer %zu consumed %zu seconds of CPU time.\n", tid, total_time);
80
81 return thrd_success;
82}
83
84// The producer.
85int produce(void* args) {
86 exists(args);
87 Enchufe cliente = *(Enchufe*)args;
88
89 Byte buf[BUF_LEN] = {0};
90 Buffer out_buf = {
91 .buf = buf,
92 .len = BUF_LEN,
93 };
94
95 while (true) {
96 // Get the message.
97 size_t msg_len = recibe(cliente, out_buf);
98
99 // The client closed the connection.
100 if (msg_len == 0) break;
101
102 // Deserialize the message
103 Procs procs = deserialize(out_buf, msg_len);
104 for (size_t i = 0; i < procs.len; ++i) {
105 Proc proc = procs.procs[i];
106 Buffer in_buf = bufcpy(proc.program);
107
108 // INFO: Enter crit region
109
110 // Wait until the consumers let the queue get smaller.
111 sem_wait(&empty);
112
113 // Put the process in the queue
114 mtx_lock(&pq_mtx); // down
115 pq_insert(&pq, proc);
116 mtx_unlock(&pq_mtx); // up
117
118 // Let the other threads know that there are elements in the queue now.
119 sem_post(&full);
120
121 // Let the client know what is getting processed.
122 zumba(cliente, in_buf);
123 free(in_buf.buf);
124 }
125
126 memset(out_buf.buf, 0, out_buf.len);
127 }
128 // Let the rest of the threads know that the client is closed.
129 client_closed = true;
130
131
132 return thrd_success;
133}
134
135int main(int argc, const char** argv) {
136 assert(argc == 3 && "You must provide three arguments.");
137
138 // Get a new enchufe.
139 IPv4 ip = parse_address(argv[1]);
140 Port port = (Port)atoi(argv[2]);
141 Enchufe enchufe = enchufa(ip, port);
142 amarra(enchufe);
143
144 // Listen to incoming clients.
145 escucha(enchufe, 1);
146 log(INFO, "Listening to: %d.%d.%d.%d on port: %d\n", ip.bytes[0], ip.bytes[1], ip.bytes[2], ip.bytes[3], port);
147
148 // Accept connection from client.
149 Enchufe cliente = acepta(enchufe);
150 log(INFO, "Accepted a connection.\n");
151
152 // Initialize data for program.
153 mtx_init(&pq_mtx, mtx_plain);
154 sem_init(&full, false, 0);
155 sem_init(&empty, false, MAX);
156 pq = pq_init();
157
158 // Start producer.
159 thrd_t producer = {0};
160 thrd_create(&producer, produce, (void*)&cliente);
161
162 log(INFO, "Producer started.\n");
163
164 // Start consumers.
165 thrd_t consumers[THREAD_COUNT] = {0};
166 for (size_t i = 0; i < THREAD_COUNT; ++i) {
167 thrd_create(&consumers[i], consume, (void*)&cliente);
168 }
169 log(INFO, "Consumers started.\n");
170
171 // Join the producer.
172 thrd_join(producer, NULL);
173
174 // Join the consumers.
175 for (size_t i = 0; i < THREAD_COUNT; ++i) {
176 thrd_join(consumers[i], NULL);
177 }
178
179 log(INFO, "Done.\n");
180
181 // Deinitialize all the program's data.
182 pq_deinit(&pq);
183 mtx_destroy(&pq_mtx);
184 sem_destroy(&full);
185 sem_destroy(&empty);
186 desenchufa(enchufe);
187 desenchufa(cliente);
188
189 return 0;
190}