summaryrefslogtreecommitdiffstats
path: root/lib/worker.c
diff options
context:
space:
mode:
authormrfoxygmfr <mrfoxygmfr@sch9.ru>2025-05-21 02:35:58 +0300
committermrfoxygmfr <mrfoxygmfr@sch9.ru>2025-05-21 02:35:58 +0300
commited729f994f55fe2bdbfa824689afab82d3ac87c7 (patch)
treed593ce59222b1aa3113a3cb1f8fef3d7466d8553 /lib/worker.c
parentb7fa22dfeb60e66a6ba5e0a6a554d0f056e09724 (diff)
feat(lib): implement controller and worker in library
Diffstat (limited to 'lib/worker.c')
-rw-r--r--lib/worker.c112
1 files changed, 112 insertions, 0 deletions
diff --git a/lib/worker.c b/lib/worker.c
new file mode 100644
index 0000000..fa399f0
--- /dev/null
+++ b/lib/worker.c
@@ -0,0 +1,112 @@
+#include "./worker.h"
+#include "./net/client.h"
+#include <pthread.h>
+
+typedef struct {
+ pthread_t tid;
+} worker_proc_t;
+
+typedef struct {
+ conn_t* conn;
+ pthread_mutex_t mutex;
+
+ int num_procs;
+ worker_proc_t* procs;
+
+ worker_func_t func;
+} worker_t;
+
+worker_t* wrk = NULL;
+
+void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker_func_t func) {
+ if (wrk != NULL) {
+ fprintf(stderr, "[worker_init] Worker has already been initialized\n");
+ exit(EXIT_FAILURE);
+ }
+
+ wrk = calloc(1, sizeof(*wrk));
+ wrk->conn = client_connect_to_server(srv_addr, srv_port);
+ if (wrk->conn == NULL) {
+ fprintf(stderr, "[worker_init] Failed to connect to the controller\n");
+ exit(EXIT_FAILURE);
+ }
+
+ wrk->procs = calloc(num_cpu, sizeof(*wrk->procs));
+ wrk->func = func;
+ wrk->num_procs = num_cpu;
+
+ pthread_mutex_init(&wrk->mutex, NULL);
+}
+
+char worker_buf[1024 * 1024];
+
+void* worker_proc_thread(void* args) {
+ args = (void*) args;
+
+ while (1) {
+ if (pthread_mutex_lock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ worker_buf[0] = REQUEST_TYPE_GET_TASK;
+ conn_write(wrk->conn, worker_buf, 1);
+
+ size_t size;
+ char* task = conn_read(wrk->conn, &size);
+ if (pthread_mutex_unlock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (size <= 4) {
+ free(task);
+ break;
+ }
+
+ char* resp;
+ size_t resp_size;
+
+ wrk->func(task + 4, size - 4, &resp, &resp_size);
+ memcpy(worker_buf + 1, task, 4);
+ free(task);
+
+ if (pthread_mutex_lock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ worker_buf[0] = REQUEST_TYPE_PUT_RESULT;
+ memcpy(worker_buf + 5, resp, resp_size);
+ conn_write(wrk->conn, worker_buf, resp_size + 5);
+ free(resp);
+
+ free(conn_read(wrk->conn, &size));
+ if (pthread_mutex_unlock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n");
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ return NULL;
+}
+
+void worker_exec() {
+ for (int i = 0; i < wrk->num_procs; i++) {
+ worker_proc_t* proc = &wrk->procs[i];
+
+ int ret = pthread_create(&proc->tid, NULL, worker_proc_thread, proc);
+ if (ret != 0) {
+ fprintf(stderr, "[worker_exec] Unable to start thread\n");
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ for (int i = 0; i < wrk->num_procs; i++) {
+ int ret = pthread_join(wrk->procs[i].tid, NULL);
+ if (ret != 0) {
+ fprintf(stderr, "[worker_exec] Unable to join thread\n");
+ exit(EXIT_FAILURE);
+ }
+ }
+}