Threads and Scheduling
at main 190 lines 4.7 kB view raw
1#include <assert.h> 2#include <stdlib.h> 3#include <threads.h> 4#include <string.h> 5#include <stdbool.h> 6#include <unistd.h> 7#include <semaphore.h> 8#include <fcntl.h> 9#include "lib/log.h" 10#include "lib/enchufe.h" 11#include "lib/lib.h" 12#include "lib/pq.h" 13#define THREAD_COUNT 2 14#define BUF_LEN 0x1000 15#define MAX 0x10 16 17// Global variables 18PQ pq = {0}; 19mtx_t pq_mtx = {0}; 20sem_t full = {0}; 21sem_t empty = {0}; 22bool client_closed = false; 23 24// The consumer. 25int consume(void*args) { 26 // Turns the strange looking thing thrd_current() into a smaller number. 27 size_t tid = ((thrd_current() & 0xFFFF) >> 12) % 12; 28 29 log(INFO, "Consumer %zu started.\n", tid); 30 31 // Get client from argument. 32 exists(args); 33 Enchufe cliente = *(Enchufe*)args; 34 35 size_t total_time = 0; 36 Proc proc = {0}; 37 while (true) { 38 // INFO: Enter crit region 39 /* In the case where the client closes the connection, the queue will 40 * start losing processes. Eventually, it will be empty. When that 41 * happens, both consumers will start waiting for the producer to put 42 * processes into the queue. Something that will never happen since the 43 * client closed the connection, causing a deadlock. To avoid running 44 * into this deadlock you must check when the queue becomes empty after 45 * the client closes the connection. 46 */ 47 if (!client_closed) { 48 // Wait for semaphore to have stuff inside. 49 sem_wait(&full); 50 } else { 51 bool is_empty = false; 52 mtx_lock(&pq_mtx); 53 is_empty = pq_empty(pq); 54 mtx_unlock(&pq_mtx); 55 if (is_empty) break; 56 } 57 58 // Take out the process from the queue. 59 mtx_lock(&pq_mtx); 60 proc = pq_access(pq); 61 pq_delete(&pq); 62 mtx_unlock(&pq_mtx); 63 64 // Let the other threads know that the queue is no longer full. 65 sem_post(&empty); 66 // INFO: Left crit region 67 68 // Let the user know what just happened. 69 log(INFO, "Consumer %zu:\n\tProcessing: %s for %d seconds.\n", tid, (const char*)proc.program.buf, proc.time); 70 71 // Perform work. 72 sleep(proc.time); 73 total_time += proc.time; 74 // Free the buffer. 75 free(proc.program.buf); 76 } 77 78 // Report back to the user. 79 log(INFO, "Consumer %zu consumed %zu seconds of CPU time.\n", tid, total_time); 80 81 return thrd_success; 82} 83 84// The producer. 85int produce(void* args) { 86 exists(args); 87 Enchufe cliente = *(Enchufe*)args; 88 89 Byte buf[BUF_LEN] = {0}; 90 Buffer out_buf = { 91 .buf = buf, 92 .len = BUF_LEN, 93 }; 94 95 while (true) { 96 // Get the message. 97 size_t msg_len = recibe(cliente, out_buf); 98 99 // The client closed the connection. 100 if (msg_len == 0) break; 101 102 // Deserialize the message 103 Procs procs = deserialize(out_buf, msg_len); 104 for (size_t i = 0; i < procs.len; ++i) { 105 Proc proc = procs.procs[i]; 106 Buffer in_buf = bufcpy(proc.program); 107 108 // INFO: Enter crit region 109 110 // Wait until the consumers let the queue get smaller. 111 sem_wait(&empty); 112 113 // Put the process in the queue 114 mtx_lock(&pq_mtx); // down 115 pq_insert(&pq, proc); 116 mtx_unlock(&pq_mtx); // up 117 118 // Let the other threads know that there are elements in the queue now. 119 sem_post(&full); 120 121 // Let the client know what is getting processed. 122 zumba(cliente, in_buf); 123 free(in_buf.buf); 124 } 125 126 memset(out_buf.buf, 0, out_buf.len); 127 } 128 // Let the rest of the threads know that the client is closed. 129 client_closed = true; 130 131 132 return thrd_success; 133} 134 135int main(int argc, const char** argv) { 136 assert(argc == 3 && "You must provide three arguments."); 137 138 // Get a new enchufe. 139 IPv4 ip = parse_address(argv[1]); 140 Port port = (Port)atoi(argv[2]); 141 Enchufe enchufe = enchufa(ip, port); 142 amarra(enchufe); 143 144 // Listen to incoming clients. 145 escucha(enchufe, 1); 146 log(INFO, "Listening to: %d.%d.%d.%d on port: %d\n", ip.bytes[0], ip.bytes[1], ip.bytes[2], ip.bytes[3], port); 147 148 // Accept connection from client. 149 Enchufe cliente = acepta(enchufe); 150 log(INFO, "Accepted a connection.\n"); 151 152 // Initialize data for program. 153 mtx_init(&pq_mtx, mtx_plain); 154 sem_init(&full, false, 0); 155 sem_init(&empty, false, MAX); 156 pq = pq_init(); 157 158 // Start producer. 159 thrd_t producer = {0}; 160 thrd_create(&producer, produce, (void*)&cliente); 161 162 log(INFO, "Producer started.\n"); 163 164 // Start consumers. 165 thrd_t consumers[THREAD_COUNT] = {0}; 166 for (size_t i = 0; i < THREAD_COUNT; ++i) { 167 thrd_create(&consumers[i], consume, (void*)&cliente); 168 } 169 log(INFO, "Consumers started.\n"); 170 171 // Join the producer. 172 thrd_join(producer, NULL); 173 174 // Join the consumers. 175 for (size_t i = 0; i < THREAD_COUNT; ++i) { 176 thrd_join(consumers[i], NULL); 177 } 178 179 log(INFO, "Done.\n"); 180 181 // Deinitialize all the program's data. 182 pq_deinit(&pq); 183 mtx_destroy(&pq_mtx); 184 sem_destroy(&full); 185 sem_destroy(&empty); 186 desenchufa(enchufe); 187 desenchufa(cliente); 188 189 return 0; 190}