Threads and Scheduling

feat!: Added basic sockett functionality.

The cluster and the edevice can now send messages between each other.

+231 -9
+51 -4
src/cluster.c
··· 1 - #include <stdio.h> 1 + #include <assert.h> 2 + #include <stdlib.h> 3 + #include <stdint.h> 4 + #include <pthread.h> 5 + #include <string.h> 6 + #include <stdbool.h> 2 7 #include "lib/log.h" 8 + #include "lib/enchufe.h" 9 + #include "lib/lib.h" 10 + #define THREAD_COUNT 3 11 + #define BUF_LEN 0x1000 12 + 13 + int main(int argc, const char** argv) { 14 + assert(argc == 3 && "You must provide three arguments."); 3 15 4 - int main(int argc, char** argv) { 5 - for (int i = 0; i < argc; ++i) { 6 - log_(INFO, "%s\n", argv[i]); 16 + IPv4 ip = parse_address(argv[1]); 17 + Port port = htons((uint16_t)atoi(argv[2])); 18 + Enchufe enchufe = enchufa(ip, port); 19 + amarra(enchufe); 20 + 21 + escucha(enchufe, 1); 22 + log(INFO, "Listening to: %d.%d.%d.%d on port: %d\n", ip.bytes[0], ip.bytes[1], ip.bytes[2], ip.bytes[3], port); 23 + Enchufe cliente = acepta(enchufe); 24 + log(INFO, "Accepted a connection.\n"); 25 + 26 + Buffer out_buf = { 27 + .buf = (Byte*)calloc(BUF_LEN, sizeof(Byte)), 28 + .len = BUF_LEN, 29 + }; 30 + if (out_buf.buf == NULL) { 31 + log(ERROR, "Could not allocate buffer of size %zu.\n", out_buf.len); 32 + return 1; 33 + } 34 + 35 + while(true) { 36 + log(INFO, "Waiting for data from client.\n"); 37 + size_t msg_len = recibe(cliente, out_buf); 38 + assert(msg_len < BUF_LEN - 1 && "The message that was received was TOO HUGE."); 39 + 40 + Procs procs = deserialize(out_buf, msg_len); 41 + for (size_t i = 0; i < procs.len; ++i) { 42 + Proc proc = procs.procs[i]; 43 + log(INFO, "Time: %d.\n", proc.time); 44 + log(INFO, "Len: %d.\n", proc.program.len); 45 + log(INFO, "Program: %s.\n", proc.program.buf); 46 + } 47 + 48 + memset(out_buf.buf, 0, out_buf.len); 49 + for (size_t i = 0; i < procs.len; ++i) free(procs.procs[i].program.buf); 50 + free(procs.procs); 7 51 } 52 + 53 + free(out_buf.buf); 54 + desenchufa(enchufe); 8 55 }
+58 -2
src/edevice.c
··· 1 + #include <assert.h> 1 2 #include <stdio.h> 3 + #include <string.h> 4 + #include <time.h> 5 + #include <unistd.h> 6 + #include "lib/enchufe.h" 7 + #include "lib/lib.h" 2 8 #include "lib/log.h" 9 + #define BUF_LEN 0x100 3 10 4 11 int main(int argc, char** argv) { 5 - for (int i = 0; i < argc; ++i) { 6 - log_(INFO, "%s\n", argv[i]); 12 + assert(argc == 3 && "You must provide three arguments."); 13 + IPv4 ip = parse_address(argv[1]); 14 + Port port = htons((uint16_t)atoi(argv[2])); 15 + log(INFO, "Connecting to: %d.%d.%d.%d on port: %d", ip.bytes[0], ip.bytes[1], ip.bytes[2], ip.bytes[3], port); 16 + 17 + Enchufe enchufe = enchufa(ip, port); 18 + conecta(enchufe); 19 + 20 + while (1) { 21 + FILE* fp = popen("ps -o comm", "r"); 22 + if (fp == NULL) { 23 + fprintf(stderr, "[ERROR]: Could not run ps command.\n"); 24 + return 1; 25 + } 26 + 27 + srand((int64_t)time(NULL)); 28 + char program[BUF_LEN]; 29 + size_t counter = 0; 30 + while (fgets(program, BUF_LEN, fp)) { 31 + if (counter <3 ) { 32 + ++counter; 33 + continue; 34 + } 35 + 36 + // path has a line feed character at the end that must be removed 37 + char* nullb = memchr(program, '\0', BUF_LEN); 38 + size_t len = (size_t)nullb - (size_t)program - 1; 39 + program[len] = '\0'; 40 + 41 + Proc proc = { 42 + .time = (Time)rand(), 43 + .program = { 44 + .buf = (Byte*)program, 45 + .len = len, 46 + }, 47 + }; 48 + 49 + Buffer in_buf = serialize(proc); 50 + zumba(enchufe, in_buf); 51 + 52 + memset(program, 0, BUF_LEN); 53 + free(in_buf.buf); 54 + ++counter; 55 + sleep(1); 56 + } 57 + 58 + sleep(1); 7 59 } 60 + 61 + desenchufa(enchufe); 62 + 63 + return 0; 8 64 }
+1 -1
src/lib/enchufe.h
··· 20 20 typedef uint8_t Byte; 21 21 22 22 typedef struct { 23 - Byte* buf; 24 23 size_t len; 24 + Byte* buf; 25 25 } Buffer; 26 26 27 27 Buffer atob(const char* str);
+96
src/lib/lib.c
··· 1 + #include "lib.h" 2 + #include "log.h" 3 + #include <stdlib.h> 4 + 5 + Procs deserialize(Buffer out_buf, size_t msg_len) { 6 + Procs procs = { 7 + .procs = (Proc*)calloc(1, sizeof(Proc)), 8 + .len = 1, 9 + }; 10 + if (procs.procs == NULL) { 11 + log(ERROR, "Could not allocate 8 processes.\n"); 12 + exit(1); 13 + } 14 + 15 + size_t buf_idx = 0; 16 + for (size_t j = 0; buf_idx < msg_len; ++j) { 17 + if (j >= procs.len) { 18 + procs.procs = (Proc*)realloc(procs.procs, procs.len * 2); 19 + procs.len = procs.len * 2; 20 + } 21 + Byte* curr = out_buf.buf + buf_idx; 22 + 23 + Time time = *(Time*)curr; 24 + 25 + Buffer program = (Buffer){ 26 + .len = *(size_t*)(curr + sizeof(Time)), 27 + .buf = curr + sizeof(Time) + sizeof(size_t), 28 + }; 29 + if (program.len > msg_len - buf_idx) { 30 + log(ERROR, "Program length is longer than buffer at: %zu.\n", buf_idx); 31 + log(ERROR, "Program length is: %zu.\n", program.len); 32 + log(ERROR, "Buffer length is: %zu.\n", msg_len); 33 + log(ERROR, "Remaining buffer length is: %zu.\n", msg_len - buf_idx); 34 + exit(1); 35 + } 36 + char* nullb = memchr(program.buf, '\0', msg_len - buf_idx); 37 + size_t calculated_len = (size_t)nullb - (size_t)program.buf; 38 + if (program.len != calculated_len) { 39 + log(ERROR, "Incorrect size.\n"); 40 + log(ERROR, "Program length is: %zu.\n", program.len); 41 + log(ERROR, "Calculated length is: %zu.\n", calculated_len); 42 + exit(1); 43 + } 44 + 45 + Byte* buf = (Byte*)calloc(program.len, sizeof(Byte)); 46 + if (buf == NULL) { 47 + log(ERROR, "Could not allocate %zu strings.\n", program.len); 48 + exit(1); 49 + } 50 + memcpy(buf, program.buf, program.len); 51 + procs.procs[j] = (Proc){ 52 + .time = time, 53 + .program = (Buffer){ 54 + .buf = buf, 55 + .len = program.len, 56 + }, 57 + }; 58 + buf_idx += sizeof(Time) + sizeof(size_t) + program.len; 59 + } 60 + return procs; 61 + } 62 + 63 + Buffer serialize(Proc proc) { 64 + size_t len = sizeof(Time) + sizeof(size_t) + proc.program.len; 65 + Buffer buf = { 66 + .buf = (Byte*)calloc(len, sizeof(Byte)), 67 + .len = len, 68 + }; 69 + 70 + if (buf.buf == NULL) { 71 + try(-1); 72 + } 73 + 74 + memcpy((void*)buf.buf, (void*)&proc.time, sizeof(Time)); 75 + memcpy((void*)(buf.buf + sizeof(Time)), (void*)&proc.program.len, sizeof(size_t)); 76 + memcpy((void*)(buf.buf + sizeof(Time) + sizeof(size_t)), (void*)proc.program.buf, proc.program.len); 77 + return buf; 78 + } 79 + 80 + IPv4 parse_address(const char* str) { 81 + size_t len = strlen(str); 82 + 83 + assert(len <= 15 && "What you have entered cannot possibly be an IP address."); 84 + 85 + IPv4 ip = {0}; 86 + size_t curr_byte = 0; 87 + for (size_t i = 0; i < len; ++i) { 88 + if (str[i] == '.') { 89 + ++curr_byte; 90 + } else { 91 + ip.bytes[curr_byte] = (Byte)(ip.bytes[curr_byte] * 10 + (str[i] - '0')); 92 + } 93 + } 94 + 95 + return ip; 96 + }
+23
src/lib/lib.h
··· 1 + #ifndef LIB_H_ 2 + #define LIB_H_ 3 + #include "enchufe.h" 4 + #include <assert.h> 5 + 6 + #define not_null(a) assert((a) != NULL && "Null pointer.") 7 + 8 + typedef Byte Time; 9 + 10 + typedef struct { 11 + Time time; 12 + Buffer program; 13 + } Proc; 14 + 15 + typedef struct { 16 + Proc* procs; 17 + size_t len; 18 + } Procs; 19 + 20 + Procs deserialize(Buffer out_buf, size_t msg_len); 21 + Buffer serialize(Proc); 22 + IPv4 parse_address(const char* str); 23 + #endif // LIB_H_
+1 -1
src/lib/log.c
··· 2 2 #include <stdarg.h> 3 3 #include "log.h" 4 4 5 - void log_(LogLevel level, const char* format, ...) { 5 + void log(LogLevel level, const char* format, ...) { 6 6 FILE* out = stdout; 7 7 8 8 switch (level) {
+1 -1
src/lib/log.h
··· 7 7 ERROR, 8 8 } LogLevel; 9 9 10 - void log_(LogLevel level, const char* format, ...); 10 + void log(LogLevel level, const char* format, ...); 11 11 12 12 #endif // LOG_H_ header