Skip to content

Commit 1b5856f

Browse files
committed
network: resolve node endpoint when bucket isn't cached
1 parent d5c6f6e commit 1b5856f

File tree

2 files changed

+120
-44
lines changed

2 files changed

+120
-44
lines changed

src/network.js

Lines changed: 99 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ const loggerWS = require('debug')('kmjs:network:ws')
1010
const WS = require('./utils/websocket')
1111
const EventEmitter = require('eventemitter2')
1212
const km = require('./keymetrics')
13+
const async = require('async')
14+
15+
const BUFFERIZED = -1
1316

1417
module.exports = class NetworkWrapper {
1518
constructor (opts) {
@@ -59,63 +62,115 @@ module.exports = class NetworkWrapper {
5962
}
6063
}
6164

65+
/**
66+
* Resolve the endpoint of the node to make the request to
67+
* because each bucket might be on a different node
68+
* @param {String} bucketID the bucket id
69+
*
70+
* @return {Promise}
71+
*/
72+
_resolveBucketEndpoint (bucketID) {
73+
if (!bucketID) return Promise.reject(new Error(`Missing argument : bucketID`))
74+
return new Promise((resolve, reject) => {
75+
// try to resolve it from local cache
76+
const node = this._buckets
77+
.filter(bucket => bucket._id === bucketID)
78+
.map(bucket => bucket.node_cache)[0]
79+
// if found, return it
80+
if (node && node.endpoints) {
81+
return resolve(node.endpoints.web)
82+
}
83+
// otherwise we will need to resolve where the bucket is hosted
84+
this._axios.request({ url: `/api/bucket/${bucketID}`, method: 'GET' })
85+
.then((res) => {
86+
const bucket = res.data
87+
this._buckets.push(bucket)
88+
return resolve(bucket.node_cache.endpoints.web)
89+
}).catch(reject)
90+
})
91+
}
92+
6293
/**
6394
* Send a http request
6495
* @param {Object} opts
6596
* @param {String} [opts.method=GET] http method
6697
* @param {String} opts.url the full URL
6798
* @param {Object} [opts.data] body data
6899
* @param {Object} [opts.params] url params
100+
*
101+
* @return {Promise}
69102
*/
70103
request (httpOpts) {
71-
if (httpOpts.url.match(/bucket/)) {
72-
let bucketID = httpOpts.url.split('/')[3]
73-
let node = this._buckets.filter(bucket => bucket._id === bucketID).map(bucket => bucket.node_cache)[0]
74-
if (node && node.endpoints) {
75-
httpOpts.baseURL = node.endpoints.web
76-
}
77-
}
78-
79104
return new Promise((resolve, reject) => {
80-
if (this.authenticated === false && httpOpts.authentication === true) {
81-
loggerHttp(`Queued request to ${httpOpts.url}`)
82-
this._queue.push({
83-
resolve,
84-
reject,
85-
request: httpOpts
86-
})
87-
} else {
88-
loggerHttp(`Making request to ${httpOpts.url}`)
89-
this._axios.request(httpOpts)
90-
.then(resolve)
91-
.catch((error) => {
92-
let response = error.response
93-
// we only need to handle when code is 401 (which mean unauthenticated)
94-
if (response && response.status !== 401) return reject(response)
95-
loggerHttp(`Got unautenticated response, buffering request from now ...`)
96-
97-
// we tell the client to not send authenticated request anymore
98-
this.authenticated = false
99-
100-
loggerHttp(`Asking to the oauth flow to retrieve new tokens`)
101-
this.oauth_flow.retrieveTokens((err, data) => {
102-
// if it fail, we fail the whole request
103-
if (err) {
104-
loggerHttp(`Failed to retrieve new tokens : ${err.message || err}`)
105-
return reject(response)
106-
}
107-
// if its good, we try to update the tokens
108-
loggerHttp(`Succesfully retrieved new tokens`)
109-
this._updateTokens(null, data, (err, authenticated) => {
105+
async.series([
106+
// we need to verify that the baseURL is correct
107+
(next) => {
108+
if (!httpOpts.url.match(/bucket\/.+/)) return next()
109+
// parse the bucket id from URL
110+
let bucketID = httpOpts.url.split('/')[3]
111+
// we need to retrieve where to send the request depending on the backend
112+
this._resolveBucketEndpoint(bucketID)
113+
.then(endpoint => {
114+
httpOpts.baseURL = endpoint
115+
// then continue the flow
116+
return next()
117+
}).catch(next)
118+
},
119+
// verify that we don't need to buffer the request because authentication
120+
next => {
121+
if (this.authenticated === true || httpOpts.authentication === false) return next()
122+
123+
loggerHttp(`Queued request to ${httpOpts.url}`)
124+
this._queue.push({
125+
resolve,
126+
reject,
127+
request: httpOpts
128+
})
129+
// we need to stop the flow here
130+
return next(BUFFERIZED)
131+
},
132+
// if the request has not been bufferized, make the request
133+
next => {
134+
// super trick to transform a promise response to a callback
135+
const successNext = res => next(null, res)
136+
loggerHttp(`Making request to ${httpOpts.url}`)
137+
138+
this._axios.request(httpOpts)
139+
.then(successNext)
140+
.catch((error) => {
141+
let response = error.response
142+
// we only need to handle when code is 401 (which mean unauthenticated)
143+
if (response && response.status !== 401) return next(response)
144+
loggerHttp(`Got unautenticated response, buffering request from now ...`)
145+
146+
// we tell the client to not send authenticated request anymore
147+
this.authenticated = false
148+
149+
loggerHttp(`Asking to the oauth flow to retrieve new tokens`)
150+
this.oauth_flow.retrieveTokens((err, data) => {
110151
// if it fail, we fail the whole request
111-
if (err) return reject(response)
112-
// then we can rebuffer the request
113-
loggerHttp(`Re-buffering call to ${httpOpts.url} since authenticated now`)
114-
return this._axios.request(httpOpts).then(resolve)
152+
if (err) {
153+
loggerHttp(`Failed to retrieve new tokens : ${err.message || err}`)
154+
return next(response)
155+
}
156+
// if its good, we try to update the tokens
157+
loggerHttp(`Succesfully retrieved new tokens`)
158+
this._updateTokens(null, data, (err, authenticated) => {
159+
// if it fail, we fail the whole request
160+
if (err) return next(response)
161+
// then we can rebuffer the request
162+
loggerHttp(`Re-buffering call to ${httpOpts.url} since authenticated now`)
163+
return this._axios.request(httpOpts).then(successNext).catch(next)
164+
})
115165
})
116166
})
117-
})
118-
}
167+
}
168+
], (err, results) => {
169+
// if the flow is stoped because the request has been
170+
// buferred, we don't need to do anything
171+
if (err === BUFFERIZED) return
172+
return err ? reject(err) : resolve(results[2])
173+
})
119174
})
120175
}
121176

test/keymetrics.mocha.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
const Keymetrics = require('..')
66
const assert = require('assert')
7+
const async = require('async')
78

89
describe('Keymetrics Integration', () => {
910
let km = null
@@ -40,4 +41,24 @@ describe('Keymetrics Integration', () => {
4041
return done()
4142
}).catch(done)
4243
})
44+
45+
it('should retrieve some data from bucket', (done) => {
46+
let bucket
47+
async.series([
48+
next => {
49+
km.bucket.retrieveAll().then(res => {
50+
bucket = res.data[0]
51+
return next()
52+
}).catch(next)
53+
},
54+
next => {
55+
km.data.status.retrieve(bucket._id).then(res => {
56+
let status = res.data
57+
assert(res.status === 200)
58+
assert(status instanceof Array)
59+
return next()
60+
}).catch(next)
61+
}
62+
], done)
63+
})
4364
})

0 commit comments

Comments
 (0)