Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed connector to stop reading data if there are no updates within 1 second #2

Merged
merged 4 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,18 @@ public void observe(State state, Set<String> selectedColumns, ObserveConsumer co
.filter(Column::getPrimaryKey)
.collect(Collectors.toList());

try (Statement stmt = getConnection().createStatement();
ResultSet rs = stmt.executeQuery(
String.format(
"OBSERVE * FROM %s BEGIN AT (%s)",
escapeTable(conf.database(), conf.table()),
state.offsetsAsSQL()))) {
while (rs.next()) {
try (
Statement stmt = getConnection().createStatement();
TimedResultSet timedRS = TimedResultSet.from(stmt.executeQuery(String.format(
"OBSERVE * FROM %s BEGIN AT (%s)",
escapeTable(conf.database(), conf.table()),
state.offsetsAsSQL())))
) {
ResultSet rs = timedRS.getResultSet();

while (timedRS.next()) {
String operation = rs.getString("Type");
Integer partition = rs.getInt("PartitionId");
int partition = rs.getInt("PartitionId");
String offset = bytesToHex(rs.getBytes("Offset"));

if (operation.equals("Delete")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -188,13 +190,16 @@ private Set<String> getSelectedColumns(UpdateRequest request, SingleStoreConfigu
return new HashSet<>();
}

int CHECKPOINT_BATCH_SIZE = 10_000;

@Override
public void update(UpdateRequest request, StreamObserver<UpdateResponse>
responseObserver) {
SingleStoreConfiguration configuration = new SingleStoreConfiguration(
request.getConfigurationMap());
SingleStoreConnection conn = new SingleStoreConnection(configuration);
Set<String> selectedColumns = getSelectedColumns(request, configuration);
AtomicLong recordsRead = new AtomicLong();

try {
State state;
Expand Down Expand Up @@ -263,6 +268,21 @@ public void update(UpdateRequest request, StreamObserver<UpdateResponse>
}

state.setOffset(partition, offset);
if (recordsRead.incrementAndGet() % CHECKPOINT_BATCH_SIZE == 0) {
responseObserver.onNext(
UpdateResponse.newBuilder()
.setOperation(
Operation.newBuilder()
.setCheckpoint(
Checkpoint.newBuilder()
.setStateJson(state.toJson())
.build())
.build())
.build());
}
});

if (recordsRead.incrementAndGet() % CHECKPOINT_BATCH_SIZE != 0) {
responseObserver.onNext(
UpdateResponse.newBuilder()
.setOperation(
Expand All @@ -273,7 +293,7 @@ public void update(UpdateRequest request, StreamObserver<UpdateResponse>
.build())
.build())
.build());
});
}

responseObserver.onNext(UpdateResponse.newBuilder()
.setLogEntry(LogEntry.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.singlestore.fivetran.connector;

import java.sql.ResultSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimedResultSet implements AutoCloseable {

private final ResultSet resultSet;
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

private TimedResultSet(ResultSet resultSet) {
this.resultSet = resultSet;
}

public static TimedResultSet from(ResultSet resultSet) {
return new TimedResultSet(resultSet);
}

@Override
public void close() {
try {
executor.shutdownNow();

if (!resultSet.isClosed()) {
((com.singlestore.jdbc.Connection) resultSet.getStatement()
.getConnection()).cancelCurrentQuery();
}
} catch (Exception ignored) {
}
}

public ResultSet getResultSet() {
return resultSet;
}

public Boolean next() throws InterruptedException, ExecutionException {
Future<Boolean> future = executor.submit(resultSet::next);
try {
// Get the result with a timeout of 1 second
return future.get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,28 +403,7 @@ public void observe() throws Exception {
stmt.execute("DROP TABLE IF EXISTS observe");
stmt.execute(
"CREATE TABLE observe (a INT, b INT, PRIMARY KEY(a));");
}

final Exception[] observeException = new Exception[1];
SingleStoreConnection observeConn = new SingleStoreConnection(conf);
List<Record> records = new ArrayList<>();
State state = new State(8);
Thread t = new Thread(() -> {
try {
observeConn.observe(state, null, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
state.setOffset(partition, offset);
}
});
} catch (Exception e) {
observeException[0] = e;
}
});
t.start();

try (Statement stmt = conn.getConnection().createStatement()) {
for (int i = 0; i < 10; i++) {
stmt.execute(String.format("INSERT INTO observe VALUES(%d, 1)", i));
}
Expand All @@ -436,12 +415,15 @@ public void observe() throws Exception {
}
}

Thread.sleep(1000);
((com.singlestore.jdbc.Connection) observeConn.getConnection()).cancelCurrentQuery();
Thread.sleep(1000);
t.interrupt();

assertTrue(observeException[0].getMessage().contains("Query execution was interrupted"));
List<Record> records = new ArrayList<>();
State state = new State(8);
conn.observe(state, null, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
state.setOffset(partition, offset);
}
});

records.sort((r1, r2) -> {
if (!r1.operation.equals(r2.operation)) {
Expand Down Expand Up @@ -469,22 +451,6 @@ public void observe() throws Exception {
assertEquals((Integer) 2, records.get(i).row.get("b").getInt());
}

records.clear();
t = new Thread(() -> {
try {
observeConn.observe(state, null, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
state.setOffset(partition, offset);
}
});
} catch (Exception e) {
observeException[0] = e;
}
});
t.start();

try (Statement stmt = conn.getConnection().createStatement()) {
for (int i = 0; i < 10; i++) {
stmt.execute(String.format("INSERT INTO observe VALUES(%d, 3)", i));
Expand All @@ -497,12 +463,14 @@ public void observe() throws Exception {
}
}

Thread.sleep(1000);
((com.singlestore.jdbc.Connection) observeConn.getConnection()).cancelCurrentQuery();
Thread.sleep(1000);
t.interrupt();

assertTrue(observeException[0].getMessage().contains("Query execution was interrupted"));
records.clear();
conn.observe(state, null, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
state.setOffset(partition, offset);
}
});

records.sort((r1, r2) -> {
if (!r1.operation.equals(r2.operation)) {
Expand Down Expand Up @@ -587,26 +555,7 @@ public void observeAllTypes() throws Exception {
+ " unique key(intColumn),\n"
+ " shard key(intColumn)\n"
+ " );");
}

final Exception[] observeException = new Exception[1];
SingleStoreConnection observeConn = new SingleStoreConnection(conf);
List<Record> records = new ArrayList<>();
Thread t = new Thread(() -> {
try {
observeConn.observe(new State(8), null, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
}
});
} catch (Exception e) {
observeException[0] = e;
}
});
t.start();

try (Statement stmt = conn.getConnection().createStatement()) {
stmt.execute("INSERT INTO `observeAllTypes` VALUES (\n" +
"TRUE, " + // boolColumn
"TRUE, " + // booleanColumn
Expand Down Expand Up @@ -704,12 +653,13 @@ public void observeAllTypes() throws Exception {
);
}

Thread.sleep(1000);
((com.singlestore.jdbc.Connection) observeConn.getConnection()).cancelCurrentQuery();
Thread.sleep(1000);
t.interrupt();

assertTrue(observeException[0].getMessage().contains("Query execution was interrupted"));
List<Record> records = new ArrayList<>();
conn.observe(new State(8), null, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
}
});

assertEquals(2, records.size());
records.sort(Comparator.comparingInt(r -> r.row.get("intColumn").getInt()));
Expand Down Expand Up @@ -899,17 +849,12 @@ public void observeVectorJson() throws Exception {
stmt.execute("DROP TABLE IF EXISTS observeVectorJson");
stmt.execute("CREATE TABLE observeVectorJson(a VECTOR(2, I32))");
stmt.execute("INSERT INTO observeVectorJson VALUES ('[1, 2]')");
stmt.execute("SET vector_type_project_format = 'JSON'");
}

final Exception[] observeException = new Exception[1];
SingleStoreConnection observeConn = new SingleStoreConnection(conf);
List<Record> records = new ArrayList<>();

try (Statement stmt = observeConn.getConnection().createStatement()) {
stmt.execute("SET vector_type_project_format = 'JSON'");
}

SchemaList schemaList = observeConn.getSchema();
SchemaList schemaList = conn.getSchema();
List<Schema> schemas = schemaList.getSchemasList();
assertEquals(1, schemas.size());

Expand All @@ -929,25 +874,12 @@ public void observeVectorJson() throws Exception {
assertEquals("a", column.getName());
assertEquals(DataType.JSON, column.getType());

Thread t = new Thread(() -> {
try {
observeConn.observe(new State(8), null, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
}
});
} catch (Exception e) {
observeException[0] = e;
conn.observe(new State(8), null, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
}
});
t.start();
Thread.sleep(1000);
((com.singlestore.jdbc.Connection) observeConn.getConnection()).cancelCurrentQuery();
Thread.sleep(1000);
t.interrupt();

assertTrue(observeException[0].getMessage().contains("Query execution was interrupted"));

assertEquals(1, records.size());
// TODO: at the moment, OBSERVE returns wrong values when `vector_type_project_format` is JSON
Expand All @@ -968,39 +900,20 @@ public void observeFilter() throws Exception {
stmt.execute("DROP TABLE IF EXISTS observeFilter");
stmt.execute(
"CREATE TABLE observeFilter (a INT, b INT, PRIMARY KEY(a));");
stmt.execute("INSERT INTO observeFilter VALUES(1, 1)");
}

final Exception[] observeException = new Exception[1];
SingleStoreConnection observeConn = new SingleStoreConnection(conf);
List<Record> records = new ArrayList<>();
State state = new State(8);
Set<String> selectedColumns = new HashSet<>();
selectedColumns.add("a");
Thread t = new Thread(() -> {
try {
observeConn.observe(state, selectedColumns, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
state.setOffset(partition, offset);
}
});
} catch (Exception e) {
observeException[0] = e;
conn.observe(state, selectedColumns, (operation, partition, offset, row) -> {
if (operation.equals("Delete") || operation.equals("Update") || operation.equals(
"Insert")) {
records.add(new Record(operation, row));
state.setOffset(partition, offset);
}
});
t.start();

try (Statement stmt = conn.getConnection().createStatement()) {
stmt.execute("INSERT INTO observeFilter VALUES(1, 1)");
}

Thread.sleep(1000);
((com.singlestore.jdbc.Connection) observeConn.getConnection()).cancelCurrentQuery();
Thread.sleep(1000);
t.interrupt();

assertTrue(observeException[0].getMessage().contains("Query execution was interrupted"));

assertEquals("Insert", records.get(0).operation);
assertEquals(1, records.get(0).row.get("a").getInt());
Expand Down
Loading