diff --git a/Makefile b/Makefile index a26b858ef7..075c1b8d54 100644 --- a/Makefile +++ b/Makefile @@ -427,6 +427,7 @@ cwaku_example: | build libwaku ./examples/cbindings/base64.c \ -lwaku -Lbuild/ \ -pthread -ldl -lm \ + -lnegentropy -Lvendor/negentropy/cpp/ \ -lminiupnpc -Lvendor/nim-nat-traversal/vendor/miniupnp/miniupnpc/build/ \ -lnatpmp -Lvendor/nim-nat-traversal/vendor/libnatpmp-upstream/ \ vendor/nim-libbacktrace/libbacktrace_wrapper.o \ @@ -439,6 +440,7 @@ cppwaku_example: | build libwaku ./examples/cpp/base64.cpp \ -lwaku -Lbuild/ \ -pthread -ldl -lm \ + -lnegentropy -Lvendor/negentropy/cpp/ \ -lminiupnpc -Lvendor/nim-nat-traversal/vendor/miniupnp/miniupnpc/build/ \ -lnatpmp -Lvendor/nim-nat-traversal/vendor/libnatpmp-upstream/ \ vendor/nim-libbacktrace/libbacktrace_wrapper.o \ diff --git a/library/callback.nim b/library/callback.nim index 9b5ea09186..395e8a1d2f 100644 --- a/library/callback.nim +++ b/library/callback.nim @@ -1,3 +1,11 @@ +import ./waku_thread/waku_thread + type WakuCallBack* = proc( callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} + +template checkLibwakuParams*(ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer) = + ctx[].userData = userData + + if isNil(callback): + return RET_MISSING_CALLBACK diff --git a/library/libwaku.nim b/library/libwaku.nim index 4fce6509fb..ce1d16abd7 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -5,7 +5,7 @@ when defined(linux): {.passl: "-Wl,-soname,libwaku.so".} -import std/[json, sequtils, atomics, times, strformat, options, atomics, strutils, os] +import std/[json, sequtils, atomics, strformat, options, atomics] import chronicles, chronos import waku/common/base64, @@ -52,7 +52,7 @@ template foreignThreadGc(body: untyped) = when declared(tearDownForeignThreadGc): tearDownForeignThreadGc() -proc relayEventCallback(ctx: ptr Context): WakuRelayHandler = +proc relayEventCallback(ctx: ptr WakuContext): WakuRelayHandler = return proc( pubsubTopic: PubsubTopic, msg: WakuMessage ): Future[system.void] {.async.} = @@ -144,10 +144,9 @@ proc waku_new( return ctx proc waku_destroy( - ctx: ptr Context, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = - if isNil(callback): - return RET_MISSING_CALLBACK + checkLibwakuParams(ctx, callback, userData) waku_thread.stopWakuThread(ctx).isOkOr: foreignThreadGc: @@ -158,12 +157,9 @@ proc waku_destroy( return RET_OK proc waku_version( - ctx: ptr Context, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = - ctx[].userData = userData - - if isNil(callback): - return RET_MISSING_CALLBACK + checkLibwakuParams(ctx, callback, userData) foreignThreadGc: callback( @@ -176,13 +172,13 @@ proc waku_version( return RET_OK proc waku_set_event_callback( - ctx: ptr Context, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ) {.dynlib, exportc.} = ctx[].eventCallback = cast[pointer](callback) ctx[].eventUserData = userData proc waku_content_topic( - ctx: ptr Context, + ctx: ptr WakuContext, appName: cstring, appVersion: cuint, contentTopicName: cstring, @@ -192,10 +188,7 @@ proc waku_content_topic( ): cint {.dynlib, exportc.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding - ctx[].userData = userData - - if isNil(callback): - return RET_MISSING_CALLBACK + checkLibwakuParams(ctx, callback, userData) let appStr = appName.alloc() let ctnStr = contentTopicName.alloc() @@ -213,14 +206,11 @@ proc waku_content_topic( return RET_OK proc waku_pubsub_topic( - ctx: ptr Context, topicName: cstring, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, topicName: cstring, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc, cdecl.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding - ctx[].userData = userData - - if isNil(callback): - return RET_MISSING_CALLBACK + checkLibwakuParams(ctx, callback, userData) let topicNameStr = topicName.alloc() @@ -234,14 +224,11 @@ proc waku_pubsub_topic( return RET_OK proc waku_default_pubsub_topic( - ctx: ptr Context, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic - ctx[].userData = userData - - if isNil(callback): - return RET_MISSING_CALLBACK + checkLibwakuParams(ctx, callback, userData) callback( RET_OK, @@ -253,7 +240,7 @@ proc waku_default_pubsub_topic( return RET_OK proc waku_relay_publish( - ctx: ptr Context, + ctx: ptr WakuContext, pubSubTopic: cstring, jsonWakuMessage: cstring, timeoutMs: cuint, @@ -262,10 +249,7 @@ proc waku_relay_publish( ): cint {.dynlib, exportc, cdecl.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms - ctx[].userData = userData - - if isNil(callback): - return RET_MISSING_CALLBACK + checkLibwakuParams(ctx, callback, userData) let jwm = jsonWakuMessage.alloc() var jsonMessage: JsonMessage @@ -315,9 +299,9 @@ proc waku_relay_publish( return RET_OK proc waku_start( - ctx: ptr Context, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) ## TODO: handle the error discard waku_thread.sendRequestToWakuThread( ctx, @@ -326,9 +310,10 @@ proc waku_start( ) proc waku_stop( - ctx: ptr Context, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) + ## TODO: handle the error discard waku_thread.sendRequestToWakuThread( ctx, @@ -337,9 +322,12 @@ proc waku_stop( ) proc waku_relay_subscribe( - ctx: ptr Context, pubSubTopic: cstring, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, + pubSubTopic: cstring, + callback: WakuCallBack, + userData: pointer, ): cint {.dynlib, exportc.} = - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) let pst = pubSubTopic.alloc() var cb = relayEventCallback(ctx) @@ -360,9 +348,12 @@ proc waku_relay_subscribe( return RET_OK proc waku_relay_unsubscribe( - ctx: ptr Context, pubSubTopic: cstring, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, + pubSubTopic: cstring, + callback: WakuCallBack, + userData: pointer, ): cint {.dynlib, exportc.} = - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) let pst = pubSubTopic.alloc() @@ -385,9 +376,12 @@ proc waku_relay_unsubscribe( return RET_OK proc waku_relay_get_num_connected_peers( - ctx: ptr Context, pubSubTopic: cstring, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, + pubSubTopic: cstring, + callback: WakuCallBack, + userData: pointer, ): cint {.dynlib, exportc.} = - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) let pst = pubSubTopic.alloc() defer: @@ -414,9 +408,12 @@ proc waku_relay_get_num_connected_peers( return RET_OK proc waku_relay_get_num_peers_in_mesh( - ctx: ptr Context, pubSubTopic: cstring, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, + pubSubTopic: cstring, + callback: WakuCallBack, + userData: pointer, ): cint {.dynlib, exportc.} = - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) let pst = pubSubTopic.alloc() defer: @@ -443,16 +440,13 @@ proc waku_relay_get_num_peers_in_mesh( return RET_OK proc waku_lightpush_publish( - ctx: ptr Context, + ctx: ptr WakuContext, pubSubTopic: cstring, jsonWakuMessage: cstring, callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc, cdecl.} = - ctx[].userData = userData - - if isNil(callback): - return RET_MISSING_CALLBACK + checkLibwakuParams(ctx, callback, userData) let jwm = jsonWakuMessage.alloc() let pst = pubSubTopic.alloc() @@ -498,13 +492,13 @@ proc waku_lightpush_publish( return RET_OK proc waku_connect( - ctx: ptr Context, + ctx: ptr WakuContext, peerMultiAddr: cstring, timeoutMs: cuint, callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) let connRes = waku_thread.sendRequestToWakuThread( ctx, @@ -521,22 +515,19 @@ proc waku_connect( return RET_OK proc waku_store_query( - ctx: ptr Context, + ctx: ptr WakuContext, jsonQuery: cstring, peerAddr: cstring, timeoutMs: cint, callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = - ctx[].userData = userData - - if isNil(callback): - return RET_MISSING_CALLBACK + checkLibwakuParams(ctx, callback, userData) let sendReqRes = waku_thread.sendRequestToWakuThread( ctx, RequestType.STORE, - JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs, callback), + JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs), ) if sendReqRes.isErr(): @@ -549,9 +540,9 @@ proc waku_store_query( return RET_OK proc waku_listen_addresses( - ctx: ptr Context, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) let connRes = waku_thread.sendRequestToWakuThread( ctx, @@ -568,14 +559,14 @@ proc waku_listen_addresses( return RET_OK proc waku_dns_discovery( - ctx: ptr Context, + ctx: ptr WakuContext, entTreeUrl: cstring, nameDnsServer: cstring, timeoutMs: cint, callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) let bootstrapPeers = waku_thread.sendRequestToWakuThread( ctx, @@ -593,11 +584,11 @@ proc waku_dns_discovery( return RET_OK proc waku_discv5_update_bootnodes( - ctx: ptr Context, bootnodes: cstring, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, bootnodes: cstring, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = ## Updates the bootnode list used for discovering new peers via DiscoveryV5 ## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]` - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) let resp = waku_thread.sendRequestToWakuThread( ctx, @@ -615,9 +606,9 @@ proc waku_discv5_update_bootnodes( return RET_OK proc waku_get_my_enr( - ctx: ptr Context, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) let connRes = waku_thread.sendRequestToWakuThread( ctx, @@ -634,9 +625,9 @@ proc waku_get_my_enr( return RET_OK proc waku_start_discv5( - ctx: ptr Context, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) let resp = waku_thread.sendRequestToWakuThread( ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StartRequest() @@ -651,9 +642,9 @@ proc waku_start_discv5( return RET_OK proc waku_stop_discv5( - ctx: ptr Context, callback: WakuCallBack, userData: pointer + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = - ctx[].userData = userData + checkLibwakuParams(ctx, callback, userData) let resp = waku_thread.sendRequestToWakuThread( ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StopRequest() diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim index 9a84dacbb4..2f6f8ae98d 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim @@ -3,7 +3,6 @@ import chronos, results import ../../../../../waku/factory/waku, ../../../../alloc, - ../../../../callback, ../../../../../waku/waku_core/peers, ../../../../../waku/waku_core/time, ../../../../../waku/waku_core/message/digest, @@ -18,7 +17,6 @@ type JsonStoreQueryRequest* = object jsonQuery: cstring peerAddr: cstring timeoutMs: cint - storeCallback: WakuCallBack type StoreRequest* = object operation: StoreReqType @@ -99,13 +97,11 @@ proc createShared*( jsonQuery: cstring, peerAddr: cstring, timeoutMs: cint, - storeCallback: WakuCallBack = nil, ): ptr type T = var ret = createShared(T) ret[].timeoutMs = timeoutMs ret[].jsonQuery = jsonQuery.alloc() ret[].peerAddr = peerAddr.alloc() - ret[].storeCallback = storeCallback return ret proc destroyShared(self: ptr JsonStoreQueryRequest) = diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index b14faf6bdb..5967cbfddc 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -9,8 +9,8 @@ import ./inter_thread_communication/waku_thread_request, ./inter_thread_communication/waku_thread_response -type Context* = object - thread: Thread[(ptr Context)] +type WakuContext* = object + thread: Thread[(ptr WakuContext)] reqChannel: ChannelSPSCSingle[ptr InterThreadRequest] reqSignal: ThreadSignalPtr respChannel: ChannelSPSCSingle[ptr InterThreadResponse] @@ -26,7 +26,7 @@ const versionString = "version / git commit hash: " & waku.git_version # TODO: this should be part of the context so multiple instances can be executed var running: Atomic[bool] -proc runWaku(ctx: ptr Context) {.async.} = +proc runWaku(ctx: ptr WakuContext) {.async.} = ## This is the worker body. This runs the Waku node ## and attends library user requests (stop, connect_to, etc.) info "Starting Waku", version = versionString @@ -49,14 +49,14 @@ proc runWaku(ctx: ptr Context) {.async.} = discard ctx.respChannel.trySend(threadSafeResp) discard ctx.respSignal.fireSync() -proc run(ctx: ptr Context) {.thread.} = +proc run(ctx: ptr WakuContext) {.thread.} = ## Launch waku worker waitFor runWaku(ctx) -proc createWakuThread*(): Result[ptr Context, string] = +proc createWakuThread*(): Result[ptr WakuContext, string] = ## This proc is called from the main thread and it creates ## the Waku working thread. - var ctx = createShared(Context, 1) + var ctx = createShared(WakuContext, 1) ctx.reqSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqSignal ThreadSignalPtr") ctx.respSignal = ThreadSignalPtr.new().valueOr: @@ -74,7 +74,7 @@ proc createWakuThread*(): Result[ptr Context, string] = return ok(ctx) -proc stopWakuThread*(ctx: ptr Context): Result[void, string] = +proc stopWakuThread*(ctx: ptr WakuContext): Result[void, string] = running.store(false) let fireRes = ctx.reqSignal.fireSync() if fireRes.isErr(): @@ -86,7 +86,7 @@ proc stopWakuThread*(ctx: ptr Context): Result[void, string] = return ok() proc sendRequestToWakuThread*( - ctx: ptr Context, reqType: RequestType, reqContent: pointer + ctx: ptr WakuContext, reqType: RequestType, reqContent: pointer ): Result[string, string] = let req = InterThreadRequest.createShared(reqType, reqContent) ## Sending the request