Skip to content

Commit

Permalink
done
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Nov 22, 2024
1 parent 59ce9e0 commit 9448f1c
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import org.junit.Before;

public class FDNOpenManyChannelsIT extends OpenManyChannelsITBase {
@Before
public void before() throws Exception {
super.setUp(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.IcebergIT;
import org.junit.Before;
import org.junit.experimental.categories.Category;

@Category(IcebergIT.class)
public class IcebergOpenManyChannelsIT extends OpenManyChannelsITBase {
@Before
public void before() throws Exception {
super.setUp(true);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.utils.Constants.ROLE;
import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING;

import java.sql.Connection;
import java.util.ArrayList;
Expand All @@ -22,13 +27,12 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/** Tries to open several thousand channels into the same table from multiple threads in parallel */
@Ignore("Will be reimplemented in dew: SNOW-807102")
public class OpenManyChannelsIT {
public abstract class OpenManyChannelsITBase {
private static final int THREAD_COUNT = 20;
private static final int CHANNELS_PER_THREAD = 250;
private static final String SCHEMA_NAME = "PUBLIC";
Expand All @@ -40,18 +44,29 @@ public class OpenManyChannelsIT {

private SnowflakeStreamingIngestClient client;

@Before
public void setUp() throws Exception {
public void setUp(boolean enableIcebergStreaming) throws Exception {
databaseName =
String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", RandomStringUtils.randomNumeric(9));
conn = TestUtils.getConnection(true);
conn.createStatement().execute(String.format("create or replace database %s;", databaseName));
conn.createStatement()
.execute(
String.format(
"create or replace table %s.%s.%s (col int)",
databaseName, SCHEMA_NAME, TABLE_NAME));
if (enableIcebergStreaming) {
conn.createStatement()
.execute(
String.format(
"create or replace iceberg table %s.%s.%s (col int)\n"
+ "catalog = 'SNOWFLAKE'\n"
+ "external_volume = 'streaming_ingest'\n"
+ "base_location = 'SDK_IT/%s/%s';",
databaseName, SCHEMA_NAME, TABLE_NAME, databaseName, TABLE_NAME));
} else {
conn.createStatement()
.execute(
String.format(
"create or replace table %s.%s.%s (col int)",
databaseName, SCHEMA_NAME, TABLE_NAME));
}
Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false);
props.setProperty(ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming));
if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) {
props.setProperty(ROLE, "ACCOUNTADMIN");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.it;

import static net.snowflake.ingest.TestUtils.verifyTableRowCount;
import static net.snowflake.ingest.utils.Constants.ROLE;
import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM;
import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING;

import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -17,16 +22,14 @@
import net.snowflake.ingest.utils.Constants;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

/** Ingest large amount of rows. */
@RunWith(Parameterized.class)
public class StreamingIngestBigFilesIT {
public abstract class BigFilesITBase {
private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB";
private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA";

Expand All @@ -35,16 +38,15 @@ public class StreamingIngestBigFilesIT {
private SnowflakeStreamingIngestClientInternal<?> client;
private Connection jdbcConnection;
private String testDb;
private boolean enableIcebergStreaming;

@Parameters(name = "{index}: {0}")
public static Object[] compressionAlgorithms() {
return new Object[] {"GZIP", "ZSTD"};
}
@Parameter(0)
public String compressionAlgorithm;

@Parameter public String compressionAlgorithm;
@Parameter(1)
public Constants.IcebergSerializationPolicy icebergSerializationPolicy;

@Before
public void beforeAll() throws Exception {
public void beforeAll(boolean enableIcebergStreaming) throws Exception {
testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4);
// Create a streaming ingest client
jdbcConnection = TestUtils.getConnection(true);
Expand All @@ -66,9 +68,11 @@ public void beforeAll() throws Exception {
prop.setProperty(ROLE, "ACCOUNTADMIN");
}
prop.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm);
prop.setProperty(ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming));
client =
(SnowflakeStreamingIngestClientInternal<?>)
SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(prop).build();
this.enableIcebergStreaming = enableIcebergStreaming;
}

@After
Expand Down Expand Up @@ -166,20 +170,41 @@ private void ingestRandomRowsToTable(

private void createTableForTest(String tableName) {
try {
jdbcConnection
.createStatement()
.execute(
String.format(
"create or replace table %s (\n"
+ " num_2_1 NUMBER(2, 1),\n"
+ " num_4_2 NUMBER(4, 2),\n"
+ " num_9_4 NUMBER(9, 4),\n"
+ " num_18_7 NUMBER(18, 7),\n"
+ " num_38_15 NUMBER(38, 15),\n"
+ " num_float FLOAT,\n"
+ " str VARCHAR(256),\n"
+ " bin BINARY(256));",
tableName));
if (enableIcebergStreaming) {
jdbcConnection
.createStatement()
.execute(
String.format(
"create or replace iceberg table %s (\n"
+ " num_2_1 decimal(2, 1),\n"
+ " num_4_2 decimal(4, 2),\n"
+ " num_9_4 decimal(9, 4),\n"
+ " num_18_7 decimal(18, 7),\n"
+ " num_38_15 decimal(38, 15),\n"
+ " num_float float,\n"
+ " str string,\n"
+ " bin binary)\n"
+ "catalog = 'SNOWFLAKE'\n"
+ "external_volume = 'streaming_ingest'\n"
+ "base_location = 'SDK_IT/%s/%s'\n"
+ "storage_serialization_policy = %s;",
tableName, testDb, tableName, icebergSerializationPolicy.name()));
} else {
jdbcConnection
.createStatement()
.execute(
String.format(
"create or replace table %s (\n"
+ " num_2_1 NUMBER(2, 1),\n"
+ " num_4_2 NUMBER(4, 2),\n"
+ " num_9_4 NUMBER(9, 4),\n"
+ " num_18_7 NUMBER(18, 7),\n"
+ " num_38_15 NUMBER(38, 15),\n"
+ " num_float FLOAT,\n"
+ " str VARCHAR(256),\n"
+ " bin BINARY(256));",
tableName));
}
} catch (SQLException e) {
throw new RuntimeException("Cannot create table " + tableName, e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.it;

import org.junit.Before;
import org.junit.runners.Parameterized;

public class FDNBigFilesIT extends BigFilesITBase {
@Parameterized.Parameters(name = "compressionAlgorithm={0}")
public static Object[][] compressionAlgorithms() {
return new Object[][] {{"GZIP", null}, {"ZSTD", null}};
}

@Before
public void before() throws Exception {
super.beforeAll(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.it;

import org.junit.Before;

public class FDNManyTablesIT extends ManyTablesITBase {
@Before
public void before() throws Exception {
super.setUp(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.it;

import net.snowflake.ingest.IcebergIT;
import net.snowflake.ingest.utils.Constants;
import org.junit.Before;
import org.junit.experimental.categories.Category;
import org.junit.runners.Parameterized;

@Category(IcebergIT.class)
public class IcebergBigFilesIT extends BigFilesITBase {
@Parameterized.Parameters(name = "icebergSerializationPolicy={1}")
public static Object[][] compressionAlgorithms() {
return new Object[][] {
{"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE},
{"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED}
};
}

@Before
public void before() throws Exception {
super.beforeAll(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.it;

import net.snowflake.ingest.IcebergIT;
import org.junit.Before;
import org.junit.experimental.categories.Category;

@Category(IcebergIT.class)
public class IcebergManyTablesIT extends ManyTablesITBase {
@Before
public void before() throws Exception {
super.setUp(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
import net.snowflake.ingest.utils.ParameterProvider;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/**
* Verified that ingestion work when we ingest into large number of tables from the same client and
* blobs and registration requests have to be cut, so they don't contain large number of chunks
*/
public class ManyTablesIT {
public abstract class ManyTablesITBase {

private static final int TABLES_COUNT = 20;
private static final int TOTAL_ROWS_COUNT = 200_000;
Expand All @@ -38,11 +37,13 @@ public class ManyTablesIT {
private SnowflakeStreamingIngestChannel[] channels;
private String[] offsetTokensPerChannel;

@Before
public void setUp() throws Exception {
public void setUp(boolean enableIcebergStreaming) throws Exception {
Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false);
props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2);
if (!enableIcebergStreaming) {
props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2);
}
props.put(ParameterProvider.MAX_CHUNKS_IN_REGISTRATION_REQUEST, 2);
props.put(ParameterProvider.ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming));
if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) {
props.setProperty(ROLE, "ACCOUNTADMIN");
}
Expand All @@ -57,7 +58,21 @@ public void setUp() throws Exception {
String[] tableNames = new String[TABLES_COUNT];
for (int i = 0; i < tableNames.length; i++) {
tableNames[i] = String.format("table_%d", i);
connection.createStatement().execute(String.format("create table table_%d(c int);", i));
if (enableIcebergStreaming) {
connection
.createStatement()
.execute(
String.format(
"create or replace iceberg table table_%s(c int)"
+ "catalog = 'SNOWFLAKE' "
+ "external_volume = 'streaming_ingest' "
+ "base_location = 'SDK_IT/%s/%s'",
i, dbName, tableNames[i]));
} else {
connection
.createStatement()
.execute(String.format("create or replace table table_%d(c int);", i));
}
channels[i] =
client.openChannel(
OpenChannelRequest.builder(String.format("channel-%d", i))
Expand Down

0 comments on commit 9448f1c

Please sign in to comment.