Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ public class Fate<T> {
private static final Logger log = LoggerFactory.getLogger(Fate.class);

private final FateStore<T> store;
private final ScheduledFuture<?> fatePoolsWatcherFuture;
private ScheduledFuture<?> fatePoolsWatcherFuture;
private final AtomicInteger needMoreThreadsWarnCount = new AtomicInteger(0);
private final ExecutorService deadResCleanerExecutor;
private ExecutorService deadResCleanerExecutor;
private final FateConfiguration<T> fateConfig;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final FateConfiguration<T> fateConfig;
private final FateConfiguration<T> fateThreadsConfig;

along with other suggestion


private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);
public static final Duration INITIAL_DELAY = Duration.ofSeconds(3);
Expand Down Expand Up @@ -247,6 +248,10 @@ public void run() {
}
}

private record FateConfiguration<T>(T environment, boolean runDeadResCleaner,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private record FateConfiguration<T>(T environment, boolean runDeadResCleaner,
private record FateThreadsConfiguration<T>(T environment, boolean runDeadResCleaner,

A bit more descriptive

AccumuloConfiguration conf, ScheduledThreadPoolExecutor genSchedExecutor) {
}

/**
* Creates a Fault-tolerant executor for the given store type.
*
Expand All @@ -258,6 +263,18 @@ public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf,
ScheduledThreadPoolExecutor genSchedExecutor) {
this.store = FateLogger.wrap(store, toLogStrFunc, false);
this.fateConfig =
new FateConfiguration<>(environment, runDeadResCleaner, conf, genSchedExecutor);

}

public void start() {
log.info("Start {} FATE", store.type());
keepRunning.set(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
keepRunning.set(true);

Already set to true at initialization

final var environment = fateConfig.environment;
final var runDeadResCleaner = fateConfig.runDeadResCleaner;
final var conf = fateConfig.conf;
final var genSchedExecutor = fateConfig.genSchedExecutor;

fatePoolsWatcherFuture =
genSchedExecutor.scheduleWithFixedDelay(new FatePoolsWatcher(environment, conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,7 @@ protected Fate<Manager> initializeFateInstance(ServerContext context, FateStore<

final Fate<Manager> fateInstance = new Fate<>(this, store, true, TraceRepo::toLogString,
getConfiguration(), context.getScheduledExecutor());
fateInstance.start();

var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), this::getSteadyTime);
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public FastFate(T environment, FateStore<T> store, boolean runDeadResCleaner,
Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
super(environment, store, runDeadResCleaner, toLogStrFunc, conf,
new ScheduledThreadPoolExecutor(2));
start();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,12 @@ private void waitFor(FateStore<FeoTestEnv> store, FateId txid) throws Exception
}

protected Fate<FeoTestEnv> initializeFate(AccumuloClient client, FateStore<FeoTestEnv> store) {
return new Fate<>(new FeoTestEnv(client), store, false, r -> r + "",
Fate<FeoTestEnv> fate = new Fate<>(new FeoTestEnv(client), store, false, r -> r + "",
FateTestUtil.updateFateConfig(new ConfigurationCopy(), 1, "AllFateOps"),
new ScheduledThreadPoolExecutor(2));

fate.start();
return fate;
}

private static Entry<FateId,String> toIdStep(Entry<Key,Value> e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,12 @@ private void submitDeferred(Fate<TestEnv> fate, ServerContext sctx, Set<FateId>
}

protected Fate<TestEnv> initializeFate(FateStore<TestEnv> store) {
return new Fate<>(new TestEnv(), store, false, r -> r + "",
Fate<TestEnv> fate = new Fate<>(new TestEnv(), store, false, r -> r + "",
FateTestUtil.updateFateConfig(new ConfigurationCopy(), 1, "AllFateOps"),
new ScheduledThreadPoolExecutor(2));

fate.start();
return fate;
}

protected abstract TStatus getTxStatus(ServerContext sctx, FateId fateId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,8 +917,10 @@ protected FastFate<LatchTestEnv> initFateWithDeadResCleaner(FateStore<LatchTestE
}

protected Fate<LatchTestEnv> initFateNoDeadResCleaner(FateStore<LatchTestEnv> store) {
return new Fate<>(new LatchTestEnv(), store, false, Object::toString,
Fate<LatchTestEnv> fate = new Fate<>(new LatchTestEnv(), store, false, Object::toString,
DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2));
fate.start();
return fate;
}

private boolean wordIsTStatus(String word) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public FlakyFate(T environment, FateStore<T> store, Function<Repo<T>,String> toL
fateExecutors.add(new FlakyFateExecutor<>(this, environment, poolConfig.getKey(),
poolConfig.getValue().getValue(), poolConfig.getValue().getKey()));
}
start();
}

private static class FlakyFateExecutor<T> extends FateExecutor<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,10 @@ private void testMultipleFateInstances(TestStoreFactory<SleepingTestEnv> testSto

Fate<SleepingTestEnv> fate1 = new Fate<>(testEnv1, store1, true, Object::toString,
DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2));
fate1.start();
Fate<SleepingTestEnv> fate2 = new Fate<>(testEnv2, store2, false, Object::toString,
DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2));
fate2.start();

try {
for (int i = 0; i < numFateIds; i++) {
Expand Down Expand Up @@ -361,6 +363,7 @@ private void testDeadReservationsCleanup(TestStoreFactory<LatchTestEnv> testStor
// fate1.
fate2 = new Fate<>(testEnv2, store2, false, Object::toString, config,
new ScheduledThreadPoolExecutor(2));
fate2.start();

// Wait for the "dead" reservations to be deleted and picked up again (reserved using
// fate2/store2/lock2 now).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public SlowFateSplit(T environment, FateStore<T> store, Function<Repo<T>,String>
fateExecutors.add(new SlowFateSplitExecutor(this, environment, poolConfig.getKey(),
poolConfig.getValue().getValue(), poolConfig.getValue().getKey()));
}
start();
}

private class SlowFateSplitExecutor extends FateExecutor<T> {
Expand Down