Threads and Scheduling

feat: Finish implementation with bonus.

+15 -10
+6 -9
src/cluster.c
··· 26 26 log(INFO, "Consumer %zu started.\n", tid); 27 27 28 28 exists(args); 29 - // Enchufe cliente = *(Enchufe*)args; 29 + Enchufe cliente = *(Enchufe*)args; 30 30 31 31 size_t total_time = 0; 32 32 Proc proc = {0}; ··· 36 36 if (pq_empty(pq)) { 37 37 mtx_unlock(&pq_mtx); // up 38 38 if (client_closed) break; 39 - continue; 39 + else continue; 40 40 } else { 41 41 proc = pq_access(pq); 42 42 pq_delete(&pq); 43 + mtx_unlock(&pq_mtx); // up 43 44 } 44 - mtx_unlock(&pq_mtx); // up 45 45 46 46 log(INFO, "Consumer %zu:\n\tProcessing: %s for %d seconds.\n", tid, (const char*)proc.program.buf, proc.time); 47 + zumba(cliente, proc.program); 47 48 48 49 sleep(proc.time); 49 50 total_time += proc.time; 50 - 51 51 free(proc.program.buf); 52 52 } 53 53 ··· 67 67 }; 68 68 69 69 while (true) { 70 - mtx_lock(&so_mtx); 71 70 size_t msg_len = recibe(cliente, out_buf); 72 - mtx_unlock(&so_mtx); 73 71 74 72 if (msg_len == 0) break; 75 73 ··· 110 108 thrd_t producer = {0}; 111 109 thrd_create(&producer, produce, (void*)&cliente); 112 110 113 - log(INFO, "Receiving messages. . .\n"); 114 - sleep(10); 115 - log(INFO, "Starting process:\n"); 111 + log(INFO, "Producer started.\n"); 116 112 117 113 thrd_t consumers[THREAD_COUNT] = {0}; 118 114 for (size_t i = 0; i < THREAD_COUNT; ++i) { 119 115 thrd_create(&consumers[i], consume, (void*)&cliente); 120 116 } 117 + log(INFO, "Consumers started.\n"); 121 118 122 119 thrd_join(producer, NULL); 123 120 for (size_t i = 0; i < THREAD_COUNT; ++i) {
+9 -1
src/edevice.c
··· 50 50 memset(program, 0, BUF_LEN); 51 51 free(in_buf.buf); 52 52 ++counter; 53 - sleep(((uint32_t)rand() % 3) | 1); 53 + 54 + Byte buf[BUF_LEN] = {0}; 55 + Buffer out_buf = { 56 + .buf = buf, 57 + .len = BUF_LEN, 58 + }; 59 + size_t msg_len = recibe(enchufe, out_buf); 60 + assert(safe_strlen((char*)out_buf.buf, BUF_LEN) == msg_len); 61 + log(INFO, "Received: %s.\n", (char*)out_buf.buf); 54 62 } 55 63 56 64 log(INFO, "Done.\n");