Skip to content

Commit b448a9e

Browse files
ktropsKatie Atrops
andauthored
adding set & get data attributes from RPC. Missing Synchronous/lockin… (#586)
Co-authored-by: Katie Atrops <[email protected]>
1 parent 0ef67dc commit b448a9e

File tree

6 files changed

+510
-68
lines changed

6 files changed

+510
-68
lines changed

integ/rpc_external_storage_test.go

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
package integ
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"testing"
7+
"time"
8+
9+
"github.com/indeedeng/iwf/gen/iwfidl"
10+
rpcStorage "github.com/indeedeng/iwf/integ/workflow/rpc-external-storage"
11+
"github.com/indeedeng/iwf/service"
12+
"github.com/indeedeng/iwf/service/common/ptr"
13+
"github.com/stretchr/testify/assert"
14+
)
15+
16+
func TestRpcExternalStorageNonLockingTemporal(t *testing.T) {
17+
if !*temporalIntegTest {
18+
t.Skip()
19+
}
20+
doTestRpcExternalStorage(t, service.BackendTypeTemporal, false)
21+
}
22+
23+
func TestRpcExternalStorageSynchronousUpdateTemporal(t *testing.T) {
24+
if !*temporalIntegTest {
25+
t.Skip()
26+
}
27+
doTestRpcExternalStorage(t, service.BackendTypeTemporal, true)
28+
}
29+
30+
func TestRpcExternalStorageNonLockingCadence(t *testing.T) {
31+
if !*cadenceIntegTest {
32+
t.Skip()
33+
}
34+
// Cadence doesn't support synchronous updates
35+
doTestRpcExternalStorage(t, service.BackendTypeCadence, false)
36+
}
37+
38+
func doTestRpcExternalStorage(t *testing.T, backendType service.BackendType, useLocking bool) {
39+
assertions := assert.New(t)
40+
41+
// start test workflow server
42+
wfHandler := rpcStorage.NewHandler()
43+
closeFunc1 := startWorkflowWorkerWithRpc(wfHandler, t)
44+
defer closeFunc1()
45+
46+
// Start IWF service with external storage enabled
47+
_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
48+
BackendType: backendType,
49+
S3TestThreshold: 100, // Set low threshold so large data gets stored in S3
50+
})
51+
defer closeFunc2()
52+
53+
// create client
54+
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
55+
Servers: []iwfidl.ServerConfiguration{
56+
{
57+
URL: "http://localhost:" + testIwfServerPort,
58+
},
59+
},
60+
})
61+
62+
// start a workflow
63+
wfId := rpcStorage.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
64+
wfInput := &iwfidl.EncodedObject{
65+
Encoding: iwfidl.PtrString("json"),
66+
Data: iwfidl.PtrString("\"start-input\""),
67+
}
68+
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
69+
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
70+
WorkflowId: wfId,
71+
IwfWorkflowType: rpcStorage.WorkflowType,
72+
WorkflowTimeoutSeconds: 30,
73+
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
74+
StartStateId: ptr.Any(rpcStorage.State1),
75+
StateInput: wfInput,
76+
}).Execute()
77+
failTestAtHttpError(err, httpResp, t)
78+
79+
// Wait briefly for workflow to initialize
80+
time.Sleep(time.Millisecond * 500)
81+
82+
var loadingPolicy *iwfidl.PersistenceLoadingPolicy
83+
if useLocking {
84+
// Use exclusive locking for synchronous updates
85+
loadingPolicy = &iwfidl.PersistenceLoadingPolicy{
86+
PersistenceLoadingType: iwfidl.PARTIAL_WITH_EXCLUSIVE_LOCK.Ptr(),
87+
PartialLoadingKeys: []string{
88+
rpcStorage.SmallDataKey,
89+
rpcStorage.LargeDataKey,
90+
},
91+
LockingKeys: []string{
92+
rpcStorage.SmallDataKey,
93+
rpcStorage.LargeDataKey,
94+
},
95+
}
96+
} else {
97+
// Use non-locking for regular RPC
98+
loadingPolicy = &iwfidl.PersistenceLoadingPolicy{
99+
PersistenceLoadingType: iwfidl.ALL_WITHOUT_LOCKING.Ptr(),
100+
}
101+
}
102+
103+
// Test 1: Make RPC call to test external storage loading functionality
104+
// The workflow waits for this RPC call to send an internal signal to close it
105+
reqRpc := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background())
106+
rpcResp, httpResp, err := reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{
107+
WorkflowId: wfId,
108+
RpcName: rpcStorage.UpdateDataAttributesRPC,
109+
Input: &rpcStorage.TestInput,
110+
DataAttributesLoadingPolicy: loadingPolicy,
111+
TimeoutSeconds: iwfidl.PtrInt32(10),
112+
}).Execute()
113+
114+
// The RPC should succeed since the workflow waits for it
115+
failTestAtHttpError(err, httpResp, t)
116+
assertions.Equal(&iwfidl.WorkflowRpcResponse{
117+
Output: &rpcStorage.TestOutput,
118+
}, rpcResp)
119+
t.Logf("✅ RPC call succeeded and sent signal to close workflow")
120+
121+
// Give a moment for the worker handler to be called and store test data
122+
time.Sleep(time.Millisecond * 100)
123+
124+
// Test 2: Verify the handler received correct data during RPC (this is the key test!)
125+
_, testData := wfHandler.GetTestResult()
126+
127+
// Verify the RPC handler received the loaded data (not references)
128+
rpcInputData, exists := testData[rpcStorage.UpdateDataAttributesRPC+"-received-data"]
129+
assertions.True(exists, "RPC should have received data attributes")
130+
131+
receivedDataAttrs, ok := rpcInputData.([]iwfidl.KeyValue)
132+
assertions.True(ok, "Received data should be KeyValue array")
133+
134+
// The handler should receive actual data content (loaded from external storage if needed)
135+
receivedDataMap := make(map[string]string)
136+
for _, attr := range receivedDataAttrs {
137+
if attr.Value != nil && attr.Value.Data != nil {
138+
receivedDataMap[*attr.Key] = *attr.Value.Data
139+
}
140+
}
141+
142+
// Both small and large data should be available as actual content to the handler
143+
// This verifies that loadDataObjectsFromExternalStorage works correctly in RPC calls
144+
if initialData, exists := receivedDataMap[rpcStorage.SmallDataKey]; exists {
145+
assertions.Equal(*rpcStorage.InitialSmallData.Data, initialData, "Handler should receive initial small data content")
146+
}
147+
if initialLargeData, exists := receivedDataMap[rpcStorage.LargeDataKey]; exists {
148+
assertions.Equal(*rpcStorage.InitialLargeData.Data, initialLargeData, "Handler should receive initial large data content (loaded from S3)")
149+
}
150+
151+
t.Logf("✅ External storage functionality verified: RPC handler received actual data content, proving that large data was correctly loaded from external storage")
152+
153+
// The workflow should complete after the RPC sent the internal signal
154+
// Wait for workflow to complete
155+
reqWait := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
156+
respWait, httpResp, err := reqWait.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
157+
WorkflowId: wfId,
158+
WaitTimeSeconds: iwfidl.PtrInt32(5),
159+
}).Execute()
160+
failTestAtHttpError(err, httpResp, t)
161+
162+
// Verify workflow completed successfully
163+
assertions.Equal(iwfidl.COMPLETED, respWait.WorkflowStatus, "Workflow should complete successfully")
164+
}

0 commit comments

Comments
 (0)