Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: LakeFS does not support the S3 ListMultipartUploads API #8422

Open
gschmutz opened this issue Dec 14, 2024 · 2 comments
Open

[Bug]: LakeFS does not support the S3 ListMultipartUploads API #8422

gschmutz opened this issue Dec 14, 2024 · 2 comments
Labels
bug Something isn't working contributor

Comments

@gschmutz
Copy link

gschmutz commented Dec 14, 2024

What happened?

I'm trying to use Apache NiFi together with LakeFS backed by S3 storage on Minio. To write to LakeFS, I have configured the standard PutS3 Processor and I can write objects to LakeFS. But I also get an error on the PutS3 processor in NiFI, which is a bit "distracting" and misleading.

I am currently in the process of developing and contributing LakeFS support for Apache NiFi, so it would be great to have theListMultipartUploads API supported.

Here the Stack trace in Apache NiFi

nifi2-1  | 2024-12-13 15:05:36,096 ERROR [Timer-Driven Process Thread-1] o.a.nifi.processors.aws.s3.PutS3Object PutS3Object[id=fb0d70a0-ec72-3e07-0f5c-e92027c09cff] Error checking S3 Multipart Upload list for demo
nifi2-1  | com.amazonaws.services.s3.model.AmazonS3Exception: This operation is not supported in LakeFS (Service: Amazon S3; Status Code: 405; Error Code: ERRLakeFSNotSupported; Request ID: 7e99337b-03f6-4d36-ad8e-04e5953d4d68; S3 Extended Request ID: F3D5600CFAEC9006; Proxy: null)
nifi2-1  | 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1912)
nifi2-1  | 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1450)
nifi2-1  | 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1419)
nifi2-1  | 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1183)
nifi2-1  | 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:838)
nifi2-1  | 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:805)
nifi2-1  | 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:779)
nifi2-1  | 	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735)
nifi2-1  | 	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717)
nifi2-1  | 	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581)
nifi2-1  | 	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559)
nifi2-1  | 	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5590)
nifi2-1  | 	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5537)
nifi2-1  | 	at com.amazonaws.services.s3.AmazonS3Client.listMultipartUploads(AmazonS3Client.java:3873)
nifi2-1  | 	at org.apache.nifi.processors.aws.s3.PutS3Object.getS3AgeoffListAndAgeoffLocalState(PutS3Object.java:889)
nifi2-1  | 	at org.apache.nifi.processors.aws.s3.PutS3Object.ageoffS3Uploads(PutS3Object.java:873)
nifi2-1  | 	at org.apache.nifi.processors.aws.s3.PutS3Object.onTrigger(PutS3Object.java:519)
nifi2-1  | 	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
nifi2-1  | 	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1274)
nifi2-1  | 	at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:244)
nifi2-1  | 	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
nifi2-1  | 	at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
nifi2-1  | 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
nifi2-1  | 	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
nifi2-1  | 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
nifi2-1  | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
nifi2-1  | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
nifi2-1  | 	at java.base/java.lang.Thread.run(Thread.java:1583)

It is caused when Nifi invokes the AmazonS3Client.listMultipartUploads() method. After doing some additional tests, I realized that ListParts (which you support) is a different API than ListMultipartUploads.

Below the documentation of the tests I have done and some python code to reproduce the error:


Problem with ListMultipartUploads

LakeFS does not implement the ListMultipartUploads API but "only" the ListParts API.

Against LakeFS with AWS S3 backend

My lakefs setup is exposed on port 28220.

