Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve wait logic to a more elegant solution #1160 #1169

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.ImmutableContext;
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.Structure;
import dev.openfeature.sdk.exceptions.GeneralError;
import lombok.Getter;
import lombok.Setter;

/**
* Contains all fields we need to worry about locking, used as intrinsic lock
* for sync blocks.
*/
@Getter
public class EventsLock {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this class out of the FlagdProvider to ensure no private fields are accessed and to keep the size of the file down.
However, I feel like the name is not approrpaite anymore, so please feel free to suggest a better one. Also, this new class is strongly coupled to the FlagdProvider, if you have any suggestions to make it cleaner please let me know

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe FlagdProviderSyncState or similar could be an appropriate name - or just SyncState or SyncResource - but to be fair, i am also not hundred percent happy with my suggestions

@Setter
private volatile ProviderEvent previousEvent = null;

private volatile ImmutableStructure syncMetadata = new ImmutableStructure();
private volatile EvaluationContext enrichedContext = new ImmutableContext();
private volatile boolean initialized;
private volatile boolean isShutDown;

public void setSyncMetadata(Structure syncMetadata) {
this.syncMetadata = new ImmutableStructure(syncMetadata.asMap());
}

public void setEnrichedContext(EvaluationContext context) {
this.enrichedContext = new ImmutableContext(context.asMap());
}

/**
* With this method called, it is suggested that initialization has been completed. It will wake up all threads that
* wait for the initialization. Subsequent calls have no effect.
*
* @return true iff this was the first call to {@code initialize()}
*/
public synchronized boolean initialize() {
if (this.initialized) {
return false;
}
this.initialized = true;
this.notifyAll();
return true;
}

/**
* Blocks the calling thread until either {@link EventsLock#initialize()} or {@link EventsLock#shutdown()} is called
* or the deadline is exceeded, whatever happens first. If {@link EventsLock#initialize()} has been executed before
* {@code waitForInitialization(long)} is called, it will return instantly.
* If the deadline is exceeded, a GeneralError will be thrown.
* If {@link EventsLock#shutdown()} is called in the meantime, an {@link IllegalStateException} will be thrown.
* Otherwise, the method will return cleanly.
*
* @param deadline the maximum time in ms to wait
* @throws GeneralError when the deadline is exceeded before {@link EventsLock#initialize()} is called on this
* object
* @throws IllegalStateException when {@link EventsLock#shutdown()} is called or has been called on this object
*/
public void waitForInitialization(long deadline) {
long start = System.currentTimeMillis();
long end = start + deadline;
while (!initialized && !isShutDown) {
long now = System.currentTimeMillis();
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
if (now >= end) {
throw new GeneralError(String.format(
"Deadline exceeded. Condition did not complete within the %d ms deadline", deadline));
}
long remaining = end - now;
synchronized (this) {
if (initialized) { // might have changed in the meantime
return;
}
if (isShutDown) {
break;
}
try {
this.wait(remaining);
} catch (InterruptedException e) {
// try again. Leave the continue to make PMD happy
continue;
}
}
}
if (isShutDown) {
throw new IllegalStateException("Already shut down");
}
}

/**
* Signals a shutdown. Threads waiting for initialization will wake up and throw an {@link IllegalStateException}.
*/
public synchronized void shutdown() {
isShutDown = true;
this.notifyAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.EventProvider;
import dev.openfeature.sdk.Hook;
import dev.openfeature.sdk.ImmutableContext;
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Metadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEvent;
Expand Down Expand Up @@ -108,7 +105,9 @@ public FlagdProvider(final FlagdOptions options) {
gracePeriod = Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD;
hooks.add(new SyncMetadataHook(this::getEnrichedContext));
errorExecutor = Executors.newSingleThreadScheduledExecutor();
this.eventsLock.initialized = initialized;
if (initialized) {
this.eventsLock.initialize();
}
}

@Override
Expand All @@ -119,27 +118,26 @@ public List<Hook> getProviderHooks() {
@Override
public void initialize(EvaluationContext evaluationContext) throws Exception {
synchronized (eventsLock) {
if (eventsLock.initialized) {
if (eventsLock.isInitialized()) {
return;
}

flagResolver.init();
// block till ready - this works with deadline fine for rpc, but with in_process
// we also need to take parsing into the equation
// TODO: evaluate where we are losing time, so we can remove this magic number -
eventsLock.waitForInitialization(this.deadline * 2);
}
// block till ready - this works with deadline fine for rpc, but with in_process
// we also need to take parsing into the equation
// TODO: evaluate where we are losing time, so we can remove this magic number -
// follow up
// wait outside of the synchonrization or we'll deadlock
Util.busyWaitAndCheck(this.deadline * 2, () -> eventsLock.initialized);
}

@Override
public void shutdown() {
synchronized (eventsLock) {
if (!eventsLock.initialized) {
return;
}
try {
if (!eventsLock.isInitialized() || eventsLock.isShutDown()) {
return;
}

this.flagResolver.shutdown();
if (errorExecutor != null) {
errorExecutor.shutdownNow();
Expand All @@ -148,7 +146,7 @@ public void shutdown() {
} catch (Exception e) {
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
} finally {
eventsLock.initialized = false;
eventsLock.shutdown();
}
}
}
Expand Down Expand Up @@ -193,7 +191,7 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
* @return Object map representing sync metadata
*/
protected Structure getSyncMetadata() {
return new ImmutableStructure(eventsLock.syncMetadata.asMap());
return eventsLock.getSyncMetadata();
}

/**
Expand All @@ -202,17 +200,17 @@ protected Structure getSyncMetadata() {
* @return context
*/
EvaluationContext getEnrichedContext() {
return eventsLock.enrichedContext;
return eventsLock.getEnrichedContext();
}

@SuppressWarnings("checkstyle:fallthrough")
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {

synchronized (eventsLock) {
log.info("FlagdProviderEvent: {}", flagdProviderEvent);
eventsLock.syncMetadata = flagdProviderEvent.getSyncMetadata();
eventsLock.setSyncMetadata(flagdProviderEvent.getSyncMetadata());
if (flagdProviderEvent.getSyncMetadata() != null) {
eventsLock.enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata());
eventsLock.setEnrichedContext(contextEnricher.apply(flagdProviderEvent.getSyncMetadata()));
}

/*
Expand All @@ -226,21 +224,22 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
*/
switch (flagdProviderEvent.getEvent()) {
case PROVIDER_CONFIGURATION_CHANGED:
if (eventsLock.previousEvent == ProviderEvent.PROVIDER_READY) {
if (eventsLock.getPreviousEvent() == ProviderEvent.PROVIDER_READY) {
onConfigurationChanged(flagdProviderEvent);
break;
}
// intentional fall through, a not-ready change will trigger a ready.
case PROVIDER_READY:
onReady();
eventsLock.previousEvent = ProviderEvent.PROVIDER_READY;
eventsLock.setPreviousEvent(ProviderEvent.PROVIDER_READY);
break;

case PROVIDER_ERROR:
if (eventsLock.previousEvent != ProviderEvent.PROVIDER_ERROR) {
if (eventsLock.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) {
onError();
eventsLock.setPreviousEvent(ProviderEvent.PROVIDER_ERROR);
}
eventsLock.previousEvent = ProviderEvent.PROVIDER_ERROR;

break;
default:
log.info("Unknown event {}", flagdProviderEvent.getEvent());
Expand All @@ -256,8 +255,7 @@ private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
}

private void onReady() {
if (!eventsLock.initialized) {
eventsLock.initialized = true;
if (eventsLock.initialize()) {
log.info("initialized FlagdProvider");
}
if (errorTask != null && !errorTask.isCancelled()) {
Expand All @@ -282,7 +280,7 @@ private void onError() {
if (!errorExecutor.isShutdown()) {
errorTask = errorExecutor.schedule(
() -> {
if (eventsLock.previousEvent == ProviderEvent.PROVIDER_ERROR) {
if (eventsLock.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) {
log.debug(
"Provider did not reconnect successfully within {}s. Emit ERROR event...",
gracePeriod);
Expand All @@ -296,15 +294,4 @@ private void onError() {
TimeUnit.SECONDS);
}
}

/**
* Contains all fields we need to worry about locking, used as intrinsic lock
* for sync blocks.
*/
static class EventsLock {
volatile ProviderEvent previousEvent = null;
volatile Structure syncMetadata = new ImmutableStructure();
volatile boolean initialized = false;
volatile EvaluationContext enrichedContext = new ImmutableContext();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class InProcessResolver implements Resolver {
private final Storage flagStore;
private final Consumer<FlagdProviderEvent> onConnectionEvent;
private final Operator operator;
private final long deadline;
private final String scope;

/**
Expand All @@ -52,7 +51,6 @@ public class InProcessResolver implements Resolver {
*/
public InProcessResolver(FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
this.flagStore = new FlagStore(getConnector(options, onConnectionEvent));
this.deadline = options.getDeadline();
this.onConnectionEvent = onConnectionEvent;
this.operator = new Operator();
this.scope = options.getSelector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.AdditionalAnswers.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat;
Expand Down Expand Up @@ -600,7 +601,8 @@ void contextEnrichment() throws Exception {
MutableStructure metadata = new MutableStructure();
metadata.add(key, val);
// given
final Function<Structure, EvaluationContext> mockEnricher = mock(Function.class);
final Function<Structure, EvaluationContext> enricher = structure -> new ImmutableContext(structure.asMap());
final Function<Structure, EvaluationContext> mockEnricher = mock(Function.class, delegatesTo(enricher));

// mock a resolver
try (MockedConstruction<InProcessResolver> mockResolver =
Expand Down
Loading
Loading