summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormrfoxygmfr <mrfoxygmfr@sch9.ru>2025-05-21 02:35:58 +0300
committermrfoxygmfr <mrfoxygmfr@sch9.ru>2025-05-21 02:35:58 +0300
commited729f994f55fe2bdbfa824689afab82d3ac87c7 (patch)
treed593ce59222b1aa3113a3cb1f8fef3d7466d8553
parentb7fa22dfeb60e66a6ba5e0a6a554d0f056e09724 (diff)
feat(lib): implement controller and worker in library
-rw-r--r--lib/common.h6
-rw-r--r--lib/controller.c304
-rw-r--r--lib/controller.h15
-rw-r--r--lib/worker.c112
-rw-r--r--lib/worker.h11
5 files changed, 447 insertions, 1 deletions
diff --git a/lib/common.h b/lib/common.h
index 2ffbc9b..ab6c05d 100644
--- a/lib/common.h
+++ b/lib/common.h
@@ -9,7 +9,11 @@
#include <unistd.h>
#include <string.h>
#include <errno.h>
+#include <time.h>
-#define COMMON_CONST 1
+enum {
+ REQUEST_TYPE_GET_TASK,
+ REQUEST_TYPE_PUT_RESULT,
+};
#endif // COMMON_H
diff --git a/lib/controller.c b/lib/controller.c
new file mode 100644
index 0000000..ce850c5
--- /dev/null
+++ b/lib/controller.c
@@ -0,0 +1,304 @@
+#include "./controller.h"
+#include "./net/server.h"
+#include <pthread.h>
+
+typedef struct {
+ conn_t* conn;
+
+ pthread_t tid;
+
+ int procs;
+} controller_conn_t;
+
+enum {
+ TASK_STATUS_QUEUED,
+ TASK_STATUS_EXECUTING,
+ TASK_STATUS_FINISHED,
+ TASK_STATUS_RETURNED,
+};
+
+typedef struct {
+ int id;
+ int status;
+ size_t task_size;
+ const char* task;
+
+ controller_conn_t* worker;
+ const char* response;
+ size_t response_size;
+} controller_task_t;
+
+typedef struct {
+ server_t* srv;
+ pthread_mutex_t mutex;
+
+ pthread_t tid;
+ int awaited;
+
+ size_t min_conn;
+ size_t max_conn;
+ size_t active_conn;
+ controller_conn_t* connections;
+
+ controller_task_t* tasks;
+ size_t tasks_len;
+ size_t tasks_cap;
+} controller_t;
+
+controller_t* cntr = NULL;
+
+void controller_init(const char* srv_addr, const char* srv_port, int min_conn, int max_conn) {
+ if (cntr != NULL) {
+ fprintf(stderr, "[controller_init] Controller has already been initialized\n");
+ exit(EXIT_FAILURE);
+ }
+
+ cntr = calloc(1, sizeof(*cntr));
+ cntr->min_conn = min_conn;
+ cntr->max_conn = max_conn;
+ cntr->connections = calloc(max_conn, sizeof(*cntr->connections));
+
+ pthread_mutex_init(&cntr->mutex, NULL);
+
+ cntr->srv = server_init_tcp(srv_addr, srv_port);
+}
+
+void controller_finish() {
+ if (cntr == NULL) {
+ fprintf(stderr, "[controller_init] Controller hasn't been initialized\n");
+ exit(EXIT_FAILURE);
+ }
+
+ pthread_mutex_destroy(&cntr->mutex);
+
+ // TODO check all active connections
+
+ server_shutdown(cntr->srv);
+}
+
+char buf[1024 * 1024];
+
+void* controller_conn_thread(void* args) {
+ controller_conn_t* conn = (controller_conn_t*) args;
+
+ while (1) {
+ size_t sz;
+ char* data = conn_read(conn->conn, &sz);
+ if (sz == 0) {
+ break;
+ }
+
+ if (data[0] == REQUEST_TYPE_GET_TASK) {
+ if (pthread_mutex_lock(&cntr->mutex) != 0) {
+ fprintf(stderr, "[check_task_with_status] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ controller_task_t* task = NULL;
+ for (size_t i = 0; i < cntr->tasks_len; i++) {
+ if (cntr->tasks[i].status == TASK_STATUS_QUEUED) {
+ task = &cntr->tasks[i];
+ task->status = TASK_STATUS_EXECUTING;
+ task->worker = conn;
+ break;
+ }
+ }
+
+ if (pthread_mutex_unlock(&cntr->mutex) != 0) {
+ fprintf(stderr, "[check_task_with_status] Unable to call pthread_mutex_unlock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (task == NULL) {
+ free(data);
+ conn_write(conn->conn, NULL, 0);
+ continue;
+ }
+
+ memcpy(buf, &task->id, sizeof(task->id));
+ memcpy(buf + sizeof(task->id), task->task, task->task_size);
+
+ conn_write(conn->conn, buf, sizeof(task->id) + task->task_size);
+ } else if (data[0] == REQUEST_TYPE_PUT_RESULT) {
+ int id = *((int*) (data + 1));
+ if (pthread_mutex_lock(&cntr->mutex) != 0) {
+ fprintf(stderr, "[controller_thread] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ controller_task_t* task = NULL;
+ for (size_t i = 0; i < cntr->tasks_len; i++) {
+ if (cntr->tasks[i].id == id) {
+ task = &cntr->tasks[i];
+ task->status = TASK_STATUS_FINISHED;
+ task->response = data + 5;
+ task->response_size = sz - 5;
+ printf("finished\n");
+ break;
+ }
+ }
+
+ if (task == NULL) {
+ fprintf(stderr, "[controller_thread] unknown task id\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (pthread_mutex_unlock(&cntr->mutex) != 0) {
+ fprintf(stderr, "[controller_thread] Unable to call pthread_mutex_unlock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ conn_write(conn->conn, NULL, 0);
+ } else {
+ conn_close(conn->conn);
+ }
+
+ free(data);
+ }
+
+ conn_close(conn->conn);
+
+ return NULL;
+}
+
+int controller_has_task_with_status(int status) {
+ if (pthread_mutex_lock(&cntr->mutex) != 0) {
+ fprintf(stderr, "[check_task_with_status] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ int ret = -1;
+ for (size_t i = 0; i < cntr->tasks_len; i++) {
+ if (cntr->tasks[i].status == status) {
+ ret = i;
+ break;
+ }
+ }
+
+ if (pthread_mutex_unlock(&cntr->mutex) != 0) {
+ fprintf(stderr, "[check_task_with_status] Unable to call pthread_mutex_unlock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ return ret;
+}
+
+const int SLEEP_INT = 1000;
+
+void* controller_loop(void* args) {
+ args = (void*) args;
+ while (1) {
+ if (cntr->awaited && (controller_has_task_with_status(TASK_STATUS_QUEUED) == -1)) {
+ break;
+ }
+
+ controller_conn_t* conn = NULL;
+ for (size_t i = 0; i < cntr->max_conn; i++) {
+ if (cntr->connections[i].conn == NULL) {
+ conn = &cntr->connections[i];
+ }
+ }
+
+ if (conn == NULL) {
+ usleep(SLEEP_INT);
+ continue;
+ }
+
+ conn_t* new_conn = server_try_accept(cntr->srv);
+ if (new_conn == NULL) {
+ usleep(SLEEP_INT);
+ continue;
+ }
+
+ conn->conn = new_conn;
+
+ int ret = pthread_create(&conn->tid, NULL, controller_conn_thread, conn);
+ if (ret != 0) {
+ fprintf(stderr, "[controller_start] Unable to start connection thread\n");
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ for (size_t i = 0; i < cntr->max_conn; i++) {
+ if (cntr->connections[i].conn != NULL) {
+ int ret = pthread_join(cntr->connections[i].tid, NULL);
+ if (ret != 0) {
+ fprintf(stderr, "[controller_start] Unable to join connection thread\n");
+ exit(EXIT_FAILURE);
+ }
+ }
+ }
+
+ return NULL;
+}
+
+void controller_start() {
+ int ret = pthread_create(&cntr->tid, NULL, controller_loop, NULL);
+ if (ret != 0) {
+ fprintf(stderr, "[controller_start] Unable to start controller in second thread\n");
+ exit(EXIT_FAILURE);
+ }
+}
+
+void controller_wait() {
+ cntr->awaited = 1;
+
+ int ret = pthread_join(cntr->tid, NULL);
+ if (ret != 0) {
+ fprintf(stderr, "[controller_wait] Unable to join controller thread\n");
+ exit(EXIT_FAILURE);
+ }
+}
+
+int controller_yield_task(const char* data, size_t size) {
+ if (pthread_mutex_lock(&cntr->mutex) != 0) {
+ fprintf(stderr, "[yield_task] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (cntr->tasks_cap == cntr->tasks_len) {
+ cntr->tasks = realloc(cntr->tasks, sizeof(*cntr->tasks) * (cntr->tasks_cap == 0 ? 1 : cntr->tasks_cap) * 2);
+ cntr->tasks_cap = (cntr->tasks_cap == 0 ? 1 : cntr->tasks_cap) * 2;
+ }
+
+ controller_task_t* task = &cntr->tasks[cntr->tasks_len];
+
+ task->id = cntr->tasks_len + 1;
+ task->task = data;
+ task->task_size = size;
+ task->status = TASK_STATUS_QUEUED;
+ cntr->tasks_len++;
+
+ if (pthread_mutex_unlock(&cntr->mutex) != 0) {
+ fprintf(stderr, "[yield_task] Unable to call pthread_mutex_unlock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ return task->id;
+}
+
+int controller_get_result(const char** res, size_t* size) {
+ int ret = controller_has_task_with_status(TASK_STATUS_FINISHED);
+ if (ret == -1) {
+ *res = NULL;
+ *size = 0;
+ return ret;
+ }
+
+ if (pthread_mutex_lock(&cntr->mutex) != 0) {
+ fprintf(stderr, "[yield_task] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ *res = cntr->tasks[ret].response;
+ *size = cntr->tasks[ret].response_size;
+ cntr->tasks[ret].status = TASK_STATUS_RETURNED;
+ ret = cntr->tasks[ret].id;
+
+ if (pthread_mutex_unlock(&cntr->mutex) != 0) {
+ fprintf(stderr, "[yield_task] Unable to call pthread_mutex_unlock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ return ret;
+}
diff --git a/lib/controller.h b/lib/controller.h
new file mode 100644
index 0000000..8ad7756
--- /dev/null
+++ b/lib/controller.h
@@ -0,0 +1,15 @@
+#ifndef CONTROLLER_H
+#define CONTROLLER_H
+
+#include "./common.h"
+
+void controller_init(const char* srv_addr, const char* srv_port, int min_conn, int max_conn);
+void controller_finish();
+
+void controller_start();
+void controller_wait();
+
+int controller_yield_task(const char* data, size_t size);
+int controller_get_result(const char** res, size_t* size);
+
+#endif // CONTROLLER_H
diff --git a/lib/worker.c b/lib/worker.c
new file mode 100644
index 0000000..fa399f0
--- /dev/null
+++ b/lib/worker.c
@@ -0,0 +1,112 @@
+#include "./worker.h"
+#include "./net/client.h"
+#include <pthread.h>
+
+typedef struct {
+ pthread_t tid;
+} worker_proc_t;
+
+typedef struct {
+ conn_t* conn;
+ pthread_mutex_t mutex;
+
+ int num_procs;
+ worker_proc_t* procs;
+
+ worker_func_t func;
+} worker_t;
+
+worker_t* wrk = NULL;
+
+void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker_func_t func) {
+ if (wrk != NULL) {
+ fprintf(stderr, "[worker_init] Worker has already been initialized\n");
+ exit(EXIT_FAILURE);
+ }
+
+ wrk = calloc(1, sizeof(*wrk));
+ wrk->conn = client_connect_to_server(srv_addr, srv_port);
+ if (wrk->conn == NULL) {
+ fprintf(stderr, "[worker_init] Failed to connect to the controller\n");
+ exit(EXIT_FAILURE);
+ }
+
+ wrk->procs = calloc(num_cpu, sizeof(*wrk->procs));
+ wrk->func = func;
+ wrk->num_procs = num_cpu;
+
+ pthread_mutex_init(&wrk->mutex, NULL);
+}
+
+char worker_buf[1024 * 1024];
+
+void* worker_proc_thread(void* args) {
+ args = (void*) args;
+
+ while (1) {
+ if (pthread_mutex_lock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ worker_buf[0] = REQUEST_TYPE_GET_TASK;
+ conn_write(wrk->conn, worker_buf, 1);
+
+ size_t size;
+ char* task = conn_read(wrk->conn, &size);
+ if (pthread_mutex_unlock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (size <= 4) {
+ free(task);
+ break;
+ }
+
+ char* resp;
+ size_t resp_size;
+
+ wrk->func(task + 4, size - 4, &resp, &resp_size);
+ memcpy(worker_buf + 1, task, 4);
+ free(task);
+
+ if (pthread_mutex_lock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ worker_buf[0] = REQUEST_TYPE_PUT_RESULT;
+ memcpy(worker_buf + 5, resp, resp_size);
+ conn_write(wrk->conn, worker_buf, resp_size + 5);
+ free(resp);
+
+ free(conn_read(wrk->conn, &size));
+ if (pthread_mutex_unlock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n");
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ return NULL;
+}
+
+void worker_exec() {
+ for (int i = 0; i < wrk->num_procs; i++) {
+ worker_proc_t* proc = &wrk->procs[i];
+
+ int ret = pthread_create(&proc->tid, NULL, worker_proc_thread, proc);
+ if (ret != 0) {
+ fprintf(stderr, "[worker_exec] Unable to start thread\n");
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ for (int i = 0; i < wrk->num_procs; i++) {
+ int ret = pthread_join(wrk->procs[i].tid, NULL);
+ if (ret != 0) {
+ fprintf(stderr, "[worker_exec] Unable to join thread\n");
+ exit(EXIT_FAILURE);
+ }
+ }
+}
diff --git a/lib/worker.h b/lib/worker.h
new file mode 100644
index 0000000..12626a1
--- /dev/null
+++ b/lib/worker.h
@@ -0,0 +1,11 @@
+#ifndef WORKER_H
+#define WORKER_H
+
+#include "./common.h"
+
+typedef void (*worker_func_t)(const char* data, size_t size, char** result, size_t* result_size);
+
+void worker_init(const char* cntr_addr, const char* cntr_port, int num_cpu, worker_func_t func);
+void worker_exec();
+
+#endif // WORKER_H