diff --git a/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/GSFileSystem.java b/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/GSFileSystem.java index afdc4704d3..dd729ed25d 100644 --- a/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/GSFileSystem.java +++ b/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/GSFileSystem.java @@ -18,7 +18,9 @@ package org.apache.fluss.fs.gs; import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.gs.token.GSImpersonatedTokenProvider; import org.apache.fluss.fs.hdfs.HadoopFileSystem; +import org.apache.fluss.fs.token.ObtainedSecurityToken; import org.apache.hadoop.conf.Configuration; @@ -34,6 +36,8 @@ public class GSFileSystem extends HadoopFileSystem { private final String scheme; private final Configuration conf; + private volatile GSImpersonatedTokenProvider tokenProvider; + /** * Creates a GSFileSystem based on the given Hadoop Google Cloud Storage file system. The given * Hadoop file system object is expected to be initialized already. @@ -48,4 +52,16 @@ public GSFileSystem( this.scheme = scheme; this.conf = conf; } + + @Override + public ObtainedSecurityToken obtainSecurityToken() { + if (tokenProvider == null) { + synchronized (this) { + if (tokenProvider == null) { + tokenProvider = new GSImpersonatedTokenProvider(scheme, conf); + } + } + } + return tokenProvider.obtainSecurityToken(); + } } diff --git a/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/GSFileSystemPlugin.java b/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/GSFileSystemPlugin.java index 6116834213..50685be18c 100644 --- a/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/GSFileSystemPlugin.java +++ b/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/GSFileSystemPlugin.java @@ -35,7 +35,7 @@ public class GSFileSystemPlugin implements FileSystemPlugin { private static final String[] FLUSS_CONFIG_PREFIXES = {"gs.", "fs.gs."}; - private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + public static final String HADOOP_CONFIG_PREFIX = "fs.gs."; @Override public String getScheme() { diff --git a/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/token/GSImperonatedAccessTokenProvider.java b/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/token/GSImperonatedAccessTokenProvider.java new file mode 100644 index 0000000000..af074417d3 --- /dev/null +++ b/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/token/GSImperonatedAccessTokenProvider.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.gs.token; + +import org.apache.fluss.exception.FlussRuntimeException; + +import com.google.cloud.hadoop.util.AccessTokenProvider; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** Delegation token provider for GCS Hadoop filesystems. */ +public class GSImperonatedAccessTokenProvider implements AccessTokenProvider { + + public static final String NAME = GSImperonatedAccessTokenProvider.class.getName(); + + public static final String COMPONENT = "Dynamic session credentials for Fluss"; + + private Configuration configuration; + + private static final Logger LOG = + LoggerFactory.getLogger(GSImperonatedAccessTokenProvider.class); + + @Override + public AccessToken getAccessToken() { + AccessTokenProvider.AccessToken accessToken = GSImpersonatedTokenReceiver.getAccessToken(); + + if (accessToken == null) { + throw new FlussRuntimeException( + GSImperonatedAccessTokenProvider.COMPONENT + " not set"); + } + + LOG.debug("Providing session credentials"); + + return accessToken; + } + + @Override + public void refresh() throws IOException { + // Intentionally blank. Credentials are updated by GSImpersonatedTokenReceiver + } + + @Override + public void setConf(Configuration configuration) { + this.configuration = configuration; + } + + @Override + public Configuration getConf() { + return configuration; + } +} diff --git a/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/token/GSImpersonatedTokenProvider.java b/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/token/GSImpersonatedTokenProvider.java new file mode 100644 index 0000000000..8ad790f724 --- /dev/null +++ b/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/token/GSImpersonatedTokenProvider.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.gs.token; + +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.fs.token.CredentialsJsonSerde; +import org.apache.fluss.fs.token.ObtainedSecurityToken; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.ComputeEngineCredentials; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ImpersonatedCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static com.google.cloud.hadoop.util.HadoopCredentialConfiguration.SERVICE_ACCOUNT_JSON_KEYFILE_SUFFIX; +import static org.apache.fluss.fs.gs.GSFileSystemPlugin.HADOOP_CONFIG_PREFIX; + +/** Impersonation token provider for GCS Hadoop filesystems. */ +public class GSImpersonatedTokenProvider { + + private static final Logger LOG = LoggerFactory.getLogger(GSImpersonatedTokenProvider.class); + + private final String scheme; + + private GoogleCredentials googleCredentials; + private String targetPrincipal; + + private static final String AUTH_TYPE_SUFFIX = ".auth.type"; + + public GSImpersonatedTokenProvider(String scheme, Configuration conf) { + this.scheme = scheme; + Pair pair = extractProvider(conf); + googleCredentials = pair.getValue(); + targetPrincipal = pair.getKey(); + } + + public ObtainedSecurityToken obtainSecurityToken() { + LOG.info("Obtaining session credentials token"); + + String scope = "https://www.googleapis.com/auth/cloud-platform"; + + List scopes = new ArrayList<>(); + scopes.add(scope); + + ImpersonatedCredentials impersonatedCredentials = + ImpersonatedCredentials.newBuilder() + .setSourceCredentials(googleCredentials) + .setTargetPrincipal(targetPrincipal) + .setScopes(scopes) + .setLifetime(3600) + .setDelegates(null) + .build(); + + AccessToken accessToken = impersonatedCredentials.getAccessToken(); + LOG.info( + "Session credentials obtained successfully with expiration: {}", + accessToken.getExpirationTime()); + + return new ObtainedSecurityToken( + scheme, + toJson(accessToken), + accessToken.getExpirationTime().getTime(), + new HashMap<>()); + } + + private byte[] toJson(AccessToken accessToken) { + org.apache.fluss.fs.token.Credentials flussCredentials = + new org.apache.fluss.fs.token.Credentials(null, null, accessToken.getTokenValue()); + return CredentialsJsonSerde.toJson(flussCredentials); + } + + private static Pair extractProvider(Configuration conf) { + final String authType = conf.get(HADOOP_CONFIG_PREFIX + AUTH_TYPE_SUFFIX); + if (authType.equals("COMPUTE_ENGINE")) { + ComputeEngineCredentials credentials = getComputeEngineCredentials(); + return Pair.of(credentials.getAccount(), credentials); + } else if (authType.equals("SERVICE_ACCOUNT_JSON_KEYFILE")) { + ServiceAccountCredentials credentials = getServiceAccountCredentials(conf); + return Pair.of(credentials.getAccount(), credentials); + } else if (authType.equals("UNAUTHENTICATED")) { + return null; + } else { + throw new IllegalArgumentException("Unsupported authentication type: " + authType); + } + } + + private static ComputeEngineCredentials getComputeEngineCredentials() { + ComputeEngineCredentials credentials = ComputeEngineCredentials.newBuilder().build(); + credentials.getAccount(); + return credentials; + } + + private static ServiceAccountCredentials getServiceAccountCredentials(Configuration conf) { + List prefixes = new ArrayList<>(); + prefixes.add(HADOOP_CONFIG_PREFIX); + + String keyFile = + SERVICE_ACCOUNT_JSON_KEYFILE_SUFFIX.withPrefixes(prefixes).get(conf, conf::get); + try (FileInputStream fis = new FileInputStream(keyFile)) { + ServiceAccountCredentials accountCredentials = + ServiceAccountCredentials.fromStream(fis); + accountCredentials.getAccount(); + return accountCredentials; + } catch (IOException e) { + throw new FlussRuntimeException("Fail to read service account json file" + e); + } + } +} diff --git a/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/token/GSImpersonatedTokenReceiver.java b/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/token/GSImpersonatedTokenReceiver.java new file mode 100644 index 0000000000..20776acf0b --- /dev/null +++ b/fluss-filesystems/fluss-fs-gs/src/main/java/org/apache/fluss/fs/gs/token/GSImpersonatedTokenReceiver.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.gs.token; + +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.fs.token.Credentials; +import org.apache.fluss.fs.token.CredentialsJsonSerde; +import org.apache.fluss.fs.token.ObtainedSecurityToken; +import org.apache.fluss.fs.token.SecurityTokenReceiver; + +import com.google.cloud.hadoop.util.AccessTokenProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Security token receiver for GCS filesystem. */ +public class GSImpersonatedTokenReceiver implements SecurityTokenReceiver { + + public static final String PROVIDER_CONFIG_NAME = "fs.gs.auth.access.token.provider.impl"; + + private static final Logger LOG = LoggerFactory.getLogger(GSImpersonatedTokenReceiver.class); + + static volatile AccessTokenProvider.AccessToken accessToken; + + public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration hadoopConfig) { + LOG.info("Updating Hadoop configuration"); + + String providers = hadoopConfig.get(PROVIDER_CONFIG_NAME, ""); + + if (!providers.contains(GSImperonatedAccessTokenProvider.NAME)) { + if (providers.isEmpty()) { + LOG.debug("Setting provider"); + providers = GSImperonatedAccessTokenProvider.NAME; + } else { + providers = GSImperonatedAccessTokenProvider.NAME + "," + providers; + LOG.debug("Prepending provider, new providers value: {}", providers); + } + + hadoopConfig.set(PROVIDER_CONFIG_NAME, providers); + } else { + LOG.debug("Provider already exists"); + } + + if (accessToken == null) { + throw new FlussRuntimeException( + GSImperonatedAccessTokenProvider.COMPONENT + " not set"); + } + + LOG.info("Updated Hadoop configuration successfully"); + } + + @Override + public String scheme() { + return "gs"; + } + + @Override + public void onNewTokensObtained(ObtainedSecurityToken token) throws Exception { + LOG.info("Updating session credentials"); + + byte[] tokenBytes = token.getToken(); + + Credentials credentials = CredentialsJsonSerde.fromJson(tokenBytes); + + accessToken = + new AccessTokenProvider.AccessToken( + credentials.getSecurityToken(), token.getValidUntil().get()); + + LOG.info( + "Session credentials updated successfully with access key: {}.", + credentials.getAccessKeyId()); + } + + public static AccessTokenProvider.AccessToken getAccessToken() { + return accessToken; + } +} diff --git a/fluss-filesystems/fluss-fs-hadoop/src/test/java/org/apache/fluss/fs/hdfs/HdfsBehaviorTest.java b/fluss-filesystems/fluss-fs-hadoop/src/test/java/org/apache/fluss/fs/hdfs/HdfsBehaviorTest.java index 8c384b4192..b696c7fa2e 100644 --- a/fluss-filesystems/fluss-fs-hadoop/src/test/java/org/apache/fluss/fs/hdfs/HdfsBehaviorTest.java +++ b/fluss-filesystems/fluss-fs-hadoop/src/test/java/org/apache/fluss/fs/hdfs/HdfsBehaviorTest.java @@ -69,12 +69,12 @@ static void createHDFS(@TempDir File tmp) throws Exception { @AfterAll static void destroyHDFS() throws Exception { - if (hdfsCluster != null) { - hdfsCluster - .getFileSystem() - .delete(new org.apache.hadoop.fs.Path(basePath.toUri()), true); - hdfsCluster.shutdown(); - } + // if (hdfsCluster != null) { + // hdfsCluster + // .getFileSystem() + // .delete(new org.apache.hadoop.fs.Path(basePath.toUri()), true); + // hdfsCluster.shutdown(); + // } } @Test