Skip to content

Commit

Permalink
[IMP] fs_storage: add the retry mechanisim to the _file_open method a…
Browse files Browse the repository at this point in the history
…s well
  • Loading branch information
AaronHForgeFlow committed Feb 10, 2025
1 parent d031e8f commit 619e271
Showing 1 changed file with 98 additions and 65 deletions.
163 changes: 98 additions & 65 deletions fs_attachment/models/ir_attachment.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,15 @@ def __enter__(self) -> io.IOBase:
self._file_open()
return self._file

def _file_open(self) -> io.IOBase:
def _get_retry_params(self):
"""Retrieve retry parameters from the filestore configuration."""
read_retry_attempts = getattr(
self.attachment.fs_storage_id, "read_retry_attempts", 3
)
read_retry_delay = getattr(self.attachment.fs_storage_id, "read_retry_delay", 1)
return read_retry_attempts, read_retry_delay

def _file_open(self):
"""Open the attachment content as a file-like object
This method will initialize the following attributes:
Expand All @@ -1023,70 +1031,95 @@ def _file_open(self) -> io.IOBase:
* _new_store_fname: the new store_fname if the file is
opened for a new version.
"""
new_store_fname = None
if (
self._is_open_for_read
or (self._is_open_for_modify and not self.new_version)
or self._is_stored_in_db
):
if self.attachment._is_file_from_a_storage(self.attachment.store_fname):
fs, _storage, fname = self.attachment._get_fs_parts()
filepath = fname
filesystem = fs
elif self.attachment.store_fname:
filepath = self.attachment._full_path(self.attachment.store_fname)
filesystem = fsspec.filesystem("file")
else:
filepath = f"{self.attachment.id}"
filesystem = fsspec.filesystem("memory")
if "a" in self.mode or self._is_open_for_read:
filesystem.pipe_file(filepath, self.attachment.db_datas)
the_file = filesystem.open(
filepath,
mode=self.mode,
block_size=self.block_size,
cache_options=self.cache_options,
compression=self.compression,
**self.kwargs,
)
else:
# mode='w' and new_version=True and storage != 'db'
# We must create a new file with a new name. If we are in an
# append mode, we must copy the content of the old file (or create
# the new one by copy of the old one).
# to not break the storage plugin mechanism, we'll use the
# _file_write method to create the new empty file with a random
# content and checksum to avoid collision.
content = self._gen_random_content()
checksum = self.attachment._compute_checksum(content)
new_store_fname = self.attachment.with_context(
attachment_res_model=self.attachment.res_model,
attachment_res_field=self.attachment.res_field,
)._file_write(content, checksum)
if self.attachment._is_file_from_a_storage(new_store_fname):
(
filesystem,
_storage,
new_filepath,
) = self.attachment._fs_parse_store_fname(new_store_fname)
_fs, _storage, old_filepath = self.attachment._get_fs_parts()
else:
new_filepath = self.attachment._full_path(new_store_fname)
old_filepath = self.attachment._full_path(self.attachment.store_fname)
filesystem = fsspec.filesystem("file")
if "a" in self.mode:
filesystem.cp_file(old_filepath, new_filepath)
the_file = filesystem.open(
new_filepath,
mode=self.mode,
block_size=self.block_size,
cache_options=self.cache_options,
compression=self.compression,
**self.kwargs,
)
self._filesystem = filesystem
self._new_store_fname = new_store_fname
self._file = the_file

max_attempts, delay = self._get_retry_params()

@retry(stop=stop_after_attempt(max_attempts), wait=wait_fixed(delay))
def _open_file():
if self._file:
return # Avoid re-opening if already set

try:
new_store_fname = None
if (
self._is_open_for_read
or (self._is_open_for_modify and not self.new_version)
or self._is_stored_in_db
):
if self.attachment._is_file_from_a_storage(
self.attachment.store_fname
):
fs, _storage, fname = self.attachment._get_fs_parts()
filepath = fname
filesystem = fs
elif self.attachment.store_fname:
filepath = self.attachment._full_path(
self.attachment.store_fname
)
filesystem = fsspec.filesystem("file")
else:
filepath = f"{self.attachment.id}"
filesystem = fsspec.filesystem("memory")
if "a" in self.mode or self._is_open_for_read:
filesystem.pipe_file(filepath, self.attachment.db_datas)
the_file = filesystem.open(
filepath,
mode=self.mode,
block_size=self.block_size,
cache_options=self.cache_options,
compression=self.compression,
**self.kwargs,
)
else:
# mode='w' and new_version=True and storage != 'db'
# We must create a new file with a new name. If we are in an
# append mode, we must copy the content of the old file (or create
# the new one by copy of the old one).
# to not break the storage plugin mechanism, we'll use the
# _file_write method to create the new empty file with a random
# content and checksum to avoid collision.
content = self._gen_random_content()
checksum = self.attachment._compute_checksum(content)
new_store_fname = self.attachment.with_context(
attachment_res_model=self.attachment.res_model,
attachment_res_field=self.attachment.res_field,
)._file_write(content, checksum)

if self.attachment._is_file_from_a_storage(new_store_fname):
(
filesystem,
_storage,
new_filepath,
) = self.attachment._fs_parse_store_fname(new_store_fname)
_fs, _storage, old_filepath = self.attachment._get_fs_parts()
else:
new_filepath = self.attachment._full_path(new_store_fname)
old_filepath = self.attachment._full_path(
self.attachment.store_fname
)
filesystem = fsspec.filesystem("file")

if "a" in self.mode:
filesystem.cp_file(old_filepath, new_filepath)

the_file = filesystem.open(
new_filepath,
mode=self.mode,
block_size=self.block_size,
cache_options=self.cache_options,
compression=self.compression,
**self.kwargs,
)

self._filesystem = filesystem
self._new_store_fname = new_store_fname
self._file = the_file

except Exception as e:
_logger.error(f"File open failed, retrying... Error: {e}")
raise

_open_file()

def _gen_random_content(self, size=256):
"""Generate a random content of size bytes"""
Expand Down

0 comments on commit 619e271

Please sign in to comment.