Skip to content

Commit

Permalink
update mutation code (#1453)
Browse files Browse the repository at this point in the history
  • Loading branch information
prabhaarya authored Feb 24, 2025
1 parent e29364e commit d284969
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 40 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,30 @@


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{DataFrame, Row}

object SparkPipelineExample {
def createSparkSession(appName: String): SparkSession = {
SparkSession
.builder()
.appName(appName)
.config("spark.master", "local") // You can adjust this based on your cluster setup
.config("spark.executor.memory", "8g")
.config("spark.driver.memory", "4g")
.config("spark.executor.instances", "13")
.config("spark.executor.cores", "2")
.getOrCreate()
}

def readDataFrameFromBigQuery(spark: SparkSession, prj: String, bqDataset: String, bqTable: String): DataFrame = {
val old_df = spark.read.format("bigquery").load(s"$prj.$bqDataset.$bqTable")
old_df.createOrReplaceTempView("old_df")
spark.sql("SELECT * FROM old_df")
}

def executePipeline(prj: String, bqDataset: String, bqTable: String, inst: String, db: String, tbl: String): Unit = {
val spark = SpannerUtils.createSparkSession("spark-spanner-demo")
val df = SpannerUtils.readDataFrameFromBigQuery(spark, prj, bqDataset, bqTable)
val spark = createSparkSession("spark-spanner-demo")
val df = readDataFrameFromBigQuery(spark, prj, bqDataset, bqTable)

val spannerMutations = new SpannerMutations(prj, inst, db, tbl)
spannerMutations.write(df)
Expand Down

0 comments on commit d284969

Please sign in to comment.