Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ dist
*.log
.DS_Store
.eslintcache

# Archive folder for test examples
examples-archive/
154 changes: 154 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ With this method, you can send a transaction and receive extremely fast response
- **Viem Integration:** Built on top of Viem for robust and type-safe interactions with the blockchain.
- **WebSocket Transport:** Includes a custom WebSocket transport for real-time Shreds monitoring.
- **Fast Response Times:** Achieve transaction confirmations as low as 5ms when close to the sequencer.
- **Enhanced Reliability:**
- **Automatic Reconnection:** Exponential backoff reconnection for resilient connections
- **Connection Status Tracking:** Real-time WebSocket connection health monitoring
- **Request Queuing:** Priority-based request queuing with automatic retry during network interruptions
- **Dynamic Subscription Management:** Add/remove addresses and pause/resume subscriptions without interruption

## Installation

Expand Down Expand Up @@ -202,6 +207,155 @@ const receipt = await client.sendRawTransactionSync({
})
```

### Connection Management

The Shred client now includes built-in connection monitoring and resilience features:

#### Monitoring Connection Status

```typescript
import { createPublicShredClient, shredsWebSocket } from 'shreds/viem'

const client = createPublicShredClient({
transport: shredsWebSocket('ws://your-endpoint'),
})

// Check connection status
console.log(client.isConnected()) // true/false
console.log(client.getConnectionStatus()) // 'connecting' | 'connected' | 'disconnected' | 'error'

// Get detailed connection statistics
const stats = client.getConnectionStats()
console.log(stats)
// {
// status: 'connected',
// connectedAt: 1234567890,
// reconnectAttempts: 0,
// totalConnections: 1,
// totalDisconnections: 0
// }

// Subscribe to connection changes
const unsubscribe = client.onConnectionChange((status) => {
console.log('Connection status changed:', status)
})

// Wait for connection with timeout
await client.waitForConnection(30000) // 30 second timeout
```

#### Configuring Reconnection

By default, the WebSocket transport will automatically reconnect with exponential backoff:

```typescript
const client = createPublicShredClient({
transport: shredsWebSocket('ws://your-endpoint', {
// Reconnection is enabled by default with these settings:
reconnect: {
attempts: 5, // Try 5 times
delay: 2000, // Start with 2s delay
},
// Delays will be: 2s → 4s → 8s → 16s → 30s (capped)
}),
})

// Disable reconnection
const clientNoReconnect = createPublicShredClient({
transport: shredsWebSocket('ws://your-endpoint', {
reconnect: false,
}),
})

// Custom reconnection settings
const clientCustom = createPublicShredClient({
transport: shredsWebSocket('ws://your-endpoint', {
reconnect: {
attempts: 10, // More attempts
delay: 5000, // Start with 5s delay
},
keepAlive: {
interval: 10000, // Ping every 10s
},
}),
})
```

### Dynamic Subscription Management

Manage subscriptions dynamically by adding/removing addresses or pausing event processing:

```typescript
// Create a managed subscription
const { subscription } = await client.watchContractShredEvent({
managed: true, // Enable dynamic management
buffered: true, // Buffer events during updates
abi: contractAbi,
eventName: 'Transfer',
address: [], // Start with no addresses
onLogs: (logs) => {
console.log('Transfer events:', logs);
}
});

// Dynamically add addresses
await subscription.addAddress('0x123...');
await subscription.addAddress('0x456...');

// Remove an address
await subscription.removeAddress('0x123...');

// Pause/resume event processing
subscription.pause();
// Events are buffered while paused
subscription.resume();
// Buffered events are delivered

// Get statistics
const stats = subscription.getStats();
console.log(`Events received: ${stats.eventCount}`);

// Unsubscribe when done
await subscription.unsubscribe();
```

### Request Queuing

Queue requests to handle network disruptions gracefully:

```typescript
// Queue a high-priority request
await client.queueRequest({
method: 'eth_sendRawTransaction',
params: [signedTx],
priority: 'high',
onSuccess: (result) => {
console.log('Transaction sent:', result);
},
onError: (error) => {
console.error('Transaction failed:', error);
}
});

// Monitor queue statistics
const stats = client.getQueueStats();
console.log(`Queued: ${stats.queueSize}, Processing: ${stats.processing}`);
console.log(`Success rate: ${(stats.processed / (stats.processed + stats.failed) * 100).toFixed(2)}%`);

// Control queue processing
client.pauseQueue(); // Pause processing
client.resumeQueue(); // Resume processing

// View queued requests
const requests = client.getQueuedRequests();
requests.forEach(req => {
console.log(`[${req.priority}] ${req.method} - retry ${req.retryCount}/${req.maxRetries}`);
});

