diff options
author | mrfoxygmfr <mrfoxygmfr@sch9.ru> | 2025-05-21 02:35:58 +0300 |
---|---|---|
committer | mrfoxygmfr <mrfoxygmfr@sch9.ru> | 2025-05-21 02:35:58 +0300 |
commit | ed729f994f55fe2bdbfa824689afab82d3ac87c7 (patch) | |
tree | d593ce59222b1aa3113a3cb1f8fef3d7466d8553 | |
parent | b7fa22dfeb60e66a6ba5e0a6a554d0f056e09724 (diff) |
feat(lib): implement controller and worker in library
-rw-r--r-- | lib/common.h | 6 | ||||
-rw-r--r-- | lib/controller.c | 304 | ||||
-rw-r--r-- | lib/controller.h | 15 | ||||
-rw-r--r-- | lib/worker.c | 112 | ||||
-rw-r--r-- | lib/worker.h | 11 |
5 files changed, 447 insertions, 1 deletions
diff --git a/lib/common.h b/lib/common.h index 2ffbc9b..ab6c05d 100644 --- a/lib/common.h +++ b/lib/common.h @@ -9,7 +9,11 @@ #include <unistd.h> #include <string.h> #include <errno.h> +#include <time.h> -#define COMMON_CONST 1 +enum { + REQUEST_TYPE_GET_TASK, + REQUEST_TYPE_PUT_RESULT, +}; #endif // COMMON_H diff --git a/lib/controller.c b/lib/controller.c new file mode 100644 index 0000000..ce850c5 --- /dev/null +++ b/lib/controller.c @@ -0,0 +1,304 @@ +#include "./controller.h" +#include "./net/server.h" +#include <pthread.h> + +typedef struct { + conn_t* conn; + + pthread_t tid; + + int procs; +} controller_conn_t; + +enum { + TASK_STATUS_QUEUED, + TASK_STATUS_EXECUTING, + TASK_STATUS_FINISHED, + TASK_STATUS_RETURNED, +}; + +typedef struct { + int id; + int status; + size_t task_size; + const char* task; + + controller_conn_t* worker; + const char* response; + size_t response_size; +} controller_task_t; + +typedef struct { + server_t* srv; + pthread_mutex_t mutex; + + pthread_t tid; + int awaited; + + size_t min_conn; + size_t max_conn; + size_t active_conn; + controller_conn_t* connections; + + controller_task_t* tasks; + size_t tasks_len; + size_t tasks_cap; +} controller_t; + +controller_t* cntr = NULL; + +void controller_init(const char* srv_addr, const char* srv_port, int min_conn, int max_conn) { + if (cntr != NULL) { + fprintf(stderr, "[controller_init] Controller has already been initialized\n"); + exit(EXIT_FAILURE); + } + + cntr = calloc(1, sizeof(*cntr)); + cntr->min_conn = min_conn; + cntr->max_conn = max_conn; + cntr->connections = calloc(max_conn, sizeof(*cntr->connections)); + + pthread_mutex_init(&cntr->mutex, NULL); + + cntr->srv = server_init_tcp(srv_addr, srv_port); +} + +void controller_finish() { + if (cntr == NULL) { + fprintf(stderr, "[controller_init] Controller hasn't been initialized\n"); + exit(EXIT_FAILURE); + } + + pthread_mutex_destroy(&cntr->mutex); + + // TODO check all active connections + + server_shutdown(cntr->srv); +} + +char buf[1024 * 1024]; + +void* controller_conn_thread(void* args) { + controller_conn_t* conn = (controller_conn_t*) args; + + while (1) { + size_t sz; + char* data = conn_read(conn->conn, &sz); + if (sz == 0) { + break; + } + + if (data[0] == REQUEST_TYPE_GET_TASK) { + if (pthread_mutex_lock(&cntr->mutex) != 0) { + fprintf(stderr, "[check_task_with_status] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + + controller_task_t* task = NULL; + for (size_t i = 0; i < cntr->tasks_len; i++) { + if (cntr->tasks[i].status == TASK_STATUS_QUEUED) { + task = &cntr->tasks[i]; + task->status = TASK_STATUS_EXECUTING; + task->worker = conn; + break; + } + } + + if (pthread_mutex_unlock(&cntr->mutex) != 0) { + fprintf(stderr, "[check_task_with_status] Unable to call pthread_mutex_unlock\n"); + exit(EXIT_FAILURE); + } + + if (task == NULL) { + free(data); + conn_write(conn->conn, NULL, 0); + continue; + } + + memcpy(buf, &task->id, sizeof(task->id)); + memcpy(buf + sizeof(task->id), task->task, task->task_size); + + conn_write(conn->conn, buf, sizeof(task->id) + task->task_size); + } else if (data[0] == REQUEST_TYPE_PUT_RESULT) { + int id = *((int*) (data + 1)); + if (pthread_mutex_lock(&cntr->mutex) != 0) { + fprintf(stderr, "[controller_thread] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + + controller_task_t* task = NULL; + for (size_t i = 0; i < cntr->tasks_len; i++) { + if (cntr->tasks[i].id == id) { + task = &cntr->tasks[i]; + task->status = TASK_STATUS_FINISHED; + task->response = data + 5; + task->response_size = sz - 5; + printf("finished\n"); + break; + } + } + + if (task == NULL) { + fprintf(stderr, "[controller_thread] unknown task id\n"); + exit(EXIT_FAILURE); + } + + if (pthread_mutex_unlock(&cntr->mutex) != 0) { + fprintf(stderr, "[controller_thread] Unable to call pthread_mutex_unlock\n"); + exit(EXIT_FAILURE); + } + + conn_write(conn->conn, NULL, 0); + } else { + conn_close(conn->conn); + } + + free(data); + } + + conn_close(conn->conn); + + return NULL; +} + +int controller_has_task_with_status(int status) { + if (pthread_mutex_lock(&cntr->mutex) != 0) { + fprintf(stderr, "[check_task_with_status] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + + int ret = -1; + for (size_t i = 0; i < cntr->tasks_len; i++) { + if (cntr->tasks[i].status == status) { + ret = i; + break; + } + } + + if (pthread_mutex_unlock(&cntr->mutex) != 0) { + fprintf(stderr, "[check_task_with_status] Unable to call pthread_mutex_unlock\n"); + exit(EXIT_FAILURE); + } + + return ret; +} + +const int SLEEP_INT = 1000; + +void* controller_loop(void* args) { + args = (void*) args; + while (1) { + if (cntr->awaited && (controller_has_task_with_status(TASK_STATUS_QUEUED) == -1)) { + break; + } + + controller_conn_t* conn = NULL; + for (size_t i = 0; i < cntr->max_conn; i++) { + if (cntr->connections[i].conn == NULL) { + conn = &cntr->connections[i]; + } + } + + if (conn == NULL) { + usleep(SLEEP_INT); + continue; + } + + conn_t* new_conn = server_try_accept(cntr->srv); + if (new_conn == NULL) { + usleep(SLEEP_INT); + continue; + } + + conn->conn = new_conn; + + int ret = pthread_create(&conn->tid, NULL, controller_conn_thread, conn); + if (ret != 0) { + fprintf(stderr, "[controller_start] Unable to start connection thread\n"); + exit(EXIT_FAILURE); + } + } + + for (size_t i = 0; i < cntr->max_conn; i++) { + if (cntr->connections[i].conn != NULL) { + int ret = pthread_join(cntr->connections[i].tid, NULL); + if (ret != 0) { + fprintf(stderr, "[controller_start] Unable to join connection thread\n"); + exit(EXIT_FAILURE); + } + } + } + + return NULL; +} + +void controller_start() { + int ret = pthread_create(&cntr->tid, NULL, controller_loop, NULL); + if (ret != 0) { + fprintf(stderr, "[controller_start] Unable to start controller in second thread\n"); + exit(EXIT_FAILURE); + } +} + +void controller_wait() { + cntr->awaited = 1; + + int ret = pthread_join(cntr->tid, NULL); + if (ret != 0) { + fprintf(stderr, "[controller_wait] Unable to join controller thread\n"); + exit(EXIT_FAILURE); + } +} + +int controller_yield_task(const char* data, size_t size) { + if (pthread_mutex_lock(&cntr->mutex) != 0) { + fprintf(stderr, "[yield_task] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + + if (cntr->tasks_cap == cntr->tasks_len) { + cntr->tasks = realloc(cntr->tasks, sizeof(*cntr->tasks) * (cntr->tasks_cap == 0 ? 1 : cntr->tasks_cap) * 2); + cntr->tasks_cap = (cntr->tasks_cap == 0 ? 1 : cntr->tasks_cap) * 2; + } + + controller_task_t* task = &cntr->tasks[cntr->tasks_len]; + + task->id = cntr->tasks_len + 1; + task->task = data; + task->task_size = size; + task->status = TASK_STATUS_QUEUED; + cntr->tasks_len++; + + if (pthread_mutex_unlock(&cntr->mutex) != 0) { + fprintf(stderr, "[yield_task] Unable to call pthread_mutex_unlock\n"); + exit(EXIT_FAILURE); + } + + return task->id; +} + +int controller_get_result(const char** res, size_t* size) { + int ret = controller_has_task_with_status(TASK_STATUS_FINISHED); + if (ret == -1) { + *res = NULL; + *size = 0; + return ret; + } + + if (pthread_mutex_lock(&cntr->mutex) != 0) { + fprintf(stderr, "[yield_task] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + + *res = cntr->tasks[ret].response; + *size = cntr->tasks[ret].response_size; + cntr->tasks[ret].status = TASK_STATUS_RETURNED; + ret = cntr->tasks[ret].id; + + if (pthread_mutex_unlock(&cntr->mutex) != 0) { + fprintf(stderr, "[yield_task] Unable to call pthread_mutex_unlock\n"); + exit(EXIT_FAILURE); + } + + return ret; +} diff --git a/lib/controller.h b/lib/controller.h new file mode 100644 index 0000000..8ad7756 --- /dev/null +++ b/lib/controller.h @@ -0,0 +1,15 @@ +#ifndef CONTROLLER_H +#define CONTROLLER_H + +#include "./common.h" + +void controller_init(const char* srv_addr, const char* srv_port, int min_conn, int max_conn); +void controller_finish(); + +void controller_start(); +void controller_wait(); + +int controller_yield_task(const char* data, size_t size); +int controller_get_result(const char** res, size_t* size); + +#endif // CONTROLLER_H diff --git a/lib/worker.c b/lib/worker.c new file mode 100644 index 0000000..fa399f0 --- /dev/null +++ b/lib/worker.c @@ -0,0 +1,112 @@ +#include "./worker.h" +#include "./net/client.h" +#include <pthread.h> + +typedef struct { + pthread_t tid; +} worker_proc_t; + +typedef struct { + conn_t* conn; + pthread_mutex_t mutex; + + int num_procs; + worker_proc_t* procs; + + worker_func_t func; +} worker_t; + +worker_t* wrk = NULL; + +void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker_func_t func) { + if (wrk != NULL) { + fprintf(stderr, "[worker_init] Worker has already been initialized\n"); + exit(EXIT_FAILURE); + } + + wrk = calloc(1, sizeof(*wrk)); + wrk->conn = client_connect_to_server(srv_addr, srv_port); + if (wrk->conn == NULL) { + fprintf(stderr, "[worker_init] Failed to connect to the controller\n"); + exit(EXIT_FAILURE); + } + + wrk->procs = calloc(num_cpu, sizeof(*wrk->procs)); + wrk->func = func; + wrk->num_procs = num_cpu; + + pthread_mutex_init(&wrk->mutex, NULL); +} + +char worker_buf[1024 * 1024]; + +void* worker_proc_thread(void* args) { + args = (void*) args; + + while (1) { + if (pthread_mutex_lock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + + worker_buf[0] = REQUEST_TYPE_GET_TASK; + conn_write(wrk->conn, worker_buf, 1); + + size_t size; + char* task = conn_read(wrk->conn, &size); + if (pthread_mutex_unlock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); + exit(EXIT_FAILURE); + } + + if (size <= 4) { + free(task); + break; + } + + char* resp; + size_t resp_size; + + wrk->func(task + 4, size - 4, &resp, &resp_size); + memcpy(worker_buf + 1, task, 4); + free(task); + + if (pthread_mutex_lock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n"); + exit(EXIT_FAILURE); + } + + worker_buf[0] = REQUEST_TYPE_PUT_RESULT; + memcpy(worker_buf + 5, resp, resp_size); + conn_write(wrk->conn, worker_buf, resp_size + 5); + free(resp); + + free(conn_read(wrk->conn, &size)); + if (pthread_mutex_unlock(&wrk->mutex) != 0) { + fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n"); + exit(EXIT_FAILURE); + } + } + + return NULL; +} + +void worker_exec() { + for (int i = 0; i < wrk->num_procs; i++) { + worker_proc_t* proc = &wrk->procs[i]; + + int ret = pthread_create(&proc->tid, NULL, worker_proc_thread, proc); + if (ret != 0) { + fprintf(stderr, "[worker_exec] Unable to start thread\n"); + exit(EXIT_FAILURE); + } + } + + for (int i = 0; i < wrk->num_procs; i++) { + int ret = pthread_join(wrk->procs[i].tid, NULL); + if (ret != 0) { + fprintf(stderr, "[worker_exec] Unable to join thread\n"); + exit(EXIT_FAILURE); + } + } +} diff --git a/lib/worker.h b/lib/worker.h new file mode 100644 index 0000000..12626a1 --- /dev/null +++ b/lib/worker.h @@ -0,0 +1,11 @@ +#ifndef WORKER_H +#define WORKER_H + +#include "./common.h" + +typedef void (*worker_func_t)(const char* data, size_t size, char** result, size_t* result_size); + +void worker_init(const char* cntr_addr, const char* cntr_port, int num_cpu, worker_func_t func); +void worker_exec(); + +#endif // WORKER_H |