Skip to content

Commit

Permalink
Merge pull request #928 from permaweb/jfrain99/add-backoff-gateway-sc…
Browse files Browse the repository at this point in the history
…heduler-utils

perf(scheduler-utils): add exponential backoff to gateway requests
  • Loading branch information
jfrain99 authored Aug 1, 2024
2 parents 69bc1d9 + aa1ad02 commit 4efe8a2
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 21 deletions.
6 changes: 5 additions & 1 deletion scheduler-utils/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ specify those coonfigurations by providing their values `connect`. You can curre
- The `GRAPHQL_URL`
- The In-Memory `cacheSize`
- Following Redirects `followRedirects`, a boolean that optimizes scheduler routing if `true`
- `GRAPHQL_MAX_RETRIES`, the maximum amount of retries for failed gateway queries
- `GRAPHQL_RETRY_BACKOFF`, the initial delay for a gateway query retry. Doubled for each successive retry

> If you'd like to use no In-Memory Cache, and load the record from chain every time, then set the `cacheSize` to `0`
Expand All @@ -108,7 +110,9 @@ import { connect } from "@permaweb/ao-scheduler-utils";
const { validate, locate, raw } = connect({
GRAPHQL_URL: "...",
cacheSize: 1000,
followRedirects: true
followRedirects: true,
GRAPHQL_MAX_RETRIES: 0,
GRAPHQL_RETRY_BACKOFF: 300
});
```

Expand Down
28 changes: 16 additions & 12 deletions scheduler-utils/src/client/gateway.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { defaultTo, find, juxt, path, pipe, prop, propEq } from 'ramda'

import { InvalidSchedulerLocationError, SchedulerTagNotFoundError, TransactionNotFoundError } from '../err.js'
import { backoff, okRes } from '../utils.js'

const URL_TAG = 'Url'
const TTL_TAG = 'Time-To-Live'
Expand All @@ -22,20 +23,23 @@ const findTransactionTags = (err) => pipe(
defaultTo([])
)

function gatewayWith ({ fetch, GRAPHQL_URL }) {
function gatewayWith ({ fetch, GRAPHQL_URL, GRAPHQL_MAX_RETRIES = 0, GRAPHQL_RETRY_BACKOFF = 300 }) {
return async ({ query, variables }) => {
return fetch(GRAPHQL_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query, variables })
})
.then((res) => res.json())
return backoff(
() => fetch(GRAPHQL_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query, variables })
})
.then(okRes)
.then((res) => res.json()),
{ maxRetries: GRAPHQL_MAX_RETRIES, delay: GRAPHQL_RETRY_BACKOFF })
}
}

export function loadProcessSchedulerWith ({ fetch, GRAPHQL_URL }) {
const gateway = gatewayWith({ fetch, GRAPHQL_URL })
const loadScheduler = loadSchedulerWith({ fetch, GRAPHQL_URL })
export function loadProcessSchedulerWith ({ fetch, GRAPHQL_URL, GRAPHQL_MAX_RETRIES, GRAPHQL_RETRY_BACKOFF }) {
const gateway = gatewayWith({ fetch, GRAPHQL_URL, GRAPHQL_MAX_RETRIES, GRAPHQL_RETRY_BACKOFF })
const loadScheduler = loadSchedulerWith({ fetch, GRAPHQL_URL, GRAPHQL_MAX_RETRIES, GRAPHQL_RETRY_BACKOFF })

const GET_TRANSACTIONS_QUERY = `
query GetTransactions ($transactionIds: [ID!]!) {
Expand Down Expand Up @@ -64,8 +68,8 @@ export function loadProcessSchedulerWith ({ fetch, GRAPHQL_URL }) {
}
}

export function loadSchedulerWith ({ fetch, GRAPHQL_URL }) {
const gateway = gatewayWith({ fetch, GRAPHQL_URL })
export function loadSchedulerWith ({ fetch, GRAPHQL_URL, GRAPHQL_MAX_RETRIES, GRAPHQL_RETRY_BACKOFF }) {
const gateway = gatewayWith({ fetch, GRAPHQL_URL, GRAPHQL_MAX_RETRIES, GRAPHQL_RETRY_BACKOFF })

const GET_SCHEDULER_LOCATION = `
query GetSchedulerLocation ($owner: String!) {
Expand Down
4 changes: 3 additions & 1 deletion scheduler-utils/src/index.browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ export * from './index.common.js'
const GRAPHQL_URL = globalThis.GRAPHQL_URL || undefined
const CACHE_SIZE = globalThis.SCHEDULER_UTILS_CACHE_SIZE || undefined
const FOLLOW_REDIRECTS = globalThis.SCHEDULER_UTILS_FOLLOW_REDIRECTS === 'true' || undefined
const GRAPHQL_MAX_RETRIES = globalThis.GRAPHQL_MAX_RETRIES || 0
const GRAPHQL_RETRY_BACKOFF = globalThis.GRAPHQL_RETRY_BACKOFF || 300

const { locate, validate, raw } = connect({ GRAPHQL_URL, cacheSize: CACHE_SIZE, followRedirects: FOLLOW_REDIRECTS })
const { locate, validate, raw } = connect({ GRAPHQL_URL, cacheSize: CACHE_SIZE, followRedirects: FOLLOW_REDIRECTS, GRAPHQL_MAX_RETRIES, GRAPHQL_RETRY_BACKOFF })

export { locate, validate, raw }
13 changes: 8 additions & 5 deletions scheduler-utils/src/index.common.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import { validateWith } from './validate.js'
export * from './err.js'

const DEFAULT_GRAPHQL_URL = 'https://arweave.net/graphql'

const DEFAULT_GRAPHQL_MAX_RETRIES = 0
const DEFAULT_GRAPHQL_RETRY_BACKOFF = 300
/**
* @typedef ConnectParams
* @property {number} [cacheSize] - the size of the internal LRU cache
Expand All @@ -21,17 +22,19 @@ const DEFAULT_GRAPHQL_URL = 'https://arweave.net/graphql'
* - a GRAPHQL_URL. Defaults to https://arweave.net/graphql
* - a cache size for the internal LRU cache. Defaults to 100
* - whether or not to follow redirects when locating a scheduler. Defaults to false
* - a max amount of retries on gateway queries. Defaults to 0
* - retry delay for retries on gateway queries. Defaults to 300 ms
*
* If either value is not provided, a default will be used.
* If any value is not provided, a default will be used.
* Invoking connect() with no parameters or an empty object is functionally equivalent
* to using the top-lvl exports
*
* @param {ConnectParams} [params]
*/
export function connect ({ cacheSize = 100, GRAPHQL_URL = DEFAULT_GRAPHQL_URL, followRedirects = false } = {}) {
export function connect ({ cacheSize = 100, GRAPHQL_URL = DEFAULT_GRAPHQL_URL, followRedirects = false, GRAPHQL_MAX_RETRIES = DEFAULT_GRAPHQL_MAX_RETRIES, GRAPHQL_RETRY_BACKOFF = DEFAULT_GRAPHQL_RETRY_BACKOFF } = {}) {
const _cache = InMemoryClient.createLruCache({ size: cacheSize })

const loadScheduler = GatewayClient.loadSchedulerWith({ fetch, GRAPHQL_URL })
const loadScheduler = GatewayClient.loadSchedulerWith({ fetch, GRAPHQL_URL, GRAPHQL_MAX_RETRIES, GRAPHQL_RETRY_BACKOFF })
const cache = {
getByProcess: InMemoryClient.getByProcessWith({ cache: _cache }),
getByOwner: InMemoryClient.getByOwnerWith({ cache: _cache }),
Expand All @@ -42,7 +45,7 @@ export function connect ({ cacheSize = 100, GRAPHQL_URL = DEFAULT_GRAPHQL_URL, f
* Locate the scheduler for the given process.
*/
const locate = locateWith({
loadProcessScheduler: GatewayClient.loadProcessSchedulerWith({ fetch, GRAPHQL_URL }),
loadProcessScheduler: GatewayClient.loadProcessSchedulerWith({ fetch, GRAPHQL_URL, GRAPHQL_MAX_RETRIES, GRAPHQL_RETRY_BACKOFF }),
loadScheduler,
cache,
followRedirects,
Expand Down
4 changes: 3 additions & 1 deletion scheduler-utils/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ export * from './index.common.js'
const GRAPHQL_URL = process.env.GRAPHQL_URL || undefined
const CACHE_SIZE = process.env.SCHEDULER_UTILS_CACHE_SIZE || undefined
const FOLLOW_REDIRECTS = process.env.SCHEDULER_UTILS_FOLLOW_REDIRECTS === 'true' || undefined
const GRAPHQL_MAX_RETRIES = process.env.GRAPHQL_MAX_RETRIES || 0
const GRAPHQL_RETRY_BACKOFF = process.env.GRAPHQL_RETRY_BACKOFF || 300

const { locate, validate, raw } = connect({ GRAPHQL_URL, cacheSize: CACHE_SIZE, followRedirects: FOLLOW_REDIRECTS })
const { locate, validate, raw } = connect({ GRAPHQL_URL, cacheSize: CACHE_SIZE, followRedirects: FOLLOW_REDIRECTS, GRAPHQL_MAX_RETRIES, GRAPHQL_RETRY_BACKOFF })

export { locate, validate, raw }
53 changes: 53 additions & 0 deletions scheduler-utils/src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,56 @@ export function trimSlash (str = '') {
str = str.trim()
return str.endsWith('/') ? trimSlash(str.slice(0, -1)) : str
}

/**
* A function that, given a function, will immediately invoke it,
* then retry it on errors, using an exponential backoff.
*
* If the final retry fails, then the overall Promise is rejected
* with that error
*
* @param {function} fn - the function to be called
* @param {{ maxRetries: number, delay: number }} param1 - the number of total retries and increased delay for each try
*/
export const backoff = (
fn,
{ maxRetries = 0, delay = 300 }
) => {
/**
* Recursive function that recurses with exponential backoff
*/
const action = (retry, delay) => {
return Promise.resolve()
.then(fn)
.catch((err) => {
// Reached max number of retries
if (retry >= maxRetries) {
return Promise.reject(err)
}

/**
* increment the retry count Retry with an exponential backoff
*/
const newRetry = retry + 1
const newDelay = delay + delay
/**
* Retry in {delay} milliseconds
*/
return new Promise((resolve) => setTimeout(resolve, delay))
.then(() => action(newRetry, newDelay))
})
}

return action(0, delay)
}

/**
* Checks if a response is OK. Otherwise, throw response.
*
* @param {Response} res - The response to check
* @returns
*/
export const okRes = (res) => {
if (res.ok) return res
throw res
}
74 changes: 73 additions & 1 deletion scheduler-utils/src/utils.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, test } from 'node:test'
import * as assert from 'node:assert'
import { trimSlash } from './utils.js'
import { backoff, trimSlash } from './utils.js'

describe('trimSlash', () => {
test('should remove trailing slash from url', () => {
Expand All @@ -10,3 +10,75 @@ describe('trimSlash', () => {
assert.equal(resultWithoutTrailingSlash, 'https://foo.bar')
})
})

describe('backoff', () => {
function isPromise (obj) {
return (
!!obj &&
(typeof obj === 'object' || typeof obj === 'function') &&
typeof obj.then === 'function'
)
}

test('should return a promise', () => {
assert.ok(isPromise(
backoff(() => Promise.resolve(''), {
maxRetries: 0,
delay: 0
})
))

assert.ok(isPromise(
backoff(() => '', {
maxRetries: 0,
delay: 0
})
))
})

test('should not retry calling the function', async () => {
let count = 0
const fn = () => count++
await backoff(fn, {
maxRetries: 0,
delay: 0
})

assert.equal(count, 1)
})

test('should retry calling the function', async () => {
let count = 0
const fn = () => {
count++
return count
? Promise.resolve('foo')
// eslint-disable-next-line
: Promise.reject('bar')
}

const res = await backoff(fn, {
maxRetries: 1,
delay: 0
})

assert.equal(res, 'foo')
})

test('should bubble the error if all retries are unsuccessful', async () => {
let count = 0
const fn = () => {
count++
// eslint-disable-next-line
return Promise.reject('bar')
}

await backoff(fn, {
maxRetries: 2,
delay: 0
}).catch(err => {
assert.equal(err, 'bar')
assert.equal(count, 3)
})
})
})

0 comments on commit 4efe8a2

Please sign in to comment.