1
- const { http } = require ( 'httpagent' ) ;
2
-
3
- const async = require ( 'async' ) ;
4
- const AWS = require ( 'aws-sdk' ) ;
1
+ const { http, https } = require ( 'httpagent' ) ;
2
+ const {
3
+ S3Client,
4
+ ListObjectVersionsCommand,
5
+ DeleteObjectsCommand,
6
+ ListMultipartUploadsCommand,
7
+ AbortMultipartUploadCommand,
8
+ } = require ( '@aws-sdk/client-s3' ) ;
9
+ const { NodeHttpHandler } = require ( '@aws-sdk/node-http-handler' ) ;
5
10
const { Logger } = require ( 'werelogs' ) ;
6
11
7
12
const log = new Logger ( 's3utils::emptyBucket' ) ;
@@ -29,36 +34,23 @@ if (!SECRET_KEY) {
29
34
}
30
35
const LISTING_LIMIT = 1000 ;
31
36
32
- AWS . config . update ( {
33
- accessKeyId : ACCESS_KEY ,
34
- secretAccessKey : SECRET_KEY ,
37
+ const s3 = new S3Client ( {
38
+ credentials : {
39
+ accessKeyId : ACCESS_KEY ,
40
+ secretAccessKey : SECRET_KEY ,
41
+ } ,
35
42
endpoint : ENDPOINT ,
36
43
region : 'us-east-1' ,
37
- sslEnabled : false ,
38
- s3ForcePathStyle : true ,
39
- apiVersions : { s3 : '2006-03-01' } ,
40
- signatureVersion : 'v4' ,
41
- signatureCache : false ,
44
+ forcePathStyle : true ,
45
+ requestHandler : new NodeHttpHandler ( {
46
+ httpAgent : new http . Agent ( { keepAlive : true } ) ,
47
+ httpsAgent : new https . Agent ( {
48
+ keepAlive : true ,
49
+ rejectUnauthorized : false
50
+ } ) ,
51
+ } ) ,
42
52
} ) ;
43
53
44
- const s3 = new AWS . S3 ( {
45
- httpOptions : {
46
- maxRetries : 0 ,
47
- timeout : 0 ,
48
- agent : new http . Agent ( { keepAlive : true } ) ,
49
- } ,
50
- } ) ;
51
-
52
- // list object versions
53
- function _listObjectVersions ( bucket , VersionIdMarker , KeyMarker , cb ) {
54
- return s3 . listObjectVersions ( {
55
- Bucket : bucket ,
56
- MaxKeys : LISTING_LIMIT ,
57
- VersionIdMarker,
58
- KeyMarker,
59
- } , cb ) ;
60
- }
61
-
62
54
// return object with key and version_id
63
55
function _getKeys ( keys ) {
64
56
return keys . map ( v => ( {
@@ -68,97 +60,91 @@ function _getKeys(keys) {
68
60
}
69
61
70
62
// delete all versions of an object
71
- function _deleteVersions ( bucket , objectsToDelete , cb ) {
72
- // multi object delete can delete max 1000 objects
63
+ async function _deleteVersions ( bucket , objectsToDelete ) {
73
64
const params = {
74
65
Bucket : bucket ,
75
66
Delete : { Objects : objectsToDelete } ,
76
67
} ;
77
- s3 . deleteObjects ( params , err => {
78
- if ( err ) {
79
- log . error ( 'batch delete err' , err ) ;
80
- return cb ( err ) ;
81
- }
68
+ const command = new DeleteObjectsCommand ( params ) ;
69
+ try {
70
+ await s3 . send ( command ) ;
82
71
objectsToDelete . forEach ( v => log . info ( `deleted key: ${ v . Key } ` ) ) ;
83
- return cb ( ) ;
84
- } ) ;
72
+ } catch ( err ) {
73
+ log . error ( 'batch delete err' , err ) ;
74
+ throw err ;
75
+ }
85
76
}
86
77
87
- function cleanupVersions ( bucket , cb ) {
78
+ async function cleanupVersions ( bucket ) {
88
79
let VersionIdMarker = null ;
89
80
let KeyMarker = null ;
90
- async . doWhilst (
91
- done => _listObjectVersions (
92
- bucket ,
81
+ let IsTruncated = true ;
82
+
83
+ while ( IsTruncated ) {
84
+ const data = await s3 . send ( new ListObjectVersionsCommand ( {
85
+ Bucket : bucket ,
86
+ MaxKeys : LISTING_LIMIT ,
93
87
VersionIdMarker,
94
88
KeyMarker,
95
- ( err , data ) => {
96
- if ( err ) {
97
- return done ( err ) ;
98
- }
99
- VersionIdMarker = data . NextVersionIdMarker ;
100
- KeyMarker = data . NextKeyMarker ;
101
- const keysToDelete = _getKeys ( data . Versions ) ;
102
- const markersToDelete = _getKeys ( data . DeleteMarkers ) ;
103
- return _deleteVersions (
104
- bucket ,
105
- keysToDelete . concat ( markersToDelete ) ,
106
- done ,
107
- ) ;
108
- } ,
109
- ) ,
110
- ( ) => {
111
- if ( VersionIdMarker || KeyMarker ) {
112
- return true ;
113
- }
114
- return false ;
115
- } ,
116
- cb ,
117
- ) ;
89
+ } ) ) ;
90
+
91
+ VersionIdMarker = data . NextVersionIdMarker ;
92
+ KeyMarker = data . NextKeyMarker ;
93
+ IsTruncated = data . IsTruncated ;
94
+ const keysToDelete = _getKeys ( data . Versions || [ ] ) ;
95
+ const markersToDelete = _getKeys ( data . DeleteMarkers || [ ] ) ;
96
+ const allObjectsToDelete = keysToDelete . concat ( markersToDelete ) ;
97
+
98
+ if ( allObjectsToDelete . length > 0 ) {
99
+ await _deleteVersions ( bucket , allObjectsToDelete ) ;
100
+ } else {
101
+ log . info ( `No objects to delete for bucket ${ bucket } ` ) ;
102
+ }
103
+ }
118
104
}
119
105
120
- function abortAllMultipartUploads ( bucket , cb ) {
121
- s3 . listMultipartUploads ( { Bucket : bucket } , ( err , res ) => {
122
- if ( err ) {
123
- return cb ( err ) ;
124
- }
125
- if ( ! res || ! res . Uploads ) {
126
- return cb ( ) ;
127
- }
128
- return async . mapLimit (
129
- res . Uploads ,
130
- 10 ,
131
- ( item , done ) => {
132
- const { Key, UploadId } = item ;
133
- const params = { Bucket : bucket , Key, UploadId } ;
134
- s3 . abortMultipartUpload ( params , done ) ;
135
- } ,
136
- cb ,
137
- ) ;
138
- } ) ;
106
+ async function abortAllMultipartUploads ( bucket ) {
107
+ const res = await s3 . send ( new ListMultipartUploadsCommand ( { Bucket : bucket } ) ) ;
108
+ log . info ( `Found ${ res . Uploads ? res . Uploads . length : 0 } multipart uploads to abort` ) ;
109
+
110
+ if ( ! res || ! res . Uploads || res . Uploads . length === 0 ) {
111
+ return ;
112
+ }
113
+
114
+ const CONCURRENCY = 10 ;
115
+ for ( let i = 0 ; i < res . Uploads . length ; i += CONCURRENCY ) {
116
+ const batch = res . Uploads . slice ( i , i + CONCURRENCY ) ;
117
+ const deleteMpuPromises = batch . map ( async item => {
118
+ const { Key, UploadId } = item ;
119
+ const params = { Bucket : bucket , Key, UploadId } ;
120
+ return await s3 . send ( new AbortMultipartUploadCommand ( params ) ) ;
121
+ } ) ;
122
+ await Promise . all ( deleteMpuPromises ) ;
123
+ }
139
124
}
140
125
141
- function _cleanupBucket ( bucket , cb ) {
142
- async . parallel ( [
143
- done => cleanupVersions ( bucket , done ) ,
144
- done => abortAllMultipartUploads ( bucket , done ) ,
145
- ] , err => {
146
- if ( err ) {
147
- log . error ( 'error occured deleting objects' , err ) ;
148
- return cb ( err ) ;
149
- }
126
+ async function _cleanupBucket ( bucket ) {
127
+ try {
128
+ await Promise . all ( [
129
+ cleanupVersions ( bucket ) ,
130
+ abortAllMultipartUploads ( bucket ) ,
131
+ ] ) ;
150
132
log . info ( `completed cleaning up of bucket: ${ bucket } ` ) ;
151
- return cb ( ) ;
152
- } ) ;
133
+ } catch ( err ) {
134
+ log . error ( 'error occured deleting objects' , err ) ;
135
+ throw err ;
136
+ }
153
137
}
154
138
155
- function cleanupBuckets ( buckets ) {
156
- async . mapLimit ( buckets , 1 , _cleanupBucket , err => {
157
- if ( err ) {
158
- return log . error ( 'error occured deleting objects' , err ) ;
139
+ async function cleanupBuckets ( buckets ) {
140
+ try {
141
+ for ( const bucket of buckets ) {
142
+ await _cleanupBucket ( bucket ) ;
159
143
}
160
- return log . info ( 'completed cleaning up the given buckets' ) ;
161
- } ) ;
144
+ log . info ( 'completed cleaning all buckets' ) ;
145
+ } catch ( err ) {
146
+ log . error ( 'error occured deleting objects' , err ) ;
147
+ }
162
148
}
163
149
164
150
cleanupBuckets ( BUCKETS ) ;
0 commit comments