diff options
Diffstat (limited to 'lib/worker.c')
-rw-r--r-- | lib/worker.c | 125 |
1 files changed, 100 insertions, 25 deletions
diff --git a/lib/worker.c b/lib/worker.c index 9891310..5d99fec 100644 --- a/lib/worker.c +++ b/lib/worker.c @@ -1,21 +1,35 @@ #include "./worker.h" #include "./net/client.h" #include <pthread.h> +#include <stdint.h> +#include <stdlib.h> typedef struct { pthread_t tid; } worker_proc_t; typedef struct { + int id; + char* task; + size_t task_size; +} worker_task_t; + +typedef struct { conn_t* conn; pthread_mutex_t mutex; + worker_task_t* tasks; + size_t task_ind; + size_t task_cnt; + int num_procs; worker_proc_t* procs; worker_func_t func; } worker_t; +const int TASK_PRELOAD = 10; + worker_t* wrk = NULL; void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker_func_t func) { @@ -31,6 +45,7 @@ void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker exit(EXIT_FAILURE); } + wrk->tasks = calloc(TASK_PRELOAD, sizeof(*wrk->tasks)); wrk->procs = calloc(num_cpu, sizeof(*wrk->procs)); wrk->func = func; wrk->num_procs = num_cpu; @@ -38,50 +53,109 @@ void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker pthread_mutex_init(&wrk->mutex, NULL); } -char worker_buf[1024 * 1024]; +worker_task_t load_task() { + char worker_buf[128]; -void* worker_proc_thread(void* args) { - args = (void*) args; + if (wrk->task_cnt == -1U) { + return (worker_task_t) {.id = -1}; + } - while (1) { - if (pthread_mutex_lock(&wrk->mutex) != 0) { - fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); - exit(EXIT_FAILURE); - } + worker_task_t* task = NULL; + if (pthread_mutex_lock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + + if (wrk->task_cnt == -1U) { + goto ret; + } + + if (wrk->task_cnt == wrk->task_ind) { worker_buf[0] = REQUEST_TYPE_GET_TASK; - conn_write(wrk->conn, worker_buf, 1); + *((int32_t*) (worker_buf + 4)) = TASK_PRELOAD; + conn_write(wrk->conn, worker_buf, 8); size_t size; - char* task = conn_read(wrk->conn, &size); - if (pthread_mutex_unlock(&wrk->mutex) != 0) { - fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); - exit(EXIT_FAILURE); + char* tasks = conn_read(wrk->conn, &size); + + if (tasks == NULL) { + wrk->task_cnt = -1U; + goto ret; + } + + wrk->task_cnt = *((int32_t*) tasks); + + if (wrk->task_cnt <= 0U) { + wrk->task_cnt = -1U; + free(tasks); + goto ret; + } + + size_t cur_ind = 4; + // divide tasks + for (size_t i = 0; i < wrk->task_cnt; i++) { + worker_task_t* task = &wrk->tasks[i]; + + task->id = *((int32_t*) (tasks + cur_ind)); + cur_ind += 4; + task->task_size = *((int32_t*) (tasks + cur_ind)); + cur_ind += 4; + + task->task = calloc(task->task_size, sizeof(*task->task)); + memcpy(task->task, tasks + cur_ind, task->task_size); + cur_ind += task->task_size; } - if (size <= 4) { - free(task); + free(tasks); + wrk->task_ind = 0; + } + + if (wrk->task_cnt != -1U) { + task = &wrk->tasks[wrk->task_ind++]; + } + +ret: + if (pthread_mutex_unlock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); + exit(EXIT_FAILURE); + } + + if (task == NULL) { + return (worker_task_t) {.id = -1}; + } + + return *task; +} + +void* worker_proc_thread(void* args) { + args = (void*) args; + char worker_buf[1024 * 1024]; + + while (1) { + worker_task_t task = load_task(); + if (task.id == -1) { break; } char* resp; size_t resp_size; - wrk->func(task + 16, size - 16, &resp, &resp_size); - if (pthread_mutex_lock(&wrk->mutex) != 0) { - fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); - exit(EXIT_FAILURE); - } - - memcpy(worker_buf + 4, task, 4); - free(task); + printf("%lx %lx %d\n", pthread_self(), (uintptr_t) task.task, task.id); + wrk->func(task.task, task.task_size, &resp, &resp_size); + printf("freed %lx %lx\n", pthread_self(), (uintptr_t) task.task); + free(task.task); worker_buf[0] = REQUEST_TYPE_PUT_RESULT; + memcpy(worker_buf + 4, &task.id, 4); memcpy(worker_buf + 16, resp, resp_size); - conn_write(wrk->conn, worker_buf, resp_size + 16); free(resp); - free(conn_read(wrk->conn, &size)); + if (pthread_mutex_lock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + conn_write(wrk->conn, worker_buf, resp_size + 16); if (pthread_mutex_unlock(&wrk->mutex) != 0) { fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); exit(EXIT_FAILURE); @@ -100,6 +174,7 @@ void worker_exec() { fprintf(stderr, "[worker_exec] Unable to start thread\n"); exit(EXIT_FAILURE); } + printf("%lx thread id %d\n", proc->tid, i + 1); } for (int i = 0; i < wrk->num_procs; i++) { |