17
17
import six
18
18
from six .moves .urllib .parse import urlparse
19
19
20
- from petastorm .gcsfs_helpers .gcsfs_wrapper import GCSFSWrapper
21
- from petastorm .hdfs .namenode import HdfsNamenodeResolver , HdfsConnector
22
-
23
20
logger = logging .getLogger (__name__ )
24
21
25
22
@@ -36,168 +33,6 @@ def get_dataset_path(parsed_url):
36
33
return parsed_url .path
37
34
38
35
39
- class FilesystemResolver (object ):
40
- """Resolves a dataset URL, makes a connection via pyarrow, and provides a filesystem object."""
41
-
42
- def __init__ (self , dataset_url , hadoop_configuration = None , connector = HdfsConnector ,
43
- hdfs_driver = 'libhdfs3' , user = None , s3_config_kwargs = None ):
44
- """
45
- Given a dataset URL and an optional hadoop configuration, parse and interpret the URL to
46
- instantiate a pyarrow filesystem.
47
-
48
- Interpretation of the URL ``scheme://hostname:port/path`` occurs in the following order:
49
-
50
- 1. If no ``scheme``, no longer supported, so raise an exception!
51
- 2. If ``scheme`` is ``file``, use local filesystem path.
52
- 3. If ``scheme`` is ``hdfs``:
53
- a. Try the ``hostname`` as a namespace and attempt to connect to a name node.
54
- 1. If that doesn't work, try connecting directly to namenode ``hostname:port``.
55
- b. If no host, connect to the default name node.
56
- 5. If ``scheme`` is ``s3``, use s3fs. The user must manually install s3fs before using s3
57
- 6. If ``scheme`` is ``gs``or ``gcs``, use gcsfs. The user must manually install gcsfs before using GCS
58
- 7. Fail otherwise.
59
-
60
- :param dataset_url: The hdfs URL or absolute path to the dataset
61
- :param hadoop_configuration: an optional hadoop configuration
62
- :param connector: the HDFS connector object to use (ONLY override for testing purposes)
63
- :param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
64
- libhdfs (java through JNI) or libhdfs3 (C++)
65
- :param user: String denoting username when connecting to HDFS. None implies login user.
66
- """
67
- # Cache both the original URL and the resolved, urlparsed dataset_url
68
- self ._dataset_url = dataset_url
69
- self ._parsed_dataset_url = None
70
- # Cache the instantiated filesystem object
71
- self ._filesystem = None
72
-
73
- if isinstance (self ._dataset_url , six .string_types ):
74
- self ._parsed_dataset_url = urlparse (self ._dataset_url )
75
- else :
76
- self ._parsed_dataset_url = self ._dataset_url
77
-
78
- if not self ._parsed_dataset_url .scheme :
79
- # Case 1
80
- raise ValueError ('ERROR! A scheme-less dataset url ({}) is no longer supported. '
81
- 'Please prepend "file://" for local filesystem.' .format (self ._parsed_dataset_url .scheme ))
82
-
83
- elif self ._parsed_dataset_url .scheme == 'file' :
84
- # Case 2: definitely local
85
- self ._filesystem = pyarrow .localfs
86
- self ._filesystem_factory = lambda : pyarrow .localfs
87
-
88
- elif self ._parsed_dataset_url .scheme == 'hdfs' :
89
-
90
- if hdfs_driver == 'libhdfs3' :
91
- # libhdfs3 does not do any namenode resolution itself so we do it manually. This is not necessary
92
- # if using libhdfs
93
-
94
- # Obtain singleton and force hadoop config evaluation
95
- namenode_resolver = HdfsNamenodeResolver (hadoop_configuration )
96
-
97
- # Since we can't tell for sure, first treat the URL as though it references a name service
98
- if self ._parsed_dataset_url .netloc :
99
- # Case 3a: Use the portion of netloc before any port, which doesn't get lowercased
100
- nameservice = self ._parsed_dataset_url .netloc .split (':' )[0 ]
101
- namenodes = namenode_resolver .resolve_hdfs_name_service (nameservice )
102
- if namenodes :
103
- self ._filesystem = connector .connect_to_either_namenode (namenodes , user = user )
104
- self ._filesystem_factory = lambda : connector .connect_to_either_namenode (namenodes , user = user )
105
- if self ._filesystem is None :
106
- # Case 3a1: That didn't work; try the URL as a namenode host
107
- self ._filesystem = connector .hdfs_connect_namenode (self ._parsed_dataset_url , user = user )
108
- self ._filesystem_factory = \
109
- lambda url = self ._dataset_url , user = user : \
110
- connector .hdfs_connect_namenode (urlparse (url ), user = user )
111
- else :
112
- # Case 3b: No netloc, so let's try to connect to default namenode
113
- # HdfsNamenodeResolver will raise exception if it fails to connect.
114
- nameservice , namenodes = namenode_resolver .resolve_default_hdfs_service ()
115
- filesystem = connector .connect_to_either_namenode (namenodes , user = user )
116
- self ._filesystem_factory = lambda : connector .connect_to_either_namenode (namenodes , user = user )
117
- if filesystem is not None :
118
- # Properly replace the parsed dataset URL once default namenode is confirmed
119
- self ._parsed_dataset_url = urlparse (
120
- 'hdfs://{}{}' .format (nameservice , self ._parsed_dataset_url .path ))
121
- self ._filesystem = filesystem
122
- else :
123
- self ._filesystem = connector .hdfs_connect_namenode (self ._parsed_dataset_url , hdfs_driver , user = user )
124
- self ._filesystem_factory = \
125
- lambda url = self ._dataset_url , user = user : \
126
- connector .hdfs_connect_namenode (urlparse (url ), hdfs_driver , user = user )
127
-
128
- elif self ._parsed_dataset_url .scheme in ('s3' , 's3a' , 's3n' ):
129
- # Case 5
130
- # S3 support requires s3fs to be installed
131
- try :
132
- import s3fs
133
- except ImportError :
134
- raise ValueError ('Must have s3fs installed in order to use datasets on s3. '
135
- 'Please install s3fs and try again.' )
136
-
137
- if not self ._parsed_dataset_url .netloc :
138
- raise ValueError ('URLs must be of the form s3://bucket/path' )
139
-
140
- fs = s3fs .S3FileSystem (config_kwargs = s3_config_kwargs )
141
- self ._filesystem = fs
142
- self ._filesystem_factory = lambda : s3fs .S3FileSystem (
143
- config_kwargs = s3_config_kwargs
144
- )
145
-
146
- elif self ._parsed_dataset_url .scheme in ['gs' , 'gcs' ]:
147
- # Case 6
148
- # GCS support requires gcsfs to be installed
149
- try :
150
- import gcsfs
151
- except ImportError :
152
- raise ValueError ('Must have gcsfs installed in order to use datasets on GCS. '
153
- 'Please install gcsfs and try again.' )
154
-
155
- if not self ._parsed_dataset_url .netloc :
156
- raise ValueError ('URLs must be of the form gs://bucket/path or gcs://bucket/path' )
157
-
158
- fs = gcsfs .GCSFileSystem ()
159
- self ._filesystem = GCSFSWrapper (fs )
160
- self ._filesystem_factory = lambda : GCSFSWrapper (gcsfs .GCSFileSystem ())
161
-
162
- else :
163
- # Case 7
164
- raise ValueError ('Unsupported scheme in dataset url {}. '
165
- 'Currently, only "file" and "hdfs" are supported.' .format (self ._parsed_dataset_url .scheme ))
166
-
167
- def parsed_dataset_url (self ):
168
- """
169
- :return: The urlparse'd dataset_url
170
- """
171
- return self ._parsed_dataset_url
172
-
173
- def get_dataset_path (self ):
174
- """
175
- The dataset path is different than the one in `_parsed_dataset_url` for some filesystems.
176
- For example s3fs expects the bucket name to be included in the path and doesn't support
177
- paths that start with a `/`
178
- """
179
- return get_dataset_path (self ._parsed_dataset_url )
180
-
181
- def filesystem (self ):
182
- """
183
- :return: The pyarrow filesystem object
184
- """
185
- return self ._filesystem
186
-
187
- def filesystem_factory (self ):
188
- """
189
- :return: A serializable function that can be used to recreate the filesystem
190
- object; useful for providing access to the filesystem object on distributed
191
- Spark executors.
192
- """
193
- return self ._filesystem_factory
194
-
195
- def __getstate__ (self ):
196
- raise RuntimeError ('Pickling FilesystemResolver is not supported as it may contain some '
197
- 'a file-system instance objects that do not support pickling but do not have '
198
- 'anti-pickling protection' )
199
-
200
-
201
36
def get_filesystem_and_path_or_paths (url_or_urls , hdfs_driver = 'libhdfs3' , s3_config_kwargs = None ):
202
37
"""
203
38
Given a url or url list, return a tuple ``(filesystem, path_or_paths)``
0 commit comments