Skip to content

Commit 0dff386

Browse files
committed
wip
1 parent 5fe43a8 commit 0dff386

File tree

2 files changed

+190
-89
lines changed

2 files changed

+190
-89
lines changed

src/main.cu

Lines changed: 132 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,18 @@
1818
#include "getopt.h"
1919
#include "log.h"
2020

21+
#define MAX_MESSAGE_SIZE (2048 * 1024)
22+
#define BUFFER_SIZE (MAX_MESSAGE_SIZE * chain_nums)
23+
24+
typedef struct {
25+
uv_mutex_t lock;
26+
uint8_t buffer[BUFFER_SIZE];
27+
ssize_t length;
28+
} message_buffer_t;
29+
30+
message_buffer_t message_buffer;
31+
uv_async_t message_handler;
32+
2133
std::atomic<uint32_t> found_solutions{0};
2234

2335
typedef std::chrono::high_resolution_clock Time;
@@ -205,46 +217,47 @@ void log_hashrate(uv_timer_t *timer)
205217
}
206218
}
207219

208-
uint8_t read_buf[2048 * 1024 * chain_nums];
209-
blob_t read_blob = {read_buf, 0};
210-
server_message_t *decode_buf(const uv_buf_t *buf, ssize_t nread)
211-
{
212-
if (read_blob.len == 0)
213-
{
214-
read_blob.blob = (uint8_t *)buf->base;
215-
read_blob.len = nread;
216-
server_message_t *message = decode_server_message(&read_blob);
217-
if (message)
218-
{
219-
// some bytes left
220-
if (read_blob.len > 0)
221-
{
222-
memcpy(read_buf, read_blob.blob, read_blob.len);
223-
read_blob.blob = read_buf;
224-
}
225-
return message;
226-
}
227-
else
228-
{ // no bytes consumed
229-
memcpy(read_buf, buf->base, nread);
230-
read_blob.blob = read_buf;
231-
read_blob.len = nread;
232-
return NULL;
233-
}
234-
}
235-
else
236-
{
237-
assert(read_blob.blob == read_buf);
238-
memcpy(read_buf + read_blob.len, buf->base, nread);
239-
read_blob.len += nread;
240-
return decode_server_message(&read_blob);
241-
}
242-
}
243-
220+
// uint8_t read_buf[2048 * 1024 * chain_nums];
221+
// blob_t read_blob = {read_buf, 0};
222+
// server_message_t *decode_buf(const uv_buf_t *buf, ssize_t nread)
223+
// {
224+
// if (read_blob.len == 0)
225+
// {
226+
// read_blob.blob = (uint8_t *)buf->base;
227+
// read_blob.len = nread;
228+
// server_message_t *message = decode_server_message(&read_blob);
229+
// if (message)
230+
// {
231+
// // some bytes left
232+
// if (read_blob.len > 0)
233+
// {
234+
// memcpy(read_buf, read_blob.blob, read_blob.len);
235+
// read_blob.blob = read_buf;
236+
// }
237+
// return message;
238+
// }
239+
// else
240+
// { // no bytes consumed
241+
// memcpy(read_buf, buf->base, nread);
242+
// read_blob.blob = read_buf;
243+
// read_blob.len = nread;
244+
// return NULL;
245+
// }
246+
// }
247+
// else
248+
// {
249+
// assert(read_blob.blob == read_buf);
250+
// memcpy(read_buf + read_blob.len, buf->base, nread);
251+
// read_blob.len += nread;
252+
// return decode_server_message(&read_blob);
253+
// }
254+
// }
255+
//
244256
void connect_to_broker();
245257

246258
void try_to_reconnect(uv_timer_t *timer){
247-
read_blob.len = 0;
259+
// read_blob.len = 0;
260+
message_buffer.length = 0;
248261
free(uv_socket);
249262
free(uv_connect);
250263
connect_to_broker();
@@ -267,39 +280,91 @@ void on_read(uv_stream_t *server, ssize_t nread, const uv_buf_t *buf)
267280
return;
268281
}
269282

270-
LOG("=========== job conts %d\n", job_counts);
283+
uv_mutex_lock(&message_buffer.lock);
284+
assert(message_buffer.length + nread <= BUFFER_SIZE);
285+
memcpy(message_buffer.buffer + message_buffer.length, buf->base, nread);
286+
message_buffer.length += nread;
287+
uv_mutex_unlock(&message_buffer.lock);
271288

272-
server_message_t *message = decode_buf(buf, nread);
273-
if (message)
274-
{
275-
switch (message->kind)
276-
{
277-
case JOBS:
278-
job_counts += 1;
279-
for (int i = 0; i < message->jobs->len; i++)
280-
{
281-
update_templates(message->jobs->jobs[i]);
282-
}
283-
start_mining_if_needed();
284-
break;
289+
uv_async_send(&message_handler);
290+
free(buf->base);
291+
}
292+
293+
uint8_t latest_frame[MAX_MESSAGE_SIZE];
285294

286-
case SUBMIT_RESULT:
287-
char *block_hash_hex = bytes_to_hex(message->submit_result->block_hash, 32);
288-
LOG(
289-
"submitted: %d -> %d, %s: %d \n",
290-
message->submit_result->from_group,
291-
message->submit_result->to_group,
292-
block_hash_hex,
293-
message->submit_result->status
294-
);
295-
free(block_hash_hex);
295+
void process_message(uv_async_t* handle) {
296+
bool found_latest_frame = false;
297+
ssize_t latest_frame_offset = 0;
298+
ssize_t latest_frame_len = 0;
299+
ssize_t offset = 0;
300+
301+
uv_mutex_lock(&message_buffer.lock);
302+
uint8_t *buf = message_buffer.buffer;
303+
ssize_t len = message_buffer.length;
304+
305+
while (len - offset >= 4) {
306+
ssize_t message_size = decode_size(buf + offset);
307+
ssize_t total_message_size = 4 + message_size;
308+
if (total_message_size == 47) {
309+
offset += total_message_size;
310+
continue;
311+
}
312+
if (len - offset >= total_message_size) {
313+
latest_frame_offset = offset + 4;
314+
latest_frame_len = message_size;
315+
offset += total_message_size;
316+
found_latest_frame = true;
317+
job_counts += 1;
318+
} else {
296319
break;
297320
}
298-
free_server_message_except_jobs(message);
299321
}
300322

301-
free(buf->base);
302-
// uv_close((uv_handle_t *) server, free_close_cb);
323+
if (found_latest_frame) {
324+
memcpy(latest_frame, buf + latest_frame_offset, latest_frame_len);
325+
}
326+
327+
if (offset > 0) {
328+
if (offset == len) {
329+
message_buffer.length = 0;
330+
} else {
331+
ssize_t remain = len - offset;
332+
memmove(message_buffer.buffer, buf + offset, remain);
333+
message_buffer.length = remain;
334+
}
335+
}
336+
uv_mutex_unlock(&message_buffer.lock);
337+
338+
if (found_latest_frame) {
339+
LOG("=========== job counts %d\n", job_counts);
340+
server_message_t *message = decode_server_message(latest_frame, latest_frame_len);
341+
if (message)
342+
{
343+
switch (message->kind)
344+
{
345+
case JOBS:
346+
for (int i = 0; i < message->jobs->len; i++)
347+
{
348+
update_templates(message->jobs->jobs[i]);
349+
}
350+
start_mining_if_needed();
351+
break;
352+
353+
case SUBMIT_RESULT:
354+
char *block_hash_hex = bytes_to_hex(message->submit_result->block_hash, 32);
355+
LOG(
356+
"submitted: %d -> %d, %s: %d \n",
357+
message->submit_result->from_group,
358+
message->submit_result->to_group,
359+
block_hash_hex,
360+
message->submit_result->status
361+
);
362+
free(block_hash_hex);
363+
break;
364+
}
365+
free_server_message_except_jobs(message);
366+
}
367+
}
303368
}
304369

305370
void on_connect(uv_connect_t *req, int status)
@@ -437,6 +502,9 @@ int main(int argc, char **argv)
437502
setup_gpu_worker_count(gpu_count, gpu_count * parallel_mining_works_per_gpu);
438503

439504
loop = uv_default_loop();
505+
uv_mutex_init(&message_buffer.lock);
506+
message_buffer.length = 0;
507+
uv_async_init(loop, &message_handler, process_message);
440508
uv_timer_init(loop, &reconnect_timer);
441509
connect_to_broker();
442510

src/messages.h

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -260,24 +260,9 @@ void extract_submit_result(uint8_t **bytes, submit_result_t *result)
260260
result->status = extract_bool(bytes);
261261
}
262262

263-
server_message_t *decode_server_message(blob_t *blob)
263+
server_message_t *decode_server_message(uint8_t *bytes, ssize_t len)
264264
{
265-
uint8_t *bytes = blob->blob;
266-
ssize_t len = blob->len;
267-
268-
if (len <= 4) {
269-
return NULL; // not enough bytes for decoding
270-
}
271-
272265
uint8_t *pos = bytes;
273-
ssize_t message_size = extract_size(&pos);
274-
assert(pos == bytes + 4);
275-
276-
ssize_t message_byte_size = message_size + 4;
277-
if (len < message_byte_size) {
278-
return NULL; // not enough bytes for decoding
279-
}
280-
281266
uint8_t version = extract_byte(&pos);
282267
if (version != mining_protocol_version) {
283268
LOG("Invalid protocol version %d, expect %d\n", version, mining_protocol_version);
@@ -305,16 +290,64 @@ server_message_t *decode_server_message(blob_t *blob)
305290
LOGERR("Invalid server message kind\n");
306291
exit(1);
307292
}
308-
309-
assert(pos == (bytes + message_byte_size));
310-
if (message_byte_size < len) {
311-
blob->len = len - message_byte_size;
312-
memmove(blob->blob, pos, blob->len);
313-
} else {
314-
blob->len = 0;
315-
}
316-
317293
return server_message;
318294
}
319295

296+
// server_message_t *decode_server_message(blob_t *blob)
297+
// {
298+
// uint8_t *bytes = blob->blob;
299+
// ssize_t len = blob->len;
300+
//
301+
// if (len <= 4) {
302+
// return NULL; // not enough bytes for decoding
303+
// }
304+
//
305+
// uint8_t *pos = bytes;
306+
// ssize_t message_size = extract_size(&pos);
307+
// assert(pos == bytes + 4);
308+
//
309+
// ssize_t message_byte_size = message_size + 4;
310+
// if (len < message_byte_size) {
311+
// return NULL; // not enough bytes for decoding
312+
// }
313+
//
314+
// uint8_t version = extract_byte(&pos);
315+
// if (version != mining_protocol_version) {
316+
// LOG("Invalid protocol version %d, expect %d\n", version, mining_protocol_version);
317+
// exit(1);
318+
// }
319+
//
320+
// server_message_t *server_message = (server_message_t *)malloc(sizeof(server_message_t));
321+
// switch (extract_byte(&pos))
322+
// {
323+
// case 0:
324+
// server_message->kind = JOBS;
325+
// server_message->jobs = (jobs_t *)malloc(sizeof(jobs_t));
326+
// extract_jobs(&pos, server_message->jobs);
327+
//
328+
// // LOG("%p, %p, %p\n", bytes, pos, bytes + len);
329+
// break;
330+
//
331+
// case 1:
332+
// server_message->kind = SUBMIT_RESULT;
333+
// server_message->submit_result = (submit_result_t *)malloc(sizeof(submit_result_t));
334+
// extract_submit_result(&pos, server_message->submit_result);
335+
// break;
336+
//
337+
// default:
338+
// LOGERR("Invalid server message kind\n");
339+
// exit(1);
340+
// }
341+
//
342+
// assert(pos == (bytes + message_byte_size));
343+
// if (message_byte_size < len) {
344+
// blob->len = len - message_byte_size;
345+
// memmove(blob->blob, pos, blob->len);
346+
// } else {
347+
// blob->len = 0;
348+
// }
349+
//
350+
// return server_message;
351+
// }
352+
320353
#endif // ALEPHIUM_MESSAGE_H

0 commit comments

Comments
 (0)