From c1a5fc2a49687f0c9299f15ed1ffc37d01e4ba79 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Thu, 14 Sep 2023 12:35:11 -0700 Subject: [PATCH] Improve readme and add python sdk/samples (#346) --- README.md | 364 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 264 insertions(+), 100 deletions(-) diff --git a/README.md b/README.md index 85d4f873..e5a99788 100644 --- a/README.md +++ b/README.md @@ -9,9 +9,9 @@ **iWF will make you a 10x developer!** -iWF is a platform for developing resilient, fault-tolerant, scalable long-running applications. -It offers a convenient abstraction for durable timers, background execution with backoff retry, -customized persisted data (with optional caching, and indexing), message queues, RPC, and more. You will build long-running reliable processes faster than ever. +iWF is an API orchestration platform for building resilient, fault-tolerant, scalable long-running applications. +It offers an orchestration coding framework with abstractions for durable timers, async/background execution with backoff retry, +KV storage, RPC, and message queues. You will build long-running reliable processes faster than ever. iWF is built on top of [Cadence](https://github.com/uber/cadence)/[Temporal](https://github.com/temporalio/temporal). Temporal adopted as [a framework](https://github.com/temporalio/awesome-temporal). Same for [Cadence](https://github.com/uber/cadence#cadence). @@ -20,15 +20,147 @@ Related projects: * [iWF Java SDK](https://github.com/indeedeng/iwf-java-sdk) and [samples](https://github.com/indeedeng/iwf-java-samples) * [iWF Golang SDK](https://github.com/indeedeng/iwf-golang-sdk) and [samples](https://github.com/indeedeng/iwf-golang-samples) -* WIP [iWF Python SDK](https://github.com/indeedeng/iwf-python-sdk) +* [iWF Python SDK](https://github.com/indeedeng/iwf-python-sdk) and [samples](https://github.com/indeedeng/iwf-python-samples) * WIP [iWF TypeScript SDK](https://github.com/indeedeng/iwf-ts-sdk) For support or any question, please post in our [Discussion](https://github.com/indeedeng/iwf/discussions), or raise an issue. If you are interested in helping this project, check out our [CONTRIBUTING](https://github.com/indeedeng/iwf/blob/main/CONTRIBUTING.md) page. Below is the basic and comprehensive documentation of iWF. There are some more details in the [wiki pages](https://github.com/indeedeng/iwf/wiki). + # What is iWF -## Example: microservice orchestration +See [concepts](#basic-concepts) if you want to skip the examples. + +## Example 1 : User sign-up/registry workflow +A common use case that is almost everywhere -- new user sign-up/register a new account in a website/system. +E.g. Amazon/Linkedin/Google/etc... + +### Use case requirements + +* User fills a form and submit to the system with email +* System will send an email for verification +* User will click the link in the email to verify the account +* If not clicking, a reminder will be sent every X hours + +user case requirements + +### Some old solution + +With some other existing technologies, you solve it using message queue(like SQS which has timer) + Database like below: + +old solution + +* Using visibility timeout for backoff retry +* Need to re-enqueue the message for larger backoff +* Using visibility timeout for durable timer +* Need to re-enqueue the message for once to have 24 hours timer +* Need to create one queue for every step +* Need additional storage for waiting & processing ready signal +* Also need DLQ and build tooling around + +**The business code will be scattered. It's complicated and hard to maintain and extend.** + +### New solution with iWF + +The solution with iWF: +iwf solution +* All in one single place without scattered business logic +* Natural to represent business +* Builtin & rich support for operation tooling + +It's so simple & easy to do that the [business logic code](https://github.com/indeedeng/iwf-python-samples#user-sign-up-workflow) can be shown here! + +Also see the [implementation in Java here](https://github.com/indeedeng/iwf-java-samples/tree/main/src/main/java/io/iworkflow/workflow/signup). + +```python +class SubmitState(WorkflowState[Form]): + def execute(self, ctx: WorkflowContext, input: Form, command_results: CommandResults, persistence: Persistence, + communication: Communication, + ) -> StateDecision: + persistence.set_data_attribute(data_attribute_form, input) + persistence.set_data_attribute(data_attribute_status, "waiting") + print(f"API to send verification email to {input.email}") + return StateDecision.single_next_state(VerifyState) + + +class VerifyState(WorkflowState[None]): + def wait_until(self, ctx: WorkflowContext, input: T, persistence: Persistence, communication: Communication, + ) -> CommandRequest: + return CommandRequest.for_any_command_completed( + TimerCommand.timer_command_by_duration( + timedelta(seconds=10) + ), # use 10 seconds for demo + InternalChannelCommand.by_name(verify_channel), + ) + + def execute(self, ctx: WorkflowContext, input: T, command_results: CommandResults, persistence: Persistence, + communication: Communication, + ) -> StateDecision: + form = persistence.get_data_attribute(data_attribute_form) + if ( + command_results.internal_channel_commands[0].status + == ChannelRequestStatus.RECEIVED + ): + print(f"API to send welcome email to {form.email}") + return StateDecision.graceful_complete_workflow("done") + else: + print(f"API to send the a reminder email to {form.email}") + return StateDecision.single_next_state(VerifyState) + + +class UserSignupWorkflow(ObjectWorkflow): + def get_workflow_states(self) -> StateSchema: + return StateSchema.with_starting_state(SubmitState(), VerifyState()) + + def get_persistence_schema(self) -> PersistenceSchema: + return PersistenceSchema.create( + PersistenceField.data_attribute_def(data_attribute_form, Form), + PersistenceField.data_attribute_def(data_attribute_status, str), + PersistenceField.data_attribute_def(data_attribute_verified_source, str), + ) + + def get_communication_schema(self) -> CommunicationSchema: + return CommunicationSchema.create( + CommunicationMethod.internal_channel_def(verify_channel, None) + ) + + @rpc() + def verify( + self, source: str, persistence: Persistence, communication: Communication + ) -> str: + status = persistence.get_data_attribute(data_attribute_status) + if status == "verified": + return "already verified" + persistence.set_data_attribute(data_attribute_status, "verified") + persistence.set_data_attribute(data_attribute_verified_source, source) + communication.publish_to_internal_channel(verify_channel) + return "done" +``` + +And the [application code](signup/main.py) will be simply interacting with the workflow like below: + +```python +@flask_app.route("/signup/submit") +def signup_submit(): + username = request.args["username"] + form = Form( + ... + ) + try: + client.start_workflow(UserSignupWorkflow, username, 3600, form) + except WorkflowAlreadyStartedError: + return "username already started registry" + return "workflow started" + + +@flask_app.route("/signup/verify") +def signup_verify(): + username = request.args["username"] + source = request.args["source"] + return client.invoke_rpc(username, UserSignupWorkflow.verify, source) +``` + +## Example 2 : API orchestration(Abstracted) ### Problem ![1](https://github.com/indeedeng/iwf/assets/4523955/e0c7001e-2c8f-4a93-92d7-37e50a248c26) @@ -68,14 +200,13 @@ It's complicated and hard to maintain and extend. ![3](https://github.com/indeedeng/iwf/assets/4523955/3428523e-c3d9-4fd6-8d10-c19b91ac7ecd) The solution with iWF: -* All in one single dependency -* WorkflowAsCode +* All in one single place without scattered business logic * Natural to represent business * Builtin & rich support for operation tooling It's so simple & easy to do that the code can be shown here! -See the running code in [Java samples](https://github.com/indeedeng/iwf-java-samples/tree/main#microservice-ochestration) and [Golang samples](https://github.com/indeedeng/iwf-golang-samples#microservice-orchestration). +See the running code in [Java samples](https://github.com/indeedeng/iwf-java-samples/tree/main#microservice-ochestration), [Golang samples](https://github.com/indeedeng/iwf-golang-samples#microservice-orchestration). ```java public class OrchestrationWorkflow implements ObjectWorkflow { @@ -194,76 +325,51 @@ class State4 implements WorkflowState { } ``` -## Basic Concepts +And the [application code](https://github.com/indeedeng/iwf-java-samples/blob/main/src/main/java/io/iworkflow/controller/MicroserviceWorkflowController.java) simply interacts with the workflow like below: +```java + @GetMapping("/start") + public ResponseEntity start( + @RequestParam String workflowId + ) { + try { + client.startWorkflow(OrchestrationWorkflow.class, workflowId, 3600, "some input data, could be any object rather than a string"); + } catch (ClientSideException e) { + if (e.getErrorSubStatus() != ErrorSubStatus.WORKFLOW_ALREADY_STARTED_SUB_STATUS) { + throw e; + } + } + return ResponseEntity.ok("success"); + } + @GetMapping("/signal") + ResponseEntity receiveSignalForApiOrchestration( + @RequestParam String workflowId) { + client.signalWorkflow(OrchestrationWorkflow.class, workflowId, "", OrchestrationWorkflow.READY_SIGNAL, null); + return ResponseEntity.ok("done"); + } +``` + +## Basic Concepts -The top-level concept is **`ObjectWorkflow`**. -A user application creates an ObjectWorkflow by implementing the Workflow interface, in one of the supported languages e.g. +A user application defines an ObjectWorkflow by implementing the Workflow interface, in one of the supported languages e.g. [Java](https://github.com/indeedeng/iwf-java-sdk/blob/main/src/main/java/io/iworkflow/core/ObjectWorkflow.java) , [Golang](https://github.com/indeedeng/iwf-golang-sdk/blob/main/iwf/workflow.go) , [Python](https://github.com/indeedeng/iwf-python-sdk/blob/main/iwf/workflow.py), or [Typescript/JavaScript](https://github.com/indeedeng/iwf-ts-sdk/blob/main/iwf/src/object-workflow.ts). -An implementation of the interface is referred to as a `WorkflowDefinition` and consists of the components shown below: - -| Name | Description | -|:-------------------------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------| -| [Data Attribute](#persistence) | Persistence field to storing data | -| [Search Attribute](#persistence) | "Searchable data attribute" -- attribute data is persisted and also indexed in search engine backed by ElasticSearch or OpenSearch | -| [Workflow State](#workflow-state) | A background execution unit. State is super powerful like a small workflow of two steps: *waitUntil* (optional) and *execute* with default infinite retry | -| [RPC](#rpc) | Remote procedure call. Invoked by a client, executed in worker, and can interact with data/search attributes, internal channel, and state execution | -| [Signal Channel](#signal-channel-vs-rpc) | Asynchronous message queue for the workflow object to receive messages from external sources | -| [Internal Channel](#internalchannel-synchronization-for-multi-threading) | "Internal Signal Channel" -- An internal message queue for workflow states/RPC | - -A workflow definition can be visualized like this: - -![Example workflow diagram](https://user-images.githubusercontent.com/4523955/234424825-ff3673c0-af23-4eb7-887d-b1f421f3aaa4.png) - -These are all the concepts that you need to build a super complicated workflow. -See this engagement workflow example in [Java](https://github.com/indeedeng/iwf-java-samples/tree/main/src/main/java/io/iworkflow/workflow/engagement) -or [Golang](https://github.com/indeedeng/iwf-golang-samples/tree/main/workflows/engagement) -for how it looks in practice! - -Below are detailed explanations of the concepts. -These concepts are powerful, and also extremely simple to learn and use (as is the philosophy of iWF). - -## Persistence - -iWF let you store customized data as a database during the workflow execution. This eliminates the need to depend on a database to implement your workflow. - -Your data are stored as Data Attributes and Search Attributes. Together both define the "persistence schema". -The persistence schema is defined and maintained in the code along with other business logic. -Search Attributes work like infinite indexes in a traditional database. You -only need to specify which attributes should be indexed, without worrying about complications you might be used to in -a traditional database like the number of shards, and the order of the fields in an index. - -Logically, the workflow definition displayed in the example workflow diagram will have a persistence schema as follows: - -| Workflow Execution | Search Attr A | Search Attr B | Data Attr C | Data Attr D | -|----------------------|---------------|:-------------:|------------:|------------:| -| Workflow Execution 1 | val 1 | val 2 | val 3 | val 4 | -| Workflow Execution 2 | val 5 | val 6 | val 7 | val 8 | -| ... | ... | ... | ... | ... | - -With Search attributes, you can write [customized SQL-like queries to find any workflow execution(s)](https://docs.temporal.io/visibility#search-attribute), just like using a database query. +An implementation of the interface is referred to as a `WorkflowDefinition` and consists of the components shown below: -Note that after workflows are closed(completed, timeout, terminated, canceled, failed), all the data retained in your persistence schema will be deleted once the configured retention period elapses. +| Name | Description | +|:--------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------| +| [WorkflowState](#workflow-state) | A basic asyn/background execution unit as a "workflow". A State consists of one or two steps: *waitUntil* (optional) and *execute* with retry | +| [RPC](#rpc) | API for application to interact with the workflow. It can access to persistence, internal channel, and state execution | +| [Persistence](#persistence) | A Kev-Value storage out-of-box to storing data. Can be accessed by RPC/WorkflowState implementation. | +| [DurableTimer](#commands-from-waituntil) | The waitUntil API can return a timer command to wait for certain time as a durable timer -- it is persisted by server and will not be lost. | +| [InternalChannel](#internalchannel-async-message-queue) | The waitUntil API can return some command for "Internal Channel" -- An internal message queue workflow | +| ~~[Signal Channel](#signal-channel-vs-rpc)~~ | Legacy concept and deprecated. Use InternalChannel + RPC instead. A message queue for the workflowState to receive messages from external sources | -The iWF persistence is mainly for storing the workflow intermediate states/data. -**It is important to not abuse iWF persistence for things like permanent storage, or for tracking/analytics purpose.** -### Caching -By default, remote procedure calls (RPCs) will load data/search attributes with the Cadence/Temporal [query API](https://docs.temporal.io/workflows#query), -which is not optimized for very high request volume (~>100 requests per second) on a single workflow execution. Such request volumes could cause -too many history replays, especially when workflows are closed. This could in turn produce undesirable latency and load. -You can enable **caching** to support those high-volume requests. -Note: -* With caching enabled read-after-write access will become *eventually consistent*, unless `bypassCachingForStrongConsistency=true` is set in RPC options -* Caching will introduce an extra event in history (upsertMemo operation for WorkflowPropertiesModified event) for updating the persisted data attributes -* Caching will be more useful for read-only RPC (no persistence.SetXXX API or communication API calls in RPC implementation) or GetDataAttributes API. - * A read-only RPC can still invoke any other RPCs (like calling other microservices, or DB operation) in the RPC implementation -* Caching is currently only supported if the backend is Temporal, because [Cadence doesn't support mutable memo](https://github.com/uber/cadence/issues/3729) ## Workflow State WorkflowState is how you implement your asynchronous process as a "workflow". @@ -273,47 +379,42 @@ A WorkflowState is itself like “a small workflow” of 1 or 2 steps: **[ `waitUntil` ] → `execute`** -The `waitUntil` API returns "commands" to wait for. When the commands are completed, the `execute` API will be invoked. -Both the `waitUntil` and `execute` APIs have access to read/write the persistence schema defined in the workflow. +**The `waitUntil` API** returns "[commands](#commands-for-workflowstates-waituntil-api)" to wait for. When the commands are completed, the `execute` API will be invoked. -The full execution flow looks like this: - -![Workflow State diagram](https://user-images.githubusercontent.com/4523955/234921554-587d8ad4-84f5-4987-b838-959869293465.png) The `waitUntil` API is optional. If not defined, then the `execute` API will be invoked immediately when the Workflow State is started. -Note: the `waitUntil` and `execute` APIs are invoked by the iWF service with infinite backoff retry by default. See the [WorkflowStateOptions](#WorkflowStateOptions) section for customization. - -The execute API will return a StateDecision: -* For a single next state a decision can - * Go to a different state - * Go to the same state, i.e. a loop - * Go to the previous state, i.e. a loop -* Dead end -- Just stop the thread -* Graceful complete -- Stop the thread, and also will stop the workflow when all other threads are stopped -* Force complete -- Stop the workflow immediately -* Force fail -- Stop the workflow immediately with failure -* For multiple next states, these are executed in parallel as multiple threads +The `execute` API returns a StateDecision to decide what is next. -With StateDecisions, the workflow definitions can have flows like these: +Both `waitUntil` and `execute` are implemented by code and executed in runtime dynamically. They are both hosted as REST API for iWF server to call. +It's extremely flexible for business -- [any code change deployed will take effect immediately](https://github.com/indeedeng/iwf/wiki/How-to-modify-workflow-code-without-breaking-changes). -![decision flow1](https://user-images.githubusercontent.com/4523955/234919901-f327dfb6-5b38-4440-a2eb-5d1c832b694e.png) +### StateDecision from `execute` +User workflow implements a **`execute` API** to return a StateDecision for: +* A next state +* Multiple next states running in parallel +* Stop the workflow: + * Graceful complete -- Stop the thread, and also will stop the workflow when all other threads are stopped + * Force complete -- Stop the workflow immediately + * Force fail -- Stop the workflow immediately with failure +* Dead end -- Just stop the thread +* Atomically go to next state with condition(e.g. channel is not empty) -or +State Decisions let you orchestrate the WorkflowState as complex as needed for any use case! -![decision flow2](https://user-images.githubusercontent.com/4523955/234919896-30db8628-daeb-4f1d-bd2b-7bf826989c75.png) +![StateDecision examples](https://github.com/indeedeng/iwf-java-samples/assets/4523955/83f127c2-42d1-454a-a688-389e5419f2bd) -or as complex as needed for any use case! -### Commands for WorkflowState's WaitUntil API +### Commands from `waitUntil` iWF provides three types of commands: -* `SignalCommand` -- Wait for a signal to be published to the workflow signal channel. External applications can use - SignalWorkflow API to signal a workflow. + * `TimerCommand` -- Wait for a **durable timer** to fire. * `InternalChannelCommand` -- Wait for a message from InternalChannel. +* ~~`SignalCommand` -- [Legacy, Use InternalChannelCommand + RPC instead]Wait for a signal to be published to the workflow signal channel. External applications can use + SignalWorkflow API to signal a workflow~~. The `waitUntil` API can return multiple commands along with a `CommandWaitingType`: @@ -321,7 +422,21 @@ The `waitUntil` API can return multiple commands along with a `CommandWaitingTyp * `AnyCommandCompleted` -- Wait for any of the commands to be completed. * `AnyCommandCombinationCompleted` -- Wait for any combination of the commands in a specified list to be completed. -### InternalChannel: synchronization for multi-threading +### InternalChannel: async message queue + + +iWF provides message queue called `InternalChannel`. User can just declare it in the workflow code without any management at all. +A message sent to the InternalChannel is persisted on server side, delivered to any WorkflowState that is waiting for it with `waitUntil`. + +Message can be sent to an InternalChannel by a WorkflowState or RPC. + +Note that the scope of an InternalChannel is only within its workflow execution (not shared across workflows). + +#### Usage 1: Waiting for external event/request +[RPC](#rpc) provides an API as mechanism to external application to interact with a workflow. Within an RPC, it can send a message to the internalChannel. +This allows workflowState to be waiting for an external event/request before proceeding. E.g., a workflow can wait for an approval before updating the database. + +#### Usage 2: Multi-thread synchronization When there are multiple threads of workflow states running in parallel, you may want to have them wait on each other to ensure some particular ordering. For example, in your problem space, WorkflowStates 1,2,3 need to be completed before WorkflowState 4. @@ -329,6 +444,10 @@ For example, in your problem space, WorkflowStates 1,2,3 need to be completed be In this case, you need to utilize the "InternalChannel". WorkflowState 4 should be waiting on an "InternalChannel" for 3 messages via the `waitUntil` API. WorkflowState 1,2,3 will each publish a message when completing. This ensures propper ordering. +A full execution flow of a single WorklfowState can look like this: + +![Workflow State diagram](https://user-images.githubusercontent.com/4523955/234921554-587d8ad4-84f5-4987-b838-959869293465.png) + ## RPC RPC stands for "Remote Procedure Call". Allows external systems to interact with the workflow execution. @@ -357,9 +476,12 @@ See the [wiki](https://github.com/indeedeng/iwf/wiki/What-does-the-atomicity-of- ### Signal Channel vs RPC -There are two major ways for external clients to interact with workflows: Signal and RPC. So what are the difference? +There are two major ways for external clients to interact with workflows: Signal and RPC. -They are completely different: +Historically, signal was created first as the only mechanism for external application to interact with workflow. However, it's a "write only" +which is limited. RPC is the new way and much more powerful and flexible. + +Here are some more details: * Signal is sent to iWF service without waiting for response of the processing * RPC will wait for worker to process the RPC request synchronously * Signal will be held in a signal channel until a workflow state consumes it @@ -367,15 +489,40 @@ They are completely different: ![signals vs rpc](https://user-images.githubusercontent.com/4523955/234932674-b0d062b2-e5dd-4dbe-93b5-1b9863acc5e0.png) -So choose based on the situations/requirements +## Persistence + +As writing code with programming model, you must have to deal with _data_ everywhere. +iWF provides a Key-Value storage out of the box. This eliminates the need to depend on a database to implement your workflow. + +Your data are stored as Data Attributes and Search Attributes. Together both define the "persistence schema". +The persistence schema is defined and maintained in the code along with other business logic. + +Search Attributes work like infinite indexes in a traditional database. You +only need to specify which attributes should be indexed, without worrying about complications you might be used to in +a traditional database like the number of shards, and the order of the fields in an index. + +Logically, the workflow definition displayed in the example workflow diagram will have a persistence schema as follows: + +| Workflow Execution | Search Attr A | Search Attr B | Data Attr C | Data Attr D | +|----------------------|---------------|:-------------:|------------:|------------:| +| Workflow Execution 1 | val 1 | val 2 | val 3 | val 4 | +| Workflow Execution 2 | val 5 | val 6 | val 7 | val 8 | +| ... | ... | ... | ... | ... | + +With Search attributes, you can write [customized SQL-like queries to find any workflow execution(s)](https://docs.temporal.io/visibility#search-attribute), just like using a database query. + +Note: +* The scope of the data/search attribute are isolated within its own workflow execution +* Lifecycle: after workflows are closed(completed, timeout, terminated, canceled, failed), all the data retained in your persistence schema will be deleted once the configured retention period elapses. + +The iWF persistence is mainly for storing the workflow intermediate states/data. +**It is important to not abuse iWF persistence for things like permanent storage, or for tracking/analytics purpose.** -| | Availability | Latency | Workflow Requirement | -|----------------|:-------------------------- |:----------------------------------------------- |:---------------------------------------------------------| -| Signal Channel | High | Low | Requires a WorkflowState to process | -| RPC | Depends on workflow worker | Higher than signal, depends on workflow worker | No WorkflowState required | ## Advanced Customization +Below are more advanced concepts/options for using iWF. + ### WorkflowOptions iWF let you deeply customize the workflow behaviors with the below options. @@ -471,6 +618,8 @@ When a workflowState/RPC API loads DataAttributes/SearchAttributes, by default i For WorkflowState, there is a 2MB limit by default to load data. User can use another loading policy `LOAD_PARTIAL_WITHOUT_LOCKING` to specify certain DataAttributes/SearchAttributes only to load. +`None` will skip the loading to save the data transportation/history cost. + `WITHOUT_LOCKING` here means if multiple StateExecutions/RPC try to upsert the same DataAttribute/SearchAttribute, they can be done in parallel without locking. @@ -496,6 +645,21 @@ There is a context object when invoking RPC or State APIs. It contains informati For example, WorkflowState can utilize `attempts` or `firstAttemptTime` from the context to make some advanced logic. +### Caching +By default, remote procedure calls (RPCs) will load data/search attributes with the Cadence/Temporal [query API](https://docs.temporal.io/workflows#query), +which is not optimized for very high request volume (~>100 requests per second) on a single workflow execution. Such request volumes could cause +too many history replays, especially when workflows are closed. This could in turn produce undesirable latency and load. + +You can enable **caching** to support those high-volume requests. + +Note: +* With caching enabled read-after-write access will become *eventually consistent*, unless `bypassCachingForStrongConsistency=true` is set in RPC options +* Caching will introduce an extra event in history (upsertMemo operation for WorkflowPropertiesModified event) for updating the persisted data attributes +* Caching will be more useful for read-only RPC (no persistence.SetXXX API or communication API calls in RPC implementation) or GetDataAttributes API. + * A read-only RPC can still invoke any other RPCs (like calling other microservices, or DB operation) in the RPC implementation +* Caching is currently only supported if the backend is Temporal, because [Cadence doesn't support mutable memo](https://github.com/uber/cadence/issues/3729) + + ## Limitation Though iWF can be used for a very wide range of use case even just CRUD, iWF is NOT for everything. It is not suitable for use cases like: