diff options
Diffstat (limited to 'lib/worker.c')
-rw-r--r-- | lib/worker.c | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/lib/worker.c b/lib/worker.c new file mode 100644 index 0000000..fa399f0 --- /dev/null +++ b/lib/worker.c @@ -0,0 +1,112 @@ +#include "./worker.h" +#include "./net/client.h" +#include <pthread.h> + +typedef struct { + pthread_t tid; +} worker_proc_t; + +typedef struct { + conn_t* conn; + pthread_mutex_t mutex; + + int num_procs; + worker_proc_t* procs; + + worker_func_t func; +} worker_t; + +worker_t* wrk = NULL; + +void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker_func_t func) { + if (wrk != NULL) { + fprintf(stderr, "[worker_init] Worker has already been initialized\n"); + exit(EXIT_FAILURE); + } + + wrk = calloc(1, sizeof(*wrk)); + wrk->conn = client_connect_to_server(srv_addr, srv_port); + if (wrk->conn == NULL) { + fprintf(stderr, "[worker_init] Failed to connect to the controller\n"); + exit(EXIT_FAILURE); + } + + wrk->procs = calloc(num_cpu, sizeof(*wrk->procs)); + wrk->func = func; + wrk->num_procs = num_cpu; + + pthread_mutex_init(&wrk->mutex, NULL); +} + +char worker_buf[1024 * 1024]; + +void* worker_proc_thread(void* args) { + args = (void*) args; + + while (1) { + if (pthread_mutex_lock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + + worker_buf[0] = REQUEST_TYPE_GET_TASK; + conn_write(wrk->conn, worker_buf, 1); + + size_t size; + char* task = conn_read(wrk->conn, &size); + if (pthread_mutex_unlock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); + exit(EXIT_FAILURE); + } + + if (size <= 4) { + free(task); + break; + } + + char* resp; + size_t resp_size; + + wrk->func(task + 4, size - 4, &resp, &resp_size); + memcpy(worker_buf + 1, task, 4); + free(task); + + if (pthread_mutex_lock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + + worker_buf[0] = REQUEST_TYPE_PUT_RESULT; + memcpy(worker_buf + 5, resp, resp_size); + conn_write(wrk->conn, worker_buf, resp_size + 5); + free(resp); + + free(conn_read(wrk->conn, &size)); + if (pthread_mutex_unlock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); + exit(EXIT_FAILURE); + } + } + + return NULL; +} + +void worker_exec() { + for (int i = 0; i < wrk->num_procs; i++) { + worker_proc_t* proc = &wrk->procs[i]; + + int ret = pthread_create(&proc->tid, NULL, worker_proc_thread, proc); + if (ret != 0) { + fprintf(stderr, "[worker_exec] Unable to start thread\n"); + exit(EXIT_FAILURE); + } + } + + for (int i = 0; i < wrk->num_procs; i++) { + int ret = pthread_join(wrk->procs[i].tid, NULL); + if (ret != 0) { + fprintf(stderr, "[worker_exec] Unable to join thread\n"); + exit(EXIT_FAILURE); + } + } +} |