guido.schmutz@AMAXDKFVW0HYY ~/w/platys-test> aws s3api --endpoint-url http://192.168.1.129:28220 list-parts --bucket demo --key main/mpu --upload-id SkGl58QIBUa.R0lvR3dbiUrmEDaGfCMhA9lxmgz.sXUhArNBKdtyryHsFE12erxy1oLWH4zGCp65QC_KHYvzzh4D08MS1Xuz_hugPHgnQZjmD6CJWo6UqphAcz6ha13K
{
    "Parts": [
        {
            "PartNumber": 1,
            "LastModified": "2024-12-14T07:59:08+00:00",
            "ETag": "d3b07384d113edec49eaa6238ad5ff00",
            "Size": 4
        }
    ],
    "ChecksumAlgorithm": null,
    "Initiator": null,
    "Owner": null,
    "StorageClass": null
}
guido.schmutz@AMAXDKFVW0HYY ~/w/platys-test> echo foo > temp_body.txt
guido.schmutz@AMAXDKFVW0HYY ~/w/platys-test> aws s3api --endpoint-url http://192.168.1.129:28220 upload-part --bucket demo --key main/mpu --upload-id SkGl58QIBUa.R0lvR3dbiUrmEDaGfCMhA9lxmgz.sXUhArNBKdtyryHsFE12erxy1oLWH4zGCp65QC_KHYvzzh4D08MS1Xuz_hugPHgnQZjmD6CJWo6UqphAcz6ha13K --part-number 1 --body temp_body.txt
{
    "ServerSideEncryption": "AES256",
    "ETag": "\"d3b07384d113edec49eaa6238ad5ff00\""
}
aws s3api --endpoint-url http://192.168.1.129:28220 list-parts --bucket demo --key main/mpu --upload-id SkGl58QIBUa.R0lvR3dbiUrmEDaGfCMhA9lxmgz.sXUhArNBKdtyryHsFE12erxy1oLWH4zGCp65QC_KHYvzzh4D08MS1Xuz_hugPHgnQZjmD6CJWo6UqphAcz6ha13K
{
    "Parts": [
        {
            "PartNumber": 1,
            "LastModified": "2024-12-14T07:59:08+00:00",
            "ETag": "d3b07384d113edec49eaa6238ad5ff00",
            "Size": 4
        }
    ],
    "ChecksumAlgorithm": null,
    "Initiator": null,
    "Owner": null,
    "StorageClass": null

This is using the ListParts API.

Against LakeFS with AWS S3 backend

guido.schmutz@AMAXDKFVW0HYY ~/w/platys-test [254]> aws s3api create-multipart-upload --bucket gschmutz-lakefs --key main/mpu
{
    "ServerSideEncryption": "AES256",
    "Bucket": "gschmutz-lakefs",
    "Key": "main/mpu",
    "UploadId": "jDemmVS6ehIHetSwp.64YMXF_55gUsWTdHlqgrlynFcJgSjFsIpRAAsoZX61cCjGW8WvtekeTJU989M7mU6o7Rhu9hLrCd6IskH0kkI914olV4guT5escT4NaC9NBnOw"
}
guido.schmutz@AMAXDKFVW0HYY ~/w/platys-test> aws s3api upload-part --bucket gschmutz-lakefs  --key main/mpu --upload-id jDemmVS6ehIHetSwp.64YMXF_55gUsWTdHlqgrlynFcJgSjFsIpRAAsoZX61cCjGW8WvtekeTJU989M7mU6o7Rhu9hLrCd6IskH0kkI914olV4guT5escT4NaC9NBnOw --part-number 1 --body temp_body.txt
{
    "ServerSideEncryption": "AES256",
    "ETag": "\"d3b07384d113edec49eaa6238ad5ff00\""
}
guido.schmutz@AMAXDKFVW0HYY ~/w/platys-test> aws s3api list-parts --bucket gschmutz-lakefs  --key main/mpu --upload-id jDemmVS6ehIHetSwp.64YMXF_55gUsWTdHlqgrlynFcJgSjFsIpRAAsoZX61cCjGW8WvtekeTJU989M7mU6o7Rhu9hLrCd6IskH0kkI914olV4guT5escT4NaC9NBnOw
{
    "Parts": [
        {
            "PartNumber": 1,
            "LastModified": "2024-12-14T08:06:19+00:00",
            "ETag": "\"d3b07384d113edec49eaa6238ad5ff00\"",
            "Size": 4
        }
    ],
    "ChecksumAlgorithm": null,
    "Initiator": {
        "ID": "arn:aws:iam::663559919114:user/s3user",
        "DisplayName": "s3user"
    },
    "Owner": {
        "ID": "c484141daec5bf65c3e44cd50a2dfe98b3f7982ca41bcffb402e4077e90cdf2a"
    },
    "StorageClass": "STANDARD"
}

Testing the list_multiparts_uploads against LakeFS with AWS S3 backend

pip install boto3
import boto3

# Create an S3 client
s3_client = boto3.client('s3', endpoint_url='http://192.168.1.129:28220')  # Use endpoint_url for custom S3-like services like MinIO

# Bucket name
bucket_name = 'demo'

try:
    # Call list_multipart_uploads
    response = s3_client.list_multipart_uploads(Bucket=bucket_name)

    # Print the uploads
    if 'Uploads' in response:
        print(f"Multipart uploads in bucket '{bucket_name}':")
        for upload in response['Uploads']:
            print(f"Key: {upload['Key']}, Upload ID: {upload['UploadId']}")
    else:
        print(f"No multipart uploads found in bucket '{bucket_name}'.")

except Exception as e:
    print(f"Error listing multipart uploads: {e}")

Running it in Jupyter produces the error

Error listing multipart uploads: An error occurred (ERRLakeFSNotSupported) when calling the ListMultipartUploads operation: This operation is not supported in LakeFS

Testing the list_multiparts_uploads against AWS S3 backend

pip install boto3
import boto3

# Create an S3 client
import boto3

# Create an S3 client
s3_client = boto3.client('s3')  # Use endpoint_url for custom S3-like services like MinIO

# Bucket name
bucket_name = 'gschmutz-lakefs'

try:
    # Call list_multipart_uploads
    response = s3_client.list_multipart_uploads(Bucket=bucket_name)

    # Print the uploads
    if 'Uploads' in response:
        print(f"Multipart uploads in bucket '{bucket_name}':")
        for upload in response['Uploads']:
            print(f"Key: {upload['Key']}, Upload ID: {upload['UploadId']}")
    else:
        print(f"No multipart uploads found in bucket '{bucket_name}'.")

except Exception as e:
    print(f"Error listing multipart uploads: {e}")

Running it in Jupyter produces the follwoing result

Multipart uploads in bucket 'gschmutz-lakefs':
Key: lakefs/data/gbosm0m84jhc72i1jlk0/ctejj8u84jhc72i1jll0, Upload ID: SkGl58QIBUa.R0lvR3dbiUrmEDaGfCMhA9lxmgz.sXUhArNBKdtyryHsFE12erxy1oLWH4zGCp65QC_KHYvzzh4D08MS1Xuz_hugPHgnQZjmD6CJWo6UqphAcz6ha13K
Key: main/mpu, Upload ID: jDemmVS6ehIHetSwp.64YMXF_55gUsWTdHlqgrlynFcJgSjFsIpRAAsoZX61cCjGW8WvtekeTJU989M7mU6o7Rhu9hLrCd6IskH0kkI914olV4guT5escT4NaC9NBnOw

We can see that both multi-part uploads created in the 2 tests (once against LakeFS and once directly against S3) above are returned.

Expected behavior

That the ListMultipartUploads API is invoked in S3 by LakeFS and the in-progress multipart uploads in a bucket or returned to the caller.

lakeFS version

1.44.0

How lakeFS is installed

Docker Compose with LakeFS running against AWS S3 storage backend

Affected clients

n.a.

Relevant log output

No response

Contact details

[email protected]

@gschmutz gschmutz added bug Something isn't working contributor labels Dec 14, 2024
@arielshaqed
Copy link
Contributor

Hi @gschmutz ,

Thanks for the in-depth conversations and especially for this issue report! It is extremely well explained and detailed.

IIUC you are making lakeFS work with Apache Nifi, and this issue will help?

@gschmutz
Copy link
Author

gschmutz commented Dec 16, 2024

Hi @arielshaqed,

yes, I have written some custom NiFi processors to support some of the operations on LakeFS, such as CreateBranch, Commit, Merge, DeleteBranch, so that it can be used with the standard GetS3 and PutS3 functionality of Apache NiFi to work with LakeFS. I'm currently testing and documenting it. I also want to add "sensors" similar to the support provided by LakeFS Airflow Provider.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working contributor
Projects
None yet
Development

No branches or pull requests

2 participants