Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions packages/core/automation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# @galaxy-kj/core-automation

Automation engine for scheduling and executing Stellar and DeFi workflows.

## Price-triggered automations

`AutomationService` now supports live oracle-backed price evaluation for
automations that use `type: 'price'` conditions.

### Configure the service

```ts
import { AutomationService, TriggerType } from '@galaxy-kj/core-automation';
import { OracleAggregator } from '@galaxy-kj/core-oracles';

const oracle = new OracleAggregator();
const automationService = new AutomationService({
oracle,
});
```

### Start polling for price rules

```ts
await automationService.startPriceMonitoring(30_000);
```

This polling loop:

- fetches live prices for the assets referenced by active `TriggerType.PRICE`
rules
- populates `ExecutionContext.priceContext`
- evaluates `PRICE_ABOVE` / `PRICE_BELOW` style conditions against the live
price map
- executes matching rules through the normal automation pipeline

### Fallback behavior

If no oracle is configured, non-price automations continue to work normally.
Price conditions simply evaluate to `false` unless a caller provides
`ExecutionContext.priceContext` explicitly.
40 changes: 40 additions & 0 deletions packages/core/automation/jest.config.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
roots: ['<rootDir>/src/test'],
testMatch: ['**/*.test.ts'],
collectCoverageFrom: [
'src/**/*.ts',
'!src/**/*.d.ts',
'!src/test/**',
'!src/utils/cron-manager.ts',
'!src/utils/execution-engine.ts',
],
coverageThreshold: {
global: {
branches: 85,
functions: 90,
lines: 90,
statements: 90,
},
},
coverageDirectory: 'coverage',
coverageReporters: ['text', 'lcov', 'json', 'json-summary'],
transform: {
'^.+\\.ts$': [
'ts-jest',
{
useESM: false,
tsconfig: {
module: 'commonjs',
moduleResolution: 'node',
},
},
],
},
moduleNameMapper: {
'^@galaxy-kj/core-oracles$': '<rootDir>/../oracles/src/index.ts',
'^(\\.{1,2}/.*)\\.js$': '$1',
},
moduleFileExtensions: ['ts', 'js', 'json'],
};
1 change: 1 addition & 0 deletions packages/core/automation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"build": "tsc",
"dev": "tsc --watch",
"test": "jest",
"test:coverage": "jest --coverage",
"lint": "eslint src/**/*.ts",
"lint:fix": "eslint src/**/*.ts --fix",
"type-check": "tsc --noEmit",
Expand Down
148 changes: 144 additions & 4 deletions packages/core/automation/src/services/automation.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import {
ExecutionContext,
ExecutionResult,
AutomationMetrics,
PriceConditionContext,
PriceTriggerCondition,
TriggerType,
StellarNetwork,
} from '../types/automation-types.js';
import { CronManager } from '../utils/cron-manager.js';
import { ConditionEvaluator } from '../utils/condition-evaluator.js';
import { ExecutionEngine } from '../utils/execution-engine.js';
import { OracleAggregator } from '@galaxy-kj/core-oracles';
import { supabaseClient } from '../../stellar-sdk/src/utils/supabase-client.js';
import { supabaseClient } from '../../../stellar-sdk/src/utils/supabase-client.js';

