diff --git a/S3/Config.py b/S3/Config.py index 9a520ab..ebce45c 100644 --- a/S3/Config.py +++ b/S3/Config.py @@ -77,6 +77,9 @@ class Config(object): parallel = False workers = 10 follow_symlinks=False + select_dir = False + max_retries = 5 + retry_delay = 3 ## Creating a singleton def __new__(self, configfile = None): diff --git a/S3/S3.py b/S3/S3.py index d277648..c87493f 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -47,7 +47,7 @@ def update_timestamp(self): def format_param_str(self): """ Format URL parameters from self.params and returns - ?parm1=val1&parm2=val2 or an empty string if there + ?parm1=val1&parm2=val2 or an empty string if there are no parameters. Output of this function should be appended directly to self.resource['uri'] """ @@ -119,9 +119,6 @@ class S3(object): ## S3 sometimes sends HTTP-307 response redir_map = {} - ## Maximum attempts of re-issuing failed requests - _max_retries = 5 - def __init__(self, config): self.config = config @@ -458,9 +455,11 @@ def create_request(self, operation, uri = None, bucket = None, object = None, he def _fail_wait(self, retries): # Wait a few seconds. The more it fails the more we wait. - return (self._max_retries - retries + 1) * 3 + return (self.config.max_retries - retries + 1) * self.config.retry_delay - def send_request(self, request, body = None, retries = _max_retries): + def send_request(self, request, body = None, retries = -1): + if retries == -1: + retries = self.config.max_retries method_string, resource, headers = request.get_triplet() debug("Processing request, please wait...") if not headers.has_key('content-length'): @@ -479,8 +478,11 @@ def send_request(self, request, body = None, retries = _max_retries): except Exception, e: if retries: warning("Retrying failed request: %s (%s)" % (resource['uri'], e)) - warning("Waiting %d sec..." % self._fail_wait(retries)) - time.sleep(self._fail_wait(retries)) + + if self._fail_wait(retries) > 0: + warning("Waiting %d sec..." % self._fail_wait(retries)) + time.sleep(self._fail_wait(retries)) + return self.send_request(request, body, retries - 1) else: raise S3RequestError("Request failed for: %s" % resource['uri']) @@ -498,8 +500,11 @@ def send_request(self, request, body = None, retries = _max_retries): if retries: warning(u"Retrying failed request: %s" % resource['uri']) warning(unicode(e)) - warning("Waiting %d sec..." % self._fail_wait(retries)) - time.sleep(self._fail_wait(retries)) + + if self._fail_wait(retries) > 0: + warning("Waiting %d sec..." % self._fail_wait(retries)) + time.sleep(self._fail_wait(retries)) + return self.send_request(request, body, retries - 1) else: raise e @@ -509,7 +514,9 @@ def send_request(self, request, body = None, retries = _max_retries): return response - def send_file(self, request, file, labels, throttle = 0, retries = _max_retries): + def send_file(self, request, file, labels, throttle = 0, retries = -1): + if retries == -1: + retries = self.config.max_retries method_string, resource, headers = request.get_triplet() size_left = size_total = headers.get("content-length") if self.config.progress_meter: @@ -529,8 +536,11 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) progress.done("failed") if retries: warning("Retrying failed request: %s (%s)" % (resource['uri'], e)) - warning("Waiting %d sec..." % self._fail_wait(retries)) - time.sleep(self._fail_wait(retries)) + + if self._fail_wait(retries) > 0: + warning("Waiting %d sec..." % self._fail_wait(retries)) + time.sleep(self._fail_wait(retries)) + # Connection error -> same throttle value return self.send_file(request, file, labels, throttle, retries - 1) else: @@ -562,12 +572,15 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) if self.config.progress_meter: progress.done("failed") if retries: - if retries < self._max_retries: + if retries < self.config.max_retries: throttle = throttle and throttle * 5 or 0.01 warning("Upload failed: %s (%s)" % (resource['uri'], e)) warning("Retrying on lower speed (throttle=%0.2f)" % throttle) - warning("Waiting %d sec..." % self._fail_wait(retries)) - time.sleep(self._fail_wait(retries)) + + if self._fail_wait(retries) > 0: + warning("Waiting %d sec..." % self._fail_wait(retries)) + time.sleep(self._fail_wait(retries)) + # Connection error -> same throttle value return self.send_file(request, file, labels, throttle, retries - 1) else: @@ -612,8 +625,11 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) if try_retry: if retries: warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response))) - warning("Waiting %d sec..." % self._fail_wait(retries)) - time.sleep(self._fail_wait(retries)) + + if self._fail_wait(retries) > 0: + warning("Waiting %d sec..." % self._fail_wait(retries)) + time.sleep(self._fail_wait(retries)) + return self.send_file(request, file, labels, throttle, retries - 1) else: warning("Too many failures. Giving up on '%s'" % (file.name)) @@ -634,7 +650,9 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) return response - def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries): + def recv_file(self, request, stream, labels, start_position = 0, retries = -1): + if retries == -1: + retries = self.config.max_retries method_string, resource, headers = request.get_triplet() if self.config.progress_meter: progress = self.config.progress_class(labels, 0) @@ -662,8 +680,11 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ progress.done("failed") if retries: warning("Retrying failed request: %s (%s)" % (resource['uri'], e)) - warning("Waiting %d sec..." % self._fail_wait(retries)) - time.sleep(self._fail_wait(retries)) + + if self._fail_wait(retries) > 0: + warning("Waiting %d sec..." % self._fail_wait(retries)) + time.sleep(self._fail_wait(retries)) + # Connection error -> same throttle value return self.recv_file(request, stream, labels, start_position, retries - 1) else: @@ -711,8 +732,11 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ progress.done("failed") if retries: warning("Retrying failed request: %s (%s)" % (resource['uri'], e)) - warning("Waiting %d sec..." % self._fail_wait(retries)) - time.sleep(self._fail_wait(retries)) + + if self._fail_wait(retries) > 0: + warning("Waiting %d sec..." % self._fail_wait(retries)) + time.sleep(self._fail_wait(retries)) + # Connection error -> same throttle value return self.recv_file(request, stream, labels, current_position, retries - 1) else: diff --git a/s3cmd b/s3cmd index 6942ab5..1b254f9 100755 --- a/s3cmd +++ b/s3cmd @@ -127,7 +127,7 @@ def cmd_ls(args): if len(args) > 0: uri = S3Uri(args[0]) if uri.type == "s3" and uri.has_bucket(): - subcmd_bucket_list(s3, uri) + subcmd_bucket_list(s3, uri, cfg.select_dir) return subcmd_buckets_list_all(s3) @@ -149,7 +149,7 @@ def subcmd_buckets_list_all(s3): bucket["Name"], )) -def subcmd_bucket_list(s3, uri): +def subcmd_bucket_list(s3, uri, select_dir = False): bucket = uri.bucket() prefix = uri.object() @@ -165,28 +165,39 @@ def subcmd_bucket_list(s3, uri): else: raise - if cfg.list_md5: + if select_dir: + format_string = u"%(uri)s" + elif cfg.list_md5: format_string = u"%(timestamp)16s %(size)9s%(coeff)1s %(md5)32s %(uri)s" else: format_string = u"%(timestamp)16s %(size)9s%(coeff)1s %(uri)s" - for prefix in response['common_prefixes']: - output(format_string % { - "timestamp": "", - "size": "DIR", - "coeff": "", - "md5": "", - "uri": uri.compose_uri(bucket, prefix["Prefix"])}) - - for object in response["list"]: - size, size_coeff = formatSize(object["Size"], Config().human_readable_sizes) - output(format_string % { - "timestamp": formatDateTime(object["LastModified"]), - "size" : str(size), - "coeff": size_coeff, - "md5" : object['ETag'].strip('"'), - "uri": uri.compose_uri(bucket, object["Key"]), - }) + if select_dir: + for prefix in response['common_prefixes']: + output(format_string % { + "timestamp": "", + "size": "DIR", + "coeff": "", + "md5": "", + "uri": uri.compose_uri(bucket, prefix["Prefix"])}) + else: + for prefix in response['common_prefixes']: + output(format_string % { + "timestamp": "", + "size": "DIR", + "coeff": "", + "md5": "", + "uri": uri.compose_uri(bucket, prefix["Prefix"])}) + + for object in response["list"]: + size, size_coeff = formatSize(object["Size"], Config().human_readable_sizes) + output(format_string % { + "timestamp": formatDateTime(object["LastModified"]), + "size" : str(size), + "coeff": size_coeff, + "md5" : object['ETag'].strip('"'), + "uri": uri.compose_uri(bucket, object["Key"]), + }) def cmd_bucket_create(args): s3 = S3(Config()) @@ -381,7 +392,7 @@ def cmd_object_put(args): #Necessary to ensure KeyboardInterrupt can actually kill #Otherwise Queue.join() blocks until all queue elements have completed - while threading.active_count() > 1: + while threading.activeCount() > 1: time.sleep(.1) q.join() @@ -532,7 +543,7 @@ def cmd_object_get(args): #Necessary to ensure KeyboardInterrupt can actually kill #Otherwise Queue.join() blocks until all queue elements have completed - while threading.active_count() > 1: + while threading.activeCount() > 1: time.sleep(.1) q.join() @@ -689,28 +700,74 @@ def subcmd_cp_mv(args, process_fce, action_str, message): warning(u"Exitting now because of --dry-run") return - seq = 0 - for key in remote_list: - seq += 1 - seq_label = "[%d of %d]" % (seq, remote_count) + if cfg.parallel and len(remote_list) > 1: + #Disabling progress metter for parallel downloads. + cfg.progress_meter = False + #Initialize Queue + global q + q = Queue.Queue() + + seq = 0 + for key in remote_list: + seq += 1 + seq_label = "[%d of %d]" % (seq, remote_count) - item = remote_list[key] - src_uri = S3Uri(item['object_uri_str']) - dst_uri = S3Uri(item['dest_name']) + item = remote_list[key] + src_uri = S3Uri(item['object_uri_str']) + dst_uri = S3Uri(item['dest_name']) + extra_headers = copy(cfg.extra_headers) + q.put([src_uri,dst_uri,extra_headers,process_fce,message,seq_label]) - extra_headers = copy(cfg.extra_headers) - response = process_fce(src_uri, dst_uri, extra_headers) - output(message % { "src" : src_uri, "dst" : dst_uri }) - if Config().acl_public: - info(u"Public URL is: %s" % dst_uri.public_url()) + for i in range(cfg.workers): + t = threading.Thread(target=cp_mv_worker) + t.daemon = True + t.start() + + #Necessary to ensure KeyboardInterrupt can actually kill + #Otherwise Queue.join() blocks until all queue elements have completed + while threading.activeCount() > 1: + time.sleep(.1) + + q.join() + else: + seq = 0 + for key in remote_list: + seq += 1 + seq_label = "[%d of %d]" % (seq, remote_count) + + item = remote_list[key] + src_uri = S3Uri(item['object_uri_str']) + dst_uri = S3Uri(item['dest_name']) + + extra_headers = copy(cfg.extra_headers) + response = process_fce(src_uri, dst_uri, extra_headers) + output(message % { "src" : src_uri, "dst" : dst_uri, "seq_label" : seq_label}) + if Config().acl_public: + info(u"Public URL is: %s" % dst_uri.public_url()) + +def cp_mv_worker(): + while True: + try: + (src_uri,dst_uri,extra_headers,process_fce,message,seq_label) = q.get_nowait() + except Queue.Empty: + return + try: + response = process_fce(src_uri, dst_uri, extra_headers) + output(message % { "src" : src_uri, "dst" : dst_uri , "seq_label" : seq_label}) + if Config().acl_public: + info(u"Public URL is: %s" % dst_uri.public_url()) + except Exception, e: + report_exception(e) + exit + q.task_done() def cmd_cp(args): s3 = S3(Config()) - subcmd_cp_mv(args, s3.object_copy, "copy", "File %(src)s copied to %(dst)s") + subcmd_cp_mv(args, s3.object_copy, "copy", "File %(src)s copied to %(dst)s %(seq_label)s") def cmd_mv(args): s3 = S3(Config()) - subcmd_cp_mv(args, s3.object_move, "move", "File %(src)s moved to %(dst)s") + subcmd_cp_mv(args, s3.object_move, "move", "File %(src)s moved to %(dst)s %(seq_label)s") def cmd_info(args): s3 = S3(Config()) @@ -866,7 +923,7 @@ def _filelist_filter_exclude_include(src_list): debug(u"PASS: %s" % (file)) return src_list, exclude_list -def _compare_filelists(src_list, dst_list, src_is_local_and_dst_is_remote): +def _compare_filelists(src_list, dst_list, src_is_local_and_dst_is_remote, src_and_dst_remote = False): info(u"Verifying attributes...") cfg = Config() exists_list = SortedDict(ignore_case = False) @@ -895,7 +952,10 @@ def _compare_filelists(src_list, dst_list, src_is_local_and_dst_is_remote): if attribs_match and 'md5' in cfg.sync_checks: ## ... same size, check MD5 - if src_is_local_and_dst_is_remote: + if src_and_dst_remote: + src_md5 = src_list[file]['md5'] + dst_md5 = dst_list[file]['md5'] + elif src_is_local_and_dst_is_remote: src_md5 = Utils.hash_file_md5(src_list[file]['full_name']) dst_md5 = dst_list[file]['md5'] else: @@ -917,6 +977,84 @@ def _compare_filelists(src_list, dst_list, src_is_local_and_dst_is_remote): return src_list, dst_list, exists_list +def cmd_sync_remote2remote(args): + s3 = S3(Config()) + + destination_base = args[-1] + dest_list = fetch_remote_list(destination_base, recursive = True, require_attribs = True) + source_list = fetch_remote_list(args[:-1], recursive = True, require_attribs = True) + + dest_count = len(dest_list) + source_count = len(source_list) + + info(u"Found %d source files, %d destination files" % (dest_count, source_count)) + + source_list, exclude_list = _filelist_filter_exclude_include(source_list) + + source_list, dest_list, existing_list = _compare_filelists(source_list, dest_list, False, True) + + dest_count = len(dest_list) + source_count = len(source_list) + + info(u"Summary: %d files to copy, %d files to delete" % (source_count, dest_count)) + + if cfg.recursive: + if not destination_base.endswith("/"): + destination_base += "/" + for key in source_list: + source_list[key]['dest_name'] = destination_base + key + else: + key = source_list.keys()[0] + if destination_base.endswith("/"): + source_list[key]['dest_name'] = destination_base + key + else: + source_list[key]['dest_name'] = destination_base + + if cfg.parallel and len(source_list) > 1: + #Disabling progress metter for parallel downloads. + cfg.progress_meter = False + #Initialize Queue + global q + q = Queue.Queue() + + seq = 0 + for key in source_list: + seq += 1 + seq_label = "[%d of %d]" % (seq, source_count) + + item = source_list[key] + src_uri = S3Uri(item['object_uri_str']) + dst_uri = S3Uri(item['dest_name']) + extra_headers = copy(cfg.extra_headers) + q.put([src_uri,dst_uri,extra_headers,s3.object_copy,"File %(src)s copied to %(dst)s %(seq_label)s",seq_label]) + + for i in range(cfg.workers): + t = threading.Thread(target=cp_mv_worker) + t.daemon = True + t.start() + + #Necessary to ensure KeyboardInterrupt can actually kill + #Otherwise Queue.join() blocks until all queue elements have completed + while threading.activeCount() > 1: + time.sleep(.1) + + q.join() + else: + seq = 0 + for key in source_list: + seq += 1 + seq_label = "[%d of %d]" % (seq, source_count) + + item = source_list[key] + src_uri = S3Uri(item['object_uri_str']) + dst_uri = S3Uri(item['dest_name']) + + extra_headers = copy(cfg.extra_headers) + response = s3.object_copy(src_uri, dst_uri, extra_headers) + output("File %(src)s copied to %(dst)s" % { "src" : src_uri, "dst" : dst_uri }) + if Config().acl_public: + info(u"Public URL is: %s" % dst_uri.public_url()) + def cmd_sync_remote2local(args): s3 = S3(Config()) @@ -997,7 +1135,7 @@ def cmd_sync_remote2local(args): #Necessary to ensure KeyboardInterrupt can actually kill #Otherwise Queue.join() blocks until all queue elements have completed - while threading.active_count() > 1: + while threading.activeCount() > 1: time.sleep(.1) q.join() @@ -1191,7 +1329,7 @@ def cmd_sync_local2remote(args): #Necessary to ensure KeyboardInterrupt can actually kill #Otherwise Queue.join() blocks until all queue elements have completed - while threading.active_count() > 1: + while threading.activeCount() > 1: time.sleep(.1) q.join() @@ -1279,7 +1417,9 @@ def cmd_sync(args): return cmd_sync_local2remote(args) if S3Uri(args[0]).type == "s3" and S3Uri(args[-1]).type == "file": return cmd_sync_remote2local(args) - raise ParameterError("Invalid source/destination: '%s'" % "' '".join(args)) + if S3Uri(args[0]).type == "s3" and S3Uri(args[-1]).type == "s3": + return cmd_sync_remote2remote(args) + raise ParameterError("Invalid source/destination: '%s'" % "' '".join(args)) def cmd_setacl(args): def _update_acl(uri, seq_label = ""): @@ -1829,6 +1969,10 @@ def main(): optparser.add_option( "--parallel", dest="parallel", action="store_true", help="Download and upload files in parallel.") optparser.add_option( "--workers", dest="workers", default=10, help="Sets the number of workers to run for uploading and downloading files (can only be used in conjunction with the --parallel argument)") + optparser.add_option( "--directory", dest="select_dir", action="store_true", default=False, help="Select directories (only for [ls]).") + optparser.add_option( "--max-retries", dest="max_retries", type="int", action="store", default=5, help="Number of retry before failing GET or PUT.") + optparser.add_option( "--retry-delay", dest="retry_delay", type="int", action="store", default=3, help="Time delay to wait after failing GET or PUT.") + optparser.set_usage(optparser.usage + " COMMAND [parameters]") optparser.set_description('S3cmd is a tool for managing objects in '+ 'Amazon S3 storage. It allows for making and removing '+ diff --git a/testsuite/etc/brokenlink.png b/testsuite/etc/brokenlink.png index 3e7b628..e69de29 120000 --- a/testsuite/etc/brokenlink.png +++ b/testsuite/etc/brokenlink.png @@ -1 +0,0 @@ -no-such-image.png \ No newline at end of file diff --git a/testsuite/etc/linked.png b/testsuite/etc/linked.png index bd36e71..e69de29 120000 --- a/testsuite/etc/linked.png +++ b/testsuite/etc/linked.png @@ -1 +0,0 @@ -logo.png \ No newline at end of file diff --git a/testsuite/etc/linked1.png b/testsuite/etc/linked1.png index bd36e71..e69de29 120000 --- a/testsuite/etc/linked1.png +++ b/testsuite/etc/linked1.png @@ -1 +0,0 @@ -logo.png \ No newline at end of file diff --git a/testsuite/etc/more/linked-dir b/testsuite/etc/more/linked-dir index 8c3482d..e69de29 120000 --- a/testsuite/etc/more/linked-dir +++ b/testsuite/etc/more/linked-dir @@ -1 +0,0 @@ -../../etc \ No newline at end of file