#include "./worker.h" #include "./net/client.h" #include #include #include typedef struct { pthread_t tid; } worker_proc_t; typedef struct { int id; char* task; size_t task_size; } worker_task_t; typedef struct { conn_t* conn; pthread_mutex_t mutex; worker_task_t* tasks; size_t task_ind; size_t task_cnt; int num_procs; worker_proc_t* procs; worker_func_t func; } worker_t; const int TASK_PRELOAD = 10; worker_t* wrk = NULL; void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker_func_t func) { if (wrk != NULL) { fprintf(stderr, "[worker_init] Worker has already been initialized\n"); exit(EXIT_FAILURE); } wrk = calloc(1, sizeof(*wrk)); wrk->conn = client_connect_to_server(srv_addr, srv_port); if (wrk->conn == NULL) { fprintf(stderr, "[worker_init] Failed to connect to the controller\n"); exit(EXIT_FAILURE); } wrk->tasks = calloc(TASK_PRELOAD, sizeof(*wrk->tasks)); wrk->procs = calloc(num_cpu, sizeof(*wrk->procs)); wrk->func = func; wrk->num_procs = num_cpu; pthread_mutex_init(&wrk->mutex, NULL); } worker_task_t load_task() { char worker_buf[128]; if (wrk->task_cnt == -1U) { return (worker_task_t) {.id = -1}; } worker_task_t* task = NULL; if (pthread_mutex_lock(&wrk->mutex) != 0) { fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); exit(EXIT_FAILURE); } if (wrk->task_cnt == -1U) { goto ret; } if (wrk->task_cnt == wrk->task_ind) { worker_buf[0] = REQUEST_TYPE_GET_TASK; *((int32_t*) (worker_buf + 4)) = TASK_PRELOAD; conn_write(wrk->conn, worker_buf, 8); size_t size; char* tasks = conn_read(wrk->conn, &size); if (tasks == NULL) { wrk->task_cnt = -1U; goto ret; } wrk->task_cnt = *((int32_t*) tasks); if (wrk->task_cnt <= 0U) { wrk->task_cnt = -1U; free(tasks); goto ret; } size_t cur_ind = 4; // divide tasks for (size_t i = 0; i < wrk->task_cnt; i++) { worker_task_t* task = &wrk->tasks[i]; task->id = *((int32_t*) (tasks + cur_ind)); cur_ind += 4; task->task_size = *((int32_t*) (tasks + cur_ind)); cur_ind += 4; task->task = calloc(task->task_size, sizeof(*task->task)); memcpy(task->task, tasks + cur_ind, task->task_size); cur_ind += task->task_size; } free(tasks); wrk->task_ind = 0; } if (wrk->task_cnt != -1U) { task = &wrk->tasks[wrk->task_ind++]; } ret: if (pthread_mutex_unlock(&wrk->mutex) != 0) { fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); exit(EXIT_FAILURE); } if (task == NULL) { return (worker_task_t) {.id = -1}; } return *task; } void* worker_proc_thread(void* args) { args = (void*) args; char worker_buf[1024 * 1024]; while (1) { worker_task_t task = load_task(); if (task.id == -1) { break; } char* resp; size_t resp_size; printf("%lx %lx %d\n", pthread_self(), (uintptr_t) task.task, task.id); wrk->func(task.task, task.task_size, &resp, &resp_size); printf("freed %lx %lx\n", pthread_self(), (uintptr_t) task.task); free(task.task); worker_buf[0] = REQUEST_TYPE_PUT_RESULT; memcpy(worker_buf + 4, &task.id, 4); memcpy(worker_buf + 16, resp, resp_size); free(resp); if (pthread_mutex_lock(&wrk->mutex) != 0) { fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); exit(EXIT_FAILURE); } conn_write(wrk->conn, worker_buf, resp_size + 16); if (pthread_mutex_unlock(&wrk->mutex) != 0) { fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); exit(EXIT_FAILURE); } } return NULL; } void worker_exec() { for (int i = 0; i < wrk->num_procs; i++) { worker_proc_t* proc = &wrk->procs[i]; int ret = pthread_create(&proc->tid, NULL, worker_proc_thread, proc); if (ret != 0) { fprintf(stderr, "[worker_exec] Unable to start thread\n"); exit(EXIT_FAILURE); } printf("%lx thread id %d\n", proc->tid, i + 1); } for (int i = 0; i < wrk->num_procs; i++) { int ret = pthread_join(wrk->procs[i].tid, NULL); if (ret != 0) { fprintf(stderr, "[worker_exec] Unable to join thread\n"); exit(EXIT_FAILURE); } } }