From 94ae7568db6531e0b024cd24f13d54ab4bcd82c5 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Thu, 21 Nov 2024 16:55:44 -0800 Subject: [PATCH 1/2] done --- .../internal/FDNOpenManyChannelsIT.java | 14 ++++ .../internal/IcebergOpenManyChannelsIT.java | 17 +++++ ...elsIT.java => OpenManyChannelsITBase.java} | 33 ++++++--- ...estBigFilesIT.java => BigFilesITBase.java} | 73 +++++++++++++------ .../streaming/internal/it/FDNBigFilesIT.java | 20 +++++ .../internal/it/FDNManyTablesIT.java | 14 ++++ .../internal/it/IcebergManyTablesIT.java | 17 +++++ ...anyTablesIT.java => ManyTablesITBase.java} | 27 +++++-- 8 files changed, 176 insertions(+), 39 deletions(-) create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/FDNOpenManyChannelsIT.java create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/IcebergOpenManyChannelsIT.java rename src/test/java/net/snowflake/ingest/streaming/internal/{OpenManyChannelsIT.java => OpenManyChannelsITBase.java} (87%) rename src/test/java/net/snowflake/ingest/streaming/internal/it/{StreamingIngestBigFilesIT.java => BigFilesITBase.java} (71%) create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/it/FDNManyTablesIT.java create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergManyTablesIT.java rename src/test/java/net/snowflake/ingest/streaming/internal/it/{ManyTablesIT.java => ManyTablesITBase.java} (79%) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FDNOpenManyChannelsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/FDNOpenManyChannelsIT.java new file mode 100644 index 000000000..5450c1973 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FDNOpenManyChannelsIT.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import org.junit.Before; + +public class FDNOpenManyChannelsIT extends OpenManyChannelsITBase { + @Before + public void before() throws Exception { + super.setUp(false); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergOpenManyChannelsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergOpenManyChannelsIT.java new file mode 100644 index 000000000..74d542ec0 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergOpenManyChannelsIT.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import net.snowflake.ingest.IcebergIT; +import org.junit.Before; +import org.junit.experimental.categories.Category; + +@Category(IcebergIT.class) +public class IcebergOpenManyChannelsIT extends OpenManyChannelsITBase { + @Before + public void before() throws Exception { + super.setUp(true); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsITBase.java similarity index 87% rename from src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsIT.java rename to src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsITBase.java index b27a76396..6deab44a8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsITBase.java @@ -1,6 +1,11 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.ROLE; +import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; import java.sql.Connection; import java.util.ArrayList; @@ -22,13 +27,12 @@ import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Ignore; import org.junit.Test; /** Tries to open several thousand channels into the same table from multiple threads in parallel */ @Ignore("Will be reimplemented in dew: SNOW-807102") -public class OpenManyChannelsIT { +public abstract class OpenManyChannelsITBase { private static final int THREAD_COUNT = 20; private static final int CHANNELS_PER_THREAD = 250; private static final String SCHEMA_NAME = "PUBLIC"; @@ -40,18 +44,29 @@ public class OpenManyChannelsIT { private SnowflakeStreamingIngestClient client; - @Before - public void setUp() throws Exception { + public void setUp(boolean enableIcebergStreaming) throws Exception { databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", RandomStringUtils.randomNumeric(9)); conn = TestUtils.getConnection(true); conn.createStatement().execute(String.format("create or replace database %s;", databaseName)); - conn.createStatement() - .execute( - String.format( - "create or replace table %s.%s.%s (col int)", - databaseName, SCHEMA_NAME, TABLE_NAME)); + if (enableIcebergStreaming) { + conn.createStatement() + .execute( + String.format( + "create or replace iceberg table %s.%s.%s (col int)\n" + + "catalog = 'SNOWFLAKE'\n" + + "external_volume = 'streaming_ingest'\n" + + "base_location = 'SDK_IT/%s/%s';", + databaseName, SCHEMA_NAME, TABLE_NAME, databaseName, TABLE_NAME)); + } else { + conn.createStatement() + .execute( + String.format( + "create or replace table %s.%s.%s (col int)", + databaseName, SCHEMA_NAME, TABLE_NAME)); + } Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); + props.setProperty(ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming)); if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { props.setProperty(ROLE, "ACCOUNTADMIN"); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/StreamingIngestBigFilesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/BigFilesITBase.java similarity index 71% rename from src/test/java/net/snowflake/ingest/streaming/internal/it/StreamingIngestBigFilesIT.java rename to src/test/java/net/snowflake/ingest/streaming/internal/it/BigFilesITBase.java index 485ad1b33..4a78111fd 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/StreamingIngestBigFilesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/BigFilesITBase.java @@ -1,8 +1,13 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal.it; import static net.snowflake.ingest.TestUtils.verifyTableRowCount; import static net.snowflake.ingest.utils.Constants.ROLE; import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM; +import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; import java.sql.Connection; import java.sql.ResultSet; @@ -27,16 +32,14 @@ import net.snowflake.ingest.utils.Constants; import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; /** Ingest large amount of rows. */ @RunWith(Parameterized.class) -public class StreamingIngestBigFilesIT { +public abstract class BigFilesITBase { private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB"; private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA"; @@ -45,16 +48,15 @@ public class StreamingIngestBigFilesIT { private SnowflakeStreamingIngestClientInternal client; private Connection jdbcConnection; private String testDb; + private boolean enableIcebergStreaming; - @Parameters(name = "{index}: {0}") - public static Object[] compressionAlgorithms() { - return new Object[] {"GZIP", "ZSTD"}; - } + @Parameter(0) + public String compressionAlgorithm; - @Parameter public String compressionAlgorithm; + @Parameter(1) + public Constants.IcebergSerializationPolicy icebergSerializationPolicy; - @Before - public void beforeAll() throws Exception { + public void beforeAll(boolean enableIcebergStreaming) throws Exception { testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4); // Create a streaming ingest client jdbcConnection = TestUtils.getConnection(true); @@ -76,9 +78,11 @@ public void beforeAll() throws Exception { prop.setProperty(ROLE, "ACCOUNTADMIN"); } prop.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm); + prop.setProperty(ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming)); client = (SnowflakeStreamingIngestClientInternal) SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(prop).build(); + this.enableIcebergStreaming = enableIcebergStreaming; } @After @@ -176,20 +180,41 @@ private void ingestRandomRowsToTable( private void createTableForTest(String tableName) { try { - jdbcConnection - .createStatement() - .execute( - String.format( - "create or replace table %s (\n" - + " num_2_1 NUMBER(2, 1),\n" - + " num_4_2 NUMBER(4, 2),\n" - + " num_9_4 NUMBER(9, 4),\n" - + " num_18_7 NUMBER(18, 7),\n" - + " num_38_15 NUMBER(38, 15),\n" - + " num_float FLOAT,\n" - + " str VARCHAR(256),\n" - + " bin BINARY(256));", - tableName)); + if (enableIcebergStreaming) { + jdbcConnection + .createStatement() + .execute( + String.format( + "create or replace iceberg table %s (\n" + + " num_2_1 decimal(2, 1),\n" + + " num_4_2 decimal(4, 2),\n" + + " num_9_4 decimal(9, 4),\n" + + " num_18_7 decimal(18, 7),\n" + + " num_38_15 decimal(38, 15),\n" + + " num_float float,\n" + + " str string,\n" + + " bin binary)\n" + + "catalog = 'SNOWFLAKE'\n" + + "external_volume = 'streaming_ingest'\n" + + "base_location = 'SDK_IT/%s/%s'\n" + + "storage_serialization_policy = %s;", + tableName, testDb, tableName, icebergSerializationPolicy.name())); + } else { + jdbcConnection + .createStatement() + .execute( + String.format( + "create or replace table %s (\n" + + " num_2_1 NUMBER(2, 1),\n" + + " num_4_2 NUMBER(4, 2),\n" + + " num_9_4 NUMBER(9, 4),\n" + + " num_18_7 NUMBER(18, 7),\n" + + " num_38_15 NUMBER(38, 15),\n" + + " num_float FLOAT,\n" + + " str VARCHAR(256),\n" + + " bin BINARY(256));", + tableName)); + } } catch (SQLException e) { throw new RuntimeException("Cannot create table " + tableName, e); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java new file mode 100644 index 000000000..181e7e6f9 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal.it; + +import org.junit.Before; +import org.junit.runners.Parameterized; + +public class FDNBigFilesIT extends BigFilesITBase { + @Parameterized.Parameters(name = "compressionAlgorithm={0}") + public static Object[][] compressionAlgorithms() { + return new Object[][] {{"GZIP", null}, {"ZSTD", null}}; + } + + @Before + public void before() throws Exception { + super.beforeAll(false); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNManyTablesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNManyTablesIT.java new file mode 100644 index 000000000..e04ef79ae --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNManyTablesIT.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal.it; + +import org.junit.Before; + +public class FDNManyTablesIT extends ManyTablesITBase { + @Before + public void before() throws Exception { + super.setUp(false); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergManyTablesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergManyTablesIT.java new file mode 100644 index 000000000..21561abfd --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergManyTablesIT.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal.it; + +import net.snowflake.ingest.IcebergIT; +import org.junit.Before; +import org.junit.experimental.categories.Category; + +@Category(IcebergIT.class) +public class IcebergManyTablesIT extends ManyTablesITBase { + @Before + public void before() throws Exception { + super.setUp(true); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/ManyTablesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/ManyTablesITBase.java similarity index 79% rename from src/test/java/net/snowflake/ingest/streaming/internal/it/ManyTablesIT.java rename to src/test/java/net/snowflake/ingest/streaming/internal/it/ManyTablesITBase.java index db9d9a5e0..bb84ab36e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/ManyTablesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/ManyTablesITBase.java @@ -21,14 +21,13 @@ import net.snowflake.ingest.utils.ParameterProvider; import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; /** * Verified that ingestion work when we ingest into large number of tables from the same client and * blobs and registration requests have to be cut, so they don't contain large number of chunks */ -public class ManyTablesIT { +public abstract class ManyTablesITBase { private static final int TABLES_COUNT = 20; private static final int TOTAL_ROWS_COUNT = 200_000; @@ -38,11 +37,13 @@ public class ManyTablesIT { private SnowflakeStreamingIngestChannel[] channels; private String[] offsetTokensPerChannel; - @Before - public void setUp() throws Exception { + public void setUp(boolean enableIcebergStreaming) throws Exception { Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); - props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2); + if (!enableIcebergStreaming) { + props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2); + } props.put(ParameterProvider.MAX_CHUNKS_IN_REGISTRATION_REQUEST, 2); + props.put(ParameterProvider.ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming)); if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { props.setProperty(ROLE, "ACCOUNTADMIN"); } @@ -57,7 +58,21 @@ public void setUp() throws Exception { String[] tableNames = new String[TABLES_COUNT]; for (int i = 0; i < tableNames.length; i++) { tableNames[i] = String.format("table_%d", i); - connection.createStatement().execute(String.format("create table table_%d(c int);", i)); + if (enableIcebergStreaming) { + connection + .createStatement() + .execute( + String.format( + "create or replace iceberg table table_%s(c int)" + + "catalog = 'SNOWFLAKE' " + + "external_volume = 'streaming_ingest' " + + "base_location = 'SDK_IT/%s/%s'", + i, dbName, tableNames[i])); + } else { + connection + .createStatement() + .execute(String.format("create or replace table table_%d(c int);", i)); + } channels[i] = client.openChannel( OpenChannelRequest.builder(String.format("channel-%d", i)) From 160c4dcc8a80cdd652d3fe772f8c531f61f1badc Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 25 Nov 2024 15:07:52 -0800 Subject: [PATCH 2/2] addresss comments --- .../streaming/internal/it/BigFilesITBase.java | 235 ------------------ .../streaming/internal/it/FDNBigFilesIT.java | 206 ++++++++++++++- 2 files changed, 200 insertions(+), 241 deletions(-) delete mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/it/BigFilesITBase.java diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/BigFilesITBase.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/BigFilesITBase.java deleted file mode 100644 index 4a78111fd..000000000 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/BigFilesITBase.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal.it; - -import static net.snowflake.ingest.TestUtils.verifyTableRowCount; -import static net.snowflake.ingest.utils.Constants.ROLE; -import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM; -import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import net.snowflake.ingest.TestUtils; -import net.snowflake.ingest.streaming.OpenChannelRequest; -import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; -import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; -import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal; -import net.snowflake.ingest.utils.Constants; -import org.junit.After; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; - -/** Ingest large amount of rows. */ -@RunWith(Parameterized.class) -public abstract class BigFilesITBase { - private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB"; - private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA"; - - private Properties prop; - - private SnowflakeStreamingIngestClientInternal client; - private Connection jdbcConnection; - private String testDb; - private boolean enableIcebergStreaming; - - @Parameter(0) - public String compressionAlgorithm; - - @Parameter(1) - public Constants.IcebergSerializationPolicy icebergSerializationPolicy; - - public void beforeAll(boolean enableIcebergStreaming) throws Exception { - testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4); - // Create a streaming ingest client - jdbcConnection = TestUtils.getConnection(true); - - jdbcConnection - .createStatement() - .execute(String.format("create or replace database %s;", testDb)); - jdbcConnection - .createStatement() - .execute(String.format("create or replace schema %s.%s;", testDb, TEST_SCHEMA)); - // Set timezone to UTC - jdbcConnection.createStatement().execute("alter session set timezone = 'UTC';"); - jdbcConnection - .createStatement() - .execute(String.format("use warehouse %s", TestUtils.getWarehouse())); - - prop = TestUtils.getProperties(Constants.BdecVersion.THREE, false); - if (prop.getProperty(ROLE).equals("DEFAULT_ROLE")) { - prop.setProperty(ROLE, "ACCOUNTADMIN"); - } - prop.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm); - prop.setProperty(ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming)); - client = - (SnowflakeStreamingIngestClientInternal) - SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(prop).build(); - this.enableIcebergStreaming = enableIcebergStreaming; - } - - @After - public void afterAll() throws Exception { - client.close(); - jdbcConnection.createStatement().execute(String.format("drop database %s", testDb)); - } - - @Test - public void testManyRowsMultipleChannelsToMultipleTable() - throws SQLException, ExecutionException, InterruptedException { - String tableNamePrefix = "t_big_table_"; - - int numTables = 2; - int numChannels = 4; // channels are assigned round-robin to tables. - int batchSize = 10000; - int numBatches = 10; // number of rows PER CHANNEL is batchSize * numBatches - boolean isNullable = false; - - Map tableIdToNumChannels = new HashMap<>(); - for (int i = 0; i < numChannels; i++) { - tableIdToNumChannels.put( - i % numTables, tableIdToNumChannels.getOrDefault(i % numTables, 0) + 1); - } - for (int i = 0; i < numTables; i++) { - String tableName = tableNamePrefix + i; - createTableForTest(tableName); - } - - ingestRandomRowsToTable( - tableNamePrefix, numTables, numChannels, batchSize, numBatches, isNullable); - - for (int i = 0; i < numTables; i++) { - int numChannelsToTable = tableIdToNumChannels.get(i); - verifyTableRowCount( - batchSize * numBatches * numChannelsToTable, - jdbcConnection, - testDb, - TEST_SCHEMA, - tableNamePrefix + i); - - // select * to ensure scanning works - ResultSet result = - jdbcConnection - .createStatement() - .executeQuery( - String.format( - "select * from %s.%s.%s", testDb, TEST_SCHEMA, tableNamePrefix + i)); - result.next(); - Assert.assertNotNull(result.getString("STR")); - } - } - - private void ingestRandomRowsToTable( - String tablePrefix, - int numTables, - int numChannels, - int batchSize, - int iterations, - boolean isNullable) - throws ExecutionException, InterruptedException { - - final List> rows = Collections.synchronizedList(new ArrayList<>()); - for (int i = 0; i < batchSize; i++) { - Random r = new Random(); - rows.add(TestUtils.getRandomRow(r, isNullable)); - } - - ExecutorService testThreadPool = Executors.newFixedThreadPool(numChannels); - CompletableFuture[] futures = new CompletableFuture[numChannels]; - List channelList = - Collections.synchronizedList(new ArrayList<>()); - for (int i = 0; i < numChannels; i++) { - final String channelName = "CHANNEL" + i; - int finalI = i; - futures[i] = - CompletableFuture.runAsync( - () -> { - int targetTable = finalI % numTables; - SnowflakeStreamingIngestChannel channel = - openChannel(tablePrefix + targetTable, channelName); - channelList.add(channel); - for (int val = 0; val < iterations; val++) { - TestUtils.verifyInsertValidationResponse( - channel.insertRows(rows, Integer.toString(val))); - } - }, - testThreadPool); - } - CompletableFuture joined = CompletableFuture.allOf(futures); - joined.get(); - channelList.forEach(channel -> TestUtils.waitChannelFlushed(channel, iterations)); - testThreadPool.shutdown(); - } - - private void createTableForTest(String tableName) { - try { - if (enableIcebergStreaming) { - jdbcConnection - .createStatement() - .execute( - String.format( - "create or replace iceberg table %s (\n" - + " num_2_1 decimal(2, 1),\n" - + " num_4_2 decimal(4, 2),\n" - + " num_9_4 decimal(9, 4),\n" - + " num_18_7 decimal(18, 7),\n" - + " num_38_15 decimal(38, 15),\n" - + " num_float float,\n" - + " str string,\n" - + " bin binary)\n" - + "catalog = 'SNOWFLAKE'\n" - + "external_volume = 'streaming_ingest'\n" - + "base_location = 'SDK_IT/%s/%s'\n" - + "storage_serialization_policy = %s;", - tableName, testDb, tableName, icebergSerializationPolicy.name())); - } else { - jdbcConnection - .createStatement() - .execute( - String.format( - "create or replace table %s (\n" - + " num_2_1 NUMBER(2, 1),\n" - + " num_4_2 NUMBER(4, 2),\n" - + " num_9_4 NUMBER(9, 4),\n" - + " num_18_7 NUMBER(18, 7),\n" - + " num_38_15 NUMBER(38, 15),\n" - + " num_float FLOAT,\n" - + " str VARCHAR(256),\n" - + " bin BINARY(256));", - tableName)); - } - } catch (SQLException e) { - throw new RuntimeException("Cannot create table " + tableName, e); - } - } - - private SnowflakeStreamingIngestChannel openChannel(String tableName, String channelName) { - OpenChannelRequest request = - OpenChannelRequest.builder(channelName) - .setDBName(testDb) - .setSchemaName(TEST_SCHEMA) - .setTableName(tableName) - .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) - .build(); - - // Open a streaming ingest channel from the given client - return client.openChannel(request); - } -} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java index 181e7e6f9..df4ffa341 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java @@ -4,17 +4,211 @@ package net.snowflake.ingest.streaming.internal.it; +import static net.snowflake.ingest.TestUtils.verifyTableRowCount; +import static net.snowflake.ingest.utils.Constants.ROLE; +import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import net.snowflake.ingest.TestUtils; +import net.snowflake.ingest.streaming.OpenChannelRequest; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; +import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal; +import net.snowflake.ingest.utils.Constants; +import org.junit.After; +import org.junit.Assert; import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Ingest large amount of rows. */ +@RunWith(Parameterized.class) +public class FDNBigFilesIT { + private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB"; + private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA"; + + private Properties prop; + + private SnowflakeStreamingIngestClientInternal client; + private Connection jdbcConnection; + private String testDb; -public class FDNBigFilesIT extends BigFilesITBase { - @Parameterized.Parameters(name = "compressionAlgorithm={0}") - public static Object[][] compressionAlgorithms() { - return new Object[][] {{"GZIP", null}, {"ZSTD", null}}; + @Parameters(name = "{index}: {0}") + public static Object[] compressionAlgorithms() { + return new Object[] {"GZIP", "ZSTD"}; } + @Parameter public String compressionAlgorithm; + @Before - public void before() throws Exception { - super.beforeAll(false); + public void beforeAll() throws Exception { + testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4); + // Create a streaming ingest client + jdbcConnection = TestUtils.getConnection(true); + + jdbcConnection + .createStatement() + .execute(String.format("create or replace database %s;", testDb)); + jdbcConnection + .createStatement() + .execute(String.format("create or replace schema %s.%s;", testDb, TEST_SCHEMA)); + // Set timezone to UTC + jdbcConnection.createStatement().execute("alter session set timezone = 'UTC';"); + jdbcConnection + .createStatement() + .execute(String.format("use warehouse %s", TestUtils.getWarehouse())); + + prop = TestUtils.getProperties(Constants.BdecVersion.THREE, false); + if (prop.getProperty(ROLE).equals("DEFAULT_ROLE")) { + prop.setProperty(ROLE, "ACCOUNTADMIN"); + } + prop.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm); + client = + (SnowflakeStreamingIngestClientInternal) + SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(prop).build(); + } + + @After + public void afterAll() throws Exception { + client.close(); + jdbcConnection.createStatement().execute(String.format("drop database %s", testDb)); + } + + @Test + public void testManyRowsMultipleChannelsToMultipleTable() + throws SQLException, ExecutionException, InterruptedException { + String tableNamePrefix = "t_big_table_"; + + int numTables = 2; + int numChannels = 4; // channels are assigned round-robin to tables. + int batchSize = 10000; + int numBatches = 10; // number of rows PER CHANNEL is batchSize * numBatches + boolean isNullable = false; + + Map tableIdToNumChannels = new HashMap<>(); + for (int i = 0; i < numChannels; i++) { + tableIdToNumChannels.put( + i % numTables, tableIdToNumChannels.getOrDefault(i % numTables, 0) + 1); + } + for (int i = 0; i < numTables; i++) { + String tableName = tableNamePrefix + i; + createTableForTest(tableName); + } + + ingestRandomRowsToTable( + tableNamePrefix, numTables, numChannels, batchSize, numBatches, isNullable); + + for (int i = 0; i < numTables; i++) { + int numChannelsToTable = tableIdToNumChannels.get(i); + verifyTableRowCount( + batchSize * numBatches * numChannelsToTable, + jdbcConnection, + testDb, + TEST_SCHEMA, + tableNamePrefix + i); + + // select * to ensure scanning works + ResultSet result = + jdbcConnection + .createStatement() + .executeQuery( + String.format( + "select * from %s.%s.%s", testDb, TEST_SCHEMA, tableNamePrefix + i)); + result.next(); + Assert.assertNotNull(result.getString("STR")); + } + } + + private void ingestRandomRowsToTable( + String tablePrefix, + int numTables, + int numChannels, + int batchSize, + int iterations, + boolean isNullable) + throws ExecutionException, InterruptedException { + + final List> rows = Collections.synchronizedList(new ArrayList<>()); + for (int i = 0; i < batchSize; i++) { + Random r = new Random(); + rows.add(TestUtils.getRandomRow(r, isNullable)); + } + + ExecutorService testThreadPool = Executors.newFixedThreadPool(numChannels); + CompletableFuture[] futures = new CompletableFuture[numChannels]; + List channelList = + Collections.synchronizedList(new ArrayList<>()); + for (int i = 0; i < numChannels; i++) { + final String channelName = "CHANNEL" + i; + int finalI = i; + futures[i] = + CompletableFuture.runAsync( + () -> { + int targetTable = finalI % numTables; + SnowflakeStreamingIngestChannel channel = + openChannel(tablePrefix + targetTable, channelName); + channelList.add(channel); + for (int val = 0; val < iterations; val++) { + TestUtils.verifyInsertValidationResponse( + channel.insertRows(rows, Integer.toString(val))); + } + }, + testThreadPool); + } + CompletableFuture joined = CompletableFuture.allOf(futures); + joined.get(); + channelList.forEach(channel -> TestUtils.waitChannelFlushed(channel, iterations)); + testThreadPool.shutdown(); + } + + private void createTableForTest(String tableName) { + try { + jdbcConnection + .createStatement() + .execute( + String.format( + "create or replace table %s (\n" + + " num_2_1 NUMBER(2, 1),\n" + + " num_4_2 NUMBER(4, 2),\n" + + " num_9_4 NUMBER(9, 4),\n" + + " num_18_7 NUMBER(18, 7),\n" + + " num_38_15 NUMBER(38, 15),\n" + + " num_float FLOAT,\n" + + " str VARCHAR(256),\n" + + " bin BINARY(256));", + tableName)); + } catch (SQLException e) { + throw new RuntimeException("Cannot create table " + tableName, e); + } + } + + private SnowflakeStreamingIngestChannel openChannel(String tableName, String channelName) { + OpenChannelRequest request = + OpenChannelRequest.builder(channelName) + .setDBName(testDb) + .setSchemaName(TEST_SCHEMA) + .setTableName(tableName) + .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) + .build(); + + // Open a streaming ingest channel from the given client + return client.openChannel(request); } }