Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload big io.Buffer to S3 #380

Closed
ivanhigueram opened this issue Nov 7, 2019 · 13 comments · Fixed by #796
Closed

Upload big io.Buffer to S3 #380

ivanhigueram opened this issue Nov 7, 2019 · 13 comments · Fixed by #796
Labels

Comments

@ivanhigueram
Copy link

ivanhigueram commented Nov 7, 2019

Problem description

I am requesting a set of files, zipping them, and then upload the zipped data to S3 using smart_open and a io.BytesIO() object. The size of the compressed data exceeds the 5 Gb S3 limit, and I know that in that case a multi-parts approach should be use (just like in boto3). I am using smart_open.s3.open() for doing this, but I do not completely understand how to configure the multi-part upload to avoid the EntityTooLarge error. I keep getting the error when using my code. Should I divide my file before hand or specify the number of parts? Checking the source code I don't see a num_parts option.

 (EntityTooLarge) when calling the UploadPart operation: Your proposed upload exceeds the maximum allowed size

My function is the following:

def stream_time_range_s3(start_date,
                         end_date,
                         aws_key,
                         aws_secret,
                         aws_bucket_name,
                         key,
                         max_workers,
                         delta):
    """
    Download individual month directory of .grd files to local directory.

    This function will download using the ftplib all the .grd files between the
    start_date and the end_date. All dates in the NOAA NARR server are
    stored following this order:
        data
        ├── year/month
            ├── year/month/day01
            ├── year/month/day02

    Here we download the monthly directory with the user-defined dates in the
    start and end dates. 

    Params:
        - start_year str: year to start download.
        - end_year str: year to stop download.
    """

    logger = logging.getLogger(__name__)

    if not isinstance(start_date, datetime):
        start_date = datetime.strptime(start_date, '%Y-%m-%d')
    else:
        ValueError(f'{start_date} is not in the correct format or not a valid type')


    session = boto3.Session(
        aws_access_key_id=aws_key,
        aws_secret_access_key=aws_secret
    )

    base_url = 'https://nomads.ncdc.noaa.gov/data/narr'
    time = ['0000', '0300', '0600', '0900', '1200', '1500', '1800', '2100']
 
    if delta is None:
        dates = datetime_range(start_date, end_date, {'days':1})
    else:
        dates = datetime_range(start_date, end_date, delta)

    urls_time_range = []
    for day, time in product(dates, time):
           file_name = f'narr-a_221_{day.strftime("%Y%m%d")}_{time}_000.grb'
           url = URL(base_url, day.strftime('%Y%m'), day.strftime('%Y%m%d'))
           urls_time_range.append(str(URL(url, file_name)))

    with multiprocessing.Pool(max_workers) as p:
        results = p.map(requests_to_s3, urls_time_range, chunksize=1)

        print('Finish download')
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, mode='w', compression=zipfile.ZIP_DEFLATED, compresslevel=1) as zf:
            for content_file_name, content_file_result in results:
                try:
                    zf.writestr(content_file_name,
                                content_file_result)
                except Exception as exc:
                    print(exc)

        print('Finish zipping  - Upload Start')
        with smart_open.s3.open(aws_bucket_name, key, 'wb', session=session) as so:
            so.write(buf.getvalue())

    return None

You can test the function by running:

from datetime import datetime

a = stream_time_range_s3(start_date=datetime(2012, 1, 1),
end_date=datetime(2012, 2, 1),
aws_key=aws_key,
delta=None,
aws_secret=aws_secret,
aws_bucket_name=bucket_name,
key='wind_2012_test_parts.zip',
max_workers=10)

Versions

Darwin-18.7.0-x86_64-i386-64bit
Python 3.7.1 (default, Feb 27 2019, 18:57:54)
[Clang 10.0.0 (clang-1000.10.44.4)]
smart_open 1.8.4
@mpenkov
Copy link
Collaborator

mpenkov commented Nov 9, 2019

Thanks for reporting this.

Could you simplify the example a bit? It's too long and requires external data (wind_2012_test_parts.zip).

From your description of the problem, it sounds like the following should reproduce your problem:

