diff options
author | mrfoxygmfr <mrfoxygmfr@sch9.ru> | 2025-05-22 23:41:29 +0300 |
---|---|---|
committer | mrfoxygmfr <mrfoxygmfr@sch9.ru> | 2025-05-22 23:41:29 +0300 |
commit | 5b2a6d3eae55ed2b45e6a5f12cd20ac5536e7692 (patch) | |
tree | ee0b6264f7dd29fcdd8e565f33282e108246c78a /lib/controller.c | |
parent | 9c5ef0efeb0b6d1ad731958d48f3829f3720c301 (diff) |
feat(lib): faster protocol
now worker request 10 tasks in one request
Diffstat (limited to 'lib/controller.c')
-rw-r--r-- | lib/controller.c | 66 |
1 files changed, 33 insertions, 33 deletions
diff --git a/lib/controller.c b/lib/controller.c index b28f0dd..04ed716 100644 --- a/lib/controller.c +++ b/lib/controller.c @@ -76,10 +76,9 @@ void controller_finish() { server_shutdown(cntr->srv); } -char buf[1024 * 1024]; - void* controller_conn_thread(void* args) { controller_conn_t* conn = (controller_conn_t*) args; + char buf[1024 * 1024]; while (1) { size_t sz; @@ -89,6 +88,10 @@ void* controller_conn_thread(void* args) { } if (data[0] == REQUEST_TYPE_GET_TASK) { + int32_t cnt = *((int32_t*) (data + 4)); + int32_t avail_cnt = 0; + size_t* task_ids = calloc(cnt, sizeof(*task_ids)); + if (pthread_mutex_lock(&cntr->mutex) != 0) { fprintf(stderr, "[check_task_with_status] Unable to call pthread_mutex_lock\n"); exit(EXIT_FAILURE); @@ -100,6 +103,10 @@ void* controller_conn_thread(void* args) { task = &cntr->tasks[i]; task->status = TASK_STATUS_EXECUTING; task->worker = conn; + task_ids[avail_cnt++] = i; + } + + if (avail_cnt == cnt) { break; } } @@ -109,24 +116,23 @@ void* controller_conn_thread(void* args) { exit(EXIT_FAILURE); } - if (task == NULL) { - free(data); - conn_write(conn->conn, NULL, 0); - continue; - } + memcpy(buf, &avail_cnt, sizeof(avail_cnt)); + size_t cur_ind = 4; - memcpy(buf, &task->id, sizeof(task->id)); - memcpy(buf + 16, task->task, task->task_size); + for (int i = 0; i < avail_cnt; i++) { + memcpy(buf + cur_ind, &cntr->tasks[task_ids[i]].id, 4); + cur_ind += 4; + memcpy(buf + cur_ind, &cntr->tasks[task_ids[i]].task_size, 4); + cur_ind += 4; + memcpy(buf + cur_ind, cntr->tasks[task_ids[i]].task, cntr->tasks[task_ids[i]].task_size); + cur_ind += cntr->tasks[task_ids[i]].task_size; + } - conn_write(conn->conn, buf, 16 + task->task_size); + conn_write(conn->conn, buf, cur_ind); + free(task_ids); free(data); } else if (data[0] == REQUEST_TYPE_PUT_RESULT) { int id = *((int*) (data + 4)); - if (pthread_mutex_lock(&cntr->mutex) != 0) { - fprintf(stderr, "[controller_thread] Unable to call pthread_mutex_lock\n"); - exit(EXIT_FAILURE); - } - controller_task_t* task = NULL; for (size_t i = 0; i < cntr->tasks_len; i++) { if (cntr->tasks[i].id == id) { @@ -134,6 +140,7 @@ void* controller_conn_thread(void* args) { task->status = TASK_STATUS_FINISHED; task->response = data + 16; task->response_size = sz - 16; + printf("finished %d\n", id); break; } } @@ -142,18 +149,21 @@ void* controller_conn_thread(void* args) { fprintf(stderr, "[controller_thread] unknown task id\n"); exit(EXIT_FAILURE); } - - if (pthread_mutex_unlock(&cntr->mutex) != 0) { - fprintf(stderr, "[controller_thread] Unable to call pthread_mutex_unlock\n"); - exit(EXIT_FAILURE); - } - - conn_write(conn->conn, NULL, 0); } else { - conn_close(conn->conn); + fprintf(stderr, "[controller_thread] unknown request type %d\n", data[0]); + exit(EXIT_FAILURE); } } + for (size_t i = 0; i < cntr->tasks_len; i++) { + if (cntr->tasks[i].worker == conn && cntr->tasks[i].status == TASK_STATUS_EXECUTING) { + fprintf(stderr, "[controller_thread] worker finished unexpectedly\n"); + exit(EXIT_FAILURE); + break; + } + } + + printf("worker finished gracefully\n"); conn_close(conn->conn); return NULL; @@ -282,20 +292,10 @@ int controller_get_result(const char** res, size_t* size) { return ret; } - if (pthread_mutex_lock(&cntr->mutex) != 0) { - fprintf(stderr, "[yield_task] Unable to call pthread_mutex_lock\n"); - exit(EXIT_FAILURE); - } - *res = cntr->tasks[ret].response; *size = cntr->tasks[ret].response_size; cntr->tasks[ret].status = TASK_STATUS_RETURNED; ret = cntr->tasks[ret].id; - if (pthread_mutex_unlock(&cntr->mutex) != 0) { - fprintf(stderr, "[yield_task] Unable to call pthread_mutex_unlock\n"); - exit(EXIT_FAILURE); - } - return ret; } |