Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core-c-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ crate-type = ["cdylib"]
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
crossbeam-utils = "0.8"
futures-util = { version = "0.3", default-features = false }
http = "1.3"
libc = "0.2"
Expand Down
130 changes: 108 additions & 22 deletions core-c-bridge/include/temporal-sdk-core-c-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ typedef struct TemporalCoreRandom TemporalCoreRandom;

typedef struct TemporalCoreRuntime TemporalCoreRuntime;

typedef struct TemporalCoreSlotReserveCompletionCtx TemporalCoreSlotReserveCompletionCtx;

typedef struct TemporalCoreWorker TemporalCoreWorker;

typedef struct TemporalCoreWorkerReplayPusher TemporalCoreWorkerReplayPusher;
Expand Down Expand Up @@ -570,18 +572,17 @@ typedef struct TemporalCoreSlotReserveCtx {
struct TemporalCoreByteArrayRef worker_identity;
struct TemporalCoreByteArrayRef worker_build_id;
bool is_sticky;
void *token_src;
} TemporalCoreSlotReserveCtx;

typedef void (*TemporalCoreCustomReserveSlotCallback)(const struct TemporalCoreSlotReserveCtx *ctx,
void *sender);
typedef void (*TemporalCoreCustomSlotSupplierReserveCallback)(const struct TemporalCoreSlotReserveCtx *ctx,
const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx,
void *user_data);

typedef void (*TemporalCoreCustomCancelReserveCallback)(void *token_source);
typedef void (*TemporalCoreCustomSlotSupplierCancelReserveCallback)(const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx,
void *user_data);

/**
* Must return C#-tracked id for the permit. A zero value means no permit was reserved.
*/
typedef uintptr_t (*TemporalCoreCustomTryReserveSlotCallback)(const struct TemporalCoreSlotReserveCtx *ctx);
typedef uintptr_t (*TemporalCoreCustomSlotSupplierTryReserveCallback)(const struct TemporalCoreSlotReserveCtx *ctx,
void *user_data);

