summaryrefslogtreecommitdiffstats
path: root/lib/worker.c
diff options
context:
space:
mode:
authormrfoxygmfr <mrfoxygmfr@sch9.ru>2025-05-22 23:41:29 +0300
committermrfoxygmfr <mrfoxygmfr@sch9.ru>2025-05-22 23:41:29 +0300
commit5b2a6d3eae55ed2b45e6a5f12cd20ac5536e7692 (patch)
treeee0b6264f7dd29fcdd8e565f33282e108246c78a /lib/worker.c
parent9c5ef0efeb0b6d1ad731958d48f3829f3720c301 (diff)
feat(lib): faster protocol
now worker request 10 tasks in one request
Diffstat (limited to 'lib/worker.c')
-rw-r--r--lib/worker.c125
1 files changed, 100 insertions, 25 deletions
diff --git a/lib/worker.c b/lib/worker.c
index 9891310..5d99fec 100644
--- a/lib/worker.c
+++ b/lib/worker.c
@@ -1,21 +1,35 @@
#include "./worker.h"
#include "./net/client.h"
#include <pthread.h>
+#include <stdint.h>
+#include <stdlib.h>
typedef struct {
pthread_t tid;
} worker_proc_t;
typedef struct {
+ int id;
+ char* task;
+ size_t task_size;
+} worker_task_t;
+
+typedef struct {
conn_t* conn;
pthread_mutex_t mutex;
+ worker_task_t* tasks;
+ size_t task_ind;
+ size_t task_cnt;
+
int num_procs;
worker_proc_t* procs;
worker_func_t func;
} worker_t;
+const int TASK_PRELOAD = 10;
+
worker_t* wrk = NULL;
void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker_func_t func) {
@@ -31,6 +45,7 @@ void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker
exit(EXIT_FAILURE);
}
+ wrk->tasks = calloc(TASK_PRELOAD, sizeof(*wrk->tasks));
wrk->procs = calloc(num_cpu, sizeof(*wrk->procs));
wrk->func = func;
wrk->num_procs = num_cpu;
@@ -38,50 +53,109 @@ void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker
pthread_mutex_init(&wrk->mutex, NULL);
}
-char worker_buf[1024 * 1024];
+worker_task_t load_task() {
+ char worker_buf[128];
-void* worker_proc_thread(void* args) {
- args = (void*) args;
+ if (wrk->task_cnt == -1U) {
+ return (worker_task_t) {.id = -1};
+ }
- while (1) {
- if (pthread_mutex_lock(&wrk->mutex) != 0) {
- fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n");
- exit(EXIT_FAILURE);
- }
+ worker_task_t* task = NULL;
+ if (pthread_mutex_lock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (wrk->task_cnt == -1U) {
+ goto ret;
+ }
+
+ if (wrk->task_cnt == wrk->task_ind) {
worker_buf[0] = REQUEST_TYPE_GET_TASK;
- conn_write(wrk->conn, worker_buf, 1);
+ *((int32_t*) (worker_buf + 4)) = TASK_PRELOAD;
+ conn_write(wrk->conn, worker_buf, 8);
size_t size;
- char* task = conn_read(wrk->conn, &size);
- if (pthread_mutex_unlock(&wrk->mutex) != 0) {
- fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n");
- exit(EXIT_FAILURE);
+ char* tasks = conn_read(wrk->conn, &size);
+
+ if (tasks == NULL) {
+ wrk->task_cnt = -1U;
+ goto ret;
+ }
+
+ wrk->task_cnt = *((int32_t*) tasks);
+
+ if (wrk->task_cnt <= 0U) {
+ wrk->task_cnt = -1U;
+ free(tasks);
+ goto ret;
+ }
+
+ size_t cur_ind = 4;
+ // divide tasks
+ for (size_t i = 0; i < wrk->task_cnt; i++) {
+ worker_task_t* task = &wrk->tasks[i];
+
+ task->id = *((int32_t*) (tasks + cur_ind));
+ cur_ind += 4;
+ task->task_size = *((int32_t*) (tasks + cur_ind));
+ cur_ind += 4;
+
+ task->task = calloc(task->task_size, sizeof(*task->task));
+ memcpy(task->task, tasks + cur_ind, task->task_size);
+ cur_ind += task->task_size;
}
- if (size <= 4) {
- free(task);
+ free(tasks);
+ wrk->task_ind = 0;
+ }
+
+ if (wrk->task_cnt != -1U) {
+ task = &wrk->tasks[wrk->task_ind++];
+ }
+
+ret:
+ if (pthread_mutex_unlock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (task == NULL) {
+ return (worker_task_t) {.id = -1};
+ }
+
+ return *task;
+}
+
+void* worker_proc_thread(void* args) {
+ args = (void*) args;
+ char worker_buf[1024 * 1024];
+
+ while (1) {
+ worker_task_t task = load_task();
+ if (task.id == -1) {
break;
}
char* resp;
size_t resp_size;
- wrk->func(task + 16, size - 16, &resp, &resp_size);
- if (pthread_mutex_lock(&wrk->mutex) != 0) {
- fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n");
- exit(EXIT_FAILURE);
- }
-
- memcpy(worker_buf + 4, task, 4);
- free(task);
+ printf("%lx %lx %d\n", pthread_self(), (uintptr_t) task.task, task.id);
+ wrk->func(task.task, task.task_size, &resp, &resp_size);
+ printf("freed %lx %lx\n", pthread_self(), (uintptr_t) task.task);
+ free(task.task);
worker_buf[0] = REQUEST_TYPE_PUT_RESULT;
+ memcpy(worker_buf + 4, &task.id, 4);
memcpy(worker_buf + 16, resp, resp_size);
- conn_write(wrk->conn, worker_buf, resp_size + 16);
free(resp);
- free(conn_read(wrk->conn, &size));
+ if (pthread_mutex_lock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+ conn_write(wrk->conn, worker_buf, resp_size + 16);
if (pthread_mutex_unlock(&wrk->mutex) != 0) {
fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n");
exit(EXIT_FAILURE);
@@ -100,6 +174,7 @@ void worker_exec() {
fprintf(stderr, "[worker_exec] Unable to start thread\n");
exit(EXIT_FAILURE);
}
+ printf("%lx thread id %d\n", proc->tid, i + 1);
}
for (int i = 0; i < wrk->num_procs; i++) {