This repository has been archived by the owner on Nov 14, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
S3manager.py
executable file
·285 lines (251 loc) · 9.32 KB
/
S3manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
#!/usr/bin/python3
"""
Simple S3 manager
:created: 04-19-2016
:updated: 04-21-2016
:author: Jacopo Scrinzi <[email protected]>
"""
import configparser
import boto3
import sys
import os
import argparse
def get_s3_client(credential):
"""
Get S3 boto3 client
:param credential: configparser.ConfigParser
:return: boto3.client
"""
if "~" in credential:
credential = credential.replace("~", os.path.expanduser("~"))
config = configparser.ConfigParser()
read = config.read(credential)
if len(read) == 0:
raise Exception("ERROR: AWS credential file not found in {}".format(
credential))
if "default" not in config.sections():
raise Exception("ERROR: [default] section not found in {}".format(
credential))
default = dict(config["default"])
return boto3.client("s3", **default)
def bucket_exists(bucket_name, s3_client):
"""
Check if bucket exists on Amazon s3
:param bucket_name: str
:param s3_client: boto3.clint
:return: bool
"""
buckets = s3_client.list_buckets()
for bucket in buckets["Buckets"]:
if bucket["Name"] == bucket_name:
return True
return False
def bucket_content(bucket_name, s3_client):
"""
Get S3 bucket content
"""
bucket = s3_client.list_objects(Bucket=bucket_name)
content = []
if "Contents" not in bucket:
return content
for key in bucket["Contents"]:
content.append(key["Key"])
return content
def file_check(filename, bucket_name, s3_client):
"""
Check file exists in S3 bucket
:param filename: str
:param bucket_name: str
:param s3_client: boto3.clint
:return: bool
"""
content = bucket_content(bucket_name, s3_client)
if filename not in content:
return False
return True
def upload_file(s3_client, bucket_name, bucket_file, host_file, force=False):
"""
Upload File to Amazon S3
:param s3_client: boto3.client
:param bucket_name: str
:param bucket_file: str
:param host_file: str
:param force: bool
:return: bool
"""
if not bucket_exists(bucket_name, s3_client):
s3_client.create_bucket(Bucket=bucket_name)
if not bucket_exists(bucket_name, s3_client):
sys.stderr.write("ERROR: Unable to create bucket: {}\n".format(
bucket_name))
return False
if not os.path.isfile(host_file):
sys.stderr.write("ERROR: {} not found\n".format(host_file))
return False
if file_check(bucket_file, bucket_name, s3_client):
if force is not True:
sys.stderr.write("ERROR: {} already present\n".format(bucket_file))
return False
s3_client.upload_file(host_file, bucket_name, bucket_file)
if not file_check(bucket_file, bucket_name, s3_client):
sys.stderr.write("ERROR: Unable to upload {}\n".format(host_file))
return False
sys.stdout.write("File: {} uploaded successfully to {} in bucket\n".
format(host_file, bucket_file, bucket_name))
return True
def download_file(s3_client, bucket_name, bucket_file, host_file, force=False):
"""
Download file from Amazon S3
:param s3_client: boto3.client
:param bucket_name: str
:param bucket_file: str
:param host_file: str
:param force: bool
:return: bool
"""
dst_dir = "/".join(host_file.split("/")[:-1])
if dst_dir:
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
if not file_check(bucket_file, bucket_name, s3_client):
sys.stderr.write("ERROR: {} not found in bucket: {}\n".format(
bucket_file, bucket_name))
return False
if os.path.isfile(host_file):
if force is not True:
sys.stderr.write("ERROR: {} already exists\n".format(host_file))
return False
s3_client.download_file(bucket_name, bucket_file, host_file)
if not os.path.isfile(host_file):
sys.stderr.write(
"ERROR: Unable to download {} to {} from bucket: {}\n".format(
bucket_file, host_file, bucket_name))
return False
sys.stdout.write("File:{} downloaded successfully to {}\n".
format(bucket_file, host_file))
return True
def delete_file(s3_client, bucket_name, bucket_file):
"""
Delete file from S3 bucket
:param s3_client: boto3.client
:param bucket_name: str
:param bucket_file: str
:return: bool
"""
if not bucket_exists(bucket_name, s3_client):
sys.stderr.write("ERROR: Unable to find bucket {}\n".
format(bucket_name))
return False
if not file_check(bucket_file, bucket_name, s3_client):
sys.stderr.write("ERROR: Unable to find {} in bucket {}\n".
format(bucket_file, bucket_name))
return False
s3_client.delete_object(Bucket=bucket_name, Key=bucket_file)
if file_check(bucket_file, bucket_name, s3_client):
sys.stderr.write("ERROR: Unable to delete {} in bucket {}\n".
format(bucket_file, bucket_name))
return False
sys.stdout.write("File: {} deleted successfully\n".format(bucket_file))
return True
def delete_all_files(s3_client, bucket_name):
"""
Delete all files in S3 Bucket
:param s3_client: boto3.client
:param bucket_name: str
:return: bool
"""
content = bucket_content(bucket_name, s3_client)
for bucket_file in content:
delete_file(s3_client, bucket_name, bucket_file)
def delete_bucket(s3_client, bucket_name, force=False):
"""
Delete S3 Bucket
:param s3_client: boto3.client
:param bucket_name: str
:param force: bool
:return: bool
"""
if not bucket_exists(bucket_name, s3_client):
sys.stdout.write("ERROR: {} does not exists\n".format(bucket_name))
return False
content = bucket_content(bucket_name, s3_client)
if content:
if force is not True:
sys.stdout.write("ERROR: {} not empty\n".format(bucket_name))
return False
delete_all_files(s3_client, bucket_name)
s3_client.delete_bucket(Bucket=bucket_name)
if bucket_exists(bucket_name, s3_client):
sys.stderr.write("ERROR: unable to delete bucket {}\n".
format(bucket_name))
return False
sys.stdout.write("Bucket:{} deleted successfully\n".format(bucket_name))
return True
def exit_code(arg):
"""
Exit with the right exit code depending on the arg passed
:param arg: bool
:return: void
"""
if arg is True:
sys.exit(0)
sys.exit(1)
def main(parser):
"""
Main function
:param parser: argparse
"""
args = parser.parse_args()
try:
s3 = get_s3_client(args.credential)
except Exception as e:
sys.stderr.write("{}\n".format(str(e)))
sys.exit(1)
if args.subparser_name == "download":
exit_code(download_file(
s3, args.bucket_name, args.bucket_file, args.host_file, args.f))
elif args.subparser_name == "upload":
exit_code(upload_file(
s3, args.bucket_name, args.bucket_file, args.host_file, args.f))
elif args.subparser_name == "delete-file":
exit_code(delete_file(s3, args.bucket_name, args.bucket_file))
elif args.subparser_name == "delete-all-files":
exit_code(delete_all_files(s3, args.bucket_name))
elif args.subparser_name == "delete-bucket":
exit_code(delete_bucket(s3, args.bucket_name, args.f))
else:
parser.print_help()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="S3 Manager", description="Amazon S3 manager")
subparsers = parser.add_subparsers(
dest="subparser_name", help="sub-command help")
parser_download = subparsers.add_parser(
"download", help="Download file from S3 bucket")
parser_upload = subparsers.add_parser(
"upload", help="Upload file to S3 bucket")
parser_delete_file = subparsers.add_parser(
"delete-file", help="Delete file in S3 bucket")
parser_delete_allfiles = subparsers.add_parser(
"delete-all-files", help="Delete all files in S3 bucket")
parser_delete_bucket = subparsers.add_parser(
"delete-bucket", help="Delete S3 bucket")
parser_download.add_argument("-f", action='store_true', help="Force")
parser_download.add_argument("bucket_name", help="S3 bucket name")
parser_download.add_argument(
"bucket_file", help="Full path file in Bucket")
parser_download.add_argument("host_file", help="Full path file on host")
parser_upload.add_argument("-f", action='store_true', help="Force")
parser_upload.add_argument("bucket_name", help="S3 bucket name")
parser_upload.add_argument("bucket_file", help="Full path file in Bucket")
parser_upload.add_argument("host_file", help="Full path file on host")
parser_delete_file.add_argument("bucket_name", help="S3 bucket name")
parser_delete_file.add_argument(
"bucket_file", help="Full path file in Bucket")
parser_delete_allfiles.add_argument("bucket_name", help="S3 bucket name")
parser_delete_bucket.add_argument("-f", action='store_true', help="Force")
parser_delete_bucket.add_argument("bucket_name", help="S3 bucket name")
parser.add_argument(
"-c", "--credential", default="~/.aws/credentials",
help="Path AWS credential file")
main(parser)