import io
import smart_open
with open('some_large_file.bin', 'rb') as fin:
    with smart_open.open('s3://bucket/key.bin', 'wb') as fout:
        buf = fin.read(10e9) # read 10GiB into memory, oof
        fout.write(buf)

Can you confirm whether the above reproduces your problem? If not, let's look into reducing your original example, it's a bit too much for me to look at.

@davidparks21
Copy link

davidparks21 commented Mar 25, 2021

I can confirm that I'm encountering this error when trying to upload a file over 5GB via fout.write(buf) as you've stated in the simplified example.

This stack overflow article appears to explain the cause: https://stackoverflow.com/questions/26319815/entitytoolarge-error-when-uploading-a-5g-file-to-amazon-s3

@mpenkov
Copy link
Collaborator

mpenkov commented Mar 25, 2021

@davidparks21 Thank you for confirming the problem.

I think we can resolve the issue by ensuring that a single write call never puts more than 5GB. If there is more data, then subsequent write calls should handle it.

Are you able to make a PR?

@davidparks21
Copy link

Oh, so just raise an exception when one fout.write(buff) is called with a buff > 5GB?

That would be an easy solution to deal with for me. I think I could do a PR for that. There was one other small thing I wanted to do a PR for too, so this would probably get me off my butt to do both.

@piskvorky
Copy link
Owner

piskvorky commented Mar 25, 2021

so just raise an exception when one fout.write(buff) is called with a buff > 5GB?

smart_open's promise is to handle large uploads (and downloads) transparently. So instead of raising an exception, isn't it better to split the chunk into multipart pieces, each smaller than 5GB?

IIRC smart_open is already handling multipart uploads transparently under the hood, so this should be no different.

@JamalRahman
Copy link

JamalRahman commented Oct 21, 2021

I have a similar issue with trying to stream/write large files to S3 via pickle.dump(obj, fout, protocol=4) as smart_open is trying to upload parts that are each greater than 5GB.

Is this still a 'needs-info' or is the problem understood?

@mpenkov
Copy link
Collaborator

mpenkov commented Oct 21, 2021

I think we understand the problem, now we "just" need to fix it.

@mpenkov mpenkov added bug and removed need-info labels Oct 21, 2021
@piskvorky
Copy link
Owner

@JamalRahman @pythric @ivanhigueram could you help with a fix, prepare the PR?

@tweglinski
Copy link

@mpenkov is it still an open issue? I found this when considering smart_open to upload 5TB file to S3.
@davidparks21 how did you resolve the problem in your case? Could you share some code snippet?

@davidparks21
Copy link

I started working on a solution, but it wasn't a very trivial change the way it's currently coded. I ran out of time and abandoned the effort back when I posted. I'm not sure about the current status, but my solution was to simply chunk the calls to file.write(n) which was pretty trivial in my use case.

@jakkdl
Copy link
Contributor

jakkdl commented Jan 26, 2024

I've been chugging away at this, and finally hit upon a solution where it wouldn't need to make any extra copies to the buffer of the data - which would be a significant improvement when dealing with the size of files we're talking about, but unfortunately boto/boto3#3423 stopped me from such a perfect solution.

I'll be opening a PR soon with a compromise solution though, but if my PR to botocore is accepted and it's released it'll open up not needing to buffer the data at all before sending (unless smaller than min-size-upload writes are involved).

@leeprevost
Copy link

I think I'm running into a very similar problem:

src = 's3://commoncrawl/projects/hyperlinkgraph/cc-main-2017-feb-mar-apr-hostgraph/vertices.txt.gz'
dst_file = src.rstrip(".gz") + ".bz2"
tp = {'client': s3client, 'min_part_size': 2 * 1024**2}
with open(src, 'r', transport_params=tp) as f:
        with open(dst_file, 'w', transport_params=tp) as g:
            g.write(f.read())

I seem to be running out of memory on a small core machine with plenty of /tmp space. So, thinking I need to buffer the write/read? I though this was handled with the tp?

@leeprevost
Copy link

I seem to be running out of memory on a small core machine with plenty of /tmp space. So, thinking I need to buffer the write/read? I though this was handled with the tp?

Solved this. I was missing the write iterator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants