-
Notifications
You must be signed in to change notification settings - Fork 468
moved fate thread creation out of the constructor #5953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you run
mvn clean verify -Dspotbugs.skip -Dtest=skip -Dit.test=org.apache.accumulo.test.fate.**
to verify that all fate tests still pass?
|
I've run the command and it says that all test pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just some minor changes
|
|
||
| public void start() { | ||
| log.info("Start {} FATE", store.type()); | ||
| keepRunning.set(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| keepRunning.set(true); |
Already set to true at initialization
| } | ||
| } | ||
|
|
||
| private record FateConfiguration<T>(T environment, boolean runDeadResCleaner, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private record FateConfiguration<T>(T environment, boolean runDeadResCleaner, | |
| private record FateThreadsConfiguration<T>(T environment, boolean runDeadResCleaner, |
A bit more descriptive
| private final AtomicInteger needMoreThreadsWarnCount = new AtomicInteger(0); | ||
| private final ExecutorService deadResCleanerExecutor; | ||
| private ExecutorService deadResCleanerExecutor; | ||
| private final FateConfiguration<T> fateConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private final FateConfiguration<T> fateConfig; | |
| private final FateConfiguration<T> fateThreadsConfig; |
along with other suggestion
| public FastFate(T environment, FateStore<T> store, boolean runDeadResCleaner, | ||
| Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) { | ||
| super(environment, store, runDeadResCleaner, toLogStrFunc, conf, | ||
| new ScheduledThreadPoolExecutor(2)); | ||
| start(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency, should call start after the FastFate is created:
var fastFate = new FastFate(...);
fastFate.start();
Is a bit more confusing having some tests:
var fate = new Fate(...);
fate.start();
and others:
var fastFate = new FastFate(...);
| public FlakyFate(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStrFunc, | ||
| AccumuloConfiguration conf) { | ||
| super(environment, store, false, toLogStrFunc, conf, new ScheduledThreadPoolExecutor(2)); | ||
| for (var poolConfig : getPoolConfigurations(conf, getStore().type()).entrySet()) { | ||
| fateExecutors.add(new FlakyFateExecutor<>(this, environment, poolConfig.getKey(), | ||
| poolConfig.getValue().getValue(), poolConfig.getValue().getKey())); | ||
| } | ||
| start(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thing here
| public SlowFateSplit(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStrFunc, | ||
| AccumuloConfiguration conf) { | ||
| super(environment, store, false, toLogStrFunc, conf, new ScheduledThreadPoolExecutor(2)); | ||
| for (var poolConfig : getPoolConfigurations(conf, getStore().type()).entrySet()) { | ||
| fateExecutors.add(new SlowFateSplitExecutor(this, environment, poolConfig.getKey(), | ||
| poolConfig.getValue().getValue(), poolConfig.getValue().getKey())); | ||
| } | ||
| start(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thing here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a few new things have to be considered with these changes. Specifically what should the behavior be in all combinations of calling .start() and .shutdown().
For example, if we do the following:
var fate = new Fate<>(...);
fate.start();
fate.start();
I think this will spin up new sets of internal resources (watcher, cleaner) and replace the old ones without the old ones being shut down. It might be best to require that a Fate object be shut down before calling .start() on it if we even want to allow for reuse of these objects. I am not sure.
Another scenario we need to consider is the following:
var fate = new Fate<>(...);
fate.shutdown();
Will probably get some NPE on resources that are created in the .start method now.
Seems like we need a new AtomicBoolean started now to track the started state and handle these cases properly.
closes issue #5828
Moved Fate threads outside of the constructor and into a new start() method. Created a new FateConfig class to handle instantiations of variables being pass into the constructor and updated Fate calls to handle the new start method.