Skip to content

Commit

Permalink
More simple thread factory
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Sep 2, 2024
1 parent a72ad6c commit acd2da9
Showing 1 changed file with 7 additions and 15 deletions.
22 changes: 7 additions & 15 deletions src/main/java/tech/ydb/importer/YdbImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.importer.config.ImporterConfig;
import tech.ydb.importer.config.JdomHelper;
import static tech.ydb.importer.config.JdomHelper.isBlank;
import tech.ydb.importer.source.AnyTableLister;
import tech.ydb.importer.source.SourceCP;
import tech.ydb.importer.source.TableMapList;
Expand Down Expand Up @@ -77,7 +77,7 @@ public AnyTableLister getTableLister() {

public void run() throws Exception {
String jdbcClassName = config.getSource().getClassName();
if (!isBlank(jdbcClassName)) {
if (!JdomHelper.isBlank(jdbcClassName)) {
LOG.info("Loading driver class {}", jdbcClassName);
Class.forName(config.getSource().getClassName());
}
Expand Down Expand Up @@ -139,8 +139,7 @@ public void run() throws Exception {
}

private ExecutorService makeWorkers() {
return Executors.newFixedThreadPool(config.getWorkers().getPoolSize(),
new WorkerFactory(this));
return Executors.newFixedThreadPool(config.getWorkers().getPoolSize(), new WorkerFactory());
}

private void retrieveSourceMetadata(List<TableDecision> tables, ExecutorService workers)
Expand Down Expand Up @@ -293,25 +292,18 @@ public static String getVersion() {
}

public static final class WorkerFactory implements ThreadFactory {

private final YdbImporter owner;
private int counter = 0;

public WorkerFactory(YdbImporter owner) {
this.owner = owner;
}
private final AtomicInteger counter = new AtomicInteger();

@Override
public Thread newThread(Runnable r) {
int workerId = counter.getAndIncrement();
final Thread t = new Thread(() -> {
BlobSaver.initCounter(counter);
BlobSaver.initCounter(workerId);
r.run();
}, "YdbImporter-worker-" + counter);
}, "YdbImporter-worker-" + workerId);
t.setDaemon(false);
++counter;
return t;
}

}

}

0 comments on commit acd2da9

Please sign in to comment.