// Clear all queued requests
client.clearQueue();
```

## Development

To set up the development environment:
Expand Down
43 changes: 39 additions & 4 deletions src/viem/actions/shred/watchContractShredEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import {
type LogTopic,
type Transport,
} from 'viem'
import { getSubscriptionManager } from '../../utils/subscription/manager'
import type { ShredsWebSocketTransport } from '../../clients/transports/shredsWebSocket'
import type { ShredLog } from '../../types/log'
import type { ManagedSubscription } from '../../utils/subscription/types'
import type { Abi, Address, ExtractAbiEvent } from 'abitype'

/**
Expand Down Expand Up @@ -78,12 +80,19 @@ export type WatchContractShredEventParameters<
* @default false
*/
strict?: strict | boolean | undefined
/** Whether to create a managed subscription that supports dynamic updates. */
managed?: boolean | undefined
/** Whether to buffer events during subscription updates (only with managed: true). */
buffered?: boolean | undefined
}

/**
* Return type for {@link watchContractShredEvent}.
*/
export type WatchContractShredEventReturnType = () => void
export type WatchContractShredEventReturnType = (() => void) & {
/** The managed subscription object, only present when managed: true */
subscription?: ManagedSubscription | undefined
}

/**
* Watches and returns emitted contract events that have been processed and confirmed as shreds
Expand All @@ -93,7 +102,7 @@ export type WatchContractShredEventReturnType = () => void
* @param parameters - {@link WatchContractShredEventParameters}
* @returns A function that can be used to unsubscribe from the event. {@link WatchContractShredEventReturnType}
*/
export function watchContractShredEvent<
export async function watchContractShredEvent<
chain extends Chain | undefined,
const abi_ extends Abi | readonly unknown[],
eventName_ extends ContractEventName<abi_> | undefined = undefined,
Expand All @@ -106,7 +115,7 @@ export function watchContractShredEvent<
>(
client: Client<transport, chain>,
parameters: WatchContractShredEventParameters<abi_, eventName_, strict>,
): WatchContractShredEventReturnType {
): Promise<WatchContractShredEventReturnType> {
const {
abi,
address,
Expand All @@ -115,6 +124,8 @@ export function watchContractShredEvent<
onError,
onLogs,
strict: strict_,
managed,
buffered,
} = parameters

const transport_ = (() => {
Expand Down Expand Up @@ -202,5 +213,29 @@ export function watchContractShredEvent<
})()
return () => unsubscribe()
}
return subscribeShredContractEvent()
// Handle managed subscriptions
if (managed) {
const manager = getSubscriptionManager()
const subscription = await manager.createManagedSubscription(client, {
abi,
address,
args,
eventName,
onError,
onLogs,
strict: strict_,
buffered,
})

// Return enhanced unsubscribe with subscription property
const enhancedUnsubscribe = Object.assign(
() => subscription.unsubscribe(),
{ subscription },
) as WatchContractShredEventReturnType

return enhancedUnsubscribe
}

// Regular subscription (backward compatible)
return subscribeShredContractEvent() as WatchContractShredEventReturnType
}
43 changes: 39 additions & 4 deletions src/viem/actions/shred/watchShredEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import {
type MaybeExtractEventArgsFromAbi,
type Transport,
} from 'viem'
import { getSubscriptionManager } from '../../utils/subscription/manager'
import type { ShredsWebSocketTransport } from '../../clients/transports/shredsWebSocket'
import type { ShredLog } from '../../types/log'
import type { ManagedSubscription } from '../../utils/subscription/types'

/**
* The parameter for the `onLogs` callback in {@link watchShredEvent}.
Expand Down Expand Up @@ -66,6 +68,10 @@ export type WatchShredEventParameters<
onError?: ((error: Error) => void) | undefined
/** The callback to call when new event logs are received. */
onLogs: WatchShredEventOnLogsFn<abiEvent, abiEvents, strict, _eventName>
/** Whether to create a managed subscription that supports dynamic updates. */
managed?: boolean | undefined
/** Whether to buffer events during subscription updates (only with managed: true). */
buffered?: boolean | undefined
} & (
| {
event: abiEvent
Expand Down Expand Up @@ -98,7 +104,10 @@ export type WatchShredEventParameters<
/**
* Return type for {@link watchShredEvent}.
*/
export type WatchShredEventReturnType = () => void
export type WatchShredEventReturnType = (() => void) & {
/** The managed subscription object, only present when managed: true */
subscription?: ManagedSubscription | undefined
}

/**
* Watches and returns emitted events that have been processed and confirmed as shreds
Expand All @@ -108,7 +117,7 @@ export type WatchShredEventReturnType = () => void
* @param parameters - {@link WatchShredEventParameters}
* @returns A function that can be used to unsubscribe from the event. {@link WatchShredEventReturnType}
*/
export function watchShredEvent<
export async function watchShredEvent<
chain extends Chain | undefined,
const abiEvent extends AbiEvent | undefined = undefined,
const abiEvents extends
Expand All @@ -131,8 +140,10 @@ export function watchShredEvent<
onError,
onLogs,
strict: strict_,
managed,
buffered,
}: WatchShredEventParameters<abiEvent, abiEvents, strict>,
): WatchShredEventReturnType {
): Promise<WatchShredEventReturnType> {
const transport_ = (() => {
if (client.transport.type === 'webSocket') return client.transport

Expand Down Expand Up @@ -226,5 +237,29 @@ export function watchShredEvent<
return () => unsubscribe()
}

return subscribeShredEvents()
// Handle managed subscriptions
if (managed) {
const manager = getSubscriptionManager()
const subscription = await manager.createManagedSubscription(client, {
address,
args,
event,
events,
onError,
onLogs,
strict: strict_,
buffered,
})

// Return enhanced unsubscribe with subscription property
const enhancedUnsubscribe = Object.assign(
() => subscription.unsubscribe(),
{ subscription },
) as WatchShredEventReturnType

return enhancedUnsubscribe
}

// Regular subscription (backward compatible)
return subscribeShredEvents() as WatchShredEventReturnType
}
Loading