Skip to content

Commit 534c72e

Browse files
authored
[python] Fix OSSParam to access DLF (#6332)
1 parent b47ebe8 commit 534c72e

File tree

5 files changed

+80
-18
lines changed

5 files changed

+80
-18
lines changed

paimon-python/pypaimon/catalog/rest/rest_catalog.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
from typing import Any, Callable, Dict, List, Optional, Union
2020
from urllib.parse import urlparse
2121

22+
import pyarrow
23+
from packaging.version import parse
24+
2225
from pypaimon.api.api_response import GetTableResponse, PagedList
2326
from pypaimon.api.options import Options
2427
from pypaimon.api.rest_api import RESTApi
@@ -200,17 +203,17 @@ def to_table_metadata(self, db: str, response: GetTableResponse) -> TableMetadat
200203
uuid=response.get_id()
201204
)
202205

203-
def file_io_from_options(self, table_path: Path) -> FileIO:
204-
return FileIO(str(table_path), self.context.options.data)
206+
def file_io_from_options(self, table_path: str) -> FileIO:
207+
return FileIO(table_path, self.context.options.data)
205208

206-
def file_io_for_data(self, table_path: Path, identifier: Identifier):
209+
def file_io_for_data(self, table_path: str, identifier: Identifier):
207210
return RESTTokenFileIO(identifier, table_path, self.context.options.data) \
208211
if self.data_token_enabled else self.file_io_from_options(table_path)
209212

210213
def load_table(self,
211214
identifier: Identifier,
212-
internal_file_io: Callable[[Path], Any],
213-
external_file_io: Callable[[Path], Any],
215+
internal_file_io: Callable[[str], Any],
216+
external_file_io: Callable[[str], Any],
214217
metadata_loader: Callable[[Identifier], TableMetadata],
215218
) -> FileStoreTable:
216219
metadata = metadata_loader(identifier)
@@ -223,9 +226,12 @@ def load_table(self,
223226
supports_version_management=True # REST catalogs support version management
224227
)
225228
path_parsed = urlparse(schema.options.get(CoreOptions.PATH))
226-
path = Path(path_parsed.path) if path_parsed.scheme is None else Path(schema.options.get(CoreOptions.PATH))
227-
table_path = path_parsed.netloc + "/" + path_parsed.path \
228-
if path_parsed.scheme == "file" else path_parsed.path[1:]
229+
path = path_parsed.path if path_parsed.scheme is None else schema.options.get(CoreOptions.PATH)
230+
if path_parsed.scheme == "file":
231+
table_path = path_parsed.path
232+
else:
233+
table_path = path_parsed.netloc + path_parsed.path \
234+
if parse(pyarrow.__version__) >= parse("7.0.0") else path_parsed.path[1:]
229235
table = self.create(data_file_io(path),
230236
Path(table_path),
231237
schema,

paimon-python/pypaimon/catalog/rest/rest_token_file_io.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,20 @@
3131

3232
class RESTTokenFileIO(FileIO):
3333

34-
def __init__(self, identifier: Identifier, path: Path,
34+
def __init__(self, identifier: Identifier, path: str,
3535
catalog_options: Optional[dict] = None):
3636
self.identifier = identifier
3737
self.path = path
3838
self.token: Optional[RESTToken] = None
3939
self.api_instance: Optional[RESTApi] = None
4040
self.lock = threading.Lock()
4141
self.log = logging.getLogger(__name__)
42-
super().__init__(str(path), catalog_options)
42+
super().__init__(path, catalog_options)
4343

44-
def _initialize_oss_fs(self) -> FileSystem:
44+
def _initialize_oss_fs(self, path) -> FileSystem:
4545
self.try_to_refresh_token()
4646
self.properties.update(self.token.token)
47-
return super()._initialize_oss_fs()
47+
return super()._initialize_oss_fs(path)
4848

4949
def new_output_stream(self, path: Path):
5050
return self.filesystem.open_output_stream(str(path))

paimon-python/pypaimon/common/config.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ class OssOptions:
2121
OSS_SECURITY_TOKEN = "fs.oss.securityToken"
2222
OSS_ENDPOINT = "fs.oss.endpoint"
2323
OSS_REGION = "fs.oss.region"
24-
OSS_BUCKET = "fs.oss.bucket"
2524

2625

2726
class S3Options:

paimon-python/pypaimon/common/file_io.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@
3131

3232

3333
class FileIO:
34-
def __init__(self, warehouse: str, catalog_options: dict):
34+
def __init__(self, path: str, catalog_options: dict):
3535
self.properties = catalog_options
3636
self.logger = logging.getLogger(__name__)
37-
scheme, netloc, path = self.parse_location(warehouse)
37+
scheme, netloc, _ = self.parse_location(path)
3838
if scheme in {"oss"}:
39-
self.filesystem = self._initialize_oss_fs()
39+
self.filesystem = self._initialize_oss_fs(path)
4040
elif scheme in {"s3", "s3a", "s3n"}:
4141
self.filesystem = self._initialize_s3_fs()
4242
elif scheme in {"hdfs", "viewfs"}:
@@ -56,7 +56,29 @@ def parse_location(location: str):
5656
else:
5757
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
5858

59-
def _initialize_oss_fs(self) -> FileSystem:
59+
def _extract_oss_bucket(self, location) -> str:
60+
uri = urlparse(location)
61+
if uri.scheme and uri.scheme != "oss":
62+
raise ValueError("Not an OSS URI: {}".format(location))
63+
64+
netloc = uri.netloc or ""
65+
# parse oss://access_id:secret_key@Endpoint/bucket/path/to/object
66+
if (getattr(uri, "username", None) or getattr(uri, "password", None)) or ("@" in netloc):
67+
first_segment = uri.path.lstrip("/").split("/", 1)[0]
68+
if not first_segment:
69+
raise ValueError("Invalid OSS URI without bucket: {}".format(location))
70+
return first_segment
71+
72+
# parse oss://bucket/... or oss://bucket.endpoint/...
73+
host = getattr(uri, "hostname", None) or netloc
74+
if not host:
75+
raise ValueError("Invalid OSS URI without host: {}".format(location))
76+
bucket = host.split(".", 1)[0]
77+
if not bucket:
78+
raise ValueError("Invalid OSS URI without bucket: {}".format(location))
79+
return bucket
80+
81+
def _initialize_oss_fs(self, path) -> FileSystem:
6082
from pyarrow.fs import S3FileSystem
6183

6284
client_kwargs = {
@@ -71,7 +93,8 @@ def _initialize_oss_fs(self) -> FileSystem:
7193
client_kwargs['force_virtual_addressing'] = True
7294
client_kwargs['endpoint_override'] = self.properties.get(OssOptions.OSS_ENDPOINT)
7395
else:
74-
client_kwargs['endpoint_override'] = (self.properties.get(OssOptions.OSS_BUCKET) + "." +
96+
oss_bucket = self._extract_oss_bucket(path)
97+
client_kwargs['endpoint_override'] = (oss_bucket + "." +
7598
self.properties.get(OssOptions.OSS_ENDPOINT))
7699

77100
return S3FileSystem(**client_kwargs)

paimon-python/pypaimon/tests/py36/ao_simple_test.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@
1515
See the License for the specific language governing permissions and
1616
limitations under the License.
1717
"""
18+
from unittest.mock import patch
19+
1820
import pyarrow as pa
1921

2022
from pypaimon import Schema
2123
from pypaimon.catalog.catalog_exception import TableNotExistException, TableAlreadyExistException, \
2224
DatabaseNotExistException, DatabaseAlreadyExistException
25+
from pypaimon.common.config import OssOptions
26+
from pypaimon.common.file_io import FileIO
2327
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
2428
from pypaimon.tests.rest.rest_base_test import RESTBaseTest
2529

@@ -385,3 +389,33 @@ def test_create_drop_database_table(self):
385389
self.rest_catalog.drop_database("db1", True)
386390
except DatabaseNotExistException:
387391
self.fail("drop_database with ignore_if_exists=True should not raise DatabaseNotExistException")
392+
393+
def test_initialize_oss_fs_pyarrow_lt_7(self):
394+
props = {
395+
OssOptions.OSS_ACCESS_KEY_ID: "AKID",
396+
OssOptions.OSS_ACCESS_KEY_SECRET: "SECRET",
397+
OssOptions.OSS_SECURITY_TOKEN: "TOKEN",
398+
OssOptions.OSS_REGION: "cn-hangzhou",
399+
OssOptions.OSS_ENDPOINT: "oss-cn-hangzhou.aliyuncs.com",
400+
}
401+
402+
with patch("pypaimon.common.file_io.pyarrow.__version__", "6.0.0"), \
403+
patch("pyarrow.fs.S3FileSystem") as mock_s3fs:
404+
FileIO("oss://oss-bucket/paimon-database/paimon-table", props)
405+
mock_s3fs.assert_called_once_with(access_key="AKID",
406+
secret_key="SECRET",
407+
session_token="TOKEN",
408+
region="cn-hangzhou",
409+
endpoint_override="oss-bucket." + props[OssOptions.OSS_ENDPOINT])
410+
FileIO("oss://oss-bucket.endpoint/paimon-database/paimon-table", props)
411+
mock_s3fs.assert_called_with(access_key="AKID",
412+
secret_key="SECRET",
413+
session_token="TOKEN",
414+
region="cn-hangzhou",
415+
endpoint_override="oss-bucket." + props[OssOptions.OSS_ENDPOINT])
416+
FileIO("oss://access_id:secret_key@Endpoint/oss-bucket/paimon-database/paimon-table", props)
417+
mock_s3fs.assert_called_with(access_key="AKID",
418+
secret_key="SECRET",
419+
session_token="TOKEN",
420+
region="cn-hangzhou",
421+
endpoint_override="oss-bucket." + props[OssOptions.OSS_ENDPOINT])

0 commit comments

Comments
 (0)