Skip to content

Commit 1eba229

Browse files
committed
Support using Weka via weka:// so we can use separate environment variables for AWS S3.
1 parent a7b5de5 commit 1eba229

File tree

4 files changed

+75
-0
lines changed

4 files changed

+75
-0
lines changed

rslp/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
"""rslearn_projects: Ai2 projects built on top of rslearn."""
2+
3+
import rslp.utils.fs # noqa: F401 (imported but unused)

rslp/transforms/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""rslp transforms.
2+
3+
These transforms should be ones that are not general enough to include in rslearn, but
4+
still relevant across multiple rslp projects.
5+
6+
If it is project-specific, it should go in rslp/[project_name]/train.py or similar.
7+
"""

rslp/utils/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""rslp utilities.
2+
3+
These utilities should be ones that are not general enough to include in rslearn, but
4+
still relevant across multiple rslp projects.
5+
6+
If it is project-specific, it should go in rslp/[project_name]/util.py or similar.
7+
"""

rslp/utils/fs.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""fsspec-related utilities."""
2+
3+
import os
4+
from typing import Any
5+
6+
import fsspec
7+
import upath.registry
8+
from s3fs import S3FileSystem
9+
from upath._flavour import WrappedFileSystemFlavour
10+
from upath.implementations.cloud import CloudPath
11+
12+
13+
class WekaFileSystem(S3FileSystem):
14+
"""fsspec FileSystem implementation for Weka.
15+
16+
This way we can still provide keys through environment variables, but use different
17+
environment variables for Weka.
18+
"""
19+
20+
protocol = ("weka",)
21+
22+
def __init__(self, **kwargs: dict[str, Any]):
23+
"""Create a new WekaFileSystem.
24+
25+
Args:
26+
kwargs: see S3FileSystem.
27+
"""
28+
super().__init__(
29+
key=os.environ["WEKA_ACCESS_KEY_ID"],
30+
secret=os.environ["WEKA_SECRET_ACCESS_KEY"],
31+
endpoint_url=os.environ["WEKA_ENDPOINT_URL"],
32+
**kwargs,
33+
)
34+
35+
36+
class WekaPath(CloudPath):
37+
"""UPath implementation for Weka."""
38+
39+
__slots__ = ()
40+
41+
def __init__(
42+
self, *args: list[Any], protocol: str | None = None, **storage_options: Any
43+
) -> None:
44+
"""Create a new WekaPath.
45+
46+
Args:
47+
args: see CloudPath.
48+
protocol: the protocol name, should be "weka".
49+
storage_options: filesystem options.
50+
"""
51+
super().__init__(*args, protocol=protocol, **storage_options)
52+
if not self.drive and len(self.parts) > 1:
53+
raise ValueError("non key-like path provided (bucket/container missing)")
54+
55+
56+
fsspec.register_implementation("weka", WekaFileSystem)
57+
upath.registry.register_implementation("weka", WekaPath)
58+
WrappedFileSystemFlavour.protocol_config["netloc_is_anchor"].add("weka")
59+
WrappedFileSystemFlavour.protocol_config["supports_empty_parts"].add("weka")

0 commit comments

Comments
 (0)