Skip to content

Commit 5dcf5a5

Browse files
authored
Track progress updates from long running tasks for better debugging and rich audit logs. (#189)
* TaskWorker accepts interim results from tasks that are still processing. * Can now push interim updates for a task still processing using the JS SDK. * Can now push interim updates for a task still processing using the Python SDK. * Updated Orchestration View to handle interim task results. * Can display interim task results as short and long progress updates on CLI. * Prepared changelog for latest feature release.
1 parent 060357e commit 5dcf5a5

40 files changed

+2551
-1633
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,11 @@ Download the latest CLI binary for your platform from our [releases page](https:
8282

8383
```shell
8484
# macOS
85-
curl -L https://github.com/orra-dev/orra/releases/download/v0.2.2/orra-darwin-arm64 -o /usr/local/bin/orra
85+
curl -L https://github.com/orra-dev/orra/releases/download/v0.2.3/orra-darwin-arm64 -o /usr/local/bin/orra
8686
chmod +x /usr/local/bin/orra
8787

8888
# Linux
89-
curl -L https://github.com/ezodude/orra/releases/download/v0.2.2/orra-linux-amd64 -o /usr/local/bin/orra
89+
curl -L https://github.com/ezodude/orra/releases/download/v0.2.3/orra-linux-amd64 -o /usr/local/bin/orra
9090
chmod +x /usr/local/bin/orra
9191

9292
# Verify installation

cli/cmd/inspect.go

+104
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121

2222
func newInspectCmd(opts *CliOpts) *cobra.Command {
2323
var detailed bool
24+
var shortUpdates bool
25+
var longUpdates bool
2426

2527
cmd := &cobra.Command{
2628
Use: "inspect [orchestration-id]",
@@ -101,6 +103,48 @@ func newInspectCmd(opts *CliOpts) *cobra.Command {
101103
fmt.Println(statusLine)
102104
}
103105

106+
if (shortUpdates || longUpdates) && len(task.InterimResults) > 0 {
107+
// Show the simplified timeline
108+
timelineStr := renderTaskTimeline(task.StatusHistory, task.InterimResults)
109+
if timelineStr != "" {
110+
fmt.Println(timelineStr)
111+
}
112+
113+
fmt.Printf("│\n│ Progress Updates %s\n", strings.Repeat("─", 35))
114+
115+
if longUpdates {
116+
// Show all updates
117+
for j, result := range task.InterimResults {
118+
fmt.Printf("│ %s Update %d/%d\n",
119+
result.Timestamp.Format("15:04:05"),
120+
j+1,
121+
len(task.InterimResults))
122+
printJSONWithPrefix(result.Data, "│ ")
123+
fmt.Printf("│\n")
124+
}
125+
} else {
126+
// Show only first and last updates
127+
first := task.InterimResults[0]
128+
fmt.Printf("│ %s First Update\n", first.Timestamp.Format("15:04:05"))
129+
printJSONWithPrefix(first.Data, "│ ")
130+
131+
// Show last update if different from first
132+
if len(task.InterimResults) > 1 {
133+
last := task.InterimResults[len(task.InterimResults)-1]
134+
fmt.Printf("│\n│ %s Latest Update\n", last.Timestamp.Format("15:04:05"))
135+
printJSONWithPrefix(last.Data, "│ ")
136+
}
137+
138+
if len(task.InterimResults) > 2 {
139+
fmt.Printf("│\n│ Use --long-updates to show all %d updates\n",
140+
len(task.InterimResults))
141+
}
142+
}
143+
144+
// Add divider after progress updates section
145+
fmt.Printf("│\n│ %s\n", strings.Repeat("─", 50))
146+
}
147+
104148
// Print compensation history if present
105149
if len(task.CompensationHistory) > 0 {
106150
fmt.Printf("│ Compensating %s\n", strings.Repeat("─", 37))
@@ -149,6 +193,16 @@ func newInspectCmd(opts *CliOpts) *cobra.Command {
149193
}
150194

151195
cmd.Flags().BoolVarP(&detailed, "detailed", "d", false, "Show detailed task history with I/O")
196+
cmd.Flags().BoolVarP(&shortUpdates, "updates", "u", false, "Show summarized progress updates (first and last only)")
197+
cmd.Flags().BoolVar(&longUpdates, "long-updates", false, "Show all progress updates with complete details")
198+
199+
cmd.PreRunE = func(cmd *cobra.Command, args []string) error {
200+
if shortUpdates && longUpdates {
201+
return fmt.Errorf("--updates and --long-updates flags cannot be used together")
202+
}
203+
return nil
204+
}
205+
152206
return cmd
153207
}
154208

@@ -199,3 +253,53 @@ func formatInspectionError(err string) string {
199253
}
200254
return err
201255
}
256+
257+
func renderTaskTimeline(statusHistory []api.TaskStatusEvent, interimResults []api.InterimTaskResult) string {
258+
if len(statusHistory) == 0 || len(interimResults) == 0 {
259+
return ""
260+
}
261+
262+
// Extract key information only
263+
startTime := statusHistory[0].Timestamp
264+
endTime := statusHistory[len(statusHistory)-1].Timestamp
265+
duration := endTime.Sub(startTime).Seconds()
266+
267+
// Extract latest progress percentage if available
268+
latestProgress := 0
269+
for _, result := range interimResults {
270+
var data map[string]interface{}
271+
if err := json.Unmarshal(result.Data, &data); err == nil {
272+
if p, ok := data["progress"].(float64); ok {
273+
latestProgress = int(p)
274+
} else if p, ok := data["percentComplete"].(float64); ok {
275+
latestProgress = int(p)
276+
}
277+
}
278+
}
279+
280+
// Build simple progress bar (inspired by kubectl)
281+
var sb strings.Builder
282+
sb.WriteString("│ Progress: [")
283+
284+
// Progress bar (20 chars max)
285+
progressChars := 20
286+
filledChars := int(float64(latestProgress) / 100 * float64(progressChars))
287+
sb.WriteString(strings.Repeat("=", filledChars))
288+
289+
if filledChars < progressChars {
290+
sb.WriteString(">") // Progress indicator
291+
sb.WriteString(strings.Repeat(" ", progressChars-filledChars-1))
292+
}
293+
294+
sb.WriteString(fmt.Sprintf("] %d%%", latestProgress))
295+
296+
// Add timing information
297+
sb.WriteString(fmt.Sprintf(" (%.1fs elapsed)", duration))
298+
sb.WriteString("\n")
299+
300+
// Add update count
301+
sb.WriteString(fmt.Sprintf("│ Updates: %d", len(interimResults)))
302+
sb.WriteString("\n")
303+
304+
return sb.String()
305+
}

cli/cmd/version.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func newVersionCmd(_ *CliOpts) *cobra.Command {
1717
Use: "version",
1818
Short: "Print the client and server version information",
1919
RunE: func(cmd *cobra.Command, args []string) error {
20-
fmt.Println("Client Version: v0.2.2")
20+
fmt.Println("Client Version: v0.2.3")
2121
// TODO: Implement server version check
2222
return nil
2323
},

cli/internal/api/api.go

+7
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ type TaskInspectResponse struct {
137137
Output json.RawMessage `json:"output,omitempty"`
138138
Error string `json:"error,omitempty"`
139139
Duration time.Duration `json:"duration"`
140+
InterimResults []InterimTaskResult `json:"interimResults,omitempty"`
140141
Compensation *TaskCompensationStatus `json:"compensation,omitempty"`
141142
CompensationHistory []CompensationStatusEvent `json:"compensationHistory,omitempty"`
142143
IsRevertible bool `json:"isRevertible"`
@@ -149,6 +150,12 @@ type TaskCompensationStatus struct {
149150
Timestamp time.Time `json:"timestamp"` // When compensation started
150151
}
151152

153+
type InterimTaskResult struct {
154+
ID string `json:"id"`
155+
Timestamp time.Time `json:"timestamp"`
156+
Data json.RawMessage `json:"data"`
157+
}
158+
152159
type CompensationStatusEvent struct {
153160
ID string `json:"id"`
154161
TaskID string `json:"taskId"`

docs/cli.md

+32-2
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,21 @@ await svc.register({
8888
// Start handling tasks
8989
svc.start(async (task) => {
9090
// Your service logic here
91+
92+
// Report progress during long-running tasks
93+
await task.pushUpdate({
94+
progress: 25,
95+
message: "Processing customer data..."
96+
});
97+
98+
// Continue processing and report more progress
99+
await task.pushUpdate({
100+
progress: 50,
101+
message: "Analyzing request..."
102+
});
103+
104+
// Complete the task with final result
105+
return { success: true, data: { /* your result */ } };
91106
});
92107
```
93108

@@ -182,6 +197,12 @@ orra inspect o_abc123
182197

183198
# Get comprehensive details with inputs/outputs
184199
orra inspect -d o_abc123
200+
201+
# View progress updates for tasks
202+
orra inspect -d o_abc123 --updates
203+
204+
# View complete progress details for long-running tasks
205+
orra inspect -d o_abc123 --long-updates
185206
```
186207

187208
### Grounding Management
@@ -237,6 +258,10 @@ orra ps
237258
orra inspect -d o_failed123
238259

239260
# Shows full task history, inputs, and outputs
261+
orra inspect -d o_failed123 --updates
262+
263+
# View all progress updates for problematic tasks
264+
orra inspect -d o_failed123 --long-updates
240265
```
241266

242267
3. **Multiple Projects or environments**
@@ -282,9 +307,14 @@ orra config reset
282307
orra inspect -d <action-id>
283308
```
284309

285-
2. View recent actions:
310+
2. View task progress and updates:
311+
```bash
312+
orra inspect -d <action-id> --updates
313+
```
314+
315+
3. View recent actions:
286316
```bash
287317
orra ps
288318
```
289319

290-
3. Visit our documentation: [https://orra.dev/docs](https://orra.dev/docs)
320+
4. Visit our documentation: [https://orra.dev/docs](https://orra.dev/docs)

docs/core.md

+37-5
Original file line numberDiff line numberDiff line change
@@ -156,17 +156,31 @@ Orra uses an append-only log (similar to Kafka) for task coordination:
156156
- Immutable history for debugging
157157
- Basis for exactly-once execution
158158
- Clean recovery after failures
159+
- Storage for progress updates from long-running tasks
159160

160161
Example log sequence:
161162
```
162163
[0001] Task customer_context started
163164
[0002] Task order_details started
164-
[0003] Task customer_context completed
165-
[0004] Task order_details completed
166-
[0005] Task generate_response started
167-
[0006] Task generate_response completed
165+
[0003] Task customer_context interim_output (20% complete)
166+
[0004] Task customer_context interim_output (50% complete)
167+
[0005] Task order_details completed
168+
[0006] Task customer_context interim_output (90% complete)
169+
[0007] Task customer_context completed
170+
[0008] Task generate_response started
171+
[0009] Task generate_response completed
168172
```
169173

174+
#### Progress Updates in the Log
175+
176+
The Plan Engine stores interim task results in the log as special entries with type `task_interim_output`. These entries:
177+
178+
- Are appended while a task is running (before completion)
179+
- Maintain chronological ordering with other log entries
180+
- Are retrievable for debugging and monitoring purposes (e.g. inspect with the CLI `--updates` or `--long-updates`)
181+
182+
This mechanism provides transparency into task execution without compromising the reliability of the core task processing model.
183+
170184
### Advanced Usage Patterns
171185

172186
#### 1. Dynamic Service/Agent Registration
@@ -221,7 +235,25 @@ orra inspect -d o_xxxxxxxxxxxxxx
221235
│ inventory-service (task1)
222236
│ ──────────────────────────────────────────────────
223237
│ 14:07:43 ◎ Processing
224-
│ 14:07:43 ● Completed
238+
│ 14:07:44 ● Completed
239+
240+
│ Progress: [====================] 100% (1.0s elapsed)
241+
│ Updates: 3
242+
243+
│ Progress Updates ───────────────────────────────────
244+
│ 14:07:43 First Update
245+
│ {
246+
"progress": 30,
247+
"status": "processing",
248+
"message": "Checking inventory"
249+
│ }
250+
251+
│ 14:07:44 Latest Update
252+
│ {
253+
"progress": 100,
254+
"status": "processing",
255+
"message": "Product located"
256+
│ }
225257
226258
│ Input:
227259
│ {

docs/sdks/js-sdk.md

+67
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,73 @@ service.onRevert(async (task, result) => {
187187
188188
## Advanced Features
189189
190+
### Progress Updates
191+
192+
For long-running tasks, you can send interim progress updates to the Plan Engine. This allows monitoring task execution in real-time and provides valuable information for debugging and audit logs.
193+
194+
The update is any object with properties that make sense for the task underway.
195+
196+
```javascript
197+
service.start(async (task) => {
198+
try {
199+
// 1. Begin processing
200+
await task.pushUpdate({
201+
progress: 20,
202+
status: "processing",
203+
message: "Starting data analysis"
204+
});
205+
206+
// 2. Continue with more steps
207+
await someFunction();
208+
await task.pushUpdate({
209+
progress: 50,
210+
status: "processing",
211+
message: "Processing halfway complete"
212+
});
213+
214+
// 3. Almost done
215+
await finalSteps();
216+
await task.pushUpdate({
217+
progress: 90,
218+
status: "processing",
219+
message: "Finalizing results"
220+
});
221+
222+
// 4. Return final result
223+
return { success: true, results: [...] };
224+
} catch (error) {
225+
// Handle errors
226+
throw error;
227+
}
228+
});
229+
```
230+
231+
#### Benefits of Progress Updates
232+
233+
- **Visibility**: Track execution of long-running tasks in real-time
234+
- **Debugging**: Identify exactly where tasks slow down or fail
235+
- **Audit Trail**: Maintain a complete history of task execution
236+
- **User Experience**: Forward progress information to end-users
237+
238+
#### Viewing Progress Updates
239+
240+
Use the Orra CLI to view progress updates:
241+
242+
```bash
243+
# View summarized progress updates
244+
orra inspect -d <orchestration-id> --updates
245+
246+
# View all detailed progress updates
247+
orra inspect -d <orchestration-id> --long-updates
248+
```
249+
250+
#### Best Practices
251+
252+
- Send updates at meaningful milestones (not too frequent)
253+
- Include percentage completion when possible
254+
- Keep messages concise and informative
255+
- Use consistent status terminology
256+
190257
### Persistence Configuration
191258
192259
Orra's Plan Engine maintains service/agent identity across restarts using persistence. This is crucial for:

0 commit comments

Comments
 (0)