typedef enum TemporalCoreSlotInfo_Tag {
WorkflowSlotInfo,
Expand Down Expand Up @@ -621,32 +622,87 @@ typedef struct TemporalCoreSlotInfo {
typedef struct TemporalCoreSlotMarkUsedCtx {
struct TemporalCoreSlotInfo slot_info;
/**
* C# id for the slot permit.
* Lang-issued permit ID.
*/
uintptr_t slot_permit;
} TemporalCoreSlotMarkUsedCtx;

typedef void (*TemporalCoreCustomMarkSlotUsedCallback)(const struct TemporalCoreSlotMarkUsedCtx *ctx);
typedef void (*TemporalCoreCustomSlotSupplierMarkUsedCallback)(const struct TemporalCoreSlotMarkUsedCtx *ctx,
void *user_data);

typedef struct TemporalCoreSlotReleaseCtx {
const struct TemporalCoreSlotInfo *slot_info;
/**
* C# id for the slot permit.
* Lang-issued permit ID.
*/
uintptr_t slot_permit;
} TemporalCoreSlotReleaseCtx;

typedef void (*TemporalCoreCustomReleaseSlotCallback)(const struct TemporalCoreSlotReleaseCtx *ctx);
typedef void (*TemporalCoreCustomSlotSupplierReleaseCallback)(const struct TemporalCoreSlotReleaseCtx *ctx,
void *user_data);

typedef void (*TemporalCoreCustomSlotImplFreeCallback)(const struct TemporalCoreCustomSlotSupplierCallbacks *userimpl);
typedef bool (*TemporalCoreCustomSlotSupplierAvailableSlotsCallback)(uintptr_t *available_slots,
void *user_data);

typedef void (*TemporalCoreCustomSlotSupplierFreeCallback)(const struct TemporalCoreCustomSlotSupplierCallbacks *userimpl);

typedef struct TemporalCoreCustomSlotSupplierCallbacks {
TemporalCoreCustomReserveSlotCallback reserve;
TemporalCoreCustomCancelReserveCallback cancel_reserve;
TemporalCoreCustomTryReserveSlotCallback try_reserve;
TemporalCoreCustomMarkSlotUsedCallback mark_used;
TemporalCoreCustomReleaseSlotCallback release;
TemporalCoreCustomSlotImplFreeCallback free;
/**
* Called to initiate asynchronous slot reservation. `ctx` contains information about
* reservation request. The pointer is only valid for the duration of the function call; the
* implementation should copy the data out of it for later use, and return as soon as possible.
*
* When slot is reserved, the implementation should call [`temporal_core_complete_async_reserve`]
* with the same `completion_ctx` as passed to this function. Reservation cannot be cancelled
* by Lang, but it can be cancelled by Core through [`cancel_reserve`](Self::cancel_reserve)
* callback. If reservation was cancelled, [`temporal_core_complete_async_cancel_reserve`]
* should be called instead.
*
* Slot reservation cannot error. The implementation should recover from errors and keep trying
* to reserve a slot until it eventually succeeds, or until reservation is cancelled by Core.
*/
TemporalCoreCustomSlotSupplierReserveCallback reserve;
/**
* Called to cancel slot reservation. `completion_ctx` specifies which reservation is being
* cancelled; the matching [`reserve`] call was made with the same `completion_ctx`. After
* cancellation, the implementation should call [`temporal_core_complete_async_cancel_reserve`]
* with the same `completion_ctx`. Calling [`temporal_core_complete_async_reserve`] is not
* needed after cancellation.
*/
TemporalCoreCustomSlotSupplierCancelReserveCallback cancel_reserve;
/**
* Called to try an immediate slot reservation. The callback should return 0 if immediate
* reservation is not currently possible, or permit ID if reservation was successful. Permit ID
* is arbitrary, but must be unique among live reservations as it's later used for `mark_used`
* and `release` callbacks.
*/
TemporalCoreCustomSlotSupplierTryReserveCallback try_reserve;
/**
* Called after successful reservation to mark slot as used. See [`SlotSupplier`](temporal_sdk_core_api::worker::SlotSupplier)
* trait for details.
*/
TemporalCoreCustomSlotSupplierMarkUsedCallback mark_used;
/**
* Called to free a previously reserved slot.
*/
TemporalCoreCustomSlotSupplierReleaseCallback release;
/**
* Called to retrieve the number of available slots if known. If the implementation knows how
* many slots are available at the moment, it should set the value behind the `available_slots`
* pointer and return true. If that number is unknown, it should return false.
*
* This function pointer can be set to null. It will be treated as if the number of available
* slots is never known.
*/
TemporalCoreCustomSlotSupplierAvailableSlotsCallback available_slots;
/**
* Called when the slot supplier is being dropped. All resources should be freed.
*/
TemporalCoreCustomSlotSupplierFreeCallback free;
/**
* Passed as an extra argument to the callbacks.
*/
void *user_data;
} TemporalCoreCustomSlotSupplierCallbacks;

typedef struct TemporalCoreCustomSlotSupplierCallbacksImpl {
Expand Down Expand Up @@ -984,10 +1040,40 @@ struct TemporalCoreWorkerReplayPushResult temporal_core_worker_replay_push(struc
struct TemporalCoreByteArrayRef workflow_id,
struct TemporalCoreByteArrayRef history);

void temporal_core_complete_async_reserve(void *sender, uintptr_t permit_id);
/**
* Completes asynchronous slot reservation started by a call to [`CustomSlotSupplierCallbacks::reserve`].
*
* `completion_ctx` must be the same as the one passed to the matching `reserve` call.
* `permit_id` is arbitrary, but must be unique among live reservations as it's later used for
* `mark_used` and `release` callbacks.
*
* This function returns true if the reservation was completed successfully, or false if the
* reservation was cancelled before completion. If this function returns false, the implementation
* should call [`temporal_core_complete_async_cancel_reserve`] with the same `completion_ctx`.
*
* **Caution:** if this function returns true, `completion_ctx` gets freed. Afterwards, calling
* either [`temporal_core_complete_async_reserve`] or [`temporal_core_complete_async_cancel_reserve`]
* with the same `completion_ctx` will cause **memory corruption!**
*/
bool temporal_core_complete_async_reserve(const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx,
uintptr_t permit_id);

void temporal_core_set_reserve_cancel_target(struct TemporalCoreSlotReserveCtx *ctx,
void *token_ptr);
/**
* Completes cancellation of asynchronous slot reservation. Cancellation can only be initiated by
* Core. It's done by calling [`CustomSlotSupplierCallbacks::cancel_reserve`] after an earlier call
* to [`CustomSlotSupplierCallbacks::reserve`].
*
* `completion_ctx` must be the same as the one passed to the matching `cancel_reserve` call.
*
* This function returns true on successful cancellation, or false if cancellation was not
* requested for the given `completion_ctx`. It is a bug to call this function when cancellation
* was not requested.
*
* **Caution:** if this function returns true, `completion_ctx` gets freed. Afterwards, calling
* either [`temporal_core_complete_async_reserve`] or [`temporal_core_complete_async_cancel_reserve`]
* with the same `completion_ctx` will cause **memory corruption!**
*/
bool temporal_core_complete_async_cancel_reserve(const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx);

#ifdef __cplusplus
} // extern "C"
Expand Down
Loading
Loading