Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: libwaku - exposing discv5 store-client and relay connected and mesh peers #3003

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions library/libwaku.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ int waku_relay_publish(void* ctx,
WakuCallBack callback,
void* userData);

int waku_lightpush_publish(void* ctx,
const char* pubSubTopic,
const char* jsonWakuMessage,
WakuCallBack callback,
void* userData);

int waku_relay_subscribe(void* ctx,
const char* pubSubTopic,
WakuCallBack callback,
Expand All @@ -85,6 +91,23 @@ int waku_relay_unsubscribe(void* ctx,
WakuCallBack callback,
void* userData);

int waku_relay_get_num_connected_peers(void* ctx,
const char* pubSubTopic,
WakuCallBack callback,
void* userData);

int waku_relay_get_num_peers_in_mesh(void* ctx,
const char* pubSubTopic,
WakuCallBack callback,
void* userData);

int waku_store_query(void* ctx,
const char* jsonQuery,
const char* peerAddr,
int timeoutMs,
WakuCallBack callback,
void* userData);

int waku_connect(void* ctx,
const char* peerMultiAddr,
unsigned int timeoutMs,
Expand Down Expand Up @@ -114,6 +137,14 @@ int waku_discv5_update_bootnodes(void* ctx,
WakuCallBack callback,
void* userData);

int waku_start_discv5(void* ctx,
WakuCallBack callback,
void* userData);

int waku_stop_discv5(void* ctx,
WakuCallBack callback,
void* userData);

// Retrieves the ENR information
int waku_get_my_enr(void* ctx,
WakuCallBack callback,
Expand Down
171 changes: 164 additions & 7 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import
./waku_thread/inter_thread_communication/requests/peer_manager_request,
./waku_thread/inter_thread_communication/requests/protocols/relay_request,
./waku_thread/inter_thread_communication/requests/protocols/store_request,
./waku_thread/inter_thread_communication/requests/protocols/lightpush_request,
./waku_thread/inter_thread_communication/requests/debug_node_request,
./waku_thread/inter_thread_communication/requests/discovery_request,
./waku_thread/inter_thread_communication/waku_thread_request,
Expand Down Expand Up @@ -383,6 +384,119 @@ proc waku_relay_unsubscribe(

return RET_OK

proc waku_relay_get_num_connected_peers(
ctx: ptr Context, pubSubTopic: cstring, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
ctx[].userData = userData

let pst = pubSubTopic.alloc()
defer:
deallocShared(pst)

let numConnPeersRes = waku_thread.sendRequestToWakuThread(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, PubsubTopic($pst)),
)

if numConnPeersRes.isErr():
foreignThreadGc:
let msg = "Error in waku_relay_get_num_connected_peers: " & $numConnPeersRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let numConnPeers = numConnPeersRes.get()
foreignThreadGc:
callback(
RET_OK, unsafeAddr numConnPeers[0], cast[csize_t](len(numConnPeers)), userData
)

return RET_OK

proc waku_relay_get_num_peers_in_mesh(
ctx: ptr Context, pubSubTopic: cstring, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
ctx[].userData = userData

let pst = pubSubTopic.alloc()
defer:
deallocShared(pst)

let numPeersInMeshRes = waku_thread.sendRequestToWakuThread(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, PubsubTopic($pst)),
)

if numPeersInMeshRes.isErr():
foreignThreadGc:
let msg = "Error in waku_relay_get_num_peers_in_mesh: " & $numPeersInMeshRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let numPeersInMesh = numPeersInMeshRes.get()
foreignThreadGc:
callback(
RET_OK, unsafeAddr numPeersInMesh[0], cast[csize_t](len(numPeersInMesh)), userData
)

return RET_OK

proc waku_lightpush_publish(
ctx: ptr Context,
pubSubTopic: cstring,
jsonWakuMessage: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc, cdecl.} =
ctx[].userData = userData

if isNil(callback):
return RET_MISSING_CALLBACK
Comment on lines +454 to +455
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed here but not in all other procs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed here but not in all other procs?

Good point! Because some checks are left. I will submit an enhancement in a separate PR


let jwm = jsonWakuMessage.alloc()
let pst = pubSubTopic.alloc()
defer:
deallocShared(jwm)
deallocShared(pst)

var jsonMessage: JsonMessage
try:
let jsonContent = parseJson($jwm)
jsonMessage = JsonMessage.fromJsonNode(jsonContent)
except JsonParsingError:
let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let wakuMessage = jsonMessage.toWakuMessage().valueOr:
let msg = fmt"Problem building the WakuMessage: {error}"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let targetPubSubTopic =
if len(pst) == 0:
DefaultPubsubTopic
else:
$pst

let sendReqRes = waku_thread.sendRequestToWakuThread(
ctx,
RequestType.LIGHTPUSH,
LightpushRequest.createShared(
LightpushMsgType.PUBLISH, PubsubTopic($pst), wakuMessage
),
)

if sendReqRes.isErr():
let msg = $sendReqRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let msg = $sendReqRes.value
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK

proc waku_connect(
ctx: ptr Context,
peerMultiAddr: cstring,
Expand All @@ -408,21 +522,30 @@ proc waku_connect(

proc waku_store_query(
ctx: ptr Context,
queryJson: cstring,
peerId: cstring,
jsonQuery: cstring,
peerAddr: cstring,
timeoutMs: cint,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
ctx[].userData = userData

## TODO: implement the logic that make the "self" node to act as a Store client
if isNil(callback):
return RET_MISSING_CALLBACK

let sendReqRes = waku_thread.sendRequestToWakuThread(
ctx,
RequestType.STORE,
JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs, callback),
)

# if sendReqRes.isErr():
# let msg = $sendReqRes.error
# callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
# return RET_ERR
if sendReqRes.isErr():
let msg = $sendReqRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let msg = $sendReqRes.value
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK

proc waku_listen_addresses(
Expand Down Expand Up @@ -510,5 +633,39 @@ proc waku_get_my_enr(
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK

proc waku_start_discv5(
ctx: ptr Context, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
ctx[].userData = userData

let resp = waku_thread.sendRequestToWakuThread(
ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StartRequest()
).valueOr:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let msg = $resp
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)

return RET_OK

proc waku_stop_discv5(
ctx: ptr Context, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
ctx[].userData = userData

let resp = waku_thread.sendRequestToWakuThread(
ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StopRequest()
).valueOr:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let msg = $resp
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)

return RET_OK

### End of exported procs
################################################################################
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import
type DiscoveryMsgType* = enum
GET_BOOTSTRAP_NODES
UPDATE_DISCV5_BOOTSTRAP_NODES
START_DISCV5
STOP_DISCV5

type DiscoveryRequest* = object
operation: DiscoveryMsgType
Expand Down Expand Up @@ -52,6 +54,12 @@ proc createUpdateBootstrapNodesRequest*(
): ptr type T =
return T.createShared(op, "", "", 0, nodes)

proc createDiscV5StartRequest*(T: type DiscoveryRequest): ptr type T =
return T.createShared(START_DISCV5, "", "", 0, "")

proc createDiscV5StopRequest*(T: type DiscoveryRequest): ptr type T =
return T.createShared(STOP_DISCV5, "", "", 0, "")

proc destroyShared(self: ptr DiscoveryRequest) =
deallocShared(self[].enrTreeUrl)
deallocShared(self[].nameDnsServer)
Expand Down Expand Up @@ -86,6 +94,16 @@ proc process*(
destroyShared(self)

case self.operation
of START_DISCV5:
let res = await waku.wakuDiscv5.start()
res.isOkOr:
return err($error)

return ok("discv5 started correctly")
of STOP_DISCV5:
await waku.wakuDiscv5.stop()

return ok("discv5 stopped correctly")
of GET_BOOTSTRAP_NODES:
let nodes = retrieveBootstrapNodes($self[].enrTreeUrl, $self[].nameDnsServer).valueOr:
return err($error)
Expand Down
Loading
Loading