Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,11 @@ private boolean processInitial(TransferProcess process) {
var provisioning = dataFlowManager.prepare(process, policy);
if (provisioning.succeeded()) {
var response = provisioning.getContent();
process.setDataPlaneId(response.getDataPlaneId());
if (response.isProvisioning()) {
process.setDataPlaneId(response.getDataPlaneId());
process.transitionProvisioningRequested();
} else {
process.setDataPlaneId(null);
process.updateDestination(response.getDataAddress());
process.transitionRequesting();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -842,8 +842,10 @@ void shouldTransitionToProvisioningRequested_whenProvisionThroughDataplaneSuccee
@Test
void shouldTransitionToRequesting_whenProvisionThroughDataplaneSucceedsButNoActualProvisionNeeded() {
var dataPlaneId = UUID.randomUUID().toString();
var dataDestination = DataAddress.Builder.newInstance().type("any").build();
var dataFlowResponse = DataFlowResponse.Builder.newInstance()
.dataPlaneId(dataPlaneId)
.dataAddress(dataDestination)
.provisioning(false)
.build();
var transferProcess = createTransferProcess(INITIAL);
Expand All @@ -863,7 +865,8 @@ void shouldTransitionToRequesting_whenProvisionThroughDataplaneSucceedsButNoActu
verify(transferProcessStore).save(captor.capture());
var storedTransferProcess = captor.getValue();
assertThat(storedTransferProcess.getState()).isEqualTo(REQUESTING.code());
assertThat(storedTransferProcess.getDataPlaneId()).isEqualTo(null);
assertThat(storedTransferProcess.getDataPlaneId()).isEqualTo(dataPlaneId);
assertThat(storedTransferProcess.getDataDestination()).isSameAs(dataDestination);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.edc.connector.dataplane.selector;

import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneAvailabilityChecker;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.connector.dataplane.selector.spi.strategy.RandomSelectionStrategy;
import org.eclipse.edc.connector.dataplane.selector.spi.strategy.SelectionStrategyRegistry;
Expand All @@ -23,6 +24,7 @@
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.system.ServiceExtension;

import java.time.Clock;
Expand Down Expand Up @@ -57,4 +59,9 @@ public SelectionStrategyRegistry selectionStrategyRegistry() {
strategy.add(new RandomSelectionStrategy());
return strategy;
}

@Provider(isDefault = true)
public DataPlaneAvailabilityChecker dataPlaneAvailabilityChecker() {
return dataPlane -> StatusResult.success();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.eclipse.edc.connector.dataplane.selector.manager.DataPlaneSelectorManagerImpl;
import org.eclipse.edc.connector.dataplane.selector.service.EmbeddedDataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneAvailabilityChecker;
import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneSelectorManager;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.connector.dataplane.selector.spi.strategy.SelectionStrategyRegistry;
Expand Down Expand Up @@ -59,7 +59,7 @@ public class DataPlaneSelectorExtension implements ServiceExtension {
@Inject
private SelectionStrategyRegistry selectionStrategyRegistry;
@Inject
private DataPlaneClientFactory clientFactory;
private DataPlaneAvailabilityChecker dataPlaneAvailabilityChecker;

private DataPlaneSelectorManager manager;

Expand Down Expand Up @@ -90,11 +90,11 @@ public DataPlaneSelectorManager dataPlaneSelectorManager(ServiceExtensionContext
);

manager = DataPlaneSelectorManagerImpl.Builder.newInstance()
.clientFactory(clientFactory)
.store(instanceStore)
.monitor(context.getMonitor())
.configuration(configuration)
.entityRetryProcessConfiguration(stateMachineConfiguration.entityRetryProcessConfiguration())
.availabilityChecker(dataPlaneAvailabilityChecker)
.build();
}
return manager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstanceStates;
import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneAvailabilityChecker;
import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneSelectorManager;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.spi.query.Criterion;
Expand All @@ -39,6 +40,7 @@ public class DataPlaneSelectorManagerImpl extends AbstractStateEntityManager<Dat

private DataPlaneClientFactory clientFactory;
private Duration checkPeriod = Duration.ofMinutes(1);
private DataPlaneAvailabilityChecker availabilityChecker;

private DataPlaneSelectorManagerImpl() {
}
Expand All @@ -60,8 +62,7 @@ private boolean checkAvailability(DataPlaneInstance instance) {
}

private boolean availability(DataPlaneInstance instance) {
var client = clientFactory.createClient(instance);
var availability = client.checkAvailability();
var availability = availabilityChecker.checkAvailability(instance);
if (availability.succeeded()) {
instance.transitionToAvailable();
} else {
Expand Down Expand Up @@ -97,13 +98,13 @@ public Builder self() {
return this;
}

public Builder clientFactory(DataPlaneClientFactory clientFactory) {
manager.clientFactory = clientFactory;
public Builder checkPeriod(Duration checkPeriod) {
manager.checkPeriod = checkPeriod;
return this;
}

public Builder checkPeriod(Duration checkPeriod) {
manager.checkPeriod = checkPeriod;
public Builder availabilityChecker(DataPlaneAvailabilityChecker dataPlaneAvailabilityChecker) {
manager.availabilityChecker = dataPlaneAvailabilityChecker;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@

package org.eclipse.edc.connector.dataplane.selector.manager;

import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstanceStates;
import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneAvailabilityChecker;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.response.StatusResult;
Expand Down Expand Up @@ -52,14 +51,14 @@
class DataPlaneSelectorManagerImplTest {

private final DataPlaneInstanceStore store = mock();
private final DataPlaneClientFactory clientFactory = mock();
private final DataPlaneAvailabilityChecker availabilityChecker = mock();
private final Duration checkPeriod = Duration.of(10, SECONDS);
private final Instant now = Instant.now();
private final Clock clock = Clock.fixed(now, ZoneId.systemDefault());
private final DataPlaneSelectorManagerImpl manager = DataPlaneSelectorManagerImpl.Builder.newInstance()
.monitor(mock())
.store(store)
.clientFactory(clientFactory)
.availabilityChecker(availabilityChecker)
.checkPeriod(checkPeriod)
.clock(clock)
.build();
Expand All @@ -71,9 +70,7 @@ class Registered {
void shouldTransitionToAvailable_whenDataPlaneIsAvailable() {
var instance = DataPlaneInstance.Builder.newInstance().state(REGISTERED.code()).url("http://any").build();
when(store.nextNotLeased(anyInt(), stateIs(REGISTERED))).thenReturn(List.of(instance)).thenReturn(emptyList());
DataPlaneClient dataPlaneClient = mock();
when(clientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(dataPlaneClient.checkAvailability()).thenReturn(StatusResult.success());
when(availabilityChecker.checkAvailability(any())).thenReturn(StatusResult.success());

manager.start();

Expand All @@ -86,9 +83,7 @@ void shouldTransitionToAvailable_whenDataPlaneIsAvailable() {
void shouldTransitionToUnavailable_whenDataPlaneIsNotAvailable() {
var instance = DataPlaneInstance.Builder.newInstance().state(REGISTERED.code()).url("http://any").build();
when(store.nextNotLeased(anyInt(), stateIs(REGISTERED))).thenReturn(List.of(instance)).thenReturn(emptyList());
DataPlaneClient dataPlaneClient = mock();
when(clientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(dataPlaneClient.checkAvailability()).thenReturn(StatusResult.failure(FATAL_ERROR));
when(availabilityChecker.checkAvailability(any())).thenReturn(StatusResult.failure(FATAL_ERROR));

manager.start();

Expand All @@ -108,9 +103,7 @@ void shouldRemainAvailable_whenDataPlaneIsAvailable() {
var instance = DataPlaneInstance.Builder.newInstance().state(AVAILABLE.code()).url("http://any")
.updatedAt(updatedAt.toEpochMilli()).build();
when(store.nextNotLeased(anyInt(), stateIs(AVAILABLE))).thenReturn(List.of(instance)).thenReturn(emptyList());
DataPlaneClient dataPlaneClient = mock();
when(clientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(dataPlaneClient.checkAvailability()).thenReturn(StatusResult.success());
when(availabilityChecker.checkAvailability(any())).thenReturn(StatusResult.success());

manager.start();

Expand All @@ -125,9 +118,7 @@ void shouldTransitionToUnavailable_whenDataPlaneIsNotAvailable() {
var instance = DataPlaneInstance.Builder.newInstance().state(AVAILABLE.code()).url("http://any")
.updatedAt(updatedAt.toEpochMilli()).build();
when(store.nextNotLeased(anyInt(), stateIs(AVAILABLE))).thenReturn(List.of(instance)).thenReturn(emptyList());
DataPlaneClient dataPlaneClient = mock();
when(clientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(dataPlaneClient.checkAvailability()).thenReturn(StatusResult.failure(FATAL_ERROR));
when(availabilityChecker.checkAvailability(any())).thenReturn(StatusResult.failure(FATAL_ERROR));

manager.start();

Expand All @@ -142,14 +133,12 @@ void shouldNotCheckAvailability_whenCheckPeriodIsLowerThanConfiguredOne() {
var instance = DataPlaneInstance.Builder.newInstance().state(AVAILABLE.code()).url("http://any")
.updatedAt(updatedAt.toEpochMilli()).build();
when(store.nextNotLeased(anyInt(), stateIs(AVAILABLE))).thenReturn(List.of(instance)).thenReturn(emptyList());
DataPlaneClient dataPlaneClient = mock();
when(clientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(dataPlaneClient.checkAvailability()).thenReturn(StatusResult.success());
when(availabilityChecker.checkAvailability(any())).thenReturn(StatusResult.success());

manager.start();

await().untilAsserted(() -> {
verifyNoInteractions(clientFactory);
verifyNoInteractions(availabilityChecker);
verify(store, atLeast(2)).nextNotLeased(anyInt(), stateIs(AVAILABLE));
});
}
Expand All @@ -164,9 +153,7 @@ void shouldRemainUnavailable_whenDataPlaneIsUnavailable() {
var instance = DataPlaneInstance.Builder.newInstance().state(UNAVAILABLE.code()).url("http://any")
.updatedAt(updatedAt.toEpochMilli()).build();
when(store.nextNotLeased(anyInt(), stateIs(UNAVAILABLE))).thenReturn(List.of(instance)).thenReturn(emptyList());
DataPlaneClient dataPlaneClient = mock();
when(clientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(dataPlaneClient.checkAvailability()).thenReturn(StatusResult.failure(FATAL_ERROR));
when(availabilityChecker.checkAvailability(any())).thenReturn(StatusResult.failure(FATAL_ERROR));

manager.start();

Expand All @@ -181,9 +168,7 @@ void shouldTransitionToAvailable_whenDataPlaneIsAvailable() {
var instance = DataPlaneInstance.Builder.newInstance().state(UNAVAILABLE.code()).url("http://any")
.updatedAt(updatedAt.toEpochMilli()).build();
when(store.nextNotLeased(anyInt(), stateIs(UNAVAILABLE))).thenReturn(List.of(instance)).thenReturn(emptyList());
DataPlaneClient dataPlaneClient = mock();
when(clientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(dataPlaneClient.checkAvailability()).thenReturn(StatusResult.success());
when(availabilityChecker.checkAvailability(any())).thenReturn(StatusResult.success());

manager.start();

Expand All @@ -198,14 +183,12 @@ void shouldNotCheckAvailability_whenCheckPeriodIsLowerThanConfiguredOne() {
var instance = DataPlaneInstance.Builder.newInstance().state(UNAVAILABLE.code()).url("http://any")
.updatedAt(updatedAt.toEpochMilli()).build();
when(store.nextNotLeased(anyInt(), stateIs(UNAVAILABLE))).thenReturn(List.of(instance)).thenReturn(emptyList());
DataPlaneClient dataPlaneClient = mock();
when(clientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(dataPlaneClient.checkAvailability()).thenReturn(StatusResult.success());
when(availabilityChecker.checkAvailability(any())).thenReturn(StatusResult.success());

manager.start();

await().untilAsserted(() -> {
verifyNoInteractions(clientFactory);
verifyNoInteractions(availabilityChecker);
verify(store, atLeast(2)).nextNotLeased(anyInt(), stateIs(UNAVAILABLE));
});
}
Expand Down
1 change: 1 addition & 0 deletions data-protocols/data-plane-signaling/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
api(project(":spi:common:transform-spi"))
api(project(":spi:common:web-spi"))
api(project(":spi:control-plane:contract-spi"))
api(project(":spi:control-plane:control-plane-spi"))
api(project(":spi:control-plane:transfer-spi"))
api(project(":spi:data-plane-selector:data-plane-selector-spi"))
implementation(project(":core:common:lib:api-lib"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,60 +14,42 @@

package org.eclipse.edc.signaling;

import jakarta.json.Json;
import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowProvisionMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowStartMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowSuspendMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowTerminateMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowResponseMessageTransformer;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneAvailabilityChecker;
import org.eclipse.edc.http.spi.ControlApiHttpClient;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.signaling.port.DataPlaneSignalingClient;
import org.eclipse.edc.signaling.port.ClientFactory;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;

import java.util.Map;

import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.VOCAB;
import static org.eclipse.edc.signaling.DataPlaneSignalingClientExtension.NAME;
import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.edc.spi.constants.CoreConstants.JSON_LD;

@Extension(NAME)
public class DataPlaneSignalingClientExtension implements ServiceExtension {

public static final String NAME = "Data Plane Signaling client";
private static final String CONTROL_CLIENT_SCOPE = "CONTROL_CLIENT_SCOPE";
public static final String NAME = "Data Plane Signaling Availability Checker";

@Inject
private ControlApiHttpClient httpClient;
@Inject
private TypeTransformerRegistry transformerRegistry;
@Inject
private TypeManager typeManager;
@Inject
private JsonLd jsonLd;

@Provider
public DataPlaneClientFactory dataPlaneClientFactory() {
jsonLd.registerNamespace(VOCAB, EDC_NAMESPACE, CONTROL_CLIENT_SCOPE);

var factory = Json.createBuilderFactory(Map.of());
var signalingApiTypeTransformerRegistry = transformerRegistry.forContext("signaling-api");
private ClientFactory clientFactory;

signalingApiTypeTransformerRegistry.register(new JsonObjectFromDataFlowStartMessageTransformer(factory, typeManager, JSON_LD));
signalingApiTypeTransformerRegistry.register(new JsonObjectFromDataFlowProvisionMessageTransformer(factory, typeManager, JSON_LD));
signalingApiTypeTransformerRegistry.register(new JsonObjectFromDataFlowSuspendMessageTransformer(factory));
signalingApiTypeTransformerRegistry.register(new JsonObjectFromDataFlowTerminateMessageTransformer(factory));
signalingApiTypeTransformerRegistry.register(new JsonObjectToDataFlowResponseMessageTransformer());
@Provider
public DataPlaneAvailabilityChecker dataPlaneAvailabilityChecker() {
var clientFactory = clientFactory();
return dataPlane -> clientFactory.createClient(dataPlane).checkAvailability();
}

return dataPlane -> new DataPlaneSignalingClient(dataPlane, httpClient,
() -> typeManager.getMapper(JSON_LD), signalingApiTypeTransformerRegistry, jsonLd,
CONTROL_CLIENT_SCOPE);
@Provider
public ClientFactory clientFactory() {
if (clientFactory == null) {
clientFactory = new ClientFactory(httpClient, () -> typeManager.getMapper(JSON_LD));
}
return clientFactory;
}

}
Loading