Skip to content

Commit

Permalink
Merge branch 'dev' into issue-955
Browse files Browse the repository at this point in the history
  • Loading branch information
rasswanth-s authored Feb 6, 2024
2 parents cb79f7c + 2c89ae2 commit b25a01f
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 32 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/cd-syft.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ on:
- TEST_PYPI
- REAL_AND_TEST_PYPI

# Prevents concurrent runs of the same workflow
# while the previous run is still in progress
concurrency:
group: "CD - Syft"
cancel-in-progress: false

jobs:
release-checks:
if: github.repository == 'OpenMined/PySyft' # don't run on forks
Expand Down
3 changes: 2 additions & 1 deletion packages/grid/default.env
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ DOCKER_IMAGE_TRAEFIK=traefik
TRAEFIK_VERSION=v2.10
REDIS_VERSION=6.2
RABBITMQ_VERSION=3
SEAWEEDFS_VERSION=3.59
SEAWEEDFS_VERSION=3.62
DOCKER_IMAGE_SEAWEEDFS=openmined/grid-seaweedfs
VERSION=latest
VERSION_HASH=unknown
Expand Down Expand Up @@ -73,6 +73,7 @@ S3_REGION="us-east-1"
S3_PRESIGNED_TIMEOUT_SECS=1800
S3_VOLUME_SIZE_MB=1024


# Jax
JAX_ENABLE_X64=True

Expand Down
5 changes: 3 additions & 2 deletions packages/grid/seaweedfs/seaweedfs.dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
ARG SEAWEEDFS_VERSION

FROM chrislusf/seaweedfs:${SEAWEEDFS_VERSION}
FROM chrislusf/seaweedfs:${SEAWEEDFS_VERSION}_large_disk

WORKDIR /

RUN apk update && \
apk add --no-cache python3 py3-pip ca-certificates bash

COPY requirements.txt app.py /
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir --break-system-packages -r requirements.txt


COPY --chmod=755 start.sh mount_command.sh /

Expand Down
14 changes: 14 additions & 0 deletions packages/hagrid/hagrid/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,13 @@ def clean(location: str) -> None:
type=str,
help="Set root password for s3 blob storage",
)
@click.option(
"--set-volume-size-limit-mb",
default=1024,
required=False,
type=click.IntRange(1024, 50000),
help="Set the volume size limit (in MBs)",
)
def launch(args: TypeTuple[str], **kwargs: Any) -> None:
verb = get_launch_verb()
try:
Expand Down Expand Up @@ -1258,6 +1265,7 @@ def create_launch_cmd(
if parsed_kwargs["use_blob_storage"]:
parsed_kwargs["set_s3_username"] = kwargs["set_s3_username"]
parsed_kwargs["set_s3_password"] = kwargs["set_s3_password"]
parsed_kwargs["set_volume_size_limit_mb"] = kwargs["set_volume_size_limit_mb"]

parsed_kwargs["node_count"] = (
int(kwargs["node_count"]) if "node_count" in kwargs else 1
Expand Down Expand Up @@ -2262,6 +2270,12 @@ def create_launch_docker_cmd(
if "set_s3_password" in kwargs and kwargs["set_s3_password"] is not None:
envs["S3_ROOT_PWD"] = kwargs["set_s3_password"]

if (
"set_volume_size_limit_mb" in kwargs
and kwargs["set_volume_size_limit_mb"] is not None
):
envs["S3_VOLUME_SIZE_MB"] = kwargs["set_volume_size_limit_mb"]

if "release" in kwargs:
envs["RELEASE"] = kwargs["release"]

Expand Down
3 changes: 2 additions & 1 deletion packages/syft/src/syft/client/domain_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,10 @@ def upload_files(

try:
result = []
for file in tqdm(expanded_file_list):
for file in expanded_file_list:
if not isinstance(file, BlobFile):
file = BlobFile(path=file, file_name=file.name)
print("Uploading", file.file_name)
if not file.uploaded:
file.upload_to_blobstorage(self)
result.append(file)
Expand Down
7 changes: 7 additions & 0 deletions packages/syft/src/syft/protocol/protocol_version.json
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,13 @@
"hash": "1f32d94b75b0a6b4e86cec93d94aa905738219e3e7e75f51dd335ee832a6ed3e",
"action": "remove"
}
},
"SeaweedFSBlobDeposit": {
"2": {
"version": 2,
"hash": "07d84a95324d95d9c868cd7d1c33c908f77aa468671d76c144586aab672bcbb5",
"action": "add"
}
}
}
}
Expand Down
138 changes: 110 additions & 28 deletions packages/syft/src/syft/store/blob_storage/seaweedfs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# stdlib
from io import BytesIO
import math
from queue import Queue
import threading
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Type
Expand All @@ -14,6 +15,7 @@
from botocore.client import ClientError as BotoClientError
from botocore.client import Config
import requests
from tqdm import tqdm
from typing_extensions import Self

# relative
Expand All @@ -33,27 +35,33 @@
from ...types.blob_storage import SeaweedSecureFilePathLocation
from ...types.blob_storage import SecureFilePathLocation
from ...types.grid_url import GridURL
from ...types.syft_migration import migrate
from ...types.syft_object import SYFT_OBJECT_VERSION_1
from ...types.syft_object import SYFT_OBJECT_VERSION_2
from ...types.transforms import drop
from ...types.transforms import make_set_default
from ...util.constants import DEFAULT_TIMEOUT

