#include "./worker.h" #include "./net/client.h" #include typedef struct { pthread_t tid; } worker_proc_t; typedef struct { conn_t* conn; pthread_mutex_t mutex; int num_procs; worker_proc_t* procs; worker_func_t func; } worker_t; worker_t* wrk = NULL; void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker_func_t func) { if (wrk != NULL) { fprintf(stderr, "[worker_init] Worker has already been initialized\n"); exit(EXIT_FAILURE); } wrk = calloc(1, sizeof(*wrk)); wrk->conn = client_connect_to_server(srv_addr, srv_port); if (wrk->conn == NULL) { fprintf(stderr, "[worker_init] Failed to connect to the controller\n"); exit(EXIT_FAILURE); } wrk->procs = calloc(num_cpu, sizeof(*wrk->procs)); wrk->func = func; wrk->num_procs = num_cpu; pthread_mutex_init(&wrk->mutex, NULL); } char worker_buf[1024 * 1024]; void* worker_proc_thread(void* args) { args = (void*) args; while (1) { if (pthread_mutex_lock(&wrk->mutex) != 0) { fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); exit(EXIT_FAILURE); } worker_buf[0] = REQUEST_TYPE_GET_TASK; conn_write(wrk->conn, worker_buf, 1); size_t size; char* task = conn_read(wrk->conn, &size); if (pthread_mutex_unlock(&wrk->mutex) != 0) { fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); exit(EXIT_FAILURE); } if (size <= 4) { free(task); break; } char* resp; size_t resp_size; wrk->func(task + 16, size - 16, &resp, &resp_size); if (pthread_mutex_lock(&wrk->mutex) != 0) { fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); exit(EXIT_FAILURE); } memcpy(worker_buf + 4, task, 4); free(task); worker_buf[0] = REQUEST_TYPE_PUT_RESULT; memcpy(worker_buf + 16, resp, resp_size); conn_write(wrk->conn, worker_buf, resp_size + 16); free(resp); free(conn_read(wrk->conn, &size)); if (pthread_mutex_unlock(&wrk->mutex) != 0) { fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); exit(EXIT_FAILURE); } } return NULL; } void worker_exec() { for (int i = 0; i < wrk->num_procs; i++) { worker_proc_t* proc = &wrk->procs[i]; int ret = pthread_create(&proc->tid, NULL, worker_proc_thread, proc); if (ret != 0) { fprintf(stderr, "[worker_exec] Unable to start thread\n"); exit(EXIT_FAILURE); } } for (int i = 0; i < wrk->num_procs; i++) { int ret = pthread_join(wrk->procs[i].tid, NULL); if (ret != 0) { fprintf(stderr, "[worker_exec] Unable to join thread\n"); exit(EXIT_FAILURE); } } }