From 992636449d2e2568438049bc81e2f18809cef962 Mon Sep 17 00:00:00 2001 From: Yoni Date: Tue, 10 Oct 2023 10:38:10 +0300 Subject: [PATCH] Metaclient: Drop Hadoop 2 support (#6740) --- .github/workflows/esti.yaml | 36 ++--- Makefile | 2 +- clients/spark/README.md | 24 +--- clients/spark/build.sbt | 133 +++++------------- .../clients/conditional/S3ClientBuilder.scala | 61 -------- .../clients/conditional/S3ClientBuilder.scala | 69 --------- .../treeverse/clients/S3ClientBuilder.scala | 76 +++++++++- .../io/treeverse/clients/StorageClients.scala | 2 +- .../io/treeverse/clients/DumpSSTable.scala | 2 +- clients/spark/project/types.scala | 9 -- design/accepted/export-functionality.md | 2 +- docs/howto/garbage-collection/gc.md | 15 -- docs/reference/spark-client.md | 34 ++--- 13 files changed, 129 insertions(+), 336 deletions(-) delete mode 100644 clients/spark/core/src/main/hadoop2/scala/io/treeverse/clients/conditional/S3ClientBuilder.scala delete mode 100644 clients/spark/core/src/main/hadoop3/scala/io/treeverse/clients/conditional/S3ClientBuilder.scala delete mode 100644 clients/spark/project/types.scala diff --git a/.github/workflows/esti.yaml b/.github/workflows/esti.yaml index a265758d613..f200647c3c3 100644 --- a/.github/workflows/esti.yaml +++ b/.github/workflows/esti.yaml @@ -622,13 +622,6 @@ jobs: name: Build metadata client for Spark 3.x runs-on: ubuntu-latest-8-cores needs: check-secrets - strategy: - matrix: - spark: - - project-variable: core3 - project-suffix: "-301" - - project-variable: core - project-suffix: "" env: TAG: ${{ needs.deploy-image.outputs.tag }} REPO: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.us-east-1.amazonaws.com @@ -640,7 +633,7 @@ jobs: id: restore-cache with: path: ${{ github.workspace }}/test/spark/metaclient - key: metadata-client-${{ matrix.spark.project-variable }}-${{ hashFiles('./clients/spark/**') }} + key: metadata-client-core-${{ hashFiles('./clients/spark/**') }} - uses: actions/setup-java@v3 if: steps.restore-cache.outputs.cache-hit != 'true' @@ -653,7 +646,7 @@ jobs: if: steps.restore-cache.outputs.cache-hit != 'true' working-directory: clients/spark run: | - sbt 'set ${{ matrix.spark.project-variable }} / assembly / test := {}' lakefs-spark-client${{ matrix.spark.project-suffix }}/assembly + sbt 'set assembly / test := {}' assembly - name: Prepare Metaclient location for export if: steps.restore-cache.outputs.cache-hit != 'true' @@ -663,23 +656,14 @@ jobs: working-directory: clients/spark run: | mkdir -p ${{ github.workspace }}/test/spark/metaclient - cp target/core${{ matrix.spark.project-suffix }}/scala-2.12/lakefs-spark-client${{ matrix.spark.project-suffix }}-assembly*.jar ${{ github.workspace }}/test/spark/metaclient/spark-assembly-${{ matrix.spark.project-variable }}.jar + cp target/core/scala-2.12/lakefs-spark-client-assembly*.jar ${{ github.workspace }}/test/spark/metaclient/spark-assembly-core.jar metadata-client-export-spark3: name: Test lakeFS metadata client export with Spark 3.x needs: [deploy-image, build-spark3-metadata-client] runs-on: ubuntu-20.04 - strategy: - matrix: - spark: - - version: 3.2.1 - project: "core" - - version: 3.1.2 - project: "core3" - - version: 3.0.2 - project: "core3" env: - SPARK_TAG: ${{ matrix.spark.version }} + SPARK_TAG: 3.2.1 REPO: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.us-east-1.amazonaws.com TAG: ${{ needs.deploy-image.outputs.tag }} steps: @@ -690,7 +674,7 @@ jobs: id: restore-cache with: path: ${{ github.workspace }}/test/spark/metaclient - key: metadata-client-${{ matrix.spark.project }}-${{ hashFiles('./clients/spark/**') }} + key: metadata-client-core-${{ hashFiles('./clients/spark/**') }} - name: Generate uniquifying value id: unique @@ -709,21 +693,21 @@ jobs: LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.ESTI_AWS_SECRET_ACCESS_KEY }} - name: Copy repository ref - run: aws s3 cp --recursive s3://esti-system-testing-data/golden-files/gc-test-data s3://esti-system-testing/${{ github.run_number }}-spark${{ matrix.spark.version }}-metaclient/exporter/${{ steps.unique.outputs.value }} + run: aws s3 cp --recursive s3://esti-system-testing-data/golden-files/gc-test-data s3://esti-system-testing/${{ github.run_number }}-spark3.2.1-metaclient/exporter/${{ steps.unique.outputs.value }} - name: Setup Exporter tests env: - STORAGE_NAMESPACE: s3://esti-system-testing/${{ github.run_number }}-spark${{ matrix.spark.version }}-metaclient/exporter/${{ steps.unique.outputs.value }} + STORAGE_NAMESPACE: s3://esti-system-testing/${{ github.run_number }}-spark3.2.1-metaclient/exporter/${{ steps.unique.outputs.value }} REPOSITORY: test-data-exporter working-directory: test/spark run: ./setup-exporter-test.sh - name: Test Exporter with Spark 3.x env: - STORAGE_NAMESPACE: s3://esti-system-testing/${{ github.run_number }}-spark${{ matrix.spark.version }}-metaclient/exporter/${{ steps.unique.outputs.value }} + STORAGE_NAMESPACE: s3://esti-system-testing/${{ github.run_number }}-spark3.2.1-metaclient/exporter/${{ steps.unique.outputs.value }} REPOSITORY: test-data-exporter - CLIENT_JAR: ${{ github.workspace }}/test/spark/metaclient/spark-assembly-${{ matrix.spark.project }}.jar - EXPORT_LOCATION: s3://esti-system-testing/${{ github.run_number }}-spark${{ matrix.spark.version }}-client-export/${{ steps.unique.outputs.value }} + CLIENT_JAR: ${{ github.workspace }}/test/spark/metaclient/spark-assembly-core.jar + EXPORT_LOCATION: s3://esti-system-testing/${{ github.run_number }}-spark3.2.1-client-export/${{ steps.unique.outputs.value }} working-directory: test/spark run: ./run-exporter-test.sh diff --git a/Makefile b/Makefile index 931a2804187..dc6e8642b3e 100644 --- a/Makefile +++ b/Makefile @@ -312,7 +312,7 @@ proto: tools ## Build proto (Protocol Buffers) files $(PROTOC) --proto_path=pkg/kv/kvtest --go_out=pkg/kv/kvtest --go_opt=paths=source_relative test_model.proto publish-scala: ## sbt publish spark client jars to nexus and s3 bucket - cd clients/spark && sbt assembly && sbt s3Upload && sbt publishSigned + cd clients/spark && sbt assembly && sbt s3Upload && sbt "project root" publishSigned aws s3 cp --recursive --acl public-read $(CLIENT_JARS_BUCKET) $(CLIENT_JARS_BUCKET) --metadata-directive REPLACE publish-lakefsfs-test: ## sbt publish spark lakefsfs test jars to s3 bucket diff --git a/clients/spark/README.md b/clients/spark/README.md index cfee4fc41d1..503ac6b9940 100644 --- a/clients/spark/README.md +++ b/clients/spark/README.md @@ -13,35 +13,17 @@ _Please note that starting version 0.9.0, Spark 2 is not supported with the lake ### Uber-jar The Uber-Jar can be found on a public S3 location: - -It should be used when running into conflicting dependencies on environments like EMR, Databricks, etc. - -For Spark for Hadoop 3: http://treeverse-clients-us-east.s3-website-us-east-1.amazonaws.com/lakefs-spark-client/${CLIENT_VERSION}/lakefs-spark-client-assembly-${CLIENT_VERSION}.jar -For Spark for Hadoop 2 (deprecated): -http://treeverse-clients-us-east.s3-website-us-east-1.amazonaws.com/lakefs-spark-client-301/${CLIENT_VERSION}/lakefs-spark-client-301-assembly-${CLIENT_VERSION}.jar - - ### Maven -Otherwise, the client can be included using Maven coordinates: - -For Spark for Hadoop 3: -``` -io.lakefs:lakefs-spark-client_2.12: -``` -[See available versions](https://mvnrepository.com/artifact/io.lakefs/lakefs-spark-client_2.12). - -For Spark for Hadoop 2 (deprecated): ``` -io.lakefs:lakefs-spark-client-301_2.12: +io.lakefs:lakefs-spark-client_2.12:${CLIENT_VERSION} ``` -[See available versions](https://mvnrepository.com/artifact/io.lakefs/lakefs-spark-client-301_2.12). ## Usage Examples ### Export using spark-submit -Replace `` below with the latest version available. See available versions for [Spark for Hadoop 3](https://mvnrepository.com/artifact/io.lakefs/lakefs-spark-client_2.12) or [Spark for Hadoop 2](https://mvnrepository.com/artifact/io.lakefs/lakefs-spark-client-301_2.12) (deprecated). +Replace `` below with the latest version available. See [available versions](https://mvnrepository.com/artifact/io.lakefs/lakefs-spark-client_2.12). ``` CLIENT_VERSION=0.10.0 @@ -58,7 +40,7 @@ spark-submit --conf spark.hadoop.lakefs.api.url=https://lakefs.example.com/api/v ### Export using spark-submit (uber-jar) -Replace `` below with the latest version available. See available versions for [Spark for Hadoop 3](https://mvnrepository.com/artifact/io.lakefs/lakefs-spark-client_2.12) or [Spark for Hadoop 2](https://mvnrepository.com/artifact/io.lakefs/lakefs-spark-client-301_2.12) (deprecated). +Replace `` below with the latest version available. See [available versions](https://mvnrepository.com/artifact/io.lakefs/lakefs-spark-client_2.12). ``` CLIENT_VERSION=0.10.0 diff --git a/clients/spark/build.sbt b/clients/spark/build.sbt index 0f91f2113d9..16e20d0ef64 100644 --- a/clients/spark/build.sbt +++ b/clients/spark/build.sbt @@ -1,102 +1,54 @@ -import build.BuildType - -lazy val baseName = "lakefs-spark" lazy val projectVersion = "0.10.0" lazy val hadoopVersion = "3.2.1" ThisBuild / isSnapshot := false ThisBuild / scalaVersion := "2.12.12" -def settingsToCompileIn(dir: String, flavour: String = "") = { - lazy val allSettings = Seq( +def settingsToCompileIn(dir: String) = { + Seq( Compile / scalaSource := (ThisBuild / baseDirectory).value / dir / "src" / "main" / "scala", Test / scalaSource := (ThisBuild / baseDirectory).value / dir / "src" / "test" / "scala", Compile / resourceDirectory := (ThisBuild / baseDirectory).value / dir / "src" / "main" / "resources", Compile / PB.includePaths += (Compile / resourceDirectory).value, Compile / PB.protoSources += (Compile / resourceDirectory).value ) - lazy val flavourSettings = - if (flavour != "") - Seq( - Compile / unmanagedSourceDirectories += (ThisBuild / baseDirectory).value / dir / "src" / "main" / flavour / "scala" - ) - else - Seq() - allSettings ++ flavourSettings } -def generateCoreProject(buildType: BuildType) = { - Project(s"${baseName}-client${buildType.suffix}", file("core")) - .settings( - sharedSettings, - if (buildType.hadoopFlavour == "hadoop2") hadoop2ShadingSettings - else hadoop3ShadingSettings, - s3UploadSettings, - settingsToCompileIn("core", buildType.hadoopFlavour), - semanticdbEnabled := true, // enable SemanticDB - semanticdbVersion := scalafixSemanticdb.revision, - scalacOptions += "-Ywarn-unused-import", - Compile / PB.targets := Seq( - scalapb.gen() -> (Compile / sourceManaged).value / "scalapb" - ), - libraryDependencies ++= getSharedLibraryDependencies(buildType) - ++ getLibraryDependenciesByHadoopFlavour(buildType.hadoopFlavour), - testFrameworks += new TestFramework("org.scalameter.ScalaMeterFramework"), - Test / logBuffered := false, - // Uncomment to get accurate benchmarks with just "sbt test". - // Otherwise tell sbt to - // "testOnly io.treeverse.clients.ReadSSTableBenchmark" - // (or similar). - // - // Test / parallelExecution := false, - - // Uncomment to get (very) full stacktraces in test: - // Test / testOptions += Tests.Argument("-oF"), - target := file(s"target/core${buildType.suffix}/"), - buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, sbtVersion), - buildInfoPackage := "io.treeverse.clients" - ) - .enablePlugins(S3Plugin, BuildInfoPlugin) -} -def generateExamplesProject(buildType: BuildType) = - Project(s"${baseName}-examples${buildType.suffix}", file(s"examples")) - .settings( - sharedSettings, - settingsToCompileIn("examples", buildType.hadoopFlavour), - semanticdbEnabled := true, // enable SemanticDB - semanticdbVersion := scalafixSemanticdb.revision, - scalacOptions += "-Ywarn-unused-import", - libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-sql" % buildType.sparkVersion % "provided", - "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.194" - ), - assembly / mainClass := Some("io.treeverse.examples.List"), - target := file(s"target/examples${buildType.suffix}/"), - run / fork := false // https://stackoverflow.com/questions/44298847/sbt-spark-fork-in-run - ) - -lazy val spark3Type = - new BuildType("-301", "3.0.1", "0.10.11", "hadoop2", "hadoop2-2.0.1") - -// EMR-6.5.0 beta, managed GC -lazy val spark312Type = - new BuildType("-312-hadoop3", "3.1.2", "0.10.11", "hadoop3", "hadoop3-2.0.1") - -lazy val coreType = - new BuildType("", "3.1.2", "0.10.11", "hadoop3", "hadoop3-2.0.1") -lazy val core = generateCoreProject(coreType) -lazy val core3 = generateCoreProject(spark3Type) -lazy val core312 = generateCoreProject(spark312Type) -lazy val examples3 = generateExamplesProject(spark3Type).dependsOn(core3) -lazy val examples312 = generateExamplesProject(spark312Type).dependsOn(core312) - -lazy val root = - (project in file(".")).aggregate(core, core3, core312, examples3, examples312) +lazy val root = (project in file("core")) + .settings( + name := "lakefs-spark-client", + sharedSettings, + hadoop3ShadingSettings, + s3UploadSettings, + settingsToCompileIn("core"), + semanticdbEnabled := true, // enable SemanticDB + semanticdbVersion := scalafixSemanticdb.revision, + scalacOptions += "-Ywarn-unused-import", + Compile / PB.targets := Seq( + scalapb.gen() -> (Compile / sourceManaged).value / "scalapb" + ), + libraryDependencies ++= getSharedLibraryDependencies(), + testFrameworks += new TestFramework("org.scalameter.ScalaMeterFramework"), + Test / logBuffered := false, + // Uncomment to get accurate benchmarks with just "sbt test". + // Otherwise tell sbt to + // "testOnly io.treeverse.clients.ReadSSTableBenchmark" + // (or similar). + // + // Test / parallelExecution := false, + + // Uncomment to get (very) full stacktraces in test: + // Test / testOptions += Tests.Argument("-oF"), + buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, sbtVersion), + buildInfoPackage := "io.treeverse.clients", + target := file(s"target/core/") + ) + .enablePlugins(S3Plugin, BuildInfoPlugin) -def getSharedLibraryDependencies(buildType: BuildType): Seq[ModuleID] = { +def getSharedLibraryDependencies(): Seq[ModuleID] = { Seq( "io.lakefs" % "api-client" % "0.91.0", - "org.apache.spark" %% "spark-sql" % buildType.sparkVersion % "provided", + "org.apache.spark" %% "spark-sql" % "3.1.2" % "provided", "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf", "org.apache.hadoop" % "hadoop-aws" % hadoopVersion % "provided", "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided", @@ -107,6 +59,7 @@ def getSharedLibraryDependencies(buildType: BuildType): Seq[ModuleID] = { "com.azure" % "azure-storage-blob" % "12.9.0", "com.azure" % "azure-storage-blob-batch" % "12.7.0", "com.azure" % "azure-identity" % "1.2.0", + "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.194" % "provided", // Snappy is JNI :-(. However it does claim to work with // ClassLoaders, and (even more importantly!) using a preloaded JNI // version will probably continue to work because the C language API @@ -129,17 +82,6 @@ def getSharedLibraryDependencies(buildType: BuildType): Seq[ModuleID] = { ) } -def getLibraryDependenciesByHadoopFlavour(hadoopFlavour: String): Seq[ModuleID] = { - if (hadoopFlavour == "hadoop2") { - // hadoop-aws provides AWS SDK at version >= 1.7.4. So declare this - // version, but ask to use whatever is provided so we do not - // override what it selects. - Seq("com.amazonaws" % "aws-java-sdk-bundle" % "1.12.194") - } else { - Seq("com.amazonaws" % "aws-java-sdk-bundle" % "1.12.194" % "provided") - } -} - def rename(prefix: String) = ShadeRule.rename(prefix -> "io.lakefs.spark.shade.@0") // We are using the default sbt assembly merge strategy https://github.com/sbt/sbt-assembly#merge-strategy with a change @@ -178,10 +120,7 @@ lazy val sharedShadeRules = Seq( rename("reactor.util.**").inAll ) -lazy val hadoop2ShadeRules = sharedShadeRules ++ Seq(rename("com.amazonaws.**").inAll) lazy val hadoop3ShadeRules = sharedShadeRules - -lazy val hadoop2ShadingSettings = assembly / assemblyShadeRules := hadoop2ShadeRules lazy val hadoop3ShadingSettings = assembly / assemblyShadeRules := hadoop3ShadeRules // Upload assembly jars to S3 @@ -194,8 +133,6 @@ lazy val s3UploadSettings = Seq( s3Upload / s3Progress := true ) -// Don't publish root project -root / publish / skip := true lazy val commonSettings = Seq( version := projectVersion, diff --git a/clients/spark/core/src/main/hadoop2/scala/io/treeverse/clients/conditional/S3ClientBuilder.scala b/clients/spark/core/src/main/hadoop2/scala/io/treeverse/clients/conditional/S3ClientBuilder.scala deleted file mode 100644 index 7820f751317..00000000000 --- a/clients/spark/core/src/main/hadoop2/scala/io/treeverse/clients/conditional/S3ClientBuilder.scala +++ /dev/null @@ -1,61 +0,0 @@ -package io.treeverse.clients.conditional - -import com.amazonaws.ClientConfiguration -import com.amazonaws.retry.{PredefinedBackoffStrategies, RetryPolicy} -import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} -import io.treeverse.clients.S3RetryDeleteObjectsCondition -import io.treeverse.clients.StorageUtils.S3.createAndValidateS3Client -import org.apache.hadoop.conf.Configuration -import org.slf4j.{Logger, LoggerFactory} - -object S3ClientBuilder extends io.treeverse.clients.S3ClientBuilder { - val logger: Logger = LoggerFactory.getLogger(getClass.toString + "[hadoop2]") - - def build(hc: Configuration, bucket: String, region: String, numRetries: Int): AmazonS3 = { - import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials} - import io.treeverse.clients.LakeFSContext - import org.apache.hadoop.fs.s3a.Constants - - val minBackoffMsecs = hc.getInt(LakeFSContext.LAKEFS_CONF_GC_S3_MIN_BACKOFF_SECONDS, - LakeFSContext.DEFAULT_LAKEFS_CONF_GC_S3_MIN_BACKOFF_SECONDS - ) * 1000 - val maxBackoffMsecs = hc.getInt(LakeFSContext.LAKEFS_CONF_GC_S3_MAX_BACKOFF_SECONDS, - LakeFSContext.DEFAULT_LAKEFS_CONF_GC_S3_MAX_BACKOFF_SECONDS - ) * 1000 - - val backoffStrategy = - new PredefinedBackoffStrategies.FullJitterBackoffStrategy(minBackoffMsecs, maxBackoffMsecs) - val retryPolicy = - new RetryPolicy(new S3RetryDeleteObjectsCondition(), backoffStrategy, numRetries, true) - val configuration = new ClientConfiguration() - .withRetryPolicy(retryPolicy) - .withThrottledRetries(true) - val s3Endpoint = hc.get(Constants.ENDPOINT, null) - - val credentialsProvider = - if (hc.get(Constants.ACCESS_KEY) != null) { - logger.info( - "Use access key ID {} {}", - hc.get(Constants.ACCESS_KEY): Any, - if (hc.get(Constants.SECRET_KEY) == null) "(missing secret key)" else "secret key ******" - ) - Some( - new AWSStaticCredentialsProvider( - new BasicAWSCredentials(hc.get(Constants.ACCESS_KEY), hc.get(Constants.SECRET_KEY)) - ) - ) - } else None - - val builder = AmazonS3ClientBuilder - .standard() - .withPathStyleAccessEnabled(hc.getBoolean(S3A_PATH_STYLE_ACCESS, true)) - - createAndValidateS3Client(configuration, - credentialsProvider, - builder, - s3Endpoint, - region, - bucket - ) - } -} diff --git a/clients/spark/core/src/main/hadoop3/scala/io/treeverse/clients/conditional/S3ClientBuilder.scala b/clients/spark/core/src/main/hadoop3/scala/io/treeverse/clients/conditional/S3ClientBuilder.scala deleted file mode 100644 index 84c6096af23..00000000000 --- a/clients/spark/core/src/main/hadoop3/scala/io/treeverse/clients/conditional/S3ClientBuilder.scala +++ /dev/null @@ -1,69 +0,0 @@ -package io.treeverse.clients.conditional - -import com.amazonaws.ClientConfiguration -import com.amazonaws.retry.{PredefinedBackoffStrategies, RetryPolicy} -import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} -import io.treeverse.clients.S3RetryDeleteObjectsCondition -import io.treeverse.clients.StorageUtils.S3.createAndValidateS3Client -import org.apache.hadoop.conf.Configuration -import org.slf4j.{Logger, LoggerFactory} - -object S3ClientBuilder extends io.treeverse.clients.S3ClientBuilder { - val logger: Logger = LoggerFactory.getLogger(getClass.toString + "[hadoop3]") - - def build(hc: Configuration, bucket: String, region: String, numRetries: Int): AmazonS3 = { - import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider - import com.amazonaws.auth.{BasicAWSCredentials, AWSStaticCredentialsProvider} - import io.treeverse.clients.LakeFSContext - import org.apache.hadoop.fs.s3a.Constants - - val minBackoffMsecs = hc.getInt(LakeFSContext.LAKEFS_CONF_GC_S3_MIN_BACKOFF_SECONDS, - LakeFSContext.DEFAULT_LAKEFS_CONF_GC_S3_MIN_BACKOFF_SECONDS - ) * 1000 - val maxBackoffMsecs = hc.getInt(LakeFSContext.LAKEFS_CONF_GC_S3_MAX_BACKOFF_SECONDS, - LakeFSContext.DEFAULT_LAKEFS_CONF_GC_S3_MAX_BACKOFF_SECONDS - ) * 1000 - - val backoffStrategy = - new PredefinedBackoffStrategies.FullJitterBackoffStrategy(minBackoffMsecs, maxBackoffMsecs) - val retryPolicy = - new RetryPolicy(new S3RetryDeleteObjectsCondition(), backoffStrategy, numRetries, true) - val configuration = new ClientConfiguration() - .withRetryPolicy(retryPolicy) - .withThrottledRetries(true) - val s3Endpoint = hc.get(Constants.ENDPOINT, null) - - // TODO(ariels): Support different per-bucket configuration methods. - // Possibly pre-generate a FileSystem to access the desired bucket, - // and query for its credentials provider. And cache them, in case - // some objects live in different buckets. - val credentialsProvider = - if (hc.get(Constants.AWS_CREDENTIALS_PROVIDER) == AssumedRoleCredentialProvider.NAME) { - logger.info("Use configured AssumedRoleCredentialProvider for bucket {}", bucket) - Some(new AssumedRoleCredentialProvider(new java.net.URI("s3a://" + bucket), hc)) - } else if (hc.get(Constants.ACCESS_KEY) != null) { - logger.info( - "Use access key ID {} {}", - hc.get(Constants.ACCESS_KEY): Any, - if (hc.get(Constants.SECRET_KEY) == null) "(missing secret key)" else "secret key ******" - ) - Some( - new AWSStaticCredentialsProvider( - new BasicAWSCredentials(hc.get(Constants.ACCESS_KEY), hc.get(Constants.SECRET_KEY)) - ) - ) - } else None - - val builder = AmazonS3ClientBuilder - .standard() - .withPathStyleAccessEnabled(hc.getBoolean(S3A_PATH_STYLE_ACCESS, true)) - - createAndValidateS3Client(configuration, - credentialsProvider, - builder, - s3Endpoint, - region, - bucket - ) - } -} diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/S3ClientBuilder.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/S3ClientBuilder.scala index 4638dba822d..9554e238f6c 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/S3ClientBuilder.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/S3ClientBuilder.scala @@ -1,14 +1,16 @@ package io.treeverse.clients -import org.apache.hadoop.conf.Configuration +import com.amazonaws.ClientConfiguration +import com.amazonaws.retry.PredefinedBackoffStrategies +import com.amazonaws.retry.RetryPolicy import com.amazonaws.services.s3.AmazonS3 +import com.amazonaws.services.s3.AmazonS3ClientBuilder +import org.apache.hadoop.conf.Configuration +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import io.treeverse.clients.StorageUtils.S3.createAndValidateS3Client -/** Interface to build an S3 client. The object - * io.treeverse.clients.conditional.S3ClientBuilder -- conditionally - * defined in a separate file according to the supported Hadoop version -- - * implements this trait. (Scala requires companion objects to be defined - * in the same file, so it cannot be a companion.) - */ trait S3ClientBuilder extends Serializable { /** Name of property from which S3A fetches whether to use path-style S3 @@ -30,3 +32,63 @@ trait S3ClientBuilder extends Serializable { */ def build(hc: Configuration, bucket: String, region: String, numRetries: Int): AmazonS3 } + +object S3ClientBuilder extends S3ClientBuilder { + val logger: Logger = LoggerFactory.getLogger(getClass.toString + "[hadoop3]") + + def build(hc: Configuration, bucket: String, region: String, numRetries: Int): AmazonS3 = { + import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider + import com.amazonaws.auth.{BasicAWSCredentials, AWSStaticCredentialsProvider} + import io.treeverse.clients.LakeFSContext + import org.apache.hadoop.fs.s3a.Constants + + val minBackoffMsecs = hc.getInt(LakeFSContext.LAKEFS_CONF_GC_S3_MIN_BACKOFF_SECONDS, + LakeFSContext.DEFAULT_LAKEFS_CONF_GC_S3_MIN_BACKOFF_SECONDS + ) * 1000 + val maxBackoffMsecs = hc.getInt(LakeFSContext.LAKEFS_CONF_GC_S3_MAX_BACKOFF_SECONDS, + LakeFSContext.DEFAULT_LAKEFS_CONF_GC_S3_MAX_BACKOFF_SECONDS + ) * 1000 + + val backoffStrategy = + new PredefinedBackoffStrategies.FullJitterBackoffStrategy(minBackoffMsecs, maxBackoffMsecs) + val retryPolicy = + new RetryPolicy(new S3RetryDeleteObjectsCondition(), backoffStrategy, numRetries, true) + val configuration = new ClientConfiguration() + .withRetryPolicy(retryPolicy) + .withThrottledRetries(true) + val s3Endpoint = hc.get(Constants.ENDPOINT, null) + + // TODO(ariels): Support different per-bucket configuration methods. + // Possibly pre-generate a FileSystem to access the desired bucket, + // and query for its credentials provider. And cache them, in case + // some objects live in different buckets. + val credentialsProvider = + if (hc.get(Constants.AWS_CREDENTIALS_PROVIDER) == AssumedRoleCredentialProvider.NAME) { + logger.info("Use configured AssumedRoleCredentialProvider for bucket {}", bucket) + Some(new AssumedRoleCredentialProvider(new java.net.URI("s3a://" + bucket), hc)) + } else if (hc.get(Constants.ACCESS_KEY) != null) { + logger.info( + "Use access key ID {} {}", + hc.get(Constants.ACCESS_KEY): Any, + if (hc.get(Constants.SECRET_KEY) == null) "(missing secret key)" else "secret key ******" + ) + Some( + new AWSStaticCredentialsProvider( + new BasicAWSCredentials(hc.get(Constants.ACCESS_KEY), hc.get(Constants.SECRET_KEY)) + ) + ) + } else None + + val builder = AmazonS3ClientBuilder + .standard() + .withPathStyleAccessEnabled(hc.getBoolean(S3A_PATH_STYLE_ACCESS, true)) + + createAndValidateS3Client(configuration, + credentialsProvider, + builder, + s3Endpoint, + region, + bucket + ) + } +} diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/StorageClients.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/StorageClients.scala index c58f1873c17..d90616fed13 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/StorageClients.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/StorageClients.scala @@ -20,7 +20,7 @@ object StorageClients { with Serializable { private val storageNSURI: URI = new URI(storageNamespace) private val bucket: String = storageNSURI.getHost - @transient lazy val s3Client: AmazonS3 = io.treeverse.clients.conditional.S3ClientBuilder + @transient lazy val s3Client: AmazonS3 = io.treeverse.clients.S3ClientBuilder .build(config.configuration, bucket, region, retries) } diff --git a/clients/spark/core/src/test/scala/io/treeverse/clients/DumpSSTable.scala b/clients/spark/core/src/test/scala/io/treeverse/clients/DumpSSTable.scala index 49a8e8d8662..8ce5b221ad6 100644 --- a/clients/spark/core/src/test/scala/io/treeverse/clients/DumpSSTable.scala +++ b/clients/spark/core/src/test/scala/io/treeverse/clients/DumpSSTable.scala @@ -1,6 +1,6 @@ // Standalone reader for SSTables. -// Usage from SBT: "lakefs-spark-client-312-hadoop3/test:run /path/to/table.sst" +// Usage from SBT: "lakefs-spark-client/test:run /path/to/table.sst" package io.treeverse.clients diff --git a/clients/spark/project/types.scala b/clients/spark/project/types.scala deleted file mode 100644 index 06b3d251e12..00000000000 --- a/clients/spark/project/types.scala +++ /dev/null @@ -1,9 +0,0 @@ -package build - -class BuildType( - val suffix: String, - val sparkVersion: String, - val scalapbVersion: String, - val hadoopFlavour: String, // If set, a directory of additional sources to compile - val gcpConnectorVersion: String -) diff --git a/design/accepted/export-functionality.md b/design/accepted/export-functionality.md index 05bd2360dee..90990254858 100644 --- a/design/accepted/export-functionality.md +++ b/design/accepted/export-functionality.md @@ -27,7 +27,7 @@ Usage example: docker run lakefs-export --conf spark.hadoop.lakefs.api.url=https:///api/v1 \ --conf spark.hadoop.lakefs.api.access_key= \ --conf spark.hadoop.lakefs.api.secret_key= \ ---packages io.lakefs:lakefs-spark-client-301_2.12:0.1.0 \ +--packages io.lakefs:lakefs-spark-client_2.12:0.1.0 \ --class io.treeverse.clients.Main export-app example-repo s3://example-bucket/prefix \ --branch=example-branch ``` diff --git a/docs/howto/garbage-collection/gc.md b/docs/howto/garbage-collection/gc.md index 37a60226d05..8cbd8e0a424 100644 --- a/docs/howto/garbage-collection/gc.md +++ b/docs/howto/garbage-collection/gc.md @@ -109,7 +109,6 @@ To run the job, use the following `spark-submit` command (or using your preferre
@@ -126,20 +125,6 @@ spark-submit --class io.treeverse.gc.GarbageCollection \ example-repo us-east-1 ```
-
- ```bash -spark-submit --class io.treeverse.gc.GarbageCollection \ - --packages org.apache.hadoop:hadoop-aws:2.7.7 \ - -c spark.hadoop.lakefs.api.url=https://lakefs.example.com:8000/api/v1 \ - -c spark.hadoop.lakefs.api.access_key= \ - -c spark.hadoop.lakefs.api.secret_key= \ - -c spark.hadoop.fs.s3a.access.key= \ - -c spark.hadoop.fs.s3a.secret.key= \ - http://treeverse-clients-us-east.s3-website-us-east-1.amazonaws.com/lakefs-spark-client-301/0.10.0/lakefs-spark-client-301-assembly-0.10.0.jar \ - example-repo us-east-1 - ``` -
-
If you want to access your storage using the account key: diff --git a/docs/reference/spark-client.md b/docs/reference/spark-client.md index eb1017d9239..29333affd45 100644 --- a/docs/reference/spark-client.md +++ b/docs/reference/spark-client.md @@ -21,34 +21,16 @@ Please note that Spark 2 is no longer supported with the lakeFS metadata client. Start Spark Shell / PySpark with the `--packages` flag: -
- -
- This client is compiled for Spark 3.0.1 with Hadoop 2 and tested with it, but can work for - higher versions. - - ```bash - spark-shell --packages io.lakefs:lakefs-spark-client-301_2.12:0.10.0 - ``` +This client is compiled for Spark 3.1.2 with Hadoop 3.2.1, but can work for other Spark +versions and higher Hadoop versions. - Alternatively an assembled jar is available on S3, at - `s3://treeverse-clients-us-east/lakefs-spark-client-301/0.10.0/lakefs-spark-client-301-assembly-0.10.0.jar` -
+```bash +spark-shell --packages io.lakefs:lakefs-spark-client_2.12:0.10.0 +``` -
- This client is compiled for Spark 3.1.2 with Hadoop 3.2.1, but can work for other Spark - versions and higher Hadoop versions. - - ```bash - spark-shell --packages io.lakefs:lakefs-spark-client_2.12:0.10.0 - ``` - - Alternatively an assembled jar is available on S3, at - `s3://treeverse-clients-us-east/lakefs-spark-client/0.10.0/lakefs-spark-client-assembly-0.10.0.jar` -
+Alternatively an assembled jar is available on S3, at +`s3://treeverse-clients-us-east/lakefs-spark-client/0.10.0/lakefs-spark-client-assembly-0.10.0.jar` +
## Configuration