diff --git a/delta-lake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormatUtils.scala b/delta-lake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormatUtils.scala index 101a82da830..1ade53b21b9 100644 --- a/delta-lake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormatUtils.scala +++ b/delta-lake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormatUtils.scala @@ -17,8 +17,8 @@ package com.nvidia.spark.rapids.delta import ai.rapids.cudf.{ColumnVector => CudfColumnVector, Scalar, Table} +import com.nvidia.spark.rapids.{GpuColumnVector, GpuMetric} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} -import com.nvidia.spark.rapids.GpuColumnVector import org.roaringbitmap.longlong.{PeekableLongIterator, Roaring64Bitmap} import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType} @@ -53,7 +53,9 @@ object GpuDeltaParquetFileFormatUtils { schema: StructType, delVector: Option[Roaring64Bitmap], input: Iterator[ColumnarBatch], - maxBatchSize: Int): Iterator[ColumnarBatch] = { + maxBatchSize: Int, + delVectorScatterTimeMetric: GpuMetric + ): Iterator[ColumnarBatch] = { val metadataRowIndexCol = schema.fieldNames.indexOf(METADATA_ROW_IDX_COL) val delRowIdx = schema.fieldNames.indexOf(METADATA_ROW_DEL_COL) if (metadataRowIndexCol == -1 && delRowIdx == -1) { @@ -74,20 +76,31 @@ object GpuDeltaParquetFileFormatUtils { Some(delRowIdx) } val newBatch = addMetadataColumns(rowIdxCol, delRowIdx2, delVector,maxBatchSize, - rowIndex, batch) + rowIndex, batch, delVectorScatterTimeMetric) rowIndex += batch.numRows() newBatch } } } + private def createFalseTable(numRows: Int): Table = { + withResource(Scalar.fromBool(false)) { s => + withResource(CudfColumnVector.fromScalar(s, numRows)) { c => + new Table(c) + } + } + } + + private def addMetadataColumns( rowIdxPos: Option[Int], delRowIdx: Option[Int], delVec: Option[Roaring64Bitmap], maxBatchSize: Int, rowIdxStart: Long, - batch: ColumnarBatch): ColumnarBatch = { + batch: ColumnarBatch, + delVectorScatterTimeMetric: GpuMetric, + ): ColumnarBatch = { val rowIdxCol = rowIdxPos.map { _ => withResource(Scalar.fromLong(rowIdxStart)) { start => GpuColumnVector.from(CudfColumnVector.sequence(start, batch.numRows()), @@ -98,30 +111,26 @@ object GpuDeltaParquetFileFormatUtils { closeOnExcept(rowIdxCol) { rowIdxCol => val delVecCol = delVec.map { delVec => - withResource(Scalar.fromBool(false)) { s => - withResource(CudfColumnVector.fromScalar(s, batch.numRows())) { c => - var table = new Table(c) - val posIter = new RoaringBitmapIterator( - delVec.getLongIteratorFrom(rowIdxStart), - rowIdxStart, - rowIdxStart + batch.numRows(), - ).grouped(Math.min(maxBatchSize, batch.numRows())) - - for (posChunk <- posIter) { - withResource(CudfColumnVector.fromLongs(posChunk: _*)) { poses => - withResource(Scalar.fromBool(true)) { s => - table = withResource(table) { _ => + delVectorScatterTimeMetric.ns { + val table = new RoaringBitmapIterator( + delVec.getLongIteratorFrom(rowIdxStart), + rowIdxStart, + rowIdxStart + batch.numRows()) + .grouped(Math.min(maxBatchSize, batch.numRows())) + .foldLeft(createFalseTable(batch.numRows())){ (table, posChunk) => + withResource(table) { _ => + withResource(CudfColumnVector.fromLongs(posChunk: _*)) { poses => + withResource(Scalar.fromBool(true)) { s => Table.scatter(Array(s), poses, table) } } } } - withResource(table) { _ => - GpuColumnVector.from(table.getColumn(0).incRefCount(), - METADATA_ROW_DEL_FIELD.dataType) - } - } + withResource(table) { _ => + GpuColumnVector.from(table.getColumn(0).incRefCount(), + METADATA_ROW_DEL_FIELD.dataType) + } } } diff --git a/delta-lake/delta-24x/src/main/scala/com/nvidia/spark/rapids/delta/delta24x/GpuDelta24xParquetFileFormat.scala b/delta-lake/delta-24x/src/main/scala/com/nvidia/spark/rapids/delta/delta24x/GpuDelta24xParquetFileFormat.scala index ef579d78e6f..77891864537 100644 --- a/delta-lake/delta-24x/src/main/scala/com/nvidia/spark/rapids/delta/delta24x/GpuDelta24xParquetFileFormat.scala +++ b/delta-lake/delta-24x/src/main/scala/com/nvidia/spark/rapids/delta/delta24x/GpuDelta24xParquetFileFormat.scala @@ -88,15 +88,22 @@ case class GpuDelta24xParquetFileFormat( val maxDelVecScatterBatchSize = RapidsConf .DELTA_LOW_SHUFFLE_MERGE_SCATTER_DEL_VECTOR_BATCH_SIZE .get(sparkSession.sessionState.conf) + val delVecScatterTimeMetric = metrics(GpuMetric.DELETION_VECTOR_SCATTER_TIME) + val delVecSizeMetric = metrics(GpuMetric.DELETION_VECTOR_SIZE) + (file: PartitionedFile) => { val input = dataReader(file) val dv = delVecs.flatMap(_.value.get(new URI(file.filePath.toString()))) - .map(dv => RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner) + .map { dv => + delVecSizeMetric += dv.descriptor.inlineData.length + RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner + } addMetadataColumnToIterator(prepareSchema(requiredSchema), dv, input.asInstanceOf[Iterator[ColumnarBatch]], - maxDelVecScatterBatchSize) + maxDelVecScatterBatchSize, + delVecScatterTimeMetric) .asInstanceOf[Iterator[InternalRow]] } } diff --git a/delta-lake/delta-spark341db/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormat.scala b/delta-lake/delta-spark341db/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormat.scala index 604ed826397..e109b81f1e5 100644 --- a/delta-lake/delta-spark341db/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormat.scala +++ b/delta-lake/delta-spark341db/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormat.scala @@ -85,14 +85,21 @@ case class GpuDeltaParquetFileFormat( .DELTA_LOW_SHUFFLE_MERGE_SCATTER_DEL_VECTOR_BATCH_SIZE .get(sparkSession.sessionState.conf) + val delVecScatterTimeMetric = metrics(GpuMetric.DELETION_VECTOR_SCATTER_TIME) + val delVecSizeMetric = metrics(GpuMetric.DELETION_VECTOR_SIZE) + (file: PartitionedFile) => { val input = dataReader(file) val dv = delVecs.flatMap(_.value.get(new URI(file.filePath.toString()))) - .map(dv => RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner) + .map { dv => + delVecSizeMetric += dv.descriptor.inlineData.length + RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner + } addMetadataColumnToIterator(prepareSchema(requiredSchema), dv, input.asInstanceOf[Iterator[ColumnarBatch]], - maxDelVecScatterBatchSize + maxDelVecScatterBatchSize, + delVecScatterTimeMetric ).asInstanceOf[Iterator[InternalRow]] } } diff --git a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_no_rdma b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_no_rdma index fe5c64b1dfc..b78a25feed7 100644 --- a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_no_rdma +++ b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_no_rdma @@ -24,7 +24,7 @@ # - ROCKY_VER: Rocky Linux OS version ARG CUDA_VER=11.8.0 -ARG UCX_VER=1.16.0 +ARG UCX_VER=1.17.0 ARG UCX_CUDA_VER=11 ARG UCX_ARCH=x86_64 ARG ROCKY_VER=8 diff --git a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_rdma b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_rdma index f88c4212a92..7e15218bf5a 100644 --- a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_rdma +++ b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_rdma @@ -24,7 +24,7 @@ # - ROCKY_VER: Rocky Linux OS version ARG CUDA_VER=11.8.0 -ARG UCX_VER=1.16.0 +ARG UCX_VER=1.17.0 ARG UCX_CUDA_VER=11 ARG UCX_ARCH=x86_64 ARG ROCKY_VER=8 diff --git a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_no_rdma b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_no_rdma index 792e7848e56..8bb01f7b1ff 100644 --- a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_no_rdma +++ b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_no_rdma @@ -25,7 +25,7 @@ # ARG CUDA_VER=11.8.0 -ARG UCX_VER=1.16.0 +ARG UCX_VER=1.17.0 ARG UCX_CUDA_VER=11 ARG UCX_ARCH=x86_64 ARG UBUNTU_VER=20.04 diff --git a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_rdma b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_rdma index 42014c67251..044e4b7411c 100644 --- a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_rdma +++ b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_rdma @@ -35,7 +35,7 @@ ARG RDMA_CORE_VERSION=32.1 ARG CUDA_VER=11.8.0 -ARG UCX_VER=1.16.0 +ARG UCX_VER=1.17.0 ARG UCX_CUDA_VER=11 ARG UCX_ARCH=x86_64 ARG UBUNTU_VER=20.04 diff --git a/integration_tests/requirements.txt b/integration_tests/requirements.txt index 570d01f5119..602f6b4943b 100644 --- a/integration_tests/requirements.txt +++ b/integration_tests/requirements.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,4 +17,5 @@ pandas pyarrow pytest-xdist >= 2.0.0 findspark -fastparquet == 0.8.3 \ No newline at end of file +fastparquet == 0.8.3 ; python_version == '3.8' +fastparquet == 2024.5.0 ; python_version >= '3.9' diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 4b0fc2827f4..bdfb7d6e35c 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -30,8 +30,6 @@ def fastparquet_unavailable(): return False except ImportError: return True - except ValueError: # TODO: remove when https://github.com/NVIDIA/spark-rapids/issues/11070 is fixed - return True rebase_write_corrected_conf = { diff --git a/integration_tests/src/main/python/hive_parquet_write_test.py b/integration_tests/src/main/python/hive_parquet_write_test.py index d824bf7f4a0..7dc10d64e49 100644 --- a/integration_tests/src/main/python/hive_parquet_write_test.py +++ b/integration_tests/src/main/python/hive_parquet_write_test.py @@ -192,6 +192,7 @@ def write_to_hive_sql(spark, output_table): _write_to_hive_conf) +@allow_non_gpu(*non_utc_allow) @pytest.mark.skipif(is_before_spark_330() or (is_databricks_runtime() and not is_databricks122_or_later()), reason="InsertIntoHiveTable supports bucketed write since Spark 330") def test_insert_hive_bucketed_table(spark_tmp_table_factory): diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 7928f6a5a5b..e21ba622f46 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -1492,3 +1492,14 @@ def test_parquet_column_name_with_dots(spark_tmp_path, reader_confs): assert_gpu_and_cpu_are_equal_collect(lambda spark: reader(spark).selectExpr("`a.b`"), conf=all_confs) assert_gpu_and_cpu_are_equal_collect(lambda spark: reader(spark).selectExpr("`a.b`.`c.d.e`.`f.g`"), conf=all_confs) + +def test_parquet_partition_batch_row_count_only_splitting(spark_tmp_path): + data_path = spark_tmp_path + "/PARQUET_DATA" + def setup_table(spark): + spark.range(1000).withColumn("p", f.lit("x")).coalesce(1)\ + .write\ + .partitionBy("p")\ + .parquet(data_path) + with_cpu_session(lambda spark: setup_table(spark)) + assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.read.parquet(data_path).select("p"), + conf={"spark.rapids.sql.columnSizeBytes": "100"}) diff --git a/integration_tests/src/main/python/subquery_test.py b/integration_tests/src/main/python/subquery_test.py index e6d641d4212..5fb73ff2367 100644 --- a/integration_tests/src/main/python/subquery_test.py +++ b/integration_tests/src/main/python/subquery_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2022, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,122 +13,185 @@ # limitations under the License. import pytest -from asserts import assert_gpu_and_cpu_are_equal_sql +from asserts import assert_gpu_and_cpu_are_equal_sql, assert_gpu_and_cpu_error +from conftest import spark_tmp_table_factory from data_gen import * from marks import * @ignore_order(local=True) @pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn) -def test_scalar_subquery_basics(data_gen): +def test_scalar_subquery_basics(spark_tmp_table_factory, data_gen): # Fix num_slices at 1 to make sure that first/last returns same results under CPU and GPU. + table_name = spark_tmp_table_factory.get() assert_gpu_and_cpu_are_equal_sql( lambda spark: gen_df(spark, [('a', data_gen)], num_slices=1), - 'table', - '''select a, (select last(a) from table) - from table - where a > (select first(a) from table) + table_name, + f'''select a, (select last(a) from {table_name}) + from {table_name} + where a > (select first(a) from {table_name}) ''') + @ignore_order(local=True) @pytest.mark.parametrize('basic_gen', all_basic_gens, ids=idfn) -def test_scalar_subquery_struct(basic_gen): +def test_scalar_subquery_struct(spark_tmp_table_factory, basic_gen): # single-level struct gen = [('ss', StructGen([['a', basic_gen], ['b', basic_gen]]))] + table_name = spark_tmp_table_factory.get() assert_gpu_and_cpu_are_equal_sql( # Fix num_slices at 1 to make sure that first/last returns same results under CPU and GPU. lambda spark: gen_df(spark, gen, num_slices=1), - 'table', - '''select ss, (select last(ss) from table) - from table - where (select first(ss) from table).b > ss.a + table_name, + f'''select ss, (select last(ss) from {table_name}) + from {table_name} + where (select first(ss) from {table_name}).b > ss.a ''') + # nested struct gen = [('ss', StructGen([['child', StructGen([['c0', basic_gen]])]]))] assert_gpu_and_cpu_are_equal_sql( # Fix num_slices at 1 to make sure that first/last returns same results under CPU and GPU. lambda spark: gen_df(spark, gen, num_slices=1), - 'table', - '''select ss, (select last(ss) from table) - from table - where (select first(ss) from table)['child']['c0'] > ss.child.c0 + table_name, + f'''select ss, (select last(ss) from {table_name}) + from {table_name} + where (select first(ss) from {table_name})['child']['c0'] > ss.child.c0 ''') + # struct of array - gen = [('ss', StructGen([['arr', ArrayGen(basic_gen)]]))] + # Note: The test query accesses the first two elements of the array. The datagen is set up + # to generate arrays of a minimum of two elements. Otherwise, the test will fail in ANSI mode. + # No meaningful test coverage is lost. Accessing invalid indices of arrays is already tested + # as part of array_test.py::test_array_item_ansi_fail_invalid_index. + gen = [('ss', StructGen([['arr', ArrayGen(basic_gen, min_length=2)]]))] assert_gpu_and_cpu_are_equal_sql( # Fix num_slices at 1 to make sure that first/last returns same results under CPU and GPU. lambda spark: gen_df(spark, gen, length=100, num_slices=1), - 'table', - '''select sort_array(ss.arr), sort_array((select last(ss) from table)['arr']) - from table - where (select first(ss) from table).arr[0] > ss.arr[1] + table_name, + f'''select sort_array(ss.arr), sort_array((select last(ss) from {table_name})['arr']) + from {table_name} + where (select first(ss) from {table_name}).arr[0] > ss.arr[1] ''') + @ignore_order(local=True) +@pytest.mark.parametrize('is_ansi_enabled', [False, True]) @pytest.mark.parametrize('basic_gen', all_basic_gens, ids=idfn) -def test_scalar_subquery_array(basic_gen): +def test_scalar_subquery_array(spark_tmp_table_factory, is_ansi_enabled, basic_gen): + """ + For this test, all the array inputs are sized so that ArrayIndexOutOfBounds conditions are + avoided. This is to ensure that the tests don't fail with exceptions in ANSI mode. + Note that no meaningful test coverage is lost here. ArrayIndexOutOfBounds exceptions are + already tested as part of array_test.py::test_array_item_ansi_fail_invalid_index. + """ + conf = {'spark.sql.ansi.enabled': is_ansi_enabled} + table_name = spark_tmp_table_factory.get() + # single-level array + test_array_gen = ArrayGen(basic_gen, min_length=1 if is_ansi_enabled else 0) assert_gpu_and_cpu_are_equal_sql( # Fix num_slices at 1 to make sure that first/last returns same results under CPU and GPU. - lambda spark: gen_df(spark, [('arr', ArrayGen(basic_gen))], num_slices=1), - 'table', - '''select sort_array(arr), - sort_array((select last(arr) from table)) - from table - where (select first(arr) from table)[0] > arr[0] - ''') + lambda spark: gen_df(spark, [('arr', test_array_gen)], num_slices=1), + table_name, + f'''select sort_array(arr), + sort_array((select last(arr) from {table_name})) + from {table_name} + where (select first(arr) from {table_name})[0] > arr[0] + ''', + conf=conf) + # nested array + test_array_gen = ArrayGen(ArrayGen(basic_gen, min_length=2 if is_ansi_enabled else 0), + min_length=11 if is_ansi_enabled else 0) assert_gpu_and_cpu_are_equal_sql( # Fix num_slices at 1 to make sure that first/last returns same results under CPU and GPU. - lambda spark: gen_df(spark, [('arr', ArrayGen(ArrayGen(basic_gen)))] - , length=100 - , num_slices=1), - 'table', - '''select sort_array(arr[10]), - sort_array((select last(arr) from table)[10]) - from table - where (select first(arr) from table)[0][1] > arr[0][1] - ''') + lambda spark: gen_df(spark, [('arr', test_array_gen)], length=100, num_slices=1), + table_name, + f'''select sort_array(arr[10]), + sort_array((select last(arr) from {table_name})[10]) + from {table_name} + where (select first(arr) from {table_name})[0][1] > arr[0][1] + ''', + conf=conf) + # array of struct + test_array_gen = ArrayGen(StructGen([['a', basic_gen]]), min_length=11 if is_ansi_enabled else 0) assert_gpu_and_cpu_are_equal_sql( # Fix num_slices at 1 to make sure that first/last returns same results under CPU and GPU. - lambda spark: gen_df(spark, [('arr', ArrayGen(StructGen([['a', basic_gen]])))] - , length=100 - , num_slices=1), - 'table', - '''select arr[10].a, (select last(arr) from table)[10].a - from table - where (select first(arr) from table)[0].a > arr[0].a - ''') + lambda spark: gen_df(spark, [('arr', test_array_gen)], length=100, num_slices=1), + table_name, + f'''select arr[10].a, (select last(arr) from {table_name})[10].a + from {table_name} + where (select first(arr) from {table_name})[0].a > arr[0].a + ''', + conf=conf) + + +@ignore_order(local=True) +def test_scalar_subquery_array_ansi_mode_failures(spark_tmp_table_factory): + """ + This tests the case where the array scalar returned from a subquery might be indexed into + with an out-of-range index value. With ANSI mode enabled, an exception is expected. + + A more thorough test for invalid indices is done in array_test.py::test_array_item_ansi_fail_invalid_index, + and is out of the scope of this test. + """ + table_name = spark_tmp_table_factory.get() + + def test_function(spark): + # Fix num_slices at 1 to make sure that first/last returns same results under CPU and GPU. + df = gen_df(spark, [('arr', ArrayGen(long_gen))], num_slices=1) + df.createOrReplaceTempView(table_name) + query = f''' + SELECT SORT_ARRAY(arr), + SORT_ARRAY((SELECT LAST(arr) FROM {table_name})) + FROM {table_name} + WHERE (SELECT CAST(ARRAY() AS ARRAY))[0] > arr[0] + ''' + return spark.sql(query) + + assert_gpu_and_cpu_error( + lambda spark: test_function(spark).collect(), + conf=ansi_enabled_conf, + error_message='ArrayIndexOutOfBoundsException') + @ignore_order(local=True) -def test_scalar_subquery_map(): +def test_scalar_subquery_map(spark_tmp_table_factory): + # Note: For this test, all the array inputs are sized so that ArrayIndexOutOfBounds conditions are + # avoided. This is to ensure that the tests don't fail with exceptions in ANSI mode. + # Note that no meaningful test coverage is lost here. ArrayIndexOutOfBounds exceptions are + # already tested as part of array_test.py::test_array_item_ansi_fail_invalid_index. + table_name = spark_tmp_table_factory.get() map_gen = map_string_string_gen[0] assert_gpu_and_cpu_are_equal_sql( # Fix num_slices at 1 to make sure that first/last returns same results under CPU and GPU. lambda spark: gen_df(spark, [('kv', map_gen)], length=100, num_slices=1), - 'table', - '''select kv['key_0'], - (select first(kv) from table)['key_1'], - (select last(kv) from table)['key_2'] - from table + table_name, + f'''select kv['key_0'], + (select first(kv) from {table_name})['key_1'], + (select last(kv) from {table_name})['key_2'] + from {table_name} ''') + # array of map assert_gpu_and_cpu_are_equal_sql( # Fix num_slices at 1 to make sure that first/last returns same results under CPU and GPU. - lambda spark: gen_df(spark, [('arr', ArrayGen(map_gen))], length=100, num_slices=1), - 'table', - '''select arr[0]['key_0'], - (select first(arr) from table)[0]['key_1'], - (select last(arr[0]) from table)['key_2'] - from table + lambda spark: gen_df(spark, [('arr', ArrayGen(map_gen, min_length=1))], length=100, num_slices=1), + table_name, + f'''select arr[0]['key_0'], + (select first(arr) from {table_name})[0]['key_1'], + (select last(arr[0]) from {table_name})['key_2'] + from {table_name} ''') + # struct of map assert_gpu_and_cpu_are_equal_sql( # Fix num_slices at 1 to make sure that first/last returns same results under CPU and GPU. lambda spark: gen_df(spark, [('ss', StructGen([['kv', map_gen]]))], length=100, num_slices=1), - 'table', - '''select ss['kv']['key_0'], - (select first(ss) from table)['kv']['key_1'], - (select last(ss.kv) from table)['key_2'] - from table + table_name, + f'''select ss['kv']['key_0'], + (select first(ss) from {table_name})['kv']['key_1'], + (select last(ss.kv) from {table_name})['key_2'] + from {table_name} ''') diff --git a/jenkins/Dockerfile-blossom.integration.rocky b/jenkins/Dockerfile-blossom.integration.rocky index 749d29ae58c..e3d83adf85e 100644 --- a/jenkins/Dockerfile-blossom.integration.rocky +++ b/jenkins/Dockerfile-blossom.integration.rocky @@ -57,7 +57,7 @@ RUN export CUDA_VER=`echo ${CUDA_VER} | cut -d '.' -f 1,2` && \ conda install -y -c conda-forge sre_yield && \ conda clean -ay # install pytest plugins for xdist parallel run -RUN python -m pip install findspark pytest-xdist pytest-order fastparquet==0.8.3 +RUN python -m pip install findspark pytest-xdist pytest-order fastparquet==2024.5.0 # Set default java as 1.8.0 ENV JAVA_HOME "/usr/lib/jvm/java-1.8.0-openjdk" diff --git a/jenkins/Dockerfile-blossom.integration.ubuntu b/jenkins/Dockerfile-blossom.integration.ubuntu index 1dd81ca68ed..6ac7a514361 100644 --- a/jenkins/Dockerfile-blossom.integration.ubuntu +++ b/jenkins/Dockerfile-blossom.integration.ubuntu @@ -1,5 +1,5 @@ # -# Copyright (c) 2020-2023, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -69,7 +69,7 @@ RUN export CUDA_VER=`echo ${CUDA_VER} | cut -d '.' -f 1,2` && \ conda install -y -c conda-forge sre_yield && \ conda clean -ay # install pytest plugins for xdist parallel run -RUN python -m pip install findspark pytest-xdist pytest-order fastparquet==0.8.3 +RUN python -m pip install findspark pytest-xdist pytest-order fastparquet==2024.5.0 RUN apt install -y inetutils-ping expect diff --git a/jenkins/Dockerfile-blossom.ubuntu b/jenkins/Dockerfile-blossom.ubuntu index 755dd3eeaa0..c5802b122af 100644 --- a/jenkins/Dockerfile-blossom.ubuntu +++ b/jenkins/Dockerfile-blossom.ubuntu @@ -21,7 +21,7 @@ # Arguments: # CUDA_VER=[11.X.Y,12.X.Y] # UBUNTU_VER=[20.04,22.0.4] -# UCX_VER=1.16.0 +# UCX_VER=1.17.0 # TARGETPLATFORM=[linux/amd64,linux/arm64] # ARCH=[amd64,arm64] # UCX_ARCH=[x86_64,aarch64] @@ -29,7 +29,7 @@ ARG CUDA_VER=11.8.0 ARG UBUNTU_VER=20.04 -ARG UCX_VER=1.16.0 +ARG UCX_VER=1.17.0 ARG TARGETPLATFORM=linux/amd64 # multi-platform build with: docker buildx build --platform linux/arm64,linux/amd64 on either amd64 or arm64 host # check available official arm-based docker images at https://hub.docker.com/r/nvidia/cuda/tags (OS/ARCH) @@ -61,7 +61,7 @@ RUN update-java-alternatives --set /usr/lib/jvm/java-1.8.0-openjdk-${ARCH} RUN ln -sfn /usr/bin/python3.9 /usr/bin/python RUN ln -sfn /usr/bin/python3.9 /usr/bin/python3 -RUN python -m pip install pytest sre_yield requests pandas pyarrow findspark pytest-xdist pre-commit pytest-order fastparquet==0.8.3 +RUN python -m pip install pytest sre_yield requests pandas pyarrow findspark pytest-xdist pre-commit pytest-order fastparquet==2024.5.0 RUN UCX_CUDA_VER=`echo ${CUDA_VER} | cut -d '.' -f1` && \ mkdir -p /tmp/ucx && \ diff --git a/jenkins/databricks/setup.sh b/jenkins/databricks/setup.sh index 63e71d3a25d..b0c5c5c65bd 100755 --- a/jenkins/databricks/setup.sh +++ b/jenkins/databricks/setup.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -52,5 +52,5 @@ PYTHON_SITE_PACKAGES="$HOME/.local/lib/${PYTHON_VERSION}/site-packages" $PYSPARK_PYTHON -m pip install --target $PYTHON_SITE_PACKAGES pytest sre_yield requests pandas pyarrow findspark pytest-xdist pytest-order # Install fastparquet (and numpy as its dependency). -$PYSPARK_PYTHON -m pip install --target $PYTHON_SITE_PACKAGES numpy -$PYSPARK_PYTHON -m pip install --target $PYTHON_SITE_PACKAGES fastparquet==0.8.3 +echo -e "fastparquet==0.8.3;python_version=='3.8'\nfastparquet==2024.5.0;python_version>='3.9'" > fastparquet.txt +$PYSPARK_PYTHON -m pip install --target $PYTHON_SITE_PACKAGES -r fastparquet.txt diff --git a/pom.xml b/pom.xml index 48ec2cca3de..6a3e94d7983 100644 --- a/pom.xml +++ b/pom.xml @@ -810,7 +810,7 @@ https://github.com/openjdk/jdk17/blob/4afbcaf55383ec2f5da53282a1547bac3d099e9d/src/jdk.compiler/share/classes/com/sun/tools/javac/resources/compiler.properties#L1993-L1994 --> -Xlint:all,-serial,-path,-try,-processing|-Werror - 1.16.0 + 1.17.0 1.0.6 ${ucx.baseVersion} diff --git a/scala2.13/pom.xml b/scala2.13/pom.xml index 8a8de83b89b..476cb07a6e5 100644 --- a/scala2.13/pom.xml +++ b/scala2.13/pom.xml @@ -810,7 +810,7 @@ https://github.com/openjdk/jdk17/blob/4afbcaf55383ec2f5da53282a1547bac3d099e9d/src/jdk.compiler/share/classes/com/sun/tools/javac/resources/compiler.properties#L1993-L1994 --> -Xlint:all,-serial,-path,-try,-processing|-Werror - 1.16.0 + 1.17.0 1.0.6 ${ucx.baseVersion} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BatchWithPartitionData.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BatchWithPartitionData.scala index f6429ddd709..d0f61d884d7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BatchWithPartitionData.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BatchWithPartitionData.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -413,6 +413,13 @@ object BatchWithPartitionDataUtils { if (splitIndices.isEmpty) { Seq(SpillableColumnarBatch(GpuColumnVector.incRefCounts(input), SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + } else if (input.numCols() == 0) { + // calculate the number of rows for each batch based on the split indices + val batchRows = + splitIndices.head +: splitIndices.zip(splitIndices.tail :+ input.numRows()).map { + case (startRow, endRow) => endRow - startRow + } + batchRows.map(numRows => new JustRowsColumnarBatch(numRows)) } else { val schema = GpuColumnVector.extractTypes(input) val splitInput = withResource(GpuColumnVector.from(input)) { table => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index 21d2de6ad68..64b08162557 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -149,30 +149,24 @@ object DumpUtils extends Logging { class ParquetDumper(private val outputStream: OutputStream, table: Table) extends HostBufferConsumer with AutoCloseable { private[this] val tempBuffer = new Array[Byte](128 * 1024) - private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]() def this(path: String, table: Table) = { this(new FileOutputStream(path), table) } - val tableWriter: TableWriter = { + private lazy val tableWriter: TableWriter = { // avoid anything conversion, just dump as it is val builder = ParquetDumper.parquetWriterOptionsFromTable(ParquetWriterOptions.builder(), table) .withCompressionType(ParquetDumper.COMPRESS_TYPE) Table.writeParquetChunked(builder.build(), this) } - override - def handleBuffer(buffer: HostMemoryBuffer, len: Long): Unit = - buffers += Tuple2(buffer, len) - - def writeBufferedData(): Unit = { - ColumnarOutputWriter.writeBufferedData(buffers, tempBuffer, outputStream) - } + override def handleBuffer(buffer: HostMemoryBuffer, len: Long): Unit = + ColumnarOutputWriter.writeBufferedData(mutable.Queue((buffer, len)), tempBuffer, + outputStream) def writeTable(table: Table): Unit = { tableWriter.write(table) - writeBufferedData() } /** @@ -181,7 +175,6 @@ class ParquetDumper(private val outputStream: OutputStream, table: Table) extend */ def close(): Unit = { tableWriter.close() - writeBufferedData() outputStream.close() } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index e93ac40b5bd..9b60cd1efca 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -83,6 +83,8 @@ object GpuMetric extends Logging { val FILECACHE_DATA_RANGE_MISSES_SIZE = "filecacheDataRangeMissesSize" val FILECACHE_FOOTER_READ_TIME = "filecacheFooterReadTime" val FILECACHE_DATA_RANGE_READ_TIME = "filecacheDataRangeReadTime" + val DELETION_VECTOR_SCATTER_TIME = "deletionVectorScatterTime" + val DELETION_VECTOR_SIZE = "deletionVectorSize" // Metric Descriptions. val DESCRIPTION_BUFFER_TIME = "buffer time" @@ -117,6 +119,8 @@ object GpuMetric extends Logging { val DESCRIPTION_FILECACHE_DATA_RANGE_MISSES_SIZE = "cached data misses size" val DESCRIPTION_FILECACHE_FOOTER_READ_TIME = "cached footer read time" val DESCRIPTION_FILECACHE_DATA_RANGE_READ_TIME = "cached data read time" + val DESCRIPTION_DELETION_VECTOR_SCATTER_TIME = "deletion vector scatter time" + val DESCRIPTION_DELETION_VECTOR_SIZE = "deletion vector size" def unwrap(input: GpuMetric): SQLMetric = input match { case w :WrappedGpuMetric => w.sqlMetric diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index a326006ab83..7195580ba69 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -424,7 +424,10 @@ case class GpuFileSourceScanExec( "filesSize" -> createSizeMetric(ESSENTIAL_LEVEL, "size of files read"), GPU_DECODE_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_GPU_DECODE_TIME), BUFFER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_BUFFER_TIME), - FILTER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_FILTER_TIME) + FILTER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_FILTER_TIME), + DELETION_VECTOR_SCATTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, + DESCRIPTION_DELETION_VECTOR_SCATTER_TIME), + DELETION_VECTOR_SIZE -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_DELETION_VECTOR_SIZE) ) ++ fileCacheMetrics ++ { relation.fileFormat match { case _: GpuReadParquetFileFormat | _: GpuOrcFileFormat =>