1
1
const fs = require ( 'fs' ) ;
2
+ const crypto = require ( 'crypto' ) ;
2
3
const { http, https } = require ( 'httpagent' ) ;
3
4
const { ObjectMD } = require ( 'arsenal' ) . models ;
4
5
5
- const AWS = require ( 'aws-sdk' ) ;
6
+ const { S3Client, ListObjectVersionsCommand, DeleteObjectsCommand } = require ( '@aws-sdk/client-s3' ) ;
7
+ const { NodeHttpHandler } = require ( '@aws-sdk/node-http-handler' ) ;
8
+ const { ConfiguredRetryStrategy } = require ( '@smithy/util-retry' ) ;
6
9
const { doWhilst, eachSeries, filterLimit } = require ( 'async' ) ;
7
10
8
11
const { Logger } = require ( 'werelogs' ) ;
9
12
10
13
const BackbeatClient = require ( './BackbeatClient' ) ;
11
14
const parseOlderThan = require ( './utils/parseOlderThan' ) ;
12
- const { safeListObjectVersions } = require ( './utils/safeList' ) ;
13
15
14
16
const log = new Logger ( 's3utils::cleanupNoncurrentVersions' ) ;
15
17
@@ -173,6 +175,33 @@ if (s3EndpointIsHttps) {
173
175
agent = new http . Agent ( { keepAlive : true } ) ;
174
176
}
175
177
178
+ const s3 = new S3Client ( {
179
+ credentials : {
180
+ accessKeyId : ACCESS_KEY ,
181
+ secretAccessKey : SECRET_KEY ,
182
+ } ,
183
+ endpoint : S3_ENDPOINT ,
184
+ region : 'us-east-1' ,
185
+ forcePathStyle : true ,
186
+ tls : s3EndpointIsHttps ,
187
+ requestHandler : new NodeHttpHandler ( {
188
+ httpAgent : agent ,
189
+ httpsAgent : agent ,
190
+ requestTimeout : 60000 ,
191
+ } ) ,
192
+ retryStrategy : new ConfiguredRetryStrategy (
193
+ AWS_SDK_REQUEST_RETRIES ,
194
+ // eslint-disable-next-line arrow-body-style
195
+ attempt => {
196
+ // Custom backoff with exponential delay capped at 1mn max
197
+ // between retries, and a little added jitter
198
+ return Math . min ( AWS_SDK_REQUEST_INITIAL_DELAY_MS
199
+ * 2 ** attempt , 60000 )
200
+ * ( 0.9 + Math . random ( ) * 0.2 ) ;
201
+ }
202
+ ) ,
203
+ } ) ;
204
+
176
205
const options = {
177
206
accessKeyId : ACCESS_KEY ,
178
207
secretAccessKey : SECRET_KEY ,
@@ -207,7 +236,6 @@ const s3Options = {
207
236
208
237
const opt = Object . assign ( options , s3Options ) ;
209
238
210
- const s3 = new AWS . S3 ( opt ) ;
211
239
const bb = new BackbeatClient ( opt ) ;
212
240
213
241
let nListed = 0 ;
@@ -244,13 +272,17 @@ const logProgressInterval = setInterval(
244
272
) ;
245
273
246
274
function _listObjectVersions ( bucket , VersionIdMarker , KeyMarker , cb ) {
247
- return safeListObjectVersions ( s3 , {
275
+ const command = new ListObjectVersionsCommand ( {
248
276
Bucket : bucket ,
249
277
MaxKeys : LISTING_LIMIT ,
250
278
Prefix : TARGET_PREFIX ,
251
279
KeyMarker,
252
280
VersionIdMarker,
253
- } , cb ) ;
281
+ } ) ;
282
+
283
+ s3 . send ( command )
284
+ . then ( data => cb ( null , data ) )
285
+ . catch ( cb ) ;
254
286
}
255
287
256
288
function _getMetadata ( bucket , key , versionId , cb ) {
@@ -297,13 +329,37 @@ function _doBatchDelete(bucket) {
297
329
batchDeleteInProgress = true ;
298
330
// multi object delete can delete max 1000 objects
299
331
const batchDeleteObjects = deleteQueue . splice ( 0 , 1000 ) ;
300
- const params = {
332
+ const command = new DeleteObjectsCommand ( {
301
333
Bucket : bucket ,
302
334
Delete : { Objects : batchDeleteObjects } ,
303
- } ;
304
- s3 . deleteObjects ( params , err => {
305
- if ( err ) {
306
- log . error ( 'batch delete error' , { error : err } ) ;
335
+ } ) ;
336
+
337
+ command . middlewareStack . add (
338
+ next => async args => {
339
+ if ( args . request . body ) {
340
+ const bodyContent = Buffer . from ( args . request . body ) ;
341
+ const md5Hash = crypto . createHash ( 'md5' ) . update ( bodyContent ) . digest ( 'base64' ) ;
342
+ // eslint-disable-next-line no-param-reassign
343
+ args . request . headers [ 'Content-MD5' ] = md5Hash ;
344
+ }
345
+ return await next ( args ) ;
346
+ } ,
347
+ {
348
+ step : 'build' ,
349
+ }
350
+ ) ;
351
+
352
+ s3 . send ( command )
353
+ . then ( ( ) => {
354
+ nDeleted += batchDeleteObjects . length ;
355
+ batchDeleteObjects . forEach ( v => log . info ( 'object deleted' , {
356
+ bucket,
357
+ key : v . Key ,
358
+ versionId : v . VersionId ,
359
+ } ) ) ;
360
+ } )
361
+ . catch ( err => {
362
+ log . error ( 'batch delete error' , { error : err } ) ;
307
363
nErrors += 1 ;
308
364
batchDeleteObjects . forEach (
309
365
v => log . error ( 'object may not be deleted' , {
@@ -312,29 +368,23 @@ function _doBatchDelete(bucket) {
312
368
versionId : v . VersionId ,
313
369
} ) ,
314
370
) ;
315
- } else {
316
- nDeleted += batchDeleteObjects . length ;
317
- batchDeleteObjects . forEach ( v => log . info ( 'object deleted' , {
318
- bucket,
319
- key : v . Key ,
320
- versionId : v . VersionId ,
321
- } ) ) ;
322
- }
323
- if ( batchDeleteOnDrain && deleteQueue . length <= 1000 ) {
324
- process . nextTick ( batchDeleteOnDrain ) ;
325
- batchDeleteOnDrain = null ;
326
- }
327
- if ( batchDeleteOnFullDrain && deleteQueue . length === 0 ) {
328
- process . nextTick ( batchDeleteOnFullDrain ) ;
329
- batchDeleteOnFullDrain = null ;
330
- }
331
- if ( deleteQueue . length > 0 ) {
332
- // there are more objects to delete, keep going
333
- _doBatchDelete ( bucket ) ;
334
- } else {
335
- batchDeleteInProgress = false ;
336
- }
337
- } ) ;
371
+ } )
372
+ . finally ( ( ) => {
373
+ if ( batchDeleteOnDrain && deleteQueue . length <= 1000 ) {
374
+ process . nextTick ( batchDeleteOnDrain ) ;
375
+ batchDeleteOnDrain = null ;
376
+ }
377
+ if ( batchDeleteOnFullDrain && deleteQueue . length === 0 ) {
378
+ process . nextTick ( batchDeleteOnFullDrain ) ;
379
+ batchDeleteOnFullDrain = null ;
380
+ }
381
+ if ( deleteQueue . length > 0 ) {
382
+ // there are more objects to delete, keep going
383
+ _doBatchDelete ( bucket ) ;
384
+ } else {
385
+ batchDeleteInProgress = false ;
386
+ }
387
+ } ) ;
338
388
}
339
389
340
390
function _triggerDeletes ( bucket , versionsToDelete , cb ) {
@@ -565,11 +615,13 @@ function triggerDeletesOnBucket(bucketName, cb) {
565
615
} ) ;
566
616
return done ( err ) ;
567
617
}
568
- nListed += data . Versions . length + data . DeleteMarkers . length ;
618
+ const versions = data . Versions || [ ] ;
619
+ const deleteMarkers = data . DeleteMarkers || [ ] ;
620
+ nListed += versions . length + deleteMarkers . length ;
569
621
const ret = _triggerDeletesOnEligibleObjects (
570
622
bucket ,
571
- data . Versions ,
572
- data . DeleteMarkers ,
623
+ versions ,
624
+ deleteMarkers ,
573
625
! data . IsTruncated ,
574
626
err => {
575
627
if ( err ) {
0 commit comments