export interface AutomationServiceConfig {
network?: StellarNetwork;
Expand All @@ -31,12 +33,20 @@ export class AutomationService extends EventEmitter {
private cronManager: CronManager;
private conditionEvaluator: ConditionEvaluator;
private executionEngine: ExecutionEngine;
private oracle?: OracleAggregator;
private rules: Map<string, AutomationRule> = new Map();
private metrics: Map<string, AutomationMetrics> = new Map();
private activeExecutions: Set<string> = new Set();
private config: Required<AutomationServiceConfig>;
private config: {
network: StellarNetwork;
sourceSecret: string;
maxConcurrentExecutions: number;
executionTimeout: number;
enableMetrics: boolean;
};
private network: StellarNetwork;
private activeTimeouts: Map<string, NodeJS.Timeout> = new Map();
private priceMonitorInterval?: NodeJS.Timeout;
private supabase = supabaseClient;

constructor(config: AutomationServiceConfig = {}) {
Expand All @@ -56,8 +66,9 @@ export class AutomationService extends EventEmitter {
enableMetrics: config.enableMetrics !== false,
};

this.oracle = config.oracle;
this.cronManager = new CronManager();
this.conditionEvaluator = new ConditionEvaluator(config.oracle);
this.conditionEvaluator = new ConditionEvaluator();
this.executionEngine = new ExecutionEngine(
this.network,
this.config.sourceSecret
Expand Down Expand Up @@ -171,7 +182,7 @@ export class AutomationService extends EventEmitter {

try {
// Build execution context
const context: ExecutionContext = {
const baseContext: ExecutionContext = {
ruleId,
userId: rule.userId,
timestamp: new Date(),
Expand All @@ -180,6 +191,7 @@ export class AutomationService extends EventEmitter {
},
...contextData,
};
const context = await this.attachLivePrices(rule, baseContext);

// Evaluate conditions
const conditionsMet = await this.conditionEvaluator.evaluateConditionGroup(
Expand Down Expand Up @@ -296,6 +308,68 @@ export class AutomationService extends EventEmitter {
}
}

async startPriceMonitoring(intervalMs: number = 30_000): Promise<void> {
if (!Number.isFinite(intervalMs) || intervalMs <= 0) {
throw new Error('Price monitoring interval must be greater than 0');
}

if (!this.oracle) {
throw new Error('Oracle not configured for price monitoring');
}

this.stopPriceMonitoring();
await this.runPriceMonitoringCycle();

this.priceMonitorInterval = setInterval(() => {
void this.runPriceMonitoringCycle();
}, intervalMs);
}

stopPriceMonitoring(): void {
if (!this.priceMonitorInterval) {
return;
}

clearInterval(this.priceMonitorInterval);
this.priceMonitorInterval = undefined;
}

async checkAndExecute(
contextData?: Partial<ExecutionContext>
): Promise<ExecutionResult[]> {
const activePriceRules = this.getAllRules().filter(
rule =>
rule.status === AutomationStatus.ACTIVE &&
rule.triggerType === TriggerType.PRICE
);

if (activePriceRules.length === 0) {
return [];
}

const priceContext = await this.buildPriceContext(
activePriceRules,
contextData?.priceContext
);

return Promise.all(
activePriceRules.map(rule =>
this.executeRule(rule.id, {
...contextData,
priceContext,
})
)
);
Comment on lines +355 to +362
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Concurrent execution of price rules may bypass maxConcurrentExecutions limit.

Promise.all executes all active price rules simultaneously. If there are many active price rules, this could exceed maxConcurrentExecutions (checked per-rule in executeRule but each call increments activeExecutions concurrently before any checks complete).

Consider using a controlled concurrency pattern (e.g., p-limit or sequential execution) or batching to respect the configured limit.

🛠️ Example using sequential execution
-    return Promise.all(
-      activePriceRules.map(rule =>
-        this.executeRule(rule.id, {
-          ...contextData,
-          priceContext,
-        })
-      )
-    );
+    const results: ExecutionResult[] = [];
+    for (const rule of activePriceRules) {
+      const result = await this.executeRule(rule.id, {
+        ...contextData,
+        priceContext,
+      });
+      results.push(result);
+    }
+    return results;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/core/automation/src/services/automation.service.ts` around lines 355
- 362, The current Promise.all over activePriceRules causes all
executeRule(rule.id, {...}) calls to start concurrently and can bypass the
per-rule maxConcurrentExecutions check (because activeExecutions is incremented
concurrently); change this to a controlled-concurrency pattern by serializing or
limiting parallel invocations: either use a concurrency limiter like p-limit and
wrap each call with limit(() => this.executeRule(...)) or iterate the
activePriceRules with an async for-loop and await each executeRule (or process
in configurable batches) so that the effective number of concurrent executeRule
executions respects maxConcurrentExecutions and updates to activeExecutions
occur under the intended limit. Ensure references to activePriceRules,
executeRule, maxConcurrentExecutions, and activeExecutions are updated
accordingly.

}

private async runPriceMonitoringCycle(): Promise<void> {
try {
await this.checkAndExecute();
} catch (error) {
this.emit('rule:error', { ruleId: 'price-monitor', error });
}
}

/**
* Update rule status
*/
Expand Down Expand Up @@ -513,6 +587,71 @@ export class AutomationService extends EventEmitter {
this.metrics.set(ruleId, metrics);
}

private async attachLivePrices(
rule: AutomationRule,
context: ExecutionContext
): Promise<ExecutionContext> {
const priceContext = await this.buildPriceContext([rule], context.priceContext);
if (!priceContext) {
return context;
}

return {
...context,
priceContext,
};
}

private async buildPriceContext(
rules: AutomationRule[],
existingPriceContext?: PriceConditionContext
): Promise<PriceConditionContext | undefined> {
if (existingPriceContext) {
return existingPriceContext;
}

if (!this.oracle) {
return undefined;
}

const assets = Array.from(
new Set(rules.flatMap(rule => this.collectPriceAssets(rule.conditionGroup)))
);

if (assets.length === 0) {
return undefined;
}

const aggregatedPrices = await this.oracle.getAggregatedPrices(assets);

return {
prices: Object.fromEntries(
aggregatedPrices.map((price: { symbol: string; price: number }) => [
price.symbol,
price.price,
])
),
timestamp: Date.now(),
};
}

private collectPriceAssets(
group: AutomationRule['conditionGroup']
): string[] {
const directAssets = group.conditions
.filter(
(condition): condition is PriceTriggerCondition =>
'type' in condition && condition.type === 'price'
)
.map(condition => condition.asset);

const nestedAssets = (group.groups || []).flatMap(nestedGroup =>
this.collectPriceAssets(nestedGroup)
);

return [...directAssets, ...nestedAssets];
}

private sanitizeAuditMetadata(
metadata?: Record<string, unknown>
): Record<string, unknown> | undefined {
Expand Down Expand Up @@ -601,6 +740,7 @@ export class AutomationService extends EventEmitter {
* Shutdown service
*/
async shutdown(): Promise<void> {
this.stopPriceMonitoring();
this.cronManager.destroy();
this.removeAllListeners();
}
Expand Down
Loading
Loading