#include #include #include #include #include #include #include #include #include "lib/log.h" #include "lib/enchufe.h" #include "lib/lib.h" #include "lib/pq.h" #define THREAD_COUNT 2 #define BUF_LEN 0x1000 #define MAX 0x10 // Global variables PQ pq = {0}; mtx_t pq_mtx = {0}; sem_t full = {0}; sem_t empty = {0}; bool client_closed = false; // The consumer. int consume(void*args) { // Turns the strange looking thing thrd_current() into a smaller number. size_t tid = ((thrd_current() & 0xFFFF) >> 12) % 12; log(INFO, "Consumer %zu started.\n", tid); // Get client from argument. exists(args); Enchufe cliente = *(Enchufe*)args; size_t total_time = 0; Proc proc = {0}; while (true) { // INFO: Enter crit region /* In the case where the client closes the connection, the queue will * start losing processes. Eventually, it will be empty. When that * happens, both consumers will start waiting for the producer to put * processes into the queue. Something that will never happen since the * client closed the connection, causing a deadlock. To avoid running * into this deadlock you must check when the queue becomes empty after * the client closes the connection. */ if (!client_closed) { // Wait for semaphore to have stuff inside. sem_wait(&full); } else { bool is_empty = false; mtx_lock(&pq_mtx); is_empty = pq_empty(pq); mtx_unlock(&pq_mtx); if (is_empty) break; } // Take out the process from the queue. mtx_lock(&pq_mtx); proc = pq_access(pq); pq_delete(&pq); mtx_unlock(&pq_mtx); // Let the other threads know that the queue is no longer full. sem_post(&empty); // INFO: Left crit region // Let the user know what just happened. log(INFO, "Consumer %zu:\n\tProcessing: %s for %d seconds.\n", tid, (const char*)proc.program.buf, proc.time); // Perform work. sleep(proc.time); total_time += proc.time; // Free the buffer. free(proc.program.buf); } // Report back to the user. log(INFO, "Consumer %zu consumed %zu seconds of CPU time.\n", tid, total_time); return thrd_success; } // The producer. int produce(void* args) { exists(args); Enchufe cliente = *(Enchufe*)args; Byte buf[BUF_LEN] = {0}; Buffer out_buf = { .buf = buf, .len = BUF_LEN, }; while (true) { // Get the message. size_t msg_len = recibe(cliente, out_buf); // The client closed the connection. if (msg_len == 0) break; // Deserialize the message Procs procs = deserialize(out_buf, msg_len); for (size_t i = 0; i < procs.len; ++i) { Proc proc = procs.procs[i]; Buffer in_buf = bufcpy(proc.program); // INFO: Enter crit region // Wait until the consumers let the queue get smaller. sem_wait(&empty); // Put the process in the queue mtx_lock(&pq_mtx); // down pq_insert(&pq, proc); mtx_unlock(&pq_mtx); // up // Let the other threads know that there are elements in the queue now. sem_post(&full); // Let the client know what is getting processed. zumba(cliente, in_buf); free(in_buf.buf); } memset(out_buf.buf, 0, out_buf.len); } // Let the rest of the threads know that the client is closed. client_closed = true; return thrd_success; } int main(int argc, const char** argv) { assert(argc == 3 && "You must provide three arguments."); // Get a new enchufe. IPv4 ip = parse_address(argv[1]); Port port = (Port)atoi(argv[2]); Enchufe enchufe = enchufa(ip, port); amarra(enchufe); // Listen to incoming clients. escucha(enchufe, 1); log(INFO, "Listening to: %d.%d.%d.%d on port: %d\n", ip.bytes[0], ip.bytes[1], ip.bytes[2], ip.bytes[3], port); // Accept connection from client. Enchufe cliente = acepta(enchufe); log(INFO, "Accepted a connection.\n"); // Initialize data for program. mtx_init(&pq_mtx, mtx_plain); sem_init(&full, false, 0); sem_init(&empty, false, MAX); pq = pq_init(); // Start producer. thrd_t producer = {0}; thrd_create(&producer, produce, (void*)&cliente); log(INFO, "Producer started.\n"); // Start consumers. thrd_t consumers[THREAD_COUNT] = {0}; for (size_t i = 0; i < THREAD_COUNT; ++i) { thrd_create(&consumers[i], consume, (void*)&cliente); } log(INFO, "Consumers started.\n"); // Join the producer. thrd_join(producer, NULL); // Join the consumers. for (size_t i = 0; i < THREAD_COUNT; ++i) { thrd_join(consumers[i], NULL); } log(INFO, "Done.\n"); // Deinitialize all the program's data. pq_deinit(&pq); mtx_destroy(&pq_mtx); sem_destroy(&full); sem_destroy(&empty); desenchufa(enchufe); desenchufa(cliente); return 0; }