Skip to content

Commit 25a632b

Browse files
committed
wip
Signed-off-by: Attila Mészáros <[email protected]>
1 parent 863ecde commit 25a632b

File tree

4 files changed

+28
-15
lines changed

4 files changed

+28
-15
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public synchronized void start() {
8181
}
8282
}
8383

84-
public void eventReceived(
84+
public synchronized void eventReceived(
8585
ResourceAction action,
8686
T resource,
8787
T oldResource,
@@ -90,11 +90,12 @@ public void eventReceived(
9090
try {
9191
if (log.isDebugEnabled()) {
9292
log.debug(
93-
"Event received for resource: {} version: {} uuid: {} action: {}",
93+
"Event received for resource: {} version: {} uuid: {} action: {} filter event: {}",
9494
ResourceID.fromResource(resource),
9595
getVersion(resource),
9696
resource.getMetadata().getUid(),
97-
action);
97+
action,
98+
filterEvent);
9899
log.trace("Event Old resource: {},\n new resource: {}", oldResource, resource);
99100
}
100101
MDCUtils.addResourceInfo(resource);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,12 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
9696
log.debug("Update and cache: {}", id);
9797
}
9898
try {
99-
temporaryResourceCache.startModifying(id);
99+
temporaryResourceCache.startEventFilterModifying(id);
100100
var updated = updateMethod.apply(resourceToUpdate);
101101
handleRecentResourceUpdate(id, updated, resourceToUpdate);
102102
return updated;
103103
} finally {
104-
temporaryResourceCache.doneModifying(id);
104+
temporaryResourceCache.doneEventFilterModifying(id);
105105
}
106106
}
107107

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.Map;
1919
import java.util.Optional;
20+
import java.util.Set;
2021
import java.util.concurrent.ConcurrentHashMap;
2122
import java.util.concurrent.locks.ReentrantLock;
2223

@@ -55,13 +56,14 @@ public class TemporaryResourceCache<T extends HasMetadata> {
5556
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
5657
private final boolean comparableResourceVersions;
5758
private final Map<ResourceID, ReentrantLock> activelyModifying = new ConcurrentHashMap<>();
59+
private final Set<ResourceID> skipFiltering = ConcurrentHashMap.newKeySet();
5860
private String latestResourceVersion;
5961

6062
public TemporaryResourceCache(boolean comparableResourceVersions) {
6163
this.comparableResourceVersions = comparableResourceVersions;
6264
}
6365

64-
public void startModifying(ResourceID id) {
66+
public void startEventFilterModifying(ResourceID id) {
6567
if (!comparableResourceVersions) {
6668
return;
6769
}
@@ -78,7 +80,7 @@ public void startModifying(ResourceID id) {
7880
.lock();
7981
}
8082

81-
public void doneModifying(ResourceID id) {
83+
public void doneEventFilterModifying(ResourceID id) {
8284
if (!comparableResourceVersions) {
8385
return;
8486
}
@@ -102,35 +104,38 @@ public boolean onAddOrUpdateEvent(T resource) {
102104
}
103105

104106
private boolean onEvent(T resource, boolean unknownState) {
105-
ReentrantLock lock = activelyModifying.get(ResourceID.fromResource(resource));
107+
var resourceId = ResourceID.fromResource(resource);
108+
ReentrantLock lock = activelyModifying.get(resourceId);
106109
if (lock != null) {
107110
lock.lock(); // wait for the modification to finish
108111
lock.unlock(); // simply unlock as the event is guaranteed after the modification
109112
}
110-
boolean[] known = new boolean[1];
113+
boolean[] filter = new boolean[1];
111114
synchronized (this) {
112115
if (!unknownState) {
113116
latestResourceVersion = resource.getMetadata().getResourceVersion();
114117
}
115118
cache.computeIfPresent(
116-
ResourceID.fromResource(resource),
119+
resourceId,
117120
(id, cached) -> {
118121
boolean remove = unknownState;
119122
if (!unknownState) {
120123
int comp = ReconcileUtils.compareResourceVersions(resource, cached);
121124
if (comp >= 0) {
122125
remove = true;
123126
}
124-
if (comp <= 0) {
125-
known[0] = true;
127+
if (comp < 0) {
128+
filter[0] = true;
129+
} else {
130+
filter[0] = skipFiltering.remove(resourceId);
126131
}
127132
}
128133
if (remove) {
129134
return null;
130135
}
131136
return cached;
132137
});
133-
return known[0];
138+
return filter[0];
134139
}
135140
}
136141

@@ -178,10 +183,17 @@ public synchronized void putResource(T newResource) {
178183
newResource.getMetadata().getResourceVersion(),
179184
resourceId);
180185
cache.put(resourceId, newResource);
186+
if (!isFilteringModification(resourceId)) {
187+
skipFiltering.add(resourceId);
188+
}
181189
}
182190
}
183191

184192
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
185193
return Optional.ofNullable(cache.get(resourceID));
186194
}
195+
196+
private boolean isFilteringModification(ResourceID resourceId) {
197+
return activelyModifying.containsKey(resourceId);
198+
}
187199
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,15 @@ void nonComparableResourceVersionsDisables() {
122122
void lockedEventBeforePut() throws Exception {
123123
var testResource = testResource();
124124

125-
temporaryResourceCache.startModifying(ResourceID.fromResource(testResource));
125+
temporaryResourceCache.startEventFilterModifying(ResourceID.fromResource(testResource));
126126

127127
ExecutorService ex = Executors.newSingleThreadExecutor();
128128
try {
129129
var result = ex.submit(() -> temporaryResourceCache.onAddOrUpdateEvent(testResource));
130130

131131
temporaryResourceCache.putResource(testResource);
132132
assertThat(result.isDone()).isFalse();
133-
temporaryResourceCache.doneModifying(ResourceID.fromResource(testResource));
133+
temporaryResourceCache.doneEventFilterModifying(ResourceID.fromResource(testResource));
134134
assertThat(result.get(10, TimeUnit.SECONDS)).isTrue();
135135
} finally {
136136
ex.shutdownNow();

0 commit comments

Comments
 (0)