Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/include/s3fs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class S3FileSystem : public HTTPFileSystem {
return false;
}
void RemoveFile(const string &filename, optional_ptr<FileOpener> opener = nullptr) override;
void RemoveFiles(const vector<string> &filenames, optional_ptr<FileOpener> opener = nullptr) override;
void RemoveDirectory(const string &directory, optional_ptr<FileOpener> opener = nullptr) override;
void FileSync(FileHandle &handle) override;
void Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) override;
Expand Down
121 changes: 108 additions & 13 deletions src/s3fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#endif

#include "duckdb/common/string_util.hpp"
#include "duckdb/common/crypto/md5.hpp"
#include "duckdb/common/types/blob.hpp"
#include "duckdb/function/scalar/string_common.hpp"
#include "duckdb/main/secret/secret_manager.hpp"
#include "duckdb/storage/buffer_manager.hpp"
Expand Down Expand Up @@ -958,21 +960,114 @@ void S3FileSystem::RemoveFile(const string &path, optional_ptr<FileOpener> opene
}
}

// Forward declaration for FindTagContents (defined later in file)
optional_idx FindTagContents(const string &response, const string &tag, idx_t cur_pos, string &result);

void S3FileSystem::RemoveFiles(const vector<string> &paths, optional_ptr<FileOpener> opener) {
if (paths.empty()) {
return;
}

struct BucketUrlInfo {
string prefix;
string http_proto;
string host;
string path;
S3AuthParams auth_params;
};

unordered_map<string, vector<string>> keys_by_bucket;
unordered_map<string, BucketUrlInfo> url_info_by_bucket;

for (auto &path : paths) {
FileOpenerInfo info = {path};
S3AuthParams auth_params = S3AuthParams::ReadFrom(opener, info);
auto parsed_url = S3UrlParse(path, auth_params);
ReadQueryParams(parsed_url.query_param, auth_params);

const string &bucket = parsed_url.bucket;
if (keys_by_bucket.find(bucket) == keys_by_bucket.end()) {
string bucket_path = parsed_url.path.substr(0, parsed_url.path.length() - parsed_url.key.length() - 1);
if (bucket_path.empty()) {
bucket_path = "/";
}
url_info_by_bucket[bucket] = {parsed_url.prefix, parsed_url.http_proto, parsed_url.host, bucket_path,
auth_params};
}

keys_by_bucket[bucket].push_back(parsed_url.key);
}

constexpr idx_t MAX_KEYS_PER_REQUEST = 1000;

for (auto &bucket_entry : keys_by_bucket) {
const string &bucket = bucket_entry.first;
const vector<string> &keys = bucket_entry.second;
const auto &url_info = url_info_by_bucket[bucket];

for (idx_t batch_start = 0; batch_start < keys.size(); batch_start += MAX_KEYS_PER_REQUEST) {
idx_t batch_end = MinValue<idx_t>(batch_start + MAX_KEYS_PER_REQUEST, keys.size());

std::stringstream xml_body;
xml_body << "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
xml_body << "<Delete xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">";

for (idx_t i = batch_start; i < batch_end; i++) {
xml_body << "<Object><Key>" << keys[i] << "</Key></Object>";
}

xml_body << "<Quiet>true</Quiet>";
xml_body << "</Delete>";

string body = xml_body.str();

MD5Context md5_context;
md5_context.Add(body);
data_t md5_hash[MD5Context::MD5_HASH_LENGTH_BINARY];
md5_context.Finish(md5_hash);

string_t md5_blob(const_char_ptr_cast(md5_hash), MD5Context::MD5_HASH_LENGTH_BINARY);
string content_md5 = Blob::ToBase64(md5_blob);

const string http_query_param_for_sig = "delete=";
const string http_query_param_for_url = "delete";
auto payload_hash = GetPayloadHash(const_cast<char *>(body.data()), body.length());

auto headers = CreateS3Header(url_info.path, http_query_param_for_sig, url_info.host, "s3", "POST",
url_info.auth_params, "", "", payload_hash, "");
headers["Content-MD5"] = content_md5;
headers["Content-Type"] = "application/xml";

string http_url = url_info.http_proto + url_info.host + S3FileSystem::UrlEncode(url_info.path) + "?" +
http_query_param_for_url;
string bucket_url = url_info.prefix + bucket + "/";
auto handle = OpenFile(bucket_url, FileFlags::FILE_FLAGS_READ, opener);

string result;
auto res = HTTPFileSystem::PostRequest(*handle, http_url, headers, result, const_cast<char *>(body.data()),
body.length());

if (res->status != HTTPStatusCode::OK_200) {
throw IOException("Failed to remove files: HTTP %d (%s)\n%s", static_cast<int>(res->status),
res->GetError(), result);
}

idx_t cur_pos = 0;
string error_content;
auto error_pos = FindTagContents(result, "Error", cur_pos, error_content);
if (error_pos.IsValid()) {
throw IOException("Failed to remove files: %s", error_content);
}
}
}
}

void S3FileSystem::RemoveDirectory(const string &path, optional_ptr<FileOpener> opener) {
vector<string> files_to_remove;
ListFiles(
path,
[&](const string &file, bool is_dir) {
try {
this->RemoveFile(file, opener);
} catch (IOException &e) {
string errmsg(e.what());
if (errmsg.find("No such file or directory") != std::string::npos) {
return;
}
throw;
}
},
opener.get());
path, [&](const string &file, bool is_dir) { files_to_remove.push_back(file); }, opener.get());

RemoveFiles(files_to_remove, opener);
}

void S3FileSystem::FileSync(FileHandle &handle) {
Expand Down
76 changes: 76 additions & 0 deletions test/sql/copy/s3/s3_remove_files.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# name: test/sql/copy/s3/s3_remove_files.test
# description: Test RemoveFiles functionality via COPY OVERWRITE on S3
# group: [s3]

require parquet

require httpfs

require-env S3_TEST_SERVER_AVAILABLE 1

require-env AWS_DEFAULT_REGION

require-env AWS_ACCESS_KEY_ID

require-env AWS_SECRET_ACCESS_KEY

require-env DUCKDB_S3_ENDPOINT

require-env DUCKDB_S3_USE_SSL

# override the default behaviour of skipping HTTP errors and connection failures: this test fails on connection issues
set ignore_error_messages

foreach url_style path vhost

statement ok
SET s3_url_style='${url_style}'

# Step 1: Clean up any leftover files from previous test runs (OVERWRITE clears the directory)
# and create initial partitioned file (part_col=1)
statement ok
COPY (SELECT 1 as part_col, 'initial_1' as value) TO 's3://test-bucket/remove_files_test' (FORMAT PARQUET, PARTITION_BY (part_col), OVERWRITE);

statement ok
COPY (SELECT 2 as part_col, 'initial_2' as value) TO 's3://test-bucket/remove_files_test' (FORMAT PARQUET, PARTITION_BY (part_col), OVERWRITE_OR_IGNORE);

# Step 2: Verify both partitions exist
query I
SELECT count(*) FROM glob('s3://test-bucket/remove_files_test/**/*.parquet')
----
2

query IT
SELECT part_col, value FROM 's3://test-bucket/remove_files_test/**/*.parquet' ORDER BY part_col
----
1 initial_1
2 initial_2

# Step 3: OVERWRITE with completely different data (part_col=99)
# This should delete all existing files via RemoveFiles and create new ones
statement ok
COPY (SELECT 99 as part_col, 'new_data' as value) TO 's3://test-bucket/remove_files_test' (FORMAT PARQUET, PARTITION_BY (part_col), OVERWRITE);

# Step 4: Verify OLD partitions are removed
query I
SELECT count(*) FROM glob('s3://test-bucket/remove_files_test/part_col=1/*.parquet')
----
0

query I
SELECT count(*) FROM glob('s3://test-bucket/remove_files_test/part_col=2/*.parquet')
----
0

# Step 5: Verify only NEW partition exists
query I
SELECT count(*) FROM glob('s3://test-bucket/remove_files_test/**/*.parquet')
----
1

query IT
SELECT part_col, value FROM 's3://test-bucket/remove_files_test/**/*.parquet'
----
99 new_data

endloop