WRITE_EXPIRATION_TIME = 900 # seconds
DEFAULT_CHUNK_SIZE = 1024**3 # 1 GB
DEFAULT_FILE_PART_SIZE = (1024**3) * 5 # 5GB
DEFAULT_UPLOAD_CHUNK_SIZE = 819200


def _byte_chunks(bytes: BytesIO, size: int) -> Generator[bytes, None, None]:
while True:
try:
yield bytes.read(size)
except BlockingIOError:
return
@serializable()
class SeaweedFSBlobDepositV1(BlobDeposit):
__canonical_name__ = "SeaweedFSBlobDeposit"
__version__ = SYFT_OBJECT_VERSION_1

urls: List[GridURL]


@serializable()
class SeaweedFSBlobDeposit(BlobDeposit):
__canonical_name__ = "SeaweedFSBlobDeposit"
__version__ = SYFT_OBJECT_VERSION_1
__version__ = SYFT_OBJECT_VERSION_2

urls: List[GridURL]
size: int

def write(self, data: BytesIO) -> Union[SyftSuccess, SyftError]:
# relative
Expand All @@ -68,24 +76,83 @@ def write(self, data: BytesIO) -> Union[SyftSuccess, SyftError]:

try:
no_lines = 0
for part_no, (byte_chunk, url) in enumerate(
zip(_byte_chunks(data, DEFAULT_CHUNK_SIZE), self.urls),
start=1,
):
no_lines += byte_chunk.count(b"\n")
if api is not None:
blob_url = api.connection.to_blob_route(
url.url_path, host=url.host_or_ip
# this loops over the parts, we have multiple parts to allow for
# concurrent uploads of a single file. (We are currently not using that)
# a part may for instance be 5GB
# parts are then splitted into chunks which are MBs (order of magnitude)
part_size = math.ceil(self.size / len(self.urls))
chunk_size = DEFAULT_UPLOAD_CHUNK_SIZE

# this is the total nr of chunks in all parts
total_iterations = math.ceil(part_size / chunk_size) * len(self.urls)

with tqdm(
total=total_iterations,
desc=f"Uploading progress", # noqa
) as pbar:
for part_no, url in enumerate(
self.urls,
start=1,
):
if api is not None:
blob_url = api.connection.to_blob_route(
url.url_path, host=url.host_or_ip
)
else:
blob_url = url

# read a chunk untill we have read part_size
class PartGenerator:
def __init__(self):
self.no_lines = 0

def async_generator(self, chunk_size=DEFAULT_UPLOAD_CHUNK_SIZE):
item_queue: Queue = Queue()
threading.Thread(
target=self.add_chunks_to_queue,
kwargs={"queue": item_queue, "chunk_size": chunk_size},
daemon=True,
).start()
item = item_queue.get()
while item != 0:
yield item
pbar.update(1)
item = item_queue.get()

def add_chunks_to_queue(
self, queue, chunk_size=DEFAULT_UPLOAD_CHUNK_SIZE
):
"""Creates a data geneator for the part"""
n = 0

while n * chunk_size <= part_size:
try:
chunk = data.read(chunk_size)
self.no_lines += chunk.count(b"\n")
n += 1
queue.put(chunk)
except BlockingIOError:
# if end of file, stop
queue.put(0)
# if end of part, stop
queue.put(0)

gen = PartGenerator()

response = requests.put(
url=str(blob_url),
data=gen.async_generator(chunk_size),
timeout=DEFAULT_TIMEOUT,
stream=True,
)
else:
blob_url = url
response = requests.put(
url=str(blob_url), data=byte_chunk, timeout=DEFAULT_TIMEOUT
)
response.raise_for_status()
etag = response.headers["ETag"]
etags.append({"ETag": etag, "PartNumber": part_no})

response.raise_for_status()
no_lines += gen.no_lines
etag = response.headers["ETag"]
etags.append({"ETag": etag, "PartNumber": part_no})

except requests.RequestException as e:
print(e)
return SyftError(message=str(e))

mark_write_complete_method = from_api_or_context(
Expand All @@ -98,6 +165,20 @@ def write(self, data: BytesIO) -> Union[SyftSuccess, SyftError]:
)


@migrate(SeaweedFSBlobDeposit, SeaweedFSBlobDepositV1)
def downgrade_seaweedblobdeposit_v2_to_v1():
return [
drop(["size"]),
]


@migrate(SeaweedFSBlobDepositV1, SeaweedFSBlobDeposit)
def upgrade_seaweedblobdeposit_v1_to_v2():
return [
make_set_default("size", 1),
]


@serializable()
class SeaweedFSClientConfig(BlobStorageClientConfig):
host: str
Expand Down Expand Up @@ -188,7 +269,7 @@ def allocate(
)

def write(self, obj: BlobStorageEntry) -> BlobDeposit:
total_parts = math.ceil(obj.file_size / DEFAULT_CHUNK_SIZE)
total_parts = math.ceil(obj.file_size / DEFAULT_FILE_PART_SIZE)

urls = [
GridURL.from_url(
Expand All @@ -205,8 +286,9 @@ def write(self, obj: BlobStorageEntry) -> BlobDeposit:
)
for i in range(total_parts)
]

return SeaweedFSBlobDeposit(blob_storage_entry_id=obj.id, urls=urls)
return SeaweedFSBlobDeposit(
blob_storage_entry_id=obj.id, urls=urls, size=obj.file_size
)

def complete_multipart_upload(
self,
Expand Down

0 comments on commit b25a01f

Please sign in to comment.