Skip to content

Commit 0b5f8de

Browse files
migrate to sdk v3 for bucketVersionsStats
Issue: S3UTILS-200
1 parent c156b94 commit 0b5f8de

File tree

1 file changed

+101
-98
lines changed

1 file changed

+101
-98
lines changed

bucketVersionsStats.js

Lines changed: 101 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
const fs = require('fs');
22
const { http, https } = require('httpagent');
33

4-
const AWS = require('aws-sdk');
5-
const { doWhilst } = require('async');
4+
const { S3Client, ListObjectVersionsCommand } = require('@aws-sdk/client-s3');
5+
const { NodeHttpHandler } = require('@aws-sdk/node-http-handler');
6+
const { ConfiguredRetryStrategy } = require('@smithy/util-retry');
67

78
const { Logger } = require('werelogs');
89

910
const parseOlderThan = require('./utils/parseOlderThan');
10-
const { safeListObjectVersions } = require('./utils/safeList');
1111

1212
const log = new Logger('s3utils::bucketVersionsStats');
1313
const { ENDPOINT } = process.env;
@@ -97,38 +97,32 @@ if (s3EndpointIsHttps) {
9797
agent = new http.Agent({ keepAlive: true });
9898
}
9999

100-
const options = {
101-
accessKeyId: ACCESS_KEY,
102-
secretAccessKey: SECRET_KEY,
100+
const s3 = new S3Client({
101+
credentials: {
102+
accessKeyId: ACCESS_KEY,
103+
secretAccessKey: SECRET_KEY,
104+
},
103105
endpoint: ENDPOINT,
104106
region: 'us-east-1',
105-
sslEnabled: s3EndpointIsHttps,
106-
s3ForcePathStyle: true,
107-
apiVersions: { s3: '2006-03-01' },
108-
signatureVersion: 'v4',
109-
signatureCache: false,
110-
httpOptions: {
111-
timeout: 0,
112-
agent,
113-
},
114-
};
115-
/**
116-
* Options specific to s3 requests
117-
* `maxRetries` & `customBackoff` are set only to s3 requests
118-
* default aws sdk retry count is 3 with an exponential delay of 2^n * 30 ms
119-
*/
120-
const s3Options = {
121-
maxRetries: AWS_SDK_REQUEST_RETRIES,
122-
customBackoff: (retryCount, error) => {
123-
log.error('aws sdk request error', { error, retryCount });
124-
// retry with exponential backoff delay capped at 1mn max
125-
// between retries, and a little added jitter
126-
return Math.min(AWS_SDK_REQUEST_INITIAL_DELAY_MS
127-
* 2 ** retryCount, 60000)
128-
* (0.9 + Math.random() * 0.2);
129-
},
130-
};
131-
const s3 = new AWS.S3(Object.assign(options, s3Options));
107+
forcePathStyle: true,
108+
tls: s3EndpointIsHttps,
109+
requestHandler: new NodeHttpHandler({
110+
httpAgent: agent,
111+
httpsAgent: agent,
112+
requestTimeout: 60000,
113+
}),
114+
retryStrategy: new ConfiguredRetryStrategy(
115+
AWS_SDK_REQUEST_RETRIES,
116+
// eslint-disable-next-line arrow-body-style
117+
attempt => {
118+
// Custom backoff with exponential delay capped at 1mn max
119+
// between retries, and a little added jitter
120+
return Math.min(AWS_SDK_REQUEST_INITIAL_DELAY_MS
121+
* 2 ** attempt, 60000)
122+
* (0.9 + Math.random() * 0.2);
123+
}
124+
),
125+
});
132126

133127
const stats = {
134128
current: {
@@ -147,10 +141,17 @@ let VersionIdMarker;
147141
function _logProgress(message) {
148142
const loggedStats = {
149143
total: {
150-
count: BigInt(stats.current.count + stats.noncurrent.count),
151-
size: BigInt(stats.current.size + stats.noncurrent.size),
144+
count: String(stats.current.count + stats.noncurrent.count),
145+
size: String(stats.current.size + stats.noncurrent.size),
146+
},
147+
current: {
148+
count: String(stats.current.count),
149+
size: String(stats.current.size),
150+
},
151+
noncurrent: {
152+
count: String(stats.noncurrent.count),
153+
size: String(stats.noncurrent.size),
152154
},
153-
...stats,
154155
};
155156
log.info(message, {
156157
bucket: BUCKET,
@@ -166,67 +167,65 @@ const logProgressInterval = setInterval(
166167
LOG_PROGRESS_INTERVAL_MS,
167168
);
168169

169-
function _listObjectVersions(bucket, KeyMarker, VersionIdMarker, cb) {
170-
return safeListObjectVersions(s3, {
171-
Bucket: bucket,
172-
MaxKeys: LISTING_LIMIT,
173-
Prefix: TARGET_PREFIX,
174-
KeyMarker,
175-
VersionIdMarker,
176-
}, cb);
177-
}
178-
179-
180-
function listBucket(bucket, cb) {
170+
async function listBucket(bucket) {
181171
let NextKeyMarker = KEY_MARKER;
182172
let NextVersionIdMarker = VERSION_ID_MARKER;
183-
return doWhilst(
184-
done => {
185-
KeyMarker = NextKeyMarker;
186-
VersionIdMarker = NextVersionIdMarker;
187-
_listObjectVersions(bucket, KeyMarker, VersionIdMarker, (err, data) => {
188-
if (err) {
189-
log.error('error listing object versions', {
190-
error: err,
191-
});
192-
return done(err);
193-
}
194-
for (const version of data.Versions) {
195-
if (_OLDER_THAN_TIMESTAMP) {
196-
const parsed = new Date(version.LastModified);
197-
if (Number.isNaN(parsed.getTime()) || parsed > _OLDER_THAN_TIMESTAMP) {
198-
continue;
199-
}
200-
}
201-
const statObj = version.IsLatest ? stats.current : stats.noncurrent;
202-
statObj.count += 1n;
203-
statObj.size += version.Size || 0n;
204-
if (VERBOSE) {
205-
log.info('version info', {
206-
bucket: BUCKET,
207-
key: version.Key,
208-
versionId: version.VersionId,
209-
isLatest: version.IsLatest,
210-
lastModified: version.LastModified,
211-
size: version.Size,
212-
});
173+
174+
while (true) {
175+
KeyMarker = NextKeyMarker;
176+
VersionIdMarker = NextVersionIdMarker;
177+
178+
const command = new ListObjectVersionsCommand({
179+
Bucket: bucket,
180+
MaxKeys: LISTING_LIMIT,
181+
Prefix: TARGET_PREFIX,
182+
KeyMarker,
183+
VersionIdMarker,
184+
});
185+
186+
try {
187+
const data = await s3.send(command);
188+
const versions = data.Versions || [];
189+
for (const version of versions) {
190+
if (_OLDER_THAN_TIMESTAMP) {
191+
const parsed = new Date(version.LastModified);
192+
if (Number.isNaN(parsed.getTime()) || parsed > _OLDER_THAN_TIMESTAMP) {
193+
continue;
213194
}
214195
}
215-
NextKeyMarker = data.NextKeyMarker;
216-
NextVersionIdMarker = data.NextVersionIdMarker;
217-
return done();
218-
});
219-
},
220-
() => {
221-
if (NextKeyMarker || NextVersionIdMarker) {
222-
return true;
196+
const statObj = version.IsLatest ? stats.current : stats.noncurrent;
197+
statObj.count += 1n;
198+
statObj.size += BigInt(version.Size || 0);
199+
if (VERBOSE) {
200+
log.info('version info', {
201+
bucket: BUCKET,
202+
key: version.Key,
203+
versionId: version.VersionId,
204+
isLatest: version.IsLatest,
205+
lastModified: version.LastModified,
206+
size: version.Size,
207+
});
208+
}
223209
}
224-
KeyMarker = undefined;
225-
VersionIdMarker = undefined;
226-
return false;
227-
},
228-
cb,
229-
);
210+
211+
NextKeyMarker = data.NextKeyMarker;
212+
NextVersionIdMarker = data.NextVersionIdMarker;
213+
214+
if (!NextKeyMarker && !NextVersionIdMarker) {
215+
break;
216+
}
217+
} catch (error) {
218+
log.error('error listing object versions', {
219+
bucket: bucket,
220+
keyMarker: KeyMarker,
221+
versionIdMarker: VersionIdMarker,
222+
error: error,
223+
errorName: error.name,
224+
errorMessage: error.message,
225+
});
226+
throw error;
227+
}
228+
}
230229
}
231230

232231
function shutdown(exitCode) {
@@ -235,20 +234,24 @@ function shutdown(exitCode) {
235234
process.exit(exitCode);
236235
}
237236

238-
listBucket(BUCKET, err => {
239-
if (err) {
237+
async function main() {
238+
try {
239+
await listBucket(BUCKET);
240+
_logProgress('final summary');
241+
shutdown(0);
242+
} catch (error) {
240243
log.error('error during execution', {
241244
bucket: BUCKET,
242245
KeyMarker,
243246
VersionIdMarker,
247+
error,
244248
});
245249
_logProgress('summary after error');
246250
shutdown(1);
247-
} else {
248-
_logProgress('final summary');
249-
shutdown(0);
250251
}
251-
});
252+
}
253+
254+
main();
252255

253256
function stop() {
254257
log.warn('stopping execution');

0 commit comments

Comments
 (0)