Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve wait logic to a more elegant solution #1160 #1169

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@
<!-- this can be overriden in child POMs to support specific SDK requirements -->
<groupId>dev.openfeature</groupId>
<artifactId>sdk</artifactId>
<!-- 1.14 <= v < 2.0 (excluding 2.0 pre-releases)-->
<version>[1.14,1.99999)</version>
<!-- 1.14.1 <= v < 2.0 (excluding 2.0 pre-releases)-->
<version>[1.14.1,1.99999)</version>
<!-- use the version provided at runtime -->
<scope>provided</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.EventProvider;
import dev.openfeature.sdk.Hook;
import dev.openfeature.sdk.ImmutableContext;
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Metadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEvent;
Expand All @@ -37,7 +34,7 @@ public class FlagdProvider extends EventProvider {
private static final String FLAGD_PROVIDER = "flagd";
private final Resolver flagResolver;
private final List<Hook> hooks = new ArrayList<>();
private final EventsLock eventsLock = new EventsLock();
private final FlagdProviderSyncResources syncResources = new FlagdProviderSyncResources();

/**
* An executor service responsible for emitting
Expand Down Expand Up @@ -109,7 +106,9 @@ public FlagdProvider(final FlagdOptions options) {
gracePeriod = Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD;
hooks.add(new SyncMetadataHook(this::getEnrichedContext));
errorExecutor = Executors.newSingleThreadScheduledExecutor();
this.eventsLock.initialized = initialized;
if (initialized) {
this.syncResources.initialize();
}
}

@Override
Expand All @@ -119,28 +118,28 @@ public List<Hook> getProviderHooks() {

@Override
public void initialize(EvaluationContext evaluationContext) throws Exception {
synchronized (eventsLock) {
if (eventsLock.initialized) {
log.info("called initialize");
synchronized (syncResources) {
if (syncResources.isInitialized()) {
return;
}

flagResolver.init();
// block till ready - this works with deadline fine for rpc, but with in_process
// we also need to take parsing into the equation
// TODO: evaluate where we are losing time, so we can remove this magic number -
syncResources.waitForInitialization(this.deadline * 2);
}
// block till ready - this works with deadline fine for rpc, but with in_process
// we also need to take parsing into the equation
// TODO: evaluate where we are losing time, so we can remove this magic number -
// follow up
// wait outside of the synchonrization or we'll deadlock
Util.busyWaitAndCheck(this.deadline * 2, () -> eventsLock.initialized);
}

@Override
public void shutdown() {
synchronized (eventsLock) {
if (!eventsLock.initialized) {
return;
}
synchronized (syncResources) {
try {
if (!syncResources.isInitialized() || syncResources.isShutDown()) {
return;
}

this.flagResolver.shutdown();
if (errorExecutor != null) {
errorExecutor.shutdownNow();
Expand All @@ -149,7 +148,7 @@ public void shutdown() {
} catch (Exception e) {
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
} finally {
eventsLock.initialized = false;
syncResources.shutdown();
}
}
}
Expand Down Expand Up @@ -194,7 +193,7 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
* @return Object map representing sync metadata
*/
protected Structure getSyncMetadata() {
return new ImmutableStructure(eventsLock.syncMetadata.asMap());
return syncResources.getSyncMetadata();
}

/**
Expand All @@ -203,17 +202,15 @@ protected Structure getSyncMetadata() {
* @return context
*/
EvaluationContext getEnrichedContext() {
return eventsLock.enrichedContext;
return syncResources.getEnrichedContext();
}

@SuppressWarnings("checkstyle:fallthrough")
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {

synchronized (eventsLock) {
log.info("FlagdProviderEvent: {}", flagdProviderEvent.getEvent());
eventsLock.syncMetadata = flagdProviderEvent.getSyncMetadata();
log.info("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent());
synchronized (syncResources) {
syncResources.setSyncMetadata(flagdProviderEvent.getSyncMetadata());
if (flagdProviderEvent.getSyncMetadata() != null) {
eventsLock.enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata());
syncResources.setEnrichedContext(contextEnricher.apply(flagdProviderEvent.getSyncMetadata()));
}

/*
Expand All @@ -227,22 +224,26 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
*/
switch (flagdProviderEvent.getEvent()) {
case PROVIDER_CONFIGURATION_CHANGED:
if (eventsLock.previousEvent == ProviderEvent.PROVIDER_READY) {
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_READY) {
onConfigurationChanged(flagdProviderEvent);
break;
}
// intentional fall through, a not-ready change will trigger a ready.
onReady();
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY);
break;

case PROVIDER_READY:
onReady();
eventsLock.previousEvent = ProviderEvent.PROVIDER_READY;
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY);
break;

case PROVIDER_ERROR:
if (eventsLock.previousEvent != ProviderEvent.PROVIDER_ERROR) {
if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) {
onError();
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR);
}
eventsLock.previousEvent = ProviderEvent.PROVIDER_ERROR;
break;

default:
log.info("Unknown event {}", flagdProviderEvent.getEvent());
}
Expand All @@ -257,8 +258,7 @@ private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
}

private void onReady() {
if (!eventsLock.initialized) {
eventsLock.initialized = true;
if (syncResources.initialize()) {
log.info("initialized FlagdProvider");
}
if (errorTask != null && !errorTask.isCancelled()) {
Expand All @@ -283,7 +283,7 @@ private void onError() {
if (!errorExecutor.isShutdown()) {
errorTask = errorExecutor.schedule(
() -> {
if (eventsLock.previousEvent == ProviderEvent.PROVIDER_ERROR) {
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) {
log.debug(
"Provider did not reconnect successfully within {}s. Emit ERROR event...",
gracePeriod);
Expand All @@ -297,15 +297,4 @@ private void onError() {
TimeUnit.SECONDS);
}
}

/**
* Contains all fields we need to worry about locking, used as intrinsic lock
* for sync blocks.
*/
static class EventsLock {
volatile ProviderEvent previousEvent = null;
volatile Structure syncMetadata = new ImmutableStructure();
volatile boolean initialized = false;
volatile EvaluationContext enrichedContext = new ImmutableContext();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.ImmutableContext;
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.Structure;
import dev.openfeature.sdk.exceptions.GeneralError;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
* Contains all fields we need to worry about locking, used as intrinsic lock
* for sync blocks in the {@link FlagdProvider}.
*/
@Slf4j
@Getter
class FlagdProviderSyncResources {
@Setter
private volatile ProviderEvent previousEvent = null;

private volatile ImmutableStructure syncMetadata = new ImmutableStructure();
private volatile EvaluationContext enrichedContext = new ImmutableContext();
private volatile boolean initialized;
private volatile boolean isShutDown;

public void setSyncMetadata(Structure syncMetadata) {
this.syncMetadata = new ImmutableStructure(syncMetadata.asMap());
}

public void setEnrichedContext(EvaluationContext context) {
this.enrichedContext = new ImmutableContext(context.asMap());
}

/**
* With this method called, it is suggested that initialization has been completed. It will wake up all threads that
* wait for the initialization. Subsequent calls have no effect.
*
* @return true iff this was the first call to {@code initialize()}
*/
public synchronized boolean initialize() {
log.info("initialize in wait");
if (this.initialized) {
return false;
}
this.initialized = true;
this.notifyAll();
log.info("notified all " + this.toString());

return true;
}

/**
* Blocks the calling thread until either {@link FlagdProviderSyncResources#initialize()} or
* {@link FlagdProviderSyncResources#shutdown()} is called or the deadline is exceeded, whatever happens first. If
* {@link FlagdProviderSyncResources#initialize()} has been executed before {@code waitForInitialization(long)} is
* called, it will return instantly. If the deadline is exceeded, a GeneralError will be thrown.
* If {@link FlagdProviderSyncResources#shutdown()} is called in the meantime, an {@link IllegalStateException} will
* be thrown. Otherwise, the method will return cleanly.
*
* @param deadline the maximum time in ms to wait
* @throws GeneralError when the deadline is exceeded before
* {@link FlagdProviderSyncResources#initialize()} is called on this object
* @throws IllegalStateException when {@link FlagdProviderSyncResources#shutdown()} is called or has been called on
* this object
*/
public void waitForInitialization(long deadline) {
log.info("wait for init " + this);
long start = System.currentTimeMillis();
long end = start + deadline;
while (!initialized && !isShutDown) {
long now = System.currentTimeMillis();
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
if (now >= end) {
throw new GeneralError(String.format(
"Deadline exceeded. Condition did not complete within the %d ms deadline", deadline));
}
long remaining = end - now;
synchronized (this) {
if (initialized) { // might have changed in the meantime
log.info("post wait for init in loop");
return;
}
if (isShutDown) {
break;
}
log.info("waiting for " + remaining + " at " + System.currentTimeMillis());
try {
this.wait(remaining);
} catch (InterruptedException e) {
// try again. Leave the continue to make PMD happy
continue;
}
log.info("waiting ended at " + System.currentTimeMillis());
}
}
if (isShutDown) {
throw new IllegalStateException("Already shut down");
}
log.info("post wait for init");
}

/**
* Signals a shutdown. Threads waiting for initialization will wake up and throw an {@link IllegalStateException}.
*/
public synchronized void shutdown() {
log.info("shutdown in wait");
isShutDown = true;
this.notifyAll();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class InProcessResolver implements Resolver {
private final Storage flagStore;
private final Consumer<FlagdProviderEvent> onConnectionEvent;
private final Operator operator;
private final long deadline;
private final String scope;

/**
Expand All @@ -52,7 +51,6 @@ public class InProcessResolver implements Resolver {
*/
public InProcessResolver(FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
this.flagStore = new FlagStore(getConnector(options, onConnectionEvent));
this.deadline = options.getDeadline();
this.onConnectionEvent = onConnectionEvent;
this.operator = new Operator();
this.scope = options.getSelector();
Expand All @@ -70,10 +68,12 @@ public void init() throws Exception {
flagStore.getStateQueue().take();
switch (storageStateChange.getStorageState()) {
case OK:
log.info("onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
onConnectionEvent.accept(new FlagdProviderEvent(
ProviderEvent.PROVIDER_CONFIGURATION_CHANGED,
storageStateChange.getChangedFlagsKeys(),
storageStateChange.getSyncMetadata()));
log.info("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
break;
case ERROR:
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR));
Expand Down
Loading
Loading