summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormrfoxygmfr <mrfoxygmfr@sch9.ru>2025-05-22 23:41:29 +0300
committermrfoxygmfr <mrfoxygmfr@sch9.ru>2025-05-22 23:41:29 +0300
commit5b2a6d3eae55ed2b45e6a5f12cd20ac5536e7692 (patch)
treeee0b6264f7dd29fcdd8e565f33282e108246c78a
parent9c5ef0efeb0b6d1ad731958d48f3829f3720c301 (diff)
feat(lib): faster protocol
now worker request 10 tasks in one request
-rw-r--r--lib/controller.c66
-rw-r--r--lib/worker.c125
-rw-r--r--worker.c5
3 files changed, 136 insertions, 60 deletions
diff --git a/lib/controller.c b/lib/controller.c
index b28f0dd..04ed716 100644
--- a/lib/controller.c
+++ b/lib/controller.c
@@ -76,10 +76,9 @@ void controller_finish() {
server_shutdown(cntr->srv);
}
-char buf[1024 * 1024];
-
void* controller_conn_thread(void* args) {
controller_conn_t* conn = (controller_conn_t*) args;
+ char buf[1024 * 1024];
while (1) {
size_t sz;
@@ -89,6 +88,10 @@ void* controller_conn_thread(void* args) {
}
if (data[0] == REQUEST_TYPE_GET_TASK) {
+ int32_t cnt = *((int32_t*) (data + 4));
+ int32_t avail_cnt = 0;
+ size_t* task_ids = calloc(cnt, sizeof(*task_ids));
+
if (pthread_mutex_lock(&cntr->mutex) != 0) {
fprintf(stderr, "[check_task_with_status] Unable to call pthread_mutex_lock\n");
exit(EXIT_FAILURE);
@@ -100,6 +103,10 @@ void* controller_conn_thread(void* args) {
task = &cntr->tasks[i];
task->status = TASK_STATUS_EXECUTING;
task->worker = conn;
+ task_ids[avail_cnt++] = i;
+ }
+
+ if (avail_cnt == cnt) {
break;
}
}
@@ -109,24 +116,23 @@ void* controller_conn_thread(void* args) {
exit(EXIT_FAILURE);
}
- if (task == NULL) {
- free(data);
- conn_write(conn->conn, NULL, 0);
- continue;
- }
+ memcpy(buf, &avail_cnt, sizeof(avail_cnt));
+ size_t cur_ind = 4;
- memcpy(buf, &task->id, sizeof(task->id));
- memcpy(buf + 16, task->task, task->task_size);
+ for (int i = 0; i < avail_cnt; i++) {
+ memcpy(buf + cur_ind, &cntr->tasks[task_ids[i]].id, 4);
+ cur_ind += 4;
+ memcpy(buf + cur_ind, &cntr->tasks[task_ids[i]].task_size, 4);
+ cur_ind += 4;
+ memcpy(buf + cur_ind, cntr->tasks[task_ids[i]].task, cntr->tasks[task_ids[i]].task_size);
+ cur_ind += cntr->tasks[task_ids[i]].task_size;
+ }
- conn_write(conn->conn, buf, 16 + task->task_size);
+ conn_write(conn->conn, buf, cur_ind);
+ free(task_ids);
free(data);
} else if (data[0] == REQUEST_TYPE_PUT_RESULT) {
int id = *((int*) (data + 4));
- if (pthread_mutex_lock(&cntr->mutex) != 0) {
- fprintf(stderr, "[controller_thread] Unable to call pthread_mutex_lock\n");
- exit(EXIT_FAILURE);
- }
-
controller_task_t* task = NULL;
for (size_t i = 0; i < cntr->tasks_len; i++) {
if (cntr->tasks[i].id == id) {
@@ -134,6 +140,7 @@ void* controller_conn_thread(void* args) {
task->status = TASK_STATUS_FINISHED;
task->response = data + 16;
task->response_size = sz - 16;
+ printf("finished %d\n", id);
break;
}
}
@@ -142,18 +149,21 @@ void* controller_conn_thread(void* args) {
fprintf(stderr, "[controller_thread] unknown task id\n");
exit(EXIT_FAILURE);
}
-
- if (pthread_mutex_unlock(&cntr->mutex) != 0) {
- fprintf(stderr, "[controller_thread] Unable to call pthread_mutex_unlock\n");
- exit(EXIT_FAILURE);
- }
-
- conn_write(conn->conn, NULL, 0);
} else {
- conn_close(conn->conn);
+ fprintf(stderr, "[controller_thread] unknown request type %d\n", data[0]);
+ exit(EXIT_FAILURE);
}
}
+ for (size_t i = 0; i < cntr->tasks_len; i++) {
+ if (cntr->tasks[i].worker == conn && cntr->tasks[i].status == TASK_STATUS_EXECUTING) {
+ fprintf(stderr, "[controller_thread] worker finished unexpectedly\n");
+ exit(EXIT_FAILURE);
+ break;
+ }
+ }
+
+ printf("worker finished gracefully\n");
conn_close(conn->conn);
return NULL;
@@ -282,20 +292,10 @@ int controller_get_result(const char** res, size_t* size) {
return ret;
}
- if (pthread_mutex_lock(&cntr->mutex) != 0) {
- fprintf(stderr, "[yield_task] Unable to call pthread_mutex_lock\n");
- exit(EXIT_FAILURE);
- }
-
*res = cntr->tasks[ret].response;
*size = cntr->tasks[ret].response_size;
cntr->tasks[ret].status = TASK_STATUS_RETURNED;
ret = cntr->tasks[ret].id;
- if (pthread_mutex_unlock(&cntr->mutex) != 0) {
- fprintf(stderr, "[yield_task] Unable to call pthread_mutex_unlock\n");
- exit(EXIT_FAILURE);
- }
-
return ret;
}
diff --git a/lib/worker.c b/lib/worker.c
index 9891310..5d99fec 100644
--- a/lib/worker.c
+++ b/lib/worker.c
@@ -1,21 +1,35 @@
#include "./worker.h"
#include "./net/client.h"
#include <pthread.h>
+#include <stdint.h>
+#include <stdlib.h>
typedef struct {
pthread_t tid;
} worker_proc_t;
typedef struct {
+ int id;
+ char* task;
+ size_t task_size;
+} worker_task_t;
+
+typedef struct {
conn_t* conn;
pthread_mutex_t mutex;
+ worker_task_t* tasks;
+ size_t task_ind;
+ size_t task_cnt;
+
int num_procs;
worker_proc_t* procs;
worker_func_t func;
} worker_t;
+const int TASK_PRELOAD = 10;
+
worker_t* wrk = NULL;
void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker_func_t func) {
@@ -31,6 +45,7 @@ void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker
exit(EXIT_FAILURE);
}
+ wrk->tasks = calloc(TASK_PRELOAD, sizeof(*wrk->tasks));
wrk->procs = calloc(num_cpu, sizeof(*wrk->procs));
wrk->func = func;
wrk->num_procs = num_cpu;
@@ -38,50 +53,109 @@ void worker_init(const char* srv_addr, const char* srv_port, int num_cpu, worker
pthread_mutex_init(&wrk->mutex, NULL);
}
-char worker_buf[1024 * 1024];
+worker_task_t load_task() {
+ char worker_buf[128];
-void* worker_proc_thread(void* args) {
- args = (void*) args;
+ if (wrk->task_cnt == -1U) {
+ return (worker_task_t) {.id = -1};
+ }
- while (1) {
- if (pthread_mutex_lock(&wrk->mutex) != 0) {
- fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n");
- exit(EXIT_FAILURE);
- }
+ worker_task_t* task = NULL;
+ if (pthread_mutex_lock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (wrk->task_cnt == -1U) {
+ goto ret;
+ }
+
+ if (wrk->task_cnt == wrk->task_ind) {
worker_buf[0] = REQUEST_TYPE_GET_TASK;
- conn_write(wrk->conn, worker_buf, 1);
+ *((int32_t*) (worker_buf + 4)) = TASK_PRELOAD;
+ conn_write(wrk->conn, worker_buf, 8);
size_t size;
- char* task = conn_read(wrk->conn, &size);
- if (pthread_mutex_unlock(&wrk->mutex) != 0) {
- fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n");
- exit(EXIT_FAILURE);
+ char* tasks = conn_read(wrk->conn, &size);
+
+ if (tasks == NULL) {
+ wrk->task_cnt = -1U;
+ goto ret;
+ }
+
+ wrk->task_cnt = *((int32_t*) tasks);
+
+ if (wrk->task_cnt <= 0U) {
+ wrk->task_cnt = -1U;
+ free(tasks);
+ goto ret;
+ }
+
+ size_t cur_ind = 4;
+ // divide tasks
+ for (size_t i = 0; i < wrk->task_cnt; i++) {
+ worker_task_t* task = &wrk->tasks[i];
+
+ task->id = *((int32_t*) (tasks + cur_ind));
+ cur_ind += 4;
+ task->task_size = *((int32_t*) (tasks + cur_ind));
+ cur_ind += 4;
+
+ task->task = calloc(task->task_size, sizeof(*task->task));
+ memcpy(task->task, tasks + cur_ind, task->task_size);
+ cur_ind += task->task_size;
}
- if (size <= 4) {
- free(task);
+ free(tasks);
+ wrk->task_ind = 0;
+ }
+
+ if (wrk->task_cnt != -1U) {
+ task = &wrk->tasks[wrk->task_ind++];
+ }
+
+ret:
+ if (pthread_mutex_unlock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (task == NULL) {
+ return (worker_task_t) {.id = -1};
+ }
+
+ return *task;
+}
+
+void* worker_proc_thread(void* args) {
+ args = (void*) args;
+ char worker_buf[1024 * 1024];
+
+ while (1) {
+ worker_task_t task = load_task();
+ if (task.id == -1) {
break;
}
char* resp;
size_t resp_size;
- wrk->func(task + 16, size - 16, &resp, &resp_size);
- if (pthread_mutex_lock(&wrk->mutex) != 0) {
- fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n");
- exit(EXIT_FAILURE);
- }
-
- memcpy(worker_buf + 4, task, 4);
- free(task);
+ printf("%lx %lx %d\n", pthread_self(), (uintptr_t) task.task, task.id);
+ wrk->func(task.task, task.task_size, &resp, &resp_size);
+ printf("freed %lx %lx\n", pthread_self(), (uintptr_t) task.task);
+ free(task.task);
worker_buf[0] = REQUEST_TYPE_PUT_RESULT;
+ memcpy(worker_buf + 4, &task.id, 4);
memcpy(worker_buf + 16, resp, resp_size);
- conn_write(wrk->conn, worker_buf, resp_size + 16);
free(resp);
- free(conn_read(wrk->conn, &size));
+ if (pthread_mutex_lock(&wrk->mutex) != 0) {
+ fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_lock\n");
+ exit(EXIT_FAILURE);
+ }
+ conn_write(wrk->conn, worker_buf, resp_size + 16);
if (pthread_mutex_unlock(&wrk->mutex) != 0) {
fprintf(stderr, "[proc_thread] Unable to call pthread_mutex_unlock\n");
exit(EXIT_FAILURE);
@@ -100,6 +174,7 @@ void worker_exec() {
fprintf(stderr, "[worker_exec] Unable to start thread\n");
exit(EXIT_FAILURE);
}
+ printf("%lx thread id %d\n", proc->tid, i + 1);
}
for (int i = 0; i < wrk->num_procs; i++) {
diff --git a/worker.c b/worker.c
index c7c560a..c6b5aeb 100644
--- a/worker.c
+++ b/worker.c
@@ -49,6 +49,7 @@ long double calculate_integral(func_t f, long double a, long double b, long doub
x += 2 * delta;
}
+ usleep(20000); // it's a very long task...
long double prev_sum = (sum_by_1 + 2 * sum_by_2 + 4 * sum_by_4) * delta / 3;
long double sum = prev_sum;
@@ -74,7 +75,7 @@ long double calculate_integral(func_t f, long double a, long double b, long doub
void worker_func(const char* task, size_t task_size, char** resp, size_t* resp_size) {
long double* args = (long double*) task;
- task_size = task_size * 2;
+ (void) task_size;
long double res = calculate_integral(func, args[0], args[1], args[2]);
printf("task %Lf %Lf -> %Lf\n", args[0], args[1], res);
@@ -86,6 +87,6 @@ void worker_func(const char* task, size_t task_size, char** resp, size_t* resp_s
int32_t main() {
worker_init("127.0.0.1", "33554", 1, worker_func);
- configure_timeout(10);
+ configure_timeout(100);
worker_exec();
}