diff --git a/src/include/s3fs.hpp b/src/include/s3fs.hpp index 7381268..449d010 100644 --- a/src/include/s3fs.hpp +++ b/src/include/s3fs.hpp @@ -225,6 +225,7 @@ class S3FileSystem : public HTTPFileSystem { return false; } void RemoveFile(const string &filename, optional_ptr opener = nullptr) override; + void RemoveFiles(const vector &filenames, optional_ptr opener = nullptr) override; void RemoveDirectory(const string &directory, optional_ptr opener = nullptr) override; void FileSync(FileHandle &handle) override; void Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) override; diff --git a/src/s3fs.cpp b/src/s3fs.cpp index a7b6a8d..62f39eb 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -14,6 +14,8 @@ #endif #include "duckdb/common/string_util.hpp" +#include "duckdb/common/crypto/md5.hpp" +#include "duckdb/common/types/blob.hpp" #include "duckdb/function/scalar/string_common.hpp" #include "duckdb/main/secret/secret_manager.hpp" #include "duckdb/storage/buffer_manager.hpp" @@ -958,21 +960,114 @@ void S3FileSystem::RemoveFile(const string &path, optional_ptr opene } } +// Forward declaration for FindTagContents (defined later in file) +optional_idx FindTagContents(const string &response, const string &tag, idx_t cur_pos, string &result); + +void S3FileSystem::RemoveFiles(const vector &paths, optional_ptr opener) { + if (paths.empty()) { + return; + } + + struct BucketUrlInfo { + string prefix; + string http_proto; + string host; + string path; + S3AuthParams auth_params; + }; + + unordered_map> keys_by_bucket; + unordered_map url_info_by_bucket; + + for (auto &path : paths) { + FileOpenerInfo info = {path}; + S3AuthParams auth_params = S3AuthParams::ReadFrom(opener, info); + auto parsed_url = S3UrlParse(path, auth_params); + ReadQueryParams(parsed_url.query_param, auth_params); + + const string &bucket = parsed_url.bucket; + if (keys_by_bucket.find(bucket) == keys_by_bucket.end()) { + string bucket_path = parsed_url.path.substr(0, parsed_url.path.length() - parsed_url.key.length() - 1); + if (bucket_path.empty()) { + bucket_path = "/"; + } + url_info_by_bucket[bucket] = {parsed_url.prefix, parsed_url.http_proto, parsed_url.host, bucket_path, + auth_params}; + } + + keys_by_bucket[bucket].push_back(parsed_url.key); + } + + constexpr idx_t MAX_KEYS_PER_REQUEST = 1000; + + for (auto &bucket_entry : keys_by_bucket) { + const string &bucket = bucket_entry.first; + const vector &keys = bucket_entry.second; + const auto &url_info = url_info_by_bucket[bucket]; + + for (idx_t batch_start = 0; batch_start < keys.size(); batch_start += MAX_KEYS_PER_REQUEST) { + idx_t batch_end = MinValue(batch_start + MAX_KEYS_PER_REQUEST, keys.size()); + + std::stringstream xml_body; + xml_body << ""; + xml_body << ""; + + for (idx_t i = batch_start; i < batch_end; i++) { + xml_body << "" << keys[i] << ""; + } + + xml_body << "true"; + xml_body << ""; + + string body = xml_body.str(); + + MD5Context md5_context; + md5_context.Add(body); + data_t md5_hash[MD5Context::MD5_HASH_LENGTH_BINARY]; + md5_context.Finish(md5_hash); + + string_t md5_blob(const_char_ptr_cast(md5_hash), MD5Context::MD5_HASH_LENGTH_BINARY); + string content_md5 = Blob::ToBase64(md5_blob); + + const string http_query_param_for_sig = "delete="; + const string http_query_param_for_url = "delete"; + auto payload_hash = GetPayloadHash(const_cast(body.data()), body.length()); + + auto headers = CreateS3Header(url_info.path, http_query_param_for_sig, url_info.host, "s3", "POST", + url_info.auth_params, "", "", payload_hash, ""); + headers["Content-MD5"] = content_md5; + headers["Content-Type"] = "application/xml"; + + string http_url = url_info.http_proto + url_info.host + S3FileSystem::UrlEncode(url_info.path) + "?" + + http_query_param_for_url; + string bucket_url = url_info.prefix + bucket + "/"; + auto handle = OpenFile(bucket_url, FileFlags::FILE_FLAGS_READ, opener); + + string result; + auto res = HTTPFileSystem::PostRequest(*handle, http_url, headers, result, const_cast(body.data()), + body.length()); + + if (res->status != HTTPStatusCode::OK_200) { + throw IOException("Failed to remove files: HTTP %d (%s)\n%s", static_cast(res->status), + res->GetError(), result); + } + + idx_t cur_pos = 0; + string error_content; + auto error_pos = FindTagContents(result, "Error", cur_pos, error_content); + if (error_pos.IsValid()) { + throw IOException("Failed to remove files: %s", error_content); + } + } + } +} + void S3FileSystem::RemoveDirectory(const string &path, optional_ptr opener) { + vector files_to_remove; ListFiles( - path, - [&](const string &file, bool is_dir) { - try { - this->RemoveFile(file, opener); - } catch (IOException &e) { - string errmsg(e.what()); - if (errmsg.find("No such file or directory") != std::string::npos) { - return; - } - throw; - } - }, - opener.get()); + path, [&](const string &file, bool is_dir) { files_to_remove.push_back(file); }, opener.get()); + + RemoveFiles(files_to_remove, opener); } void S3FileSystem::FileSync(FileHandle &handle) { diff --git a/test/sql/copy/s3/s3_remove_files.test b/test/sql/copy/s3/s3_remove_files.test new file mode 100644 index 0000000..ad12c81 --- /dev/null +++ b/test/sql/copy/s3/s3_remove_files.test @@ -0,0 +1,76 @@ +# name: test/sql/copy/s3/s3_remove_files.test +# description: Test RemoveFiles functionality via COPY OVERWRITE on S3 +# group: [s3] + +require parquet + +require httpfs + +require-env S3_TEST_SERVER_AVAILABLE 1 + +require-env AWS_DEFAULT_REGION + +require-env AWS_ACCESS_KEY_ID + +require-env AWS_SECRET_ACCESS_KEY + +require-env DUCKDB_S3_ENDPOINT + +require-env DUCKDB_S3_USE_SSL + +# override the default behaviour of skipping HTTP errors and connection failures: this test fails on connection issues +set ignore_error_messages + +foreach url_style path vhost + +statement ok +SET s3_url_style='${url_style}' + +# Step 1: Clean up any leftover files from previous test runs (OVERWRITE clears the directory) +# and create initial partitioned file (part_col=1) +statement ok +COPY (SELECT 1 as part_col, 'initial_1' as value) TO 's3://test-bucket/remove_files_test' (FORMAT PARQUET, PARTITION_BY (part_col), OVERWRITE); + +statement ok +COPY (SELECT 2 as part_col, 'initial_2' as value) TO 's3://test-bucket/remove_files_test' (FORMAT PARQUET, PARTITION_BY (part_col), OVERWRITE_OR_IGNORE); + +# Step 2: Verify both partitions exist +query I +SELECT count(*) FROM glob('s3://test-bucket/remove_files_test/**/*.parquet') +---- +2 + +query IT +SELECT part_col, value FROM 's3://test-bucket/remove_files_test/**/*.parquet' ORDER BY part_col +---- +1 initial_1 +2 initial_2 + +# Step 3: OVERWRITE with completely different data (part_col=99) +# This should delete all existing files via RemoveFiles and create new ones +statement ok +COPY (SELECT 99 as part_col, 'new_data' as value) TO 's3://test-bucket/remove_files_test' (FORMAT PARQUET, PARTITION_BY (part_col), OVERWRITE); + +# Step 4: Verify OLD partitions are removed +query I +SELECT count(*) FROM glob('s3://test-bucket/remove_files_test/part_col=1/*.parquet') +---- +0 + +query I +SELECT count(*) FROM glob('s3://test-bucket/remove_files_test/part_col=2/*.parquet') +---- +0 + +# Step 5: Verify only NEW partition exists +query I +SELECT count(*) FROM glob('s3://test-bucket/remove_files_test/**/*.parquet') +---- +1 + +query IT +SELECT part_col, value FROM 's3://test-bucket/remove_files_test/**/*.parquet' +---- +99 new_data + +endloop