diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FDNOpenManyChannelsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/FDNOpenManyChannelsIT.java new file mode 100644 index 000000000..5450c1973 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FDNOpenManyChannelsIT.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import org.junit.Before; + +public class FDNOpenManyChannelsIT extends OpenManyChannelsITBase { + @Before + public void before() throws Exception { + super.setUp(false); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergOpenManyChannelsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergOpenManyChannelsIT.java new file mode 100644 index 000000000..74d542ec0 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergOpenManyChannelsIT.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import net.snowflake.ingest.IcebergIT; +import org.junit.Before; +import org.junit.experimental.categories.Category; + +@Category(IcebergIT.class) +public class IcebergOpenManyChannelsIT extends OpenManyChannelsITBase { + @Before + public void before() throws Exception { + super.setUp(true); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsITBase.java similarity index 87% rename from src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsIT.java rename to src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsITBase.java index b27a76396..6deab44a8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsITBase.java @@ -1,6 +1,11 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.ROLE; +import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; import java.sql.Connection; import java.util.ArrayList; @@ -22,13 +27,12 @@ import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Ignore; import org.junit.Test; /** Tries to open several thousand channels into the same table from multiple threads in parallel */ @Ignore("Will be reimplemented in dew: SNOW-807102") -public class OpenManyChannelsIT { +public abstract class OpenManyChannelsITBase { private static final int THREAD_COUNT = 20; private static final int CHANNELS_PER_THREAD = 250; private static final String SCHEMA_NAME = "PUBLIC"; @@ -40,18 +44,29 @@ public class OpenManyChannelsIT { private SnowflakeStreamingIngestClient client; - @Before - public void setUp() throws Exception { + public void setUp(boolean enableIcebergStreaming) throws Exception { databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", RandomStringUtils.randomNumeric(9)); conn = TestUtils.getConnection(true); conn.createStatement().execute(String.format("create or replace database %s;", databaseName)); - conn.createStatement() - .execute( - String.format( - "create or replace table %s.%s.%s (col int)", - databaseName, SCHEMA_NAME, TABLE_NAME)); + if (enableIcebergStreaming) { + conn.createStatement() + .execute( + String.format( + "create or replace iceberg table %s.%s.%s (col int)\n" + + "catalog = 'SNOWFLAKE'\n" + + "external_volume = 'streaming_ingest'\n" + + "base_location = 'SDK_IT/%s/%s';", + databaseName, SCHEMA_NAME, TABLE_NAME, databaseName, TABLE_NAME)); + } else { + conn.createStatement() + .execute( + String.format( + "create or replace table %s.%s.%s (col int)", + databaseName, SCHEMA_NAME, TABLE_NAME)); + } Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); + props.setProperty(ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming)); if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { props.setProperty(ROLE, "ACCOUNTADMIN"); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/StreamingIngestBigFilesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java similarity index 98% rename from src/test/java/net/snowflake/ingest/streaming/internal/it/StreamingIngestBigFilesIT.java rename to src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java index 485ad1b33..df4ffa341 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/StreamingIngestBigFilesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal.it; import static net.snowflake.ingest.TestUtils.verifyTableRowCount; @@ -36,7 +40,7 @@ /** Ingest large amount of rows. */ @RunWith(Parameterized.class) -public class StreamingIngestBigFilesIT { +public class FDNBigFilesIT { private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB"; private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA"; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNManyTablesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNManyTablesIT.java new file mode 100644 index 000000000..e04ef79ae --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNManyTablesIT.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal.it; + +import org.junit.Before; + +public class FDNManyTablesIT extends ManyTablesITBase { + @Before + public void before() throws Exception { + super.setUp(false); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergManyTablesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergManyTablesIT.java new file mode 100644 index 000000000..21561abfd --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergManyTablesIT.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal.it; + +import net.snowflake.ingest.IcebergIT; +import org.junit.Before; +import org.junit.experimental.categories.Category; + +@Category(IcebergIT.class) +public class IcebergManyTablesIT extends ManyTablesITBase { + @Before + public void before() throws Exception { + super.setUp(true); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/ManyTablesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/ManyTablesITBase.java similarity index 79% rename from src/test/java/net/snowflake/ingest/streaming/internal/it/ManyTablesIT.java rename to src/test/java/net/snowflake/ingest/streaming/internal/it/ManyTablesITBase.java index db9d9a5e0..bb84ab36e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/ManyTablesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/ManyTablesITBase.java @@ -21,14 +21,13 @@ import net.snowflake.ingest.utils.ParameterProvider; import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; /** * Verified that ingestion work when we ingest into large number of tables from the same client and * blobs and registration requests have to be cut, so they don't contain large number of chunks */ -public class ManyTablesIT { +public abstract class ManyTablesITBase { private static final int TABLES_COUNT = 20; private static final int TOTAL_ROWS_COUNT = 200_000; @@ -38,11 +37,13 @@ public class ManyTablesIT { private SnowflakeStreamingIngestChannel[] channels; private String[] offsetTokensPerChannel; - @Before - public void setUp() throws Exception { + public void setUp(boolean enableIcebergStreaming) throws Exception { Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); - props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2); + if (!enableIcebergStreaming) { + props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2); + } props.put(ParameterProvider.MAX_CHUNKS_IN_REGISTRATION_REQUEST, 2); + props.put(ParameterProvider.ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming)); if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { props.setProperty(ROLE, "ACCOUNTADMIN"); } @@ -57,7 +58,21 @@ public void setUp() throws Exception { String[] tableNames = new String[TABLES_COUNT]; for (int i = 0; i < tableNames.length; i++) { tableNames[i] = String.format("table_%d", i); - connection.createStatement().execute(String.format("create table table_%d(c int);", i)); + if (enableIcebergStreaming) { + connection + .createStatement() + .execute( + String.format( + "create or replace iceberg table table_%s(c int)" + + "catalog = 'SNOWFLAKE' " + + "external_volume = 'streaming_ingest' " + + "base_location = 'SDK_IT/%s/%s'", + i, dbName, tableNames[i])); + } else { + connection + .createStatement() + .execute(String.format("create or replace table table_%d(c int);", i)); + } channels[i] = client.openChannel( OpenChannelRequest.builder(String.format("channel-%d", i))