Skip to content

Commit

Permalink
Changed connector to stop reading data if there are no updates within…
Browse files Browse the repository at this point in the history
… 1 second (#2)

* Changed connector to stop reading data if there are no updates within 1 second 
* Added batching of checkpoints
  • Loading branch information
AdalbertMemSQL authored Nov 4, 2024
1 parent ebd18eb commit 6214533
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 135 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ gradle jar
3. Run the Jar

```
java -jar build/libs/singlestore-fivetran-connector-0.0.1.jar
java -jar build/libs/singlestore-fivetran-connector-0.0.2.jar
```

## Steps for running Java tests
Expand Down Expand Up @@ -108,7 +108,7 @@ CREATE TABLE t(a INT PRIMARY KEY, b INT);
wget -O src/main/proto/common.proto https://raw.githubusercontent.com/fivetran/fivetran_sdk/production/common.proto
wget -O src/main/proto/connector_sdk.proto https://raw.githubusercontent.com/fivetran/fivetran_sdk/production/connector_sdk.proto
gradle jar
java -jar build/libs/singlestore-fivetran-connector-0.0.1.jar
java -jar build/libs/singlestore-fivetran-connector-0.0.2.jar
```

6. Update the `./tester/configuration.json` file with your credentials
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ repositories {
mavenCentral()
}

version = '0.0.1'
version = '0.0.2'

sourceCompatibility = 1.8
targetCompatibility = 1.8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,18 @@ public void observe(State state, Set<String> selectedColumns, ObserveConsumer co
.filter(Column::getPrimaryKey)
.collect(Collectors.toList());

try (Statement stmt = getConnection().createStatement();
ResultSet rs = stmt.executeQuery(
String.format(
"OBSERVE * FROM %s BEGIN AT (%s)",
escapeTable(conf.database(), conf.table()),
state.offsetsAsSQL()))) {
while (rs.next()) {
try (
Statement stmt = getConnection().createStatement();
TimedResultSet timedRS = TimedResultSet.from(stmt.executeQuery(String.format(
"OBSERVE * FROM %s BEGIN AT (%s)",
escapeTable(conf.database(), conf.table()),
state.offsetsAsSQL())))
) {
ResultSet rs = timedRS.getResultSet();

while (timedRS.next()) {
String operation = rs.getString("Type");
Integer partition = rs.getInt("PartitionId");
int partition = rs.getInt("PartitionId");
String offset = bytesToHex(rs.getBytes("Offset"));

if (operation.equals("Delete")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -188,13 +190,16 @@ private Set<String> getSelectedColumns(UpdateRequest request, SingleStoreConfigu
return new HashSet<>();
}

int CHECKPOINT_BATCH_SIZE = 10_000;

@Override
public void update(UpdateRequest request, StreamObserver<UpdateResponse>
responseObserver) {
SingleStoreConfiguration configuration = new SingleStoreConfiguration(
request.getConfigurationMap());
SingleStoreConnection conn = new SingleStoreConnection(configuration);
Set<String> selectedColumns = getSelectedColumns(request, configuration);
AtomicLong recordsRead = new AtomicLong();

try {
State state;
Expand Down Expand Up @@ -263,6 +268,21 @@ public void update(UpdateRequest request, StreamObserver<UpdateResponse>
}

state.setOffset(partition, offset);
if (recordsRead.incrementAndGet() % CHECKPOINT_BATCH_SIZE == 0) {
responseObserver.onNext(
UpdateResponse.newBuilder()
.setOperation(
Operation.newBuilder()
.setCheckpoint(
Checkpoint.newBuilder()
.setStateJson(state.toJson())
.build())
.build())
.build());
}
});

if (recordsRead.incrementAndGet() % CHECKPOINT_BATCH_SIZE != 0) {
responseObserver.onNext(
UpdateResponse.newBuilder()
.setOperation(
Expand All @@ -273,7 +293,7 @@ public void update(UpdateRequest request, StreamObserver<UpdateResponse>
.build())
.build())
.build());
});
}

responseObserver.onNext(UpdateResponse.newBuilder()
.setLogEntry(LogEntry.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.singlestore.fivetran.connector;

import java.sql.ResultSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimedResultSet implements AutoCloseable {

private final ResultSet resultSet;
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

private TimedResultSet(ResultSet resultSet) {
this.resultSet = resultSet;
}

public static TimedResultSet from(ResultSet resultSet) {
return new TimedResultSet(resultSet);
}

@Override
public void close() {
try {
executor.shutdownNow();

if (!resultSet.isClosed()) {
((com.singlestore.jdbc.Connection) resultSet.getStatement()
.getConnection()).cancelCurrentQuery();
}
} catch (Exception ignored) {
}
}

public ResultSet getResultSet() {
return resultSet;
}

public Boolean next() throws InterruptedException, ExecutionException {
Future<Boolean> future = executor.submit(resultSet::next);
try {
// Get the result with a timeout of 1 second
return future.get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
return false;
}
}
}
Loading

0 comments on commit 6214533

Please sign in to comment.