Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1813772 Enable old IT suites for iceberg tables #917

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import org.junit.Before;

public class FDNOpenManyChannelsIT extends OpenManyChannelsITBase {
@Before
public void before() throws Exception {
super.setUp(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.IcebergIT;
import org.junit.Before;
import org.junit.experimental.categories.Category;

@Category(IcebergIT.class)
public class IcebergOpenManyChannelsIT extends OpenManyChannelsITBase {
@Before
public void before() throws Exception {
super.setUp(true);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.utils.Constants.ROLE;
import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING;

import java.sql.Connection;
import java.util.ArrayList;
Expand All @@ -22,13 +27,12 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/** Tries to open several thousand channels into the same table from multiple threads in parallel */
@Ignore("Will be reimplemented in dew: SNOW-807102")
public class OpenManyChannelsIT {
public abstract class OpenManyChannelsITBase {
private static final int THREAD_COUNT = 20;
private static final int CHANNELS_PER_THREAD = 250;
private static final String SCHEMA_NAME = "PUBLIC";
Expand All @@ -40,18 +44,29 @@ public class OpenManyChannelsIT {

private SnowflakeStreamingIngestClient client;

@Before
public void setUp() throws Exception {
public void setUp(boolean enableIcebergStreaming) throws Exception {
databaseName =
String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", RandomStringUtils.randomNumeric(9));
conn = TestUtils.getConnection(true);
conn.createStatement().execute(String.format("create or replace database %s;", databaseName));
conn.createStatement()
.execute(
String.format(
"create or replace table %s.%s.%s (col int)",
databaseName, SCHEMA_NAME, TABLE_NAME));
if (enableIcebergStreaming) {
conn.createStatement()
.execute(
String.format(
"create or replace iceberg table %s.%s.%s (col int)\n"
+ "catalog = 'SNOWFLAKE'\n"
+ "external_volume = 'streaming_ingest'\n"
+ "base_location = 'SDK_IT/%s/%s';",
databaseName, SCHEMA_NAME, TABLE_NAME, databaseName, TABLE_NAME));
} else {
conn.createStatement()
.execute(
String.format(
"create or replace table %s.%s.%s (col int)",
databaseName, SCHEMA_NAME, TABLE_NAME));
}
Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false);
props.setProperty(ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming));
if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) {
props.setProperty(ROLE, "ACCOUNTADMIN");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.it;

import static net.snowflake.ingest.TestUtils.verifyTableRowCount;
Expand Down Expand Up @@ -36,7 +40,7 @@

/** Ingest large amount of rows. */
@RunWith(Parameterized.class)
public class StreamingIngestBigFilesIT {
public class FDNBigFilesIT {
private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB";
private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.it;

import org.junit.Before;

public class FDNManyTablesIT extends ManyTablesITBase {
@Before
public void before() throws Exception {
super.setUp(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.it;

import net.snowflake.ingest.IcebergIT;
import org.junit.Before;
import org.junit.experimental.categories.Category;

@Category(IcebergIT.class)
public class IcebergManyTablesIT extends ManyTablesITBase {
@Before
public void before() throws Exception {
super.setUp(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
import net.snowflake.ingest.utils.ParameterProvider;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/**
* Verified that ingestion work when we ingest into large number of tables from the same client and
* blobs and registration requests have to be cut, so they don't contain large number of chunks
*/
public class ManyTablesIT {
public abstract class ManyTablesITBase {

private static final int TABLES_COUNT = 20;
private static final int TOTAL_ROWS_COUNT = 200_000;
Expand All @@ -38,11 +37,13 @@ public class ManyTablesIT {
private SnowflakeStreamingIngestChannel[] channels;
private String[] offsetTokensPerChannel;

@Before
public void setUp() throws Exception {
public void setUp(boolean enableIcebergStreaming) throws Exception {
Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false);
props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2);
if (!enableIcebergStreaming) {
props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2);
}
props.put(ParameterProvider.MAX_CHUNKS_IN_REGISTRATION_REQUEST, 2);
props.put(ParameterProvider.ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming));
if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) {
props.setProperty(ROLE, "ACCOUNTADMIN");
}
Expand All @@ -57,7 +58,21 @@ public void setUp() throws Exception {
String[] tableNames = new String[TABLES_COUNT];
for (int i = 0; i < tableNames.length; i++) {
tableNames[i] = String.format("table_%d", i);
connection.createStatement().execute(String.format("create table table_%d(c int);", i));
if (enableIcebergStreaming) {
connection
.createStatement()
.execute(
String.format(
"create or replace iceberg table table_%s(c int)"
+ "catalog = 'SNOWFLAKE' "
+ "external_volume = 'streaming_ingest' "
+ "base_location = 'SDK_IT/%s/%s'",
i, dbName, tableNames[i]));
} else {
connection
.createStatement()
.execute(String.format("create or replace table table_%d(c int);", i));
}
channels[i] =
client.openChannel(
OpenChannelRequest.builder(String.format("channel-%d", i))
Expand Down
Loading