diff options
author | mrfoxygmfr <mrfoxygmfr@sch9.ru> | 2025-05-22 23:41:29 +0300 |
---|---|---|
committer | mrfoxygmfr <mrfoxygmfr@sch9.ru> | 2025-05-22 23:41:29 +0300 |
commit | 5b2a6d3eae55ed2b45e6a5f12cd20ac5536e7692 (patch) | |
tree | ee0b6264f7dd29fcdd8e565f33282e108246c78a | |
parent | 9c5ef0efeb0b6d1ad731958d48f3829f3720c301 (diff) |
feat(lib): faster protocol
now worker request 10 tasks in one request
-rw-r--r-- | lib/controller.c | 66 | ||||
-rw-r--r-- | lib/worker.c | 125 | ||||
-rw-r--r-- | worker.c | 5 |
3 files changed, 136 insertions, 60 deletions
diff --git a/lib/controller.c b/lib/controller.c index b28f0dd..04ed716 100644 --- a/lib/controller.c +++ b/lib/controller.c @@ -76,10 +76,9 @@ void controller_finish() { server_shutdown(cntr->srv); } -char buf[1024 * 1024]; - void* controller_conn_thread(void* args) { controller_conn_t* conn = (controller_conn_t*) args; + char buf[1024 * 1024]; while (1) { size_t sz; @@ -89,6 +88,10 @@ void* controller_conn_thread(void* args) { } if (data[0] == REQUEST_TYPE_GET_TASK) { + int32_t cnt = *((int32_t*) (data + 4)); + int32_t avail_cnt = 0; + size_t* task_ids = calloc(cnt, sizeof(*task_ids)); + if (pthread_mutex_lock(&cntr->mutex) != 0) { fprintf(stderr, "[check_task_with_status] Unable to call pthread_mutex_lock\n"); exit(EXIT_FAILURE); @@ -100,6 +103,10 @@ void* controller_conn_thread(void* args) { task = &cntr->tasks[i]; task->status = TASK_STATUS_EXECUTING; task->worker = conn; + task_ids[avail_cnt++] = i; + } + + if (avail_cnt == cnt) { break; } } @@ -109,24 +116,23 @@ void* controller_conn_thread(void* args) { exit(EXIT_FAILURE); } - if (task == NULL) { - free(data); - conn_write(conn->conn, NULL, 0); - continue; - } + memcpy(buf, &avail_cnt, sizeof(avail_cnt)); + size_t cur_ind = 4; - memcpy(buf, &task->id, sizeof(task->id)); - memcpy(buf + 16, task->task, task->task_size); + for (int i = 0; i < avail_cnt; i++) { + memcpy(buf + cur_ind, &cntr->tasks[task_ids[i]].id, 4); + cur_ind += 4; + memcpy(buf + cur_ind, &cntr->tasks[task_ids[i]].task_size, 4); + cur_ind += 4; + memcpy(buf + cur_ind, cntr->tasks[task_ids[i]].task, cntr->tasks[task_ids[i]].task_size); + cur_ind += cntr->tasks[task_ids[i]].task_size; + } - conn_write(conn->conn, buf, 16 + task->task_size); + conn_write(conn->conn, buf, cur_ind); + free(task_ids); free(data); } else if (data[0] == REQUEST_TYPE_PUT_RESULT) { int id = *((int*) (data + 4)); - if (pthread_mutex_lock(&cntr->mutex) != 0) { - fprintf(stderr, "[controller_thread] Unable to call pthread_mutex_lock\n"); - exit(EXIT_FAILURE); - } - controller_task_t* task = NULL; for (size_t i = 0; i < cntr->tasks_len; i++) { if (cntr->tasks[i].id == id) { @@ -134,6 +140,7 @@ void* controller_conn_thread(void* args) { task->status = TASK_STATUS_FINISHED; task->response = data + 16; task->response_size = sz - 16; + printf("finished %d\n", id); break; } } @@ -142,18 +149,21 @@ void* controller_conn_thread(void* args) { fprintf(stderr, "[controller_thread] unknown task id\n"); exit(EXIT_FAILURE); } - - if (pthread_mutex_unlock(&cntr->mutex) != 0) { - fprintf(stderr, "[controller_thread] Unable to call pthread_mutex_unlock\n"); - exit(EXIT_FAILURE); - } - - conn_write(conn->conn, NULL, 0); } else { - conn_close(conn->conn); + fprintf(stderr, "[controller_thread] unknown request type %d\n", data[0]); + exit(EXIT_FAILURE); } } + for (size_t i = 0; i < cntr->tasks_len; i++) { + if (cntr->tasks[i].worker == conn && cntr->tasks[i].status == TASK_STATUS_EXECUTING) { + fprintf(stderr, "[controller_thread] worker finished unexpectedly\n"); + exit(EXIT_FAILURE); + break; + } + } + + printf("worker finished gracefully\n"); conn_close(conn->conn); return NULL; @@ -282,20 +292,10 @@ int controller_get_result(const char** res, size_t* size) { return ret; } - if (pthread_mutex_lock(&cntr->mutex) != 0) { - fprintf(stderr, "[yield_task] Unable to call pthread_mutex_lock\n"); - exit(EXIT_FAILURE); - } - *res = cntr->tasks[ret].response; *size = cntr->tasks[ret].response_size; cntr->tasks[ret].status = TASK_STATUS_RETURNED; ret = cntr->tasks[ret].id; - if (pthread_mutex_unlock(&cntr->mutex) != 0) { - fprintf(stderr, "[yield_task] Unable to call pthread_mutex_unlock\n"); - exit(EXIT_FAILURE); - } - return ret; } diff --git a/lib/worker.c b/lib/worker.c index 9891310..5d99fec 100644 --- a/lib/worker.c +++ b/lib/worker.c @@ -1,21 +1,35 @@ #include "./worker.h" #include "./net/client.h" #include <pthread.h> +#include <stdint.h> +#include <stdlib.h> typedef struct { pthread_t tid; } worker_proc_t; typedef struct { + int id; + char* task; + size_t task_size; +} worker_task_t; + +typedef struct { conn_t* conn; pthread_mutex_t mutex; + worker_task_t* tasks; + size_t task_ind; + size_t task_cnt; + int num_procs; worker_proc_t* procs; worker_func_t func; } worker_t; +const int TASK_PRELOAD = 10; + worker_t* wrk = NULL; void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker_func_t func) { @@ -31,6 +45,7 @@ void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker exit(EXIT_FAILURE); } + wrk->tasks = calloc(TASK_PRELOAD, sizeof(*wrk->tasks)); wrk->procs = calloc(num_cpu, sizeof(*wrk->procs)); wrk->func = func; wrk->num_procs = num_cpu; @@ -38,50 +53,109 @@ void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker pthread_mutex_init(&wrk->mutex, NULL); } -char worker_buf[1024 * 1024]; +worker_task_t load_task() { + char worker_buf[128]; -void* worker_proc_thread(void* args) { - args = (void*) args; + if (wrk->task_cnt == -1U) { + return (worker_task_t) {.id = -1}; + } - while (1) { - if (pthread_mutex_lock(&wrk->mutex) != 0) { - fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); - exit(EXIT_FAILURE); - } + worker_task_t* task = NULL; + if (pthread_mutex_lock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + + if (wrk->task_cnt == -1U) { + goto ret; + } + + if (wrk->task_cnt == wrk->task_ind) { worker_buf[0] = REQUEST_TYPE_GET_TASK; - conn_write(wrk->conn, worker_buf, 1); + *((int32_t*) (worker_buf + 4)) = TASK_PRELOAD; + conn_write(wrk->conn, worker_buf, 8); size_t size; - char* task = conn_read(wrk->conn, &size); - if (pthread_mutex_unlock(&wrk->mutex) != 0) { - fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); - exit(EXIT_FAILURE); + char* tasks = conn_read(wrk->conn, &size); + + if (tasks == NULL) { + wrk->task_cnt = -1U; + goto ret; + } + + wrk->task_cnt = *((int32_t*) tasks); + + if (wrk->task_cnt <= 0U) { + wrk->task_cnt = -1U; + free(tasks); + goto ret; + } + + size_t cur_ind = 4; + // divide tasks + for (size_t i = 0; i < wrk->task_cnt; i++) { + worker_task_t* task = &wrk->tasks[i]; + + task->id = *((int32_t*) (tasks + cur_ind)); + cur_ind += 4; + task->task_size = *((int32_t*) (tasks + cur_ind)); + cur_ind += 4; + + task->task = calloc(task->task_size, sizeof(*task->task)); + memcpy(task->task, tasks + cur_ind, task->task_size); + cur_ind += task->task_size; } - if (size <= 4) { - free(task); + free(tasks); + wrk->task_ind = 0; + } + + if (wrk->task_cnt != -1U) { + task = &wrk->tasks[wrk->task_ind++]; + } + +ret: + if (pthread_mutex_unlock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); + exit(EXIT_FAILURE); + } + + if (task == NULL) { + return (worker_task_t) {.id = -1}; + } + + return *task; +} + +void* worker_proc_thread(void* args) { + args = (void*) args; + char worker_buf[1024 * 1024]; + + while (1) { + worker_task_t task = load_task(); + if (task.id == -1) { break; } char* resp; size_t resp_size; - wrk->func(task + 16, size - 16, &resp, &resp_size); - if (pthread_mutex_lock(&wrk->mutex) != 0) { - fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); - exit(EXIT_FAILURE); - } - - memcpy(worker_buf + 4, task, 4); - free(task); + printf("%lx %lx %d\n", pthread_self(), (uintptr_t) task.task, task.id); + wrk->func(task.task, task.task_size, &resp, &resp_size); + printf("freed %lx %lx\n", pthread_self(), (uintptr_t) task.task); + free(task.task); worker_buf[0] = REQUEST_TYPE_PUT_RESULT; + memcpy(worker_buf + 4, &task.id, 4); memcpy(worker_buf + 16, resp, resp_size); - conn_write(wrk->conn, worker_buf, resp_size + 16); free(resp); - free(conn_read(wrk->conn, &size)); + if (pthread_mutex_lock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + conn_write(wrk->conn, worker_buf, resp_size + 16); if (pthread_mutex_unlock(&wrk->mutex) != 0) { fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); exit(EXIT_FAILURE); @@ -100,6 +174,7 @@ void worker_exec() { fprintf(stderr, "[worker_exec] Unable to start thread\n"); exit(EXIT_FAILURE); } + printf("%lx thread id %d\n", proc->tid, i + 1); } for (int i = 0; i < wrk->num_procs; i++) { @@ -49,6 +49,7 @@ long double calculate_integral(func_t f, long double a, long double b, long doub x += 2 * delta; } + usleep(20000); // it's a very long task... long double prev_sum = (sum_by_1 + 2 * sum_by_2 + 4 * sum_by_4) * delta / 3; long double sum = prev_sum; @@ -74,7 +75,7 @@ long double calculate_integral(func_t f, long double a, long double b, long doub void worker_func(const char* task, size_t task_size, char** resp, size_t* resp_size) { long double* args = (long double*) task; - task_size = task_size * 2; + (void) task_size; long double res = calculate_integral(func, args[0], args[1], args[2]); printf("task %Lf %Lf -> %Lf\n", args[0], args[1], res); @@ -86,6 +87,6 @@ void worker_func(const char* task, size_t task_size, char** resp, size_t* resp_s int32_t main() { worker_init("127.0.0.1", "33554", 1, worker_func); - configure_timeout(10); + configure_timeout(100); worker_exec(); } |