diff --git a/README.md b/README.md index 63035b6..332be14 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ gradle jar 3. Run the Jar ``` -java -jar build/libs/singlestore-fivetran-connector-0.0.1.jar +java -jar build/libs/singlestore-fivetran-connector-0.0.2.jar ``` ## Steps for running Java tests @@ -108,7 +108,7 @@ CREATE TABLE t(a INT PRIMARY KEY, b INT); wget -O src/main/proto/common.proto https://raw.githubusercontent.com/fivetran/fivetran_sdk/production/common.proto wget -O src/main/proto/connector_sdk.proto https://raw.githubusercontent.com/fivetran/fivetran_sdk/production/connector_sdk.proto gradle jar -java -jar build/libs/singlestore-fivetran-connector-0.0.1.jar +java -jar build/libs/singlestore-fivetran-connector-0.0.2.jar ``` 6. Update the `./tester/configuration.json` file with your credentials diff --git a/build.gradle b/build.gradle index cf8008c..8f96fa8 100644 --- a/build.gradle +++ b/build.gradle @@ -19,7 +19,7 @@ repositories { mavenCentral() } -version = '0.0.1' +version = '0.0.2' sourceCompatibility = 1.8 targetCompatibility = 1.8 diff --git a/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnection.java b/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnection.java index e0824a7..b326499 100644 --- a/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnection.java +++ b/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnection.java @@ -268,15 +268,18 @@ public void observe(State state, Set selectedColumns, ObserveConsumer co .filter(Column::getPrimaryKey) .collect(Collectors.toList()); - try (Statement stmt = getConnection().createStatement(); - ResultSet rs = stmt.executeQuery( - String.format( - "OBSERVE * FROM %s BEGIN AT (%s)", - escapeTable(conf.database(), conf.table()), - state.offsetsAsSQL()))) { - while (rs.next()) { + try ( + Statement stmt = getConnection().createStatement(); + TimedResultSet timedRS = TimedResultSet.from(stmt.executeQuery(String.format( + "OBSERVE * FROM %s BEGIN AT (%s)", + escapeTable(conf.database(), conf.table()), + state.offsetsAsSQL()))) + ) { + ResultSet rs = timedRS.getResultSet(); + + while (timedRS.next()) { String operation = rs.getString("Type"); - Integer partition = rs.getInt("PartitionId"); + int partition = rs.getInt("PartitionId"); String offset = bytesToHex(rs.getBytes("Offset")); if (operation.equals("Delete")) { diff --git a/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnectorServiceImpl.java b/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnectorServiceImpl.java index 193d1ec..a88702f 100644 --- a/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnectorServiceImpl.java +++ b/src/main/java/com/singlestore/fivetran/connector/SingleStoreConnectorServiceImpl.java @@ -29,6 +29,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -188,6 +190,8 @@ private Set getSelectedColumns(UpdateRequest request, SingleStoreConfigu return new HashSet<>(); } + int CHECKPOINT_BATCH_SIZE = 10_000; + @Override public void update(UpdateRequest request, StreamObserver responseObserver) { @@ -195,6 +199,7 @@ public void update(UpdateRequest request, StreamObserver request.getConfigurationMap()); SingleStoreConnection conn = new SingleStoreConnection(configuration); Set selectedColumns = getSelectedColumns(request, configuration); + AtomicLong recordsRead = new AtomicLong(); try { State state; @@ -263,6 +268,21 @@ public void update(UpdateRequest request, StreamObserver } state.setOffset(partition, offset); + if (recordsRead.incrementAndGet() % CHECKPOINT_BATCH_SIZE == 0) { + responseObserver.onNext( + UpdateResponse.newBuilder() + .setOperation( + Operation.newBuilder() + .setCheckpoint( + Checkpoint.newBuilder() + .setStateJson(state.toJson()) + .build()) + .build()) + .build()); + } + }); + + if (recordsRead.incrementAndGet() % CHECKPOINT_BATCH_SIZE != 0) { responseObserver.onNext( UpdateResponse.newBuilder() .setOperation( @@ -273,7 +293,7 @@ public void update(UpdateRequest request, StreamObserver .build()) .build()) .build()); - }); + } responseObserver.onNext(UpdateResponse.newBuilder() .setLogEntry(LogEntry.newBuilder() diff --git a/src/main/java/com/singlestore/fivetran/connector/TimedResultSet.java b/src/main/java/com/singlestore/fivetran/connector/TimedResultSet.java new file mode 100644 index 0000000..5d402f0 --- /dev/null +++ b/src/main/java/com/singlestore/fivetran/connector/TimedResultSet.java @@ -0,0 +1,53 @@ +package com.singlestore.fivetran.connector; + +import java.sql.ResultSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TimedResultSet implements AutoCloseable { + + private final ResultSet resultSet; + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + private TimedResultSet(ResultSet resultSet) { + this.resultSet = resultSet; + } + + public static TimedResultSet from(ResultSet resultSet) { + return new TimedResultSet(resultSet); + } + + @Override + public void close() { + try { + executor.shutdownNow(); + + if (!resultSet.isClosed()) { + ((com.singlestore.jdbc.Connection) resultSet.getStatement() + .getConnection()).cancelCurrentQuery(); + } + } catch (Exception ignored) { + } + } + + public ResultSet getResultSet() { + return resultSet; + } + + public Boolean next() throws InterruptedException, ExecutionException { + Future future = executor.submit(resultSet::next); + try { + // Get the result with a timeout of 1 second + return future.get(1, TimeUnit.SECONDS); + } catch (TimeoutException e) { + future.cancel(true); + return false; + } + } +} diff --git a/src/test/java/com/singlestore/fivetran/connector/SingleStoreConnectionTest.java b/src/test/java/com/singlestore/fivetran/connector/SingleStoreConnectionTest.java index d94da97..a70d842 100644 --- a/src/test/java/com/singlestore/fivetran/connector/SingleStoreConnectionTest.java +++ b/src/test/java/com/singlestore/fivetran/connector/SingleStoreConnectionTest.java @@ -403,28 +403,7 @@ public void observe() throws Exception { stmt.execute("DROP TABLE IF EXISTS observe"); stmt.execute( "CREATE TABLE observe (a INT, b INT, PRIMARY KEY(a));"); - } - - final Exception[] observeException = new Exception[1]; - SingleStoreConnection observeConn = new SingleStoreConnection(conf); - List records = new ArrayList<>(); - State state = new State(8); - Thread t = new Thread(() -> { - try { - observeConn.observe(state, null, (operation, partition, offset, row) -> { - if (operation.equals("Delete") || operation.equals("Update") || operation.equals( - "Insert")) { - records.add(new Record(operation, row)); - state.setOffset(partition, offset); - } - }); - } catch (Exception e) { - observeException[0] = e; - } - }); - t.start(); - try (Statement stmt = conn.getConnection().createStatement()) { for (int i = 0; i < 10; i++) { stmt.execute(String.format("INSERT INTO observe VALUES(%d, 1)", i)); } @@ -436,12 +415,15 @@ public void observe() throws Exception { } } - Thread.sleep(1000); - ((com.singlestore.jdbc.Connection) observeConn.getConnection()).cancelCurrentQuery(); - Thread.sleep(1000); - t.interrupt(); - - assertTrue(observeException[0].getMessage().contains("Query execution was interrupted")); + List records = new ArrayList<>(); + State state = new State(8); + conn.observe(state, null, (operation, partition, offset, row) -> { + if (operation.equals("Delete") || operation.equals("Update") || operation.equals( + "Insert")) { + records.add(new Record(operation, row)); + state.setOffset(partition, offset); + } + }); records.sort((r1, r2) -> { if (!r1.operation.equals(r2.operation)) { @@ -469,22 +451,6 @@ public void observe() throws Exception { assertEquals((Integer) 2, records.get(i).row.get("b").getInt()); } - records.clear(); - t = new Thread(() -> { - try { - observeConn.observe(state, null, (operation, partition, offset, row) -> { - if (operation.equals("Delete") || operation.equals("Update") || operation.equals( - "Insert")) { - records.add(new Record(operation, row)); - state.setOffset(partition, offset); - } - }); - } catch (Exception e) { - observeException[0] = e; - } - }); - t.start(); - try (Statement stmt = conn.getConnection().createStatement()) { for (int i = 0; i < 10; i++) { stmt.execute(String.format("INSERT INTO observe VALUES(%d, 3)", i)); @@ -497,12 +463,14 @@ public void observe() throws Exception { } } - Thread.sleep(1000); - ((com.singlestore.jdbc.Connection) observeConn.getConnection()).cancelCurrentQuery(); - Thread.sleep(1000); - t.interrupt(); - - assertTrue(observeException[0].getMessage().contains("Query execution was interrupted")); + records.clear(); + conn.observe(state, null, (operation, partition, offset, row) -> { + if (operation.equals("Delete") || operation.equals("Update") || operation.equals( + "Insert")) { + records.add(new Record(operation, row)); + state.setOffset(partition, offset); + } + }); records.sort((r1, r2) -> { if (!r1.operation.equals(r2.operation)) { @@ -587,26 +555,7 @@ public void observeAllTypes() throws Exception { + " unique key(intColumn),\n" + " shard key(intColumn)\n" + " );"); - } - final Exception[] observeException = new Exception[1]; - SingleStoreConnection observeConn = new SingleStoreConnection(conf); - List records = new ArrayList<>(); - Thread t = new Thread(() -> { - try { - observeConn.observe(new State(8), null, (operation, partition, offset, row) -> { - if (operation.equals("Delete") || operation.equals("Update") || operation.equals( - "Insert")) { - records.add(new Record(operation, row)); - } - }); - } catch (Exception e) { - observeException[0] = e; - } - }); - t.start(); - - try (Statement stmt = conn.getConnection().createStatement()) { stmt.execute("INSERT INTO `observeAllTypes` VALUES (\n" + "TRUE, " + // boolColumn "TRUE, " + // booleanColumn @@ -704,12 +653,13 @@ public void observeAllTypes() throws Exception { ); } - Thread.sleep(1000); - ((com.singlestore.jdbc.Connection) observeConn.getConnection()).cancelCurrentQuery(); - Thread.sleep(1000); - t.interrupt(); - - assertTrue(observeException[0].getMessage().contains("Query execution was interrupted")); + List records = new ArrayList<>(); + conn.observe(new State(8), null, (operation, partition, offset, row) -> { + if (operation.equals("Delete") || operation.equals("Update") || operation.equals( + "Insert")) { + records.add(new Record(operation, row)); + } + }); assertEquals(2, records.size()); records.sort(Comparator.comparingInt(r -> r.row.get("intColumn").getInt())); @@ -899,17 +849,12 @@ public void observeVectorJson() throws Exception { stmt.execute("DROP TABLE IF EXISTS observeVectorJson"); stmt.execute("CREATE TABLE observeVectorJson(a VECTOR(2, I32))"); stmt.execute("INSERT INTO observeVectorJson VALUES ('[1, 2]')"); + stmt.execute("SET vector_type_project_format = 'JSON'"); } - final Exception[] observeException = new Exception[1]; - SingleStoreConnection observeConn = new SingleStoreConnection(conf); List records = new ArrayList<>(); - try (Statement stmt = observeConn.getConnection().createStatement()) { - stmt.execute("SET vector_type_project_format = 'JSON'"); - } - - SchemaList schemaList = observeConn.getSchema(); + SchemaList schemaList = conn.getSchema(); List schemas = schemaList.getSchemasList(); assertEquals(1, schemas.size()); @@ -929,25 +874,12 @@ public void observeVectorJson() throws Exception { assertEquals("a", column.getName()); assertEquals(DataType.JSON, column.getType()); - Thread t = new Thread(() -> { - try { - observeConn.observe(new State(8), null, (operation, partition, offset, row) -> { - if (operation.equals("Delete") || operation.equals("Update") || operation.equals( - "Insert")) { - records.add(new Record(operation, row)); - } - }); - } catch (Exception e) { - observeException[0] = e; + conn.observe(new State(8), null, (operation, partition, offset, row) -> { + if (operation.equals("Delete") || operation.equals("Update") || operation.equals( + "Insert")) { + records.add(new Record(operation, row)); } }); - t.start(); - Thread.sleep(1000); - ((com.singlestore.jdbc.Connection) observeConn.getConnection()).cancelCurrentQuery(); - Thread.sleep(1000); - t.interrupt(); - - assertTrue(observeException[0].getMessage().contains("Query execution was interrupted")); assertEquals(1, records.size()); // TODO: at the moment, OBSERVE returns wrong values when `vector_type_project_format` is JSON @@ -968,39 +900,20 @@ public void observeFilter() throws Exception { stmt.execute("DROP TABLE IF EXISTS observeFilter"); stmt.execute( "CREATE TABLE observeFilter (a INT, b INT, PRIMARY KEY(a));"); + stmt.execute("INSERT INTO observeFilter VALUES(1, 1)"); } - final Exception[] observeException = new Exception[1]; - SingleStoreConnection observeConn = new SingleStoreConnection(conf); List records = new ArrayList<>(); State state = new State(8); Set selectedColumns = new HashSet<>(); selectedColumns.add("a"); - Thread t = new Thread(() -> { - try { - observeConn.observe(state, selectedColumns, (operation, partition, offset, row) -> { - if (operation.equals("Delete") || operation.equals("Update") || operation.equals( - "Insert")) { - records.add(new Record(operation, row)); - state.setOffset(partition, offset); - } - }); - } catch (Exception e) { - observeException[0] = e; + conn.observe(state, selectedColumns, (operation, partition, offset, row) -> { + if (operation.equals("Delete") || operation.equals("Update") || operation.equals( + "Insert")) { + records.add(new Record(operation, row)); + state.setOffset(partition, offset); } }); - t.start(); - - try (Statement stmt = conn.getConnection().createStatement()) { - stmt.execute("INSERT INTO observeFilter VALUES(1, 1)"); - } - - Thread.sleep(1000); - ((com.singlestore.jdbc.Connection) observeConn.getConnection()).cancelCurrentQuery(); - Thread.sleep(1000); - t.interrupt(); - - assertTrue(observeException[0].getMessage().contains("Query execution was interrupted")); assertEquals("Insert", records.get(0).operation); assertEquals(1, records.get(0).row.get("a").getInt());