From 9524d72505638e758c709e6976ae94937adf5643 Mon Sep 17 00:00:00 2001 From: sfc-gh-jfan Date: Wed, 27 Apr 2022 14:37:24 -0700 Subject: [PATCH 1/6] PRODSEC-1262 Whitesource Transition to Snyk --- .github/workflows/snyk-issue.yml | 25 +++++++++++++++++++++++++ .github/workflows/snyk-pr.yml | 29 +++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 .github/workflows/snyk-issue.yml create mode 100644 .github/workflows/snyk-pr.yml diff --git a/.github/workflows/snyk-issue.yml b/.github/workflows/snyk-issue.yml new file mode 100644 index 00000000..b54275bf --- /dev/null +++ b/.github/workflows/snyk-issue.yml @@ -0,0 +1,25 @@ +name: Snyk Issue + +on: + issues: + types: [opened, reopened] + +concurrency: snyk-issue + +jobs: + whitesource: + runs-on: ubuntu-latest + if: ${{ github.event.issue.user.login == 'sfc-gh-snyk-sca-sa' }} + steps: + - name: checkout action + uses: actions/checkout@v2 + with: + repository: snowflakedb/whitesource-actions + token: ${{ secrets.WHITESOURCE_ACTION_TOKEN }} + path: whitesource-actions + + - name: Jira Creation + uses: ./whitesource-actions/snyk-issue + with: + jira_token: ${{ secrets.JIRA_TOKEN }} + gh_token: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/snyk-pr.yml b/.github/workflows/snyk-pr.yml new file mode 100644 index 00000000..0fb62a81 --- /dev/null +++ b/.github/workflows/snyk-pr.yml @@ -0,0 +1,29 @@ +name: snyk-pr +on: + pull_request: + branches: + - master +jobs: + whitesource: + runs-on: ubuntu-latest + if: ${{ github.event.pull_request.user.login == 'sfc-gh-snyk-sca-sa' }} + steps: + - name: checkout + uses: actions/checkout@v2 + with: + ref: ${{ github.event.pull_request.head.ref }} + fetch-depth: 0 + + - name: checkout action + uses: actions/checkout@v2 + with: + repository: snowflakedb/whitesource-actions + token: ${{ secrets.WHITESOURCE_ACTION_TOKEN }} + path: whitesource-actions + + - name: PR + uses: ./whitesource-actions/snyk-pr + with: + jira_token: ${{ secrets.JIRA_TOKEN }} + gh_token: ${{ secrets.GITHUB_TOKEN }} + amend: false # true if you want the commit to be amended with the JIRA number \ No newline at end of file From b1a9a2b38930aea4379494e08c3a260166849b7d Mon Sep 17 00:00:00 2001 From: sfc-gh-jfan Date: Wed, 27 Apr 2022 16:21:01 -0700 Subject: [PATCH 2/6] minor update --- .github/workflows/snyk-issue.yml | 14 +++++++++----- .github/workflows/snyk-pr.yml | 4 +++- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/.github/workflows/snyk-issue.yml b/.github/workflows/snyk-issue.yml index b54275bf..9641ba37 100644 --- a/.github/workflows/snyk-issue.yml +++ b/.github/workflows/snyk-issue.yml @@ -1,15 +1,14 @@ name: Snyk Issue on: - issues: - types: [opened, reopened] + schedule: + - cron: '* */12 * * *' concurrency: snyk-issue jobs: whitesource: runs-on: ubuntu-latest - if: ${{ github.event.issue.user.login == 'sfc-gh-snyk-sca-sa' }} steps: - name: checkout action uses: actions/checkout@v2 @@ -17,9 +16,14 @@ jobs: repository: snowflakedb/whitesource-actions token: ${{ secrets.WHITESOURCE_ACTION_TOKEN }} path: whitesource-actions - + - name: set-env + run: echo "REPO=$(basename $GITHUB_REPOSITORY)" >> $GITHUB_ENV - name: Jira Creation uses: ./whitesource-actions/snyk-issue with: + snyk_org: 91a1c3d0-0729-4a98-8210-28087460f1e3 + snyk_token: ${{ secrets.SNYK_GITHUB_INTEGRATION_TOKEN }} jira_token: ${{ secrets.JIRA_TOKEN }} - gh_token: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + diff --git a/.github/workflows/snyk-pr.yml b/.github/workflows/snyk-pr.yml index 0fb62a81..02b5f173 100644 --- a/.github/workflows/snyk-pr.yml +++ b/.github/workflows/snyk-pr.yml @@ -23,7 +23,9 @@ jobs: - name: PR uses: ./whitesource-actions/snyk-pr + env: + PR_TITLE: ${{ github.event.pull_request.title }} with: jira_token: ${{ secrets.JIRA_TOKEN }} gh_token: ${{ secrets.GITHUB_TOKEN }} - amend: false # true if you want the commit to be amended with the JIRA number \ No newline at end of file + amend: false # true if you want the commit to be amended with the JIRA number From 4fd9f5f578c99ea9da3fd84c4e737e9841878ba0 Mon Sep 17 00:00:00 2001 From: sfc-gh-jfan Date: Wed, 4 May 2022 15:36:16 -0700 Subject: [PATCH 3/6] minor update --- .github/workflows/snyk-issue.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/snyk-issue.yml b/.github/workflows/snyk-issue.yml index 9641ba37..d2ef124d 100644 --- a/.github/workflows/snyk-issue.yml +++ b/.github/workflows/snyk-issue.yml @@ -21,7 +21,7 @@ jobs: - name: Jira Creation uses: ./whitesource-actions/snyk-issue with: - snyk_org: 91a1c3d0-0729-4a98-8210-28087460f1e3 + snyk_org: ${{ secrets.SNYK_ORG_ID }} snyk_token: ${{ secrets.SNYK_GITHUB_INTEGRATION_TOKEN }} jira_token: ${{ secrets.JIRA_TOKEN }} env: From 2ec9ae90f9e1c0da255ae00e77163aa51133e567 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Tue, 21 Jun 2022 11:14:22 +0800 Subject: [PATCH 4/6] Support catalog for Spark Snowflake --- .../spark/snowflake/catalog/SfCatalog.scala | 161 ++++++++++++++++++ .../spark/snowflake/catalog/SfScan.scala | 35 ++++ .../snowflake/catalog/SfScanBuilder.scala | 56 ++++++ .../spark/snowflake/catalog/SfTable.scala | 49 ++++++ .../snowflake/catalog/SfWriterBuilder.scala | 29 ++++ 5 files changed, 330 insertions(+) create mode 100644 src/main/scala/net/snowflake/spark/snowflake/catalog/SfCatalog.scala create mode 100644 src/main/scala/net/snowflake/spark/snowflake/catalog/SfScan.scala create mode 100644 src/main/scala/net/snowflake/spark/snowflake/catalog/SfScanBuilder.scala create mode 100644 src/main/scala/net/snowflake/spark/snowflake/catalog/SfTable.scala create mode 100644 src/main/scala/net/snowflake/spark/snowflake/catalog/SfWriterBuilder.scala diff --git a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfCatalog.scala b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfCatalog.scala new file mode 100644 index 00000000..66f67c04 --- /dev/null +++ b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfCatalog.scala @@ -0,0 +1,161 @@ +package net.snowflake.spark.snowflake.catalog + +import net.snowflake.spark.snowflake.DefaultJDBCWrapper.DataBaseOperations +import net.snowflake.spark.snowflake.Parameters.{MergedParameters, PARAM_SF_DATABASE, PARAM_SF_DBTABLE, PARAM_SF_SCHEMA} +import net.snowflake.spark.snowflake.{DefaultJDBCWrapper, Parameters} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.sql.SQLException +import scala.collection.convert.ImplicitConversions.`map AsScala` +import scala.collection.mutable.ArrayBuilder + +class SfCatalog extends TableCatalog with Logging with SupportsNamespaces{ + var catalogName: String = null + var params: MergedParameters = _ + val jdbcWrapper = DefaultJDBCWrapper + + override def name(): String = { + require(catalogName != null, "The SfCatalog is not initialed") + catalogName + } + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + val map = options.asCaseSensitiveMap().toMap + // to pass the check + params = Parameters.mergeParameters(map + + (PARAM_SF_DATABASE -> "__invalid_database") + + (PARAM_SF_SCHEMA -> "__invalid_schema") + + (PARAM_SF_DBTABLE -> "__invalid_dbtable")) + catalogName = name + } + + override def listTables(namespace: Array[String]): Array[Identifier] = { + checkNamespace(namespace) + val catalog = if (namespace.length == 2) namespace(0) else null + val schemaPattern = if (namespace.length == 2) namespace(1) else null + val rs = DefaultJDBCWrapper.getConnector(params).getMetaData() + .getTables(catalog, schemaPattern, "%", Array("TABLE")) + new Iterator[Identifier] { + def hasNext = rs.next() + def next() = Identifier.of(namespace, rs.getString("TABLE_NAME")) + }.toArray + + } + + override def tableExists(ident: Identifier): Boolean = { + checkNamespace(ident.namespace()) + DefaultJDBCWrapper.tableExists(params, getFullTableName(ident)) + } + + override def dropTable(ident: Identifier): Boolean = { + checkNamespace(ident.namespace()) + val conn = DefaultJDBCWrapper.getConnector(params) + conn.dropTable(getFullTableName(ident)) + } + + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { + checkNamespace(oldIdent.namespace()) + val conn = DefaultJDBCWrapper.getConnector(params) + conn.renameTable(getFullTableName(newIdent), getFullTableName(newIdent)) + } + + override def loadTable(ident: Identifier): Table = { + checkNamespace(ident.namespace()) + val map = params.parameters + params = Parameters.mergeParameters(map + + (PARAM_SF_DBTABLE -> getTableName(ident)) + + (PARAM_SF_DATABASE -> getDatabase(ident)) + + (PARAM_SF_SCHEMA -> getSchema(ident))) + try { + SfTable(ident, jdbcWrapper, params) + } catch { + case _: SQLException => throw QueryCompilationErrors.noSuchTableError(ident) + } + } + + override def alterTable(ident: Identifier, changes: TableChange*): Table = ??? + + override def namespaceExists(namespace: Array[String]): Boolean = namespace match { + case Array(catalog, schema) => + val rs = DefaultJDBCWrapper.getConnector(params).getMetaData().getSchemas(catalog, schema) + + while (rs.next()) { + val tableSchema = rs.getString("TABLE_SCHEM") + if (tableSchema == schema) return true + } + false + case _ => false + } + + override def listNamespaces(): Array[Array[String]] = { + val schemaBuilder = ArrayBuilder.make[Array[String]] + val rs = DefaultJDBCWrapper.getConnector(params).getMetaData().getSchemas() + while (rs.next()) { + schemaBuilder += Array(rs.getString("TABLE_SCHEM")) + } + schemaBuilder.result + } + + override def listNamespaces(namespace: Array[String]): Array[Array[String]] = { + namespace match { + case Array() => + listNamespaces() + case Array(_, _) if namespaceExists(namespace) => + Array() + case _ => + throw QueryCompilationErrors.noSuchNamespaceError(namespace) + } + } + + override def loadNamespaceMetadata(namespace: Array[String]): java.util.Map[String, String] = { + namespace match { + case Array(catalog, schema) => + if (!namespaceExists(namespace)) { + throw QueryCompilationErrors.noSuchNamespaceError(Array(catalog, schema)) + } + new java.util.HashMap[String, String]() + case _ => + throw QueryCompilationErrors.noSuchNamespaceError(namespace) + } + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: java.util.Map[String, String]): Table = ??? + + override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = ??? + + override def dropNamespace(namespace: Array[String]): Boolean = ??? + + private def checkNamespace(namespace: Array[String]): Unit = { + // a database and schema comprise a namespace in Snowflake + if (namespace.length != 2) { + throw QueryCompilationErrors.noSuchNamespaceError(namespace) + } + } + + override def createNamespace( + namespace: Array[String], + metadata: java.util.Map[String, String]): Unit = ??? + + private def getTableName(ident: Identifier): String = { + (ident.name()) + } + private def getDatabase(ident: Identifier): String = { + (ident.namespace())(0) + } + private def getSchema(ident: Identifier): String = { + (ident.namespace())(1) + } + private def getFullTableName(ident: Identifier): String = { + (ident.namespace() :+ ident.name()).mkString(".") + + } +} diff --git a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScan.scala b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScan.scala new file mode 100644 index 00000000..cb80e4d1 --- /dev/null +++ b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScan.scala @@ -0,0 +1,35 @@ +package net.snowflake.spark.snowflake.catalog + +import net.snowflake.spark.snowflake.SnowflakeRelation +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.connector.read.V1Scan +import org.apache.spark.sql.sources.{BaseRelation, Filter, TableScan} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Row, SQLContext} + +case class SfScan( + relation: SnowflakeRelation, + prunedSchema: StructType, + pushedFilters: Array[Filter]) extends V1Scan { + + override def readSchema(): StructType = prunedSchema + + override def toV1TableScan[T <: BaseRelation with TableScan](context: SQLContext): T = { + new BaseRelation with TableScan { + override def sqlContext: SQLContext = context + override def schema: StructType = prunedSchema + override def needConversion: Boolean = relation.needConversion + override def buildScan(): RDD[Row] = { + val columnList = prunedSchema.map(_.name).toArray + relation.buildScan(columnList, pushedFilters) + } + }.asInstanceOf[T] + } + + override def description(): String = { + super.description() + ", prunedSchema: " + seqToString(prunedSchema) + + ", PushedFilters: " + seqToString(pushedFilters) + } + + private def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]") +} diff --git a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScanBuilder.scala b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScanBuilder.scala new file mode 100644 index 00000000..61585bae --- /dev/null +++ b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScanBuilder.scala @@ -0,0 +1,56 @@ +package net.snowflake.spark.snowflake.catalog + +import net.snowflake.spark.snowflake.{FilterPushdown, JDBCWrapper, SnowflakeRelation} +import net.snowflake.spark.snowflake.Parameters.MergedParameters +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType + +case class SfScanBuilder(session: SparkSession, + schema: StructType, + params: MergedParameters, + jdbcWrapper: JDBCWrapper) extends ScanBuilder + with SupportsPushDownFilters + with SupportsPushDownRequiredColumns + with Logging{ + private val isCaseSensitive = session.sessionState.conf.caseSensitiveAnalysis + + private var pushedFilter = Array.empty[Filter] + + private var finalSchema = schema + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + val (pushed, unSupported) = filters.partition( + filter => + FilterPushdown + .buildFilterStatement( + schema, + filter, + true + ) + .isDefined + ) + this.pushedFilter = pushed + unSupported + } + + override def pushedFilters(): Array[Filter] = pushedFilter + + override def pruneColumns(requiredSchema: StructType): Unit = { + val requiredCols = requiredSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)) + .toSet + val fields = schema.fields.filter { field => + val colName = PartitioningUtils.getColName(field, isCaseSensitive) + requiredCols.contains(colName) + } + finalSchema = StructType(fields) + } + + override def build(): Scan = { + SfScan(SnowflakeRelation(jdbcWrapper, params, + Option(schema))(session.sqlContext), finalSchema, pushedFilters) + } +} diff --git a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfTable.scala b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfTable.scala new file mode 100644 index 00000000..ed551967 --- /dev/null +++ b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfTable.scala @@ -0,0 +1,49 @@ +package net.snowflake.spark.snowflake.catalog + +import net.snowflake.spark.snowflake.DefaultJDBCWrapper.DataBaseOperations +import net.snowflake.spark.snowflake.Parameters.MergedParameters +import net.snowflake.spark.snowflake.{DefaultJDBCWrapper, JDBCWrapper} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRead, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.sql.Connection +import java.util +import scala.collection.JavaConverters._ + +case class SfTable(ident: Identifier, + jdbcWrapper: JDBCWrapper, + params: MergedParameters) extends Table + with SupportsRead + with SupportsWrite + with Logging { + + override def name(): String = (ident.namespace() :+ ident.name()).mkString(".") + + override def schema(): StructType = { + val conn: Connection = DefaultJDBCWrapper.getConnector(params) + try { + conn.tableSchema(name, params) + } finally { + conn.close() + } + } + + override def capabilities(): util.Set[TableCapability] = { + Set(TableCapability.BATCH_READ, + TableCapability.V1_BATCH_WRITE, + TableCapability.TRUNCATE).asJava + } + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + SfScanBuilder(SparkSession.active, schema, params, jdbcWrapper) + } + + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { + SfWriterBuilder(jdbcWrapper, params) + } +} diff --git a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfWriterBuilder.scala b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfWriterBuilder.scala new file mode 100644 index 00000000..1a3faa0b --- /dev/null +++ b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfWriterBuilder.scala @@ -0,0 +1,29 @@ +package net.snowflake.spark.snowflake.catalog + +import net.snowflake.spark.snowflake.JDBCWrapper +import net.snowflake.spark.snowflake.Parameters.MergedParameters +import org.apache.spark.sql._ +import org.apache.spark.sql.connector.write._ +import org.apache.spark.sql.sources.InsertableRelation + +case class SfWriterBuilder(jdbcWrapper: JDBCWrapper, params: MergedParameters) extends WriteBuilder + with SupportsTruncate { + private var isTruncate = false + + override def truncate(): WriteBuilder = { + isTruncate = true + this + } + + override def build(): V1Write = new V1Write { + override def toInsertableRelation: InsertableRelation = (data: DataFrame, _: Boolean) => { + val saveMode = if (isTruncate) { + SaveMode.Overwrite + } else { + SaveMode.Append + } + val writer = new net.snowflake.spark.snowflake.SnowflakeWriter(jdbcWrapper) + writer.save(data.sqlContext, data, saveMode, params) + } + } +} \ No newline at end of file From 75b7ffb9bc0097f0b638d172ed668f783212dde6 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Fri, 7 Oct 2022 21:18:35 +0800 Subject: [PATCH 5/6] Add test case --- .../spark/snowflake/SfCatalogSuite.scala | 57 +++++++ .../spark/snowflake/catalog/SfCatalog.scala | 149 ++++++++++++------ .../spark/snowflake/catalog/SfScan.scala | 13 +- .../snowflake/catalog/SfScanBuilder.scala | 58 ++++--- .../spark/snowflake/catalog/SfTable.scala | 29 ++-- .../snowflake/catalog/SfWriterBuilder.scala | 25 +-- .../snowflake/spark/snowflake/BaseTest.scala | 23 ++- 7 files changed, 245 insertions(+), 109 deletions(-) create mode 100644 src/it/scala/net/snowflake/spark/snowflake/SfCatalogSuite.scala diff --git a/src/it/scala/net/snowflake/spark/snowflake/SfCatalogSuite.scala b/src/it/scala/net/snowflake/spark/snowflake/SfCatalogSuite.scala new file mode 100644 index 00000000..8866512a --- /dev/null +++ b/src/it/scala/net/snowflake/spark/snowflake/SfCatalogSuite.scala @@ -0,0 +1,57 @@ +package net.snowflake.spark.snowflake + +import org.apache.spark.sql.Row +import org.apache.spark.{SparkConf, SparkContext} + +class SfCatalogSuite extends SnowflakeIntegrationSuite { + private val test_table2: String = s"test_table2_$randomSuffix" + + protected def defaultSfCatalogParams: Map[String, String] = Map( + "spark.sql.catalog.snowflake" -> "net.snowflake.spark.snowflake.catalog.SfCatalog", + "spark.sql.catalog.snowflake.sfURL" -> "account.snowflakecomputing.com:443", + "spark.sql.catalog.snowflake.sfUser" -> "username", + "spark.sql.catalog.snowflake.sfPassword" -> "password" + ) + + test("Test SfCatalog Params") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("SfCatalogSuite") + conf.setAll(defaultSfCatalogParams) + + val sc = SparkContext.getOrCreate(conf) + + assert( + sc.getConf + .get("spark.sql.catalog.snowflake") + .equals("net.snowflake.spark.snowflake.catalog.SfCatalog") + ) + + assert( + sc.getConf + .get("spark.sql.catalog.snowflake.sfURL") + .equals("account.snowflakecomputing.com:443") + ) + + assert( + sc.getConf + .get("spark.sql.catalog.snowflake.sfUser") + .equals("username") + ) + + assert( + sc.getConf + .get("spark.sql.catalog.snowflake.sfPassword") + .equals("password") + ) + + } + /* + test("Test SfCatalog working") { + val df = sparkSession.sql(s"select * from snowflake.$test_table2") + checkAnswer( + df.select("\"testint1\"", "\"!#TES#STRI?NG\"", "testint2"), + Seq(Row(1, "Unicode", 42), Row(2, "Mario", 3), Row(3, "Luigi", 42)) + ) + }*/ +} diff --git a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfCatalog.scala b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfCatalog.scala index 66f67c04..f2ea645b 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfCatalog.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfCatalog.scala @@ -1,10 +1,15 @@ package net.snowflake.spark.snowflake.catalog import net.snowflake.spark.snowflake.DefaultJDBCWrapper.DataBaseOperations -import net.snowflake.spark.snowflake.Parameters.{MergedParameters, PARAM_SF_DATABASE, PARAM_SF_DBTABLE, PARAM_SF_SCHEMA} +import net.snowflake.spark.snowflake.Parameters.{ + MergedParameters, + PARAM_SF_DATABASE, + PARAM_SF_DBTABLE, + PARAM_SF_SCHEMA +} import net.snowflake.spark.snowflake.{DefaultJDBCWrapper, Parameters} import org.apache.spark.internal.Logging -import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.StructType @@ -14,7 +19,7 @@ import java.sql.SQLException import scala.collection.convert.ImplicitConversions.`map AsScala` import scala.collection.mutable.ArrayBuilder -class SfCatalog extends TableCatalog with Logging with SupportsNamespaces{ +class SfCatalog extends TableCatalog with Logging with SupportsNamespaces { var catalogName: String = null var params: MergedParameters = _ val jdbcWrapper = DefaultJDBCWrapper @@ -24,26 +29,33 @@ class SfCatalog extends TableCatalog with Logging with SupportsNamespaces{ catalogName } - override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + override def initialize( + name: String, + options: CaseInsensitiveStringMap + ): Unit = { val map = options.asCaseSensitiveMap().toMap // to pass the check - params = Parameters.mergeParameters(map + - (PARAM_SF_DATABASE -> "__invalid_database") + - (PARAM_SF_SCHEMA -> "__invalid_schema") + - (PARAM_SF_DBTABLE -> "__invalid_dbtable")) + params = Parameters.mergeParameters( + map + + (PARAM_SF_DATABASE -> "__invalid_database") + + (PARAM_SF_SCHEMA -> "__invalid_schema") + + (PARAM_SF_DBTABLE -> "__invalid_dbtable") + ) catalogName = name } override def listTables(namespace: Array[String]): Array[Identifier] = { - checkNamespace(namespace) + checkNamespace(namespace) val catalog = if (namespace.length == 2) namespace(0) else null val schemaPattern = if (namespace.length == 2) namespace(1) else null - val rs = DefaultJDBCWrapper.getConnector(params).getMetaData() - .getTables(catalog, schemaPattern, "%", Array("TABLE")) - new Iterator[Identifier] { - def hasNext = rs.next() - def next() = Identifier.of(namespace, rs.getString("TABLE_NAME")) - }.toArray + val rs = DefaultJDBCWrapper + .getConnector(params) + .getMetaData() + .getTables(catalog, schemaPattern, "%", Array("TABLE")) + new Iterator[Identifier] { + def hasNext = rs.next() + def next() = Identifier.of(namespace, rs.getString("TABLE_NAME")) + }.toArray } @@ -67,41 +79,54 @@ class SfCatalog extends TableCatalog with Logging with SupportsNamespaces{ override def loadTable(ident: Identifier): Table = { checkNamespace(ident.namespace()) val map = params.parameters - params = Parameters.mergeParameters(map + - (PARAM_SF_DBTABLE -> getTableName(ident)) + - (PARAM_SF_DATABASE -> getDatabase(ident)) + - (PARAM_SF_SCHEMA -> getSchema(ident))) + params = Parameters.mergeParameters( + map + + (PARAM_SF_DBTABLE -> getTableName(ident)) + + (PARAM_SF_DATABASE -> getDatabase(ident)) + + (PARAM_SF_SCHEMA -> getSchema(ident)) + ) try { SfTable(ident, jdbcWrapper, params) } catch { - case _: SQLException => throw QueryCompilationErrors.noSuchTableError(ident) + case _: SQLException => + throw QueryCompilationErrors.noSuchTableError(ident) } } - override def alterTable(ident: Identifier, changes: TableChange*): Table = ??? - - override def namespaceExists(namespace: Array[String]): Boolean = namespace match { - case Array(catalog, schema) => - val rs = DefaultJDBCWrapper.getConnector(params).getMetaData().getSchemas(catalog, schema) - - while (rs.next()) { - val tableSchema = rs.getString("TABLE_SCHEM") - if (tableSchema == schema) return true - } - false - case _ => false + override def alterTable(ident: Identifier, changes: TableChange*): Table = { + throw new UnsupportedOperationException( + "SfCatalog does not support altering table operation" + ) } + override def namespaceExists(namespace: Array[String]): Boolean = + namespace match { + case Array(catalog, schema) => + val rs = DefaultJDBCWrapper + .getConnector(params) + .getMetaData() + .getSchemas(catalog, schema) + + while (rs.next()) { + val tableSchema = rs.getString("TABLE_SCHEM") + if (tableSchema == schema) return true + } + false + case _ => false + } + override def listNamespaces(): Array[Array[String]] = { - val schemaBuilder = ArrayBuilder.make[Array[String]] - val rs = DefaultJDBCWrapper.getConnector(params).getMetaData().getSchemas() - while (rs.next()) { - schemaBuilder += Array(rs.getString("TABLE_SCHEM")) - } - schemaBuilder.result + val schemaBuilder = ArrayBuilder.make[Array[String]] + val rs = DefaultJDBCWrapper.getConnector(params).getMetaData().getSchemas() + while (rs.next()) { + schemaBuilder += Array(rs.getString("TABLE_SCHEM")) + } + schemaBuilder.result } - override def listNamespaces(namespace: Array[String]): Array[Array[String]] = { + override def listNamespaces( + namespace: Array[String] + ): Array[Array[String]] = { namespace match { case Array() => listNamespaces() @@ -112,11 +137,15 @@ class SfCatalog extends TableCatalog with Logging with SupportsNamespaces{ } } - override def loadNamespaceMetadata(namespace: Array[String]): java.util.Map[String, String] = { + override def loadNamespaceMetadata( + namespace: Array[String] + ): java.util.Map[String, String] = { namespace match { case Array(catalog, schema) => if (!namespaceExists(namespace)) { - throw QueryCompilationErrors.noSuchNamespaceError(Array(catalog, schema)) + throw QueryCompilationErrors.noSuchNamespaceError( + Array(catalog, schema) + ) } new java.util.HashMap[String, String]() case _ => @@ -125,14 +154,31 @@ class SfCatalog extends TableCatalog with Logging with SupportsNamespaces{ } override def createTable( - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: java.util.Map[String, String]): Table = ??? + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: java.util.Map[String, String] + ): Table = { + throw new UnsupportedOperationException( + "SfCatalog does not support creating table operation" + ) + } - override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = ??? + override def alterNamespace( + namespace: Array[String], + changes: NamespaceChange* + ): Unit = { + throw new UnsupportedOperationException( + "SfCatalog does not support altering namespace operation" + ) - override def dropNamespace(namespace: Array[String]): Boolean = ??? + } + + override def dropNamespace(namespace: Array[String]): Boolean = { + throw new UnsupportedOperationException( + "SfCatalog does not support dropping namespace operation" + ) + } private def checkNamespace(namespace: Array[String]): Unit = { // a database and schema comprise a namespace in Snowflake @@ -142,8 +188,13 @@ class SfCatalog extends TableCatalog with Logging with SupportsNamespaces{ } override def createNamespace( - namespace: Array[String], - metadata: java.util.Map[String, String]): Unit = ??? + namespace: Array[String], + metadata: java.util.Map[String, String] + ): Unit = { + throw new UnsupportedOperationException( + "SfCatalog does not support creating namespace operation" + ) + } private def getTableName(ident: Identifier): String = { (ident.name()) diff --git a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScan.scala b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScan.scala index cb80e4d1..ad054a86 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScan.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScan.scala @@ -8,13 +8,16 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext} case class SfScan( - relation: SnowflakeRelation, - prunedSchema: StructType, - pushedFilters: Array[Filter]) extends V1Scan { + relation: SnowflakeRelation, + prunedSchema: StructType, + pushedFilters: Array[Filter] +) extends V1Scan { override def readSchema(): StructType = prunedSchema - override def toV1TableScan[T <: BaseRelation with TableScan](context: SQLContext): T = { + override def toV1TableScan[T <: BaseRelation with TableScan]( + context: SQLContext + ): T = { new BaseRelation with TableScan { override def sqlContext: SQLContext = context override def schema: StructType = prunedSchema @@ -27,7 +30,7 @@ case class SfScan( } override def description(): String = { - super.description() + ", prunedSchema: " + seqToString(prunedSchema) + + super.description() + ", prunedSchema: " + seqToString(prunedSchema) + ", PushedFilters: " + seqToString(pushedFilters) } diff --git a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScanBuilder.scala b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScanBuilder.scala index 61585bae..e87ab7ea 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScanBuilder.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfScanBuilder.scala @@ -1,21 +1,32 @@ package net.snowflake.spark.snowflake.catalog -import net.snowflake.spark.snowflake.{FilterPushdown, JDBCWrapper, SnowflakeRelation} import net.snowflake.spark.snowflake.Parameters.MergedParameters +import net.snowflake.spark.snowflake.{ + FilterPushdown, + JDBCWrapper, + SnowflakeRelation +} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.connector.read.{ + Scan, + ScanBuilder, + SupportsPushDownFilters, + SupportsPushDownRequiredColumns +} import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -case class SfScanBuilder(session: SparkSession, - schema: StructType, - params: MergedParameters, - jdbcWrapper: JDBCWrapper) extends ScanBuilder - with SupportsPushDownFilters - with SupportsPushDownRequiredColumns - with Logging{ +case class SfScanBuilder( + session: SparkSession, + schema: StructType, + params: MergedParameters, + jdbcWrapper: JDBCWrapper +) extends ScanBuilder + with SupportsPushDownFilters + with SupportsPushDownRequiredColumns + with Logging { private val isCaseSensitive = session.sessionState.conf.caseSensitiveAnalysis private var pushedFilter = Array.empty[Filter] @@ -23,15 +34,14 @@ case class SfScanBuilder(session: SparkSession, private var finalSchema = schema override def pushFilters(filters: Array[Filter]): Array[Filter] = { - val (pushed, unSupported) = filters.partition( - filter => - FilterPushdown - .buildFilterStatement( - schema, - filter, - true - ) - .isDefined + val (pushed, unSupported) = filters.partition(filter => + FilterPushdown + .buildFilterStatement( + schema, + filter, + true + ) + .isDefined ) this.pushedFilter = pushed unSupported @@ -40,7 +50,8 @@ case class SfScanBuilder(session: SparkSession, override def pushedFilters(): Array[Filter] = pushedFilter override def pruneColumns(requiredSchema: StructType): Unit = { - val requiredCols = requiredSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)) + val requiredCols = requiredSchema.fields + .map(PartitioningUtils.getColName(_, isCaseSensitive)) .toSet val fields = schema.fields.filter { field => val colName = PartitioningUtils.getColName(field, isCaseSensitive) @@ -50,7 +61,12 @@ case class SfScanBuilder(session: SparkSession, } override def build(): Scan = { - SfScan(SnowflakeRelation(jdbcWrapper, params, - Option(schema))(session.sqlContext), finalSchema, pushedFilters) + SfScan( + SnowflakeRelation(jdbcWrapper, params, Option(schema))( + session.sqlContext + ), + finalSchema, + pushedFilters + ) } } diff --git a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfTable.scala b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfTable.scala index ed551967..e2d52c5e 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfTable.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfTable.scala @@ -5,7 +5,7 @@ import net.snowflake.spark.snowflake.Parameters.MergedParameters import net.snowflake.spark.snowflake.{DefaultJDBCWrapper, JDBCWrapper} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRead, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.types.StructType @@ -15,14 +15,17 @@ import java.sql.Connection import java.util import scala.collection.JavaConverters._ -case class SfTable(ident: Identifier, - jdbcWrapper: JDBCWrapper, - params: MergedParameters) extends Table - with SupportsRead - with SupportsWrite - with Logging { +case class SfTable( + ident: Identifier, + jdbcWrapper: JDBCWrapper, + params: MergedParameters +) extends Table + with SupportsRead + with SupportsWrite + with Logging { - override def name(): String = (ident.namespace() :+ ident.name()).mkString(".") + override def name(): String = + (ident.namespace() :+ ident.name()).mkString(".") override def schema(): StructType = { val conn: Connection = DefaultJDBCWrapper.getConnector(params) @@ -34,12 +37,16 @@ case class SfTable(ident: Identifier, } override def capabilities(): util.Set[TableCapability] = { - Set(TableCapability.BATCH_READ, + Set( + TableCapability.BATCH_READ, TableCapability.V1_BATCH_WRITE, - TableCapability.TRUNCATE).asJava + TableCapability.TRUNCATE + ).asJava } - override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + override def newScanBuilder( + options: CaseInsensitiveStringMap + ): ScanBuilder = { SfScanBuilder(SparkSession.active, schema, params, jdbcWrapper) } diff --git a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfWriterBuilder.scala b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfWriterBuilder.scala index 1a3faa0b..68c689d4 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfWriterBuilder.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfWriterBuilder.scala @@ -6,8 +6,9 @@ import org.apache.spark.sql._ import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.sources.InsertableRelation -case class SfWriterBuilder(jdbcWrapper: JDBCWrapper, params: MergedParameters) extends WriteBuilder - with SupportsTruncate { +case class SfWriterBuilder(jdbcWrapper: JDBCWrapper, params: MergedParameters) + extends WriteBuilder + with SupportsTruncate { private var isTruncate = false override def truncate(): WriteBuilder = { @@ -16,14 +17,16 @@ case class SfWriterBuilder(jdbcWrapper: JDBCWrapper, params: MergedParameters) e } override def build(): V1Write = new V1Write { - override def toInsertableRelation: InsertableRelation = (data: DataFrame, _: Boolean) => { - val saveMode = if (isTruncate) { - SaveMode.Overwrite - } else { - SaveMode.Append + override def toInsertableRelation: InsertableRelation = + (data: DataFrame, _: Boolean) => { + val saveMode = if (isTruncate) { + SaveMode.Overwrite + } else { + SaveMode.Append + } + val writer = + new net.snowflake.spark.snowflake.SnowflakeWriter(jdbcWrapper) + writer.save(data.sqlContext, data, saveMode, params) } - val writer = new net.snowflake.spark.snowflake.SnowflakeWriter(jdbcWrapper) - writer.save(data.sqlContext, data, saveMode, params) - } } -} \ No newline at end of file +} diff --git a/src/test/scala/net/snowflake/spark/snowflake/BaseTest.scala b/src/test/scala/net/snowflake/spark/snowflake/BaseTest.scala index d2087e1b..0be3819d 100644 --- a/src/test/scala/net/snowflake/spark/snowflake/BaseTest.scala +++ b/src/test/scala/net/snowflake/spark/snowflake/BaseTest.scala @@ -32,23 +32,22 @@ import org.mockito.Mockito import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} -/** - * Created by mzukowski on 8/9/16. +/** Created by mzukowski on 8/9/16. */ private class TestContext extends SparkContext("local", "SnowflakeBaseTest") { - /** - * A text file containing fake unloaded Snowflake data of all supported types + /** A text file containing fake unloaded Snowflake data of all supported types */ val testData: String = new File( - "src/test/resources/snowflake_unload_data.txt").toURI.toString + "src/test/resources/snowflake_unload_data.txt" + ).toURI.toString override def newAPIHadoopFile[K, V, F <: InputFormat[K, V]]( - path: String, - fClass: Class[F], - kClass: Class[K], - vClass: Class[V], - conf: Configuration = hadoopConfiguration + path: String, + fClass: Class[F], + kClass: Class[K], + vClass: Class[V], + conf: Configuration = hadoopConfiguration ): RDD[(K, V)] = { super.newAPIHadoopFile[K, V, F](testData, fClass, kClass, vClass, conf) } @@ -60,8 +59,7 @@ class BaseTest with BeforeAndAfterAll with BeforeAndAfterEach { - /** - * Spark Context with Hadoop file overridden to point at our local test data file for this suite, + /** Spark Context with Hadoop file overridden to point at our local test data file for this suite, * no matter what temp directory was generated and requested. */ protected var sc: SparkContext = _ @@ -106,6 +104,7 @@ class BaseTest sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "test2") sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "test1") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "test2") + // Configure a mock S3 client so that we don't hit errors when trying to access AWS in tests. mockS3Client = Mockito.mock(classOf[AmazonS3Client], Mockito.RETURNS_SMART_NULLS) From be8d050d80f7b228b409c9975207d32f5f207d9c Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Fri, 28 Oct 2022 11:12:46 +0800 Subject: [PATCH 6/6] refactor code and add test --- .../spark/snowflake/catalog/SfCatalog.scala | 21 ++++++++++++------- .../spark/snowflake/SfCatalogSuite.scala | 17 ++++----------- 2 files changed, 18 insertions(+), 20 deletions(-) rename src/{it => test}/scala/net/snowflake/spark/snowflake/SfCatalogSuite.scala (71%) diff --git a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfCatalog.scala b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfCatalog.scala index f2ea645b..3b736351 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/catalog/SfCatalog.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/catalog/SfCatalog.scala @@ -9,9 +9,12 @@ import net.snowflake.spark.snowflake.Parameters.{ } import net.snowflake.spark.snowflake.{DefaultJDBCWrapper, Parameters} import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.analysis.{ + NoSuchNamespaceException, + NoSuchTableException +} import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -89,7 +92,8 @@ class SfCatalog extends TableCatalog with Logging with SupportsNamespaces { SfTable(ident, jdbcWrapper, params) } catch { case _: SQLException => - throw QueryCompilationErrors.noSuchTableError(ident) + throw new NoSuchTableException(ident) + } } @@ -133,7 +137,7 @@ class SfCatalog extends TableCatalog with Logging with SupportsNamespaces { case Array(_, _) if namespaceExists(namespace) => Array() case _ => - throw QueryCompilationErrors.noSuchNamespaceError(namespace) + throw new NoSuchNamespaceException(namespace) } } @@ -143,13 +147,13 @@ class SfCatalog extends TableCatalog with Logging with SupportsNamespaces { namespace match { case Array(catalog, schema) => if (!namespaceExists(namespace)) { - throw QueryCompilationErrors.noSuchNamespaceError( + throw new NoSuchNamespaceException( Array(catalog, schema) ) } new java.util.HashMap[String, String]() case _ => - throw QueryCompilationErrors.noSuchNamespaceError(namespace) + throw new NoSuchNamespaceException(namespace) } } @@ -174,7 +178,10 @@ class SfCatalog extends TableCatalog with Logging with SupportsNamespaces { } - override def dropNamespace(namespace: Array[String]): Boolean = { + override def dropNamespace( + namespace: Array[String], + cascade: Boolean + ): Boolean = { throw new UnsupportedOperationException( "SfCatalog does not support dropping namespace operation" ) @@ -183,7 +190,7 @@ class SfCatalog extends TableCatalog with Logging with SupportsNamespaces { private def checkNamespace(namespace: Array[String]): Unit = { // a database and schema comprise a namespace in Snowflake if (namespace.length != 2) { - throw QueryCompilationErrors.noSuchNamespaceError(namespace) + throw new NoSuchNamespaceException(namespace) } } diff --git a/src/it/scala/net/snowflake/spark/snowflake/SfCatalogSuite.scala b/src/test/scala/net/snowflake/spark/snowflake/SfCatalogSuite.scala similarity index 71% rename from src/it/scala/net/snowflake/spark/snowflake/SfCatalogSuite.scala rename to src/test/scala/net/snowflake/spark/snowflake/SfCatalogSuite.scala index 8866512a..ab9d4044 100644 --- a/src/it/scala/net/snowflake/spark/snowflake/SfCatalogSuite.scala +++ b/src/test/scala/net/snowflake/spark/snowflake/SfCatalogSuite.scala @@ -1,10 +1,9 @@ package net.snowflake.spark.snowflake -import org.apache.spark.sql.Row import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.FunSuite -class SfCatalogSuite extends SnowflakeIntegrationSuite { - private val test_table2: String = s"test_table2_$randomSuffix" +class SfCatalogSuite extends FunSuite { protected def defaultSfCatalogParams: Map[String, String] = Map( "spark.sql.catalog.snowflake" -> "net.snowflake.spark.snowflake.catalog.SfCatalog", @@ -13,7 +12,7 @@ class SfCatalogSuite extends SnowflakeIntegrationSuite { "spark.sql.catalog.snowflake.sfPassword" -> "password" ) - test("Test SfCatalog Params") { + test("SfCatalog Params") { val conf = new SparkConf() .setMaster("local") .setAppName("SfCatalogSuite") @@ -44,14 +43,6 @@ class SfCatalogSuite extends SnowflakeIntegrationSuite { .get("spark.sql.catalog.snowflake.sfPassword") .equals("password") ) - } - /* - test("Test SfCatalog working") { - val df = sparkSession.sql(s"select * from snowflake.$test_table2") - checkAnswer( - df.select("\"testint1\"", "\"!#TES#STRI?NG\"", "testint2"), - Seq(Row(1, "Unicode", 42), Row(2, "Mario", 3), Row(3, "Luigi", 42)) - ) - }*/ + }