diff --git a/core/control-plane/control-plane-transfer-manager/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java b/core/control-plane/control-plane-transfer-manager/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java index 8346d38a15c..5cd1b3e4a8a 100644 --- a/core/control-plane/control-plane-transfer-manager/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java +++ b/core/control-plane/control-plane-transfer-manager/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java @@ -216,11 +216,11 @@ private boolean processInitial(TransferProcess process) { var provisioning = dataFlowManager.prepare(process, policy); if (provisioning.succeeded()) { var response = provisioning.getContent(); + process.setDataPlaneId(response.getDataPlaneId()); if (response.isProvisioning()) { - process.setDataPlaneId(response.getDataPlaneId()); process.transitionProvisioningRequested(); } else { - process.setDataPlaneId(null); + process.updateDestination(response.getDataAddress()); process.transitionRequesting(); } diff --git a/core/control-plane/control-plane-transfer-manager/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java b/core/control-plane/control-plane-transfer-manager/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java index 320199cb482..7adc7411121 100644 --- a/core/control-plane/control-plane-transfer-manager/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java +++ b/core/control-plane/control-plane-transfer-manager/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java @@ -842,8 +842,10 @@ void shouldTransitionToProvisioningRequested_whenProvisionThroughDataplaneSuccee @Test void shouldTransitionToRequesting_whenProvisionThroughDataplaneSucceedsButNoActualProvisionNeeded() { var dataPlaneId = UUID.randomUUID().toString(); + var dataDestination = DataAddress.Builder.newInstance().type("any").build(); var dataFlowResponse = DataFlowResponse.Builder.newInstance() .dataPlaneId(dataPlaneId) + .dataAddress(dataDestination) .provisioning(false) .build(); var transferProcess = createTransferProcess(INITIAL); @@ -863,7 +865,8 @@ void shouldTransitionToRequesting_whenProvisionThroughDataplaneSucceedsButNoActu verify(transferProcessStore).save(captor.capture()); var storedTransferProcess = captor.getValue(); assertThat(storedTransferProcess.getState()).isEqualTo(REQUESTING.code()); - assertThat(storedTransferProcess.getDataPlaneId()).isEqualTo(null); + assertThat(storedTransferProcess.getDataPlaneId()).isEqualTo(dataPlaneId); + assertThat(storedTransferProcess.getDataDestination()).isSameAs(dataDestination); }); } } diff --git a/core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/DataPlaneSelectorDefaultServicesExtension.java b/core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/DataPlaneSelectorDefaultServicesExtension.java index 2e9ad19efcb..5e60da15a35 100644 --- a/core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/DataPlaneSelectorDefaultServicesExtension.java +++ b/core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/DataPlaneSelectorDefaultServicesExtension.java @@ -14,6 +14,7 @@ package org.eclipse.edc.connector.dataplane.selector; +import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneAvailabilityChecker; import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore; import org.eclipse.edc.connector.dataplane.selector.spi.strategy.RandomSelectionStrategy; import org.eclipse.edc.connector.dataplane.selector.spi.strategy.SelectionStrategyRegistry; @@ -23,6 +24,7 @@ import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Provider; import org.eclipse.edc.spi.query.CriterionOperatorRegistry; +import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.system.ServiceExtension; import java.time.Clock; @@ -57,4 +59,9 @@ public SelectionStrategyRegistry selectionStrategyRegistry() { strategy.add(new RandomSelectionStrategy()); return strategy; } + + @Provider(isDefault = true) + public DataPlaneAvailabilityChecker dataPlaneAvailabilityChecker() { + return dataPlane -> StatusResult.success(); + } } diff --git a/core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/DataPlaneSelectorExtension.java b/core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/DataPlaneSelectorExtension.java index 1e26bb7ddd0..2a055195dac 100644 --- a/core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/DataPlaneSelectorExtension.java +++ b/core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/DataPlaneSelectorExtension.java @@ -17,7 +17,7 @@ import org.eclipse.edc.connector.dataplane.selector.manager.DataPlaneSelectorManagerImpl; import org.eclipse.edc.connector.dataplane.selector.service.EmbeddedDataPlaneSelectorService; import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; -import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; +import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneAvailabilityChecker; import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneSelectorManager; import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore; import org.eclipse.edc.connector.dataplane.selector.spi.strategy.SelectionStrategyRegistry; @@ -59,7 +59,7 @@ public class DataPlaneSelectorExtension implements ServiceExtension { @Inject private SelectionStrategyRegistry selectionStrategyRegistry; @Inject - private DataPlaneClientFactory clientFactory; + private DataPlaneAvailabilityChecker dataPlaneAvailabilityChecker; private DataPlaneSelectorManager manager; @@ -90,11 +90,11 @@ public DataPlaneSelectorManager dataPlaneSelectorManager(ServiceExtensionContext ); manager = DataPlaneSelectorManagerImpl.Builder.newInstance() - .clientFactory(clientFactory) .store(instanceStore) .monitor(context.getMonitor()) .configuration(configuration) .entityRetryProcessConfiguration(stateMachineConfiguration.entityRetryProcessConfiguration()) + .availabilityChecker(dataPlaneAvailabilityChecker) .build(); } return manager; diff --git a/core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/manager/DataPlaneSelectorManagerImpl.java b/core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/manager/DataPlaneSelectorManagerImpl.java index 4af7406db6d..eb59aea26e3 100644 --- a/core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/manager/DataPlaneSelectorManagerImpl.java +++ b/core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/manager/DataPlaneSelectorManagerImpl.java @@ -18,6 +18,7 @@ import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstanceStates; +import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneAvailabilityChecker; import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneSelectorManager; import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore; import org.eclipse.edc.spi.query.Criterion; @@ -39,6 +40,7 @@ public class DataPlaneSelectorManagerImpl extends AbstractStateEntityManager { - verifyNoInteractions(clientFactory); + verifyNoInteractions(availabilityChecker); verify(store, atLeast(2)).nextNotLeased(anyInt(), stateIs(AVAILABLE)); }); } @@ -164,9 +153,7 @@ void shouldRemainUnavailable_whenDataPlaneIsUnavailable() { var instance = DataPlaneInstance.Builder.newInstance().state(UNAVAILABLE.code()).url("http://any") .updatedAt(updatedAt.toEpochMilli()).build(); when(store.nextNotLeased(anyInt(), stateIs(UNAVAILABLE))).thenReturn(List.of(instance)).thenReturn(emptyList()); - DataPlaneClient dataPlaneClient = mock(); - when(clientFactory.createClient(any())).thenReturn(dataPlaneClient); - when(dataPlaneClient.checkAvailability()).thenReturn(StatusResult.failure(FATAL_ERROR)); + when(availabilityChecker.checkAvailability(any())).thenReturn(StatusResult.failure(FATAL_ERROR)); manager.start(); @@ -181,9 +168,7 @@ void shouldTransitionToAvailable_whenDataPlaneIsAvailable() { var instance = DataPlaneInstance.Builder.newInstance().state(UNAVAILABLE.code()).url("http://any") .updatedAt(updatedAt.toEpochMilli()).build(); when(store.nextNotLeased(anyInt(), stateIs(UNAVAILABLE))).thenReturn(List.of(instance)).thenReturn(emptyList()); - DataPlaneClient dataPlaneClient = mock(); - when(clientFactory.createClient(any())).thenReturn(dataPlaneClient); - when(dataPlaneClient.checkAvailability()).thenReturn(StatusResult.success()); + when(availabilityChecker.checkAvailability(any())).thenReturn(StatusResult.success()); manager.start(); @@ -198,14 +183,12 @@ void shouldNotCheckAvailability_whenCheckPeriodIsLowerThanConfiguredOne() { var instance = DataPlaneInstance.Builder.newInstance().state(UNAVAILABLE.code()).url("http://any") .updatedAt(updatedAt.toEpochMilli()).build(); when(store.nextNotLeased(anyInt(), stateIs(UNAVAILABLE))).thenReturn(List.of(instance)).thenReturn(emptyList()); - DataPlaneClient dataPlaneClient = mock(); - when(clientFactory.createClient(any())).thenReturn(dataPlaneClient); - when(dataPlaneClient.checkAvailability()).thenReturn(StatusResult.success()); + when(availabilityChecker.checkAvailability(any())).thenReturn(StatusResult.success()); manager.start(); await().untilAsserted(() -> { - verifyNoInteractions(clientFactory); + verifyNoInteractions(availabilityChecker); verify(store, atLeast(2)).nextNotLeased(anyInt(), stateIs(UNAVAILABLE)); }); } diff --git a/data-protocols/data-plane-signaling/build.gradle.kts b/data-protocols/data-plane-signaling/build.gradle.kts index a0d4aa5d2d9..6f462f1d82a 100644 --- a/data-protocols/data-plane-signaling/build.gradle.kts +++ b/data-protocols/data-plane-signaling/build.gradle.kts @@ -24,6 +24,7 @@ dependencies { api(project(":spi:common:transform-spi")) api(project(":spi:common:web-spi")) api(project(":spi:control-plane:contract-spi")) + api(project(":spi:control-plane:control-plane-spi")) api(project(":spi:control-plane:transfer-spi")) api(project(":spi:data-plane-selector:data-plane-selector-spi")) implementation(project(":core:common:lib:api-lib")) diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingClientExtension.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingClientExtension.java index 46083490613..e021aa3533c 100644 --- a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingClientExtension.java +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingClientExtension.java @@ -14,60 +14,42 @@ package org.eclipse.edc.signaling; -import jakarta.json.Json; -import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowProvisionMessageTransformer; -import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowStartMessageTransformer; -import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowSuspendMessageTransformer; -import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowTerminateMessageTransformer; -import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowResponseMessageTransformer; -import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; +import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneAvailabilityChecker; import org.eclipse.edc.http.spi.ControlApiHttpClient; -import org.eclipse.edc.jsonld.spi.JsonLd; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Provider; -import org.eclipse.edc.signaling.port.DataPlaneSignalingClient; +import org.eclipse.edc.signaling.port.ClientFactory; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.types.TypeManager; -import org.eclipse.edc.transform.spi.TypeTransformerRegistry; -import java.util.Map; - -import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.VOCAB; import static org.eclipse.edc.signaling.DataPlaneSignalingClientExtension.NAME; -import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; import static org.eclipse.edc.spi.constants.CoreConstants.JSON_LD; @Extension(NAME) public class DataPlaneSignalingClientExtension implements ServiceExtension { - public static final String NAME = "Data Plane Signaling client"; - private static final String CONTROL_CLIENT_SCOPE = "CONTROL_CLIENT_SCOPE"; + public static final String NAME = "Data Plane Signaling Availability Checker"; @Inject private ControlApiHttpClient httpClient; @Inject - private TypeTransformerRegistry transformerRegistry; - @Inject private TypeManager typeManager; - @Inject - private JsonLd jsonLd; - - @Provider - public DataPlaneClientFactory dataPlaneClientFactory() { - jsonLd.registerNamespace(VOCAB, EDC_NAMESPACE, CONTROL_CLIENT_SCOPE); - var factory = Json.createBuilderFactory(Map.of()); - var signalingApiTypeTransformerRegistry = transformerRegistry.forContext("signaling-api"); + private ClientFactory clientFactory; - signalingApiTypeTransformerRegistry.register(new JsonObjectFromDataFlowStartMessageTransformer(factory, typeManager, JSON_LD)); - signalingApiTypeTransformerRegistry.register(new JsonObjectFromDataFlowProvisionMessageTransformer(factory, typeManager, JSON_LD)); - signalingApiTypeTransformerRegistry.register(new JsonObjectFromDataFlowSuspendMessageTransformer(factory)); - signalingApiTypeTransformerRegistry.register(new JsonObjectFromDataFlowTerminateMessageTransformer(factory)); - signalingApiTypeTransformerRegistry.register(new JsonObjectToDataFlowResponseMessageTransformer()); + @Provider + public DataPlaneAvailabilityChecker dataPlaneAvailabilityChecker() { + var clientFactory = clientFactory(); + return dataPlane -> clientFactory.createClient(dataPlane).checkAvailability(); + } - return dataPlane -> new DataPlaneSignalingClient(dataPlane, httpClient, - () -> typeManager.getMapper(JSON_LD), signalingApiTypeTransformerRegistry, jsonLd, - CONTROL_CLIENT_SCOPE); + @Provider + public ClientFactory clientFactory() { + if (clientFactory == null) { + clientFactory = new ClientFactory(httpClient, () -> typeManager.getMapper(JSON_LD)); + } + return clientFactory; } + } diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingExtension.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingExtension.java index 6f3fbbefa73..ca57f4650b8 100644 --- a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingExtension.java +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingExtension.java @@ -14,14 +14,25 @@ package org.eclipse.edc.signaling; +import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessService; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager; import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; -import org.eclipse.edc.signaling.port.DataPlaneRegistrationApiController; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.signaling.logic.DataPlaneSignalingFlowController; +import org.eclipse.edc.signaling.port.ClientFactory; +import org.eclipse.edc.signaling.port.api.DataPlaneRegistrationApiController; +import org.eclipse.edc.signaling.port.api.DataPlaneTransferApiController; +import org.eclipse.edc.signaling.port.transformer.DataAddressToDspDataAddressTransformer; +import org.eclipse.edc.signaling.port.transformer.DataFlowResponseMessageToDataFlowResponseTransformer; +import org.eclipse.edc.signaling.port.transformer.DspDataAddressToDataAddressTransformer; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.transform.spi.TypeTransformerRegistry; import org.eclipse.edc.web.spi.WebService; import org.eclipse.edc.web.spi.configuration.ApiContext; +import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl; import static org.eclipse.edc.signaling.DataPlaneSignalingExtension.NAME; @@ -29,20 +40,39 @@ public class DataPlaneSignalingExtension implements ServiceExtension { public static final String NAME = "Data Plane Signaling"; + private static final String DEFAULT_DATAPLANE_SELECTOR_STRATEGY = "random"; + + @Setting( + description = "Defines strategy for Data Plane instance selection in case Data Plane is not embedded in current runtime", + defaultValue = DEFAULT_DATAPLANE_SELECTOR_STRATEGY, + key = "edc.dataplane.client.selector.strategy" + ) + private String selectionStrategy; @Inject - private WebService webService; + private TypeTransformerRegistry transformerRegistry; + @Inject + private DataFlowManager dataFlowManager; + @Inject + private ControlApiUrl controlApiUrl; @Inject private DataPlaneSelectorService dataPlaneSelectorService; - - @Override - public String name() { - return NAME; - } + @Inject + private WebService webService; + @Inject + private ClientFactory clientFactory; + @Inject + private TransferProcessService transferProcessService; @Override public void initialize(ServiceExtensionContext context) { webService.registerResource(ApiContext.CONTROL, new DataPlaneRegistrationApiController(dataPlaneSelectorService)); + webService.registerResource(ApiContext.CONTROL, new DataPlaneTransferApiController(transferProcessService)); + var typeTransformerRegistry = transformerRegistry.forContext("signaling-api"); + typeTransformerRegistry.register(new DataAddressToDspDataAddressTransformer()); + typeTransformerRegistry.register(new DataFlowResponseMessageToDataFlowResponseTransformer()); + typeTransformerRegistry.register(new DspDataAddressToDataAddressTransformer()); + dataFlowManager.register(10, new DataPlaneSignalingFlowController(controlApiUrl, dataPlaneSelectorService, selectionStrategy, typeTransformerRegistry, clientFactory)); } } diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/domain/DataFlowPrepareMessage.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/domain/DataFlowPrepareMessage.java new file mode 100644 index 00000000000..4df713fba3a --- /dev/null +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/domain/DataFlowPrepareMessage.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.domain; + +import java.net.URI; +import java.util.List; +import java.util.Map; + +public final class DataFlowPrepareMessage { + private String messageId; + private String participantId; + private String counterPartyId; + private String dataspaceContext; + private String processId; + private String agreementId; + private String datasetId; + private URI callbackAddress; + private String transferType; + private List labels; + private Map metadata; + + private DataFlowPrepareMessage() { + } + + public String getMessageId() { + return messageId; + } + + public String getParticipantId() { + return participantId; + } + + public String getCounterPartyId() { + return counterPartyId; + } + + public String getDataspaceContext() { + return dataspaceContext; + } + + public String getProcessId() { + return processId; + } + + public String getAgreementId() { + return agreementId; + } + + public String getDatasetId() { + return datasetId; + } + + public URI getCallbackAddress() { + return callbackAddress; + } + + public String getTransferType() { + return transferType; + } + + public List getLabels() { + return labels; + } + + public Map getMetadata() { + return metadata; + } + + public static class Builder { + + private final DataFlowPrepareMessage instance = new DataFlowPrepareMessage(); + + public static Builder newInstance() { + return new Builder(); + } + + private Builder() { + + } + + public DataFlowPrepareMessage build() { + return instance; + } + + public Builder messageId(String messageId) { + instance.messageId = messageId; + return this; + } + + public Builder participantId(String participantId) { + instance.participantId = participantId; + return this; + } + + public Builder counterPartyId(String counterPartyId) { + instance.counterPartyId = counterPartyId; + return this; + } + + public Builder dataspaceContext(String dataspaceContext) { + instance.dataspaceContext = dataspaceContext; + return this; + } + + public Builder processId(String processId) { + instance.processId = processId; + return this; + } + + public Builder agreementId(String agreementId) { + instance.agreementId = agreementId; + return this; + } + + public Builder datasetId(String datasetId) { + instance.datasetId = datasetId; + return this; + } + + public Builder callbackAddress(URI callbackAddress) { + instance.callbackAddress = callbackAddress; + return this; + } + + public Builder transferType(String transferType) { + instance.transferType = transferType; + return this; + } + + public Builder labels(List labels) { + instance.labels = labels; + return this; + } + + public Builder metadata(Map metadata) { + instance.metadata = metadata; + return this; + } + + } +} diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/domain/DataFlowResponseMessage.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/domain/DataFlowResponseMessage.java new file mode 100644 index 00000000000..d1262d78570 --- /dev/null +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/domain/DataFlowResponseMessage.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.domain; + +public final class DataFlowResponseMessage { + + private String dataplaneId; + private DspDataAddress dataAddress; + private String state; + private String error; + + private DataFlowResponseMessage() { + + } + + public String getDataplaneId() { + return dataplaneId; + } + + public DspDataAddress getDataAddress() { + return dataAddress; + } + + public String getState() { + return state; + } + + public String getError() { + return error; + } + + public static class Builder { + + private final DataFlowResponseMessage instance = new DataFlowResponseMessage(); + + public static Builder newInstance() { + return new Builder(); + } + + private Builder() { + + } + + public DataFlowResponseMessage build() { + return instance; + } + + + public Builder dataplaneId(String dataplaneId) { + instance.dataplaneId = dataplaneId; + return this; + } + + public Builder dataAddress(DspDataAddress dataAddress) { + instance.dataAddress = dataAddress; + return this; + } + + public Builder state(String state) { + instance.state = state; + return this; + } + + public Builder error(String error) { + instance.error = error; + return this; + } + } +} diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/domain/DataFlowStartMessage.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/domain/DataFlowStartMessage.java new file mode 100644 index 00000000000..12590ed379b --- /dev/null +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/domain/DataFlowStartMessage.java @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.domain; + +import java.net.URI; +import java.util.List; +import java.util.Map; + +public final class DataFlowStartMessage { + + private String messageId; + private String participantId; + private String counterPartyId; + private String dataspaceContext; + private String processId; + private String agreementId; + private String datasetId; + private URI callbackAddress; + private String transferType; + private DspDataAddress dataAddress; + private List labels; + private Map metadata; + + private DataFlowStartMessage() { + } + + public String getMessageId() { + return messageId; + } + + public String getParticipantId() { + return participantId; + } + + public String getCounterPartyId() { + return counterPartyId; + } + + public String getDataspaceContext() { + return dataspaceContext; + } + + public String getProcessId() { + return processId; + } + + public String getAgreementId() { + return agreementId; + } + + public String getDatasetId() { + return datasetId; + } + + public URI getCallbackAddress() { + return callbackAddress; + } + + public String getTransferType() { + return transferType; + } + + public DspDataAddress getDataAddress() { + return dataAddress; + } + + public List getLabels() { + return labels; + } + + public Map getMetadata() { + return metadata; + } + + public static class Builder { + + private final DataFlowStartMessage instance = new DataFlowStartMessage(); + + public static Builder newInstance() { + return new Builder(); + } + + private Builder() { + + } + + public DataFlowStartMessage build() { + return instance; + } + + public Builder messageId(String messageId) { + instance.messageId = messageId; + return this; + } + + public Builder participantId(String participantId) { + instance.participantId = participantId; + return this; + } + + public Builder counterPartyId(String counterPartyId) { + instance.counterPartyId = counterPartyId; + return this; + } + + public Builder dataspaceContext(String dataspaceContext) { + instance.dataspaceContext = dataspaceContext; + return this; + } + + public Builder processId(String processId) { + instance.processId = processId; + return this; + } + + public Builder agreementId(String agreementId) { + instance.agreementId = agreementId; + return this; + } + + public Builder datasetId(String datasetId) { + instance.datasetId = datasetId; + return this; + } + + public Builder callbackAddress(URI callbackAddress) { + instance.callbackAddress = callbackAddress; + return this; + } + + public Builder transferType(String transferType) { + instance.transferType = transferType; + return this; + } + + public Builder dataAddress(DspDataAddress dataAddress) { + instance.dataAddress = dataAddress; + return this; + } + + public Builder labels(List labels) { + instance.labels = labels; + return this; + } + + public Builder metadata(Map metadata) { + instance.metadata = metadata; + return this; + } + + } + +} diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/domain/DspDataAddress.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/domain/DspDataAddress.java new file mode 100644 index 00000000000..b6bdb672a37 --- /dev/null +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/domain/DspDataAddress.java @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.domain; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; + +import java.util.ArrayList; +import java.util.List; + +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; + +@JsonDeserialize(builder = DspDataAddress.Builder.class) +public final class DspDataAddress { + + public static final String DSP_DATA_ADDRESS_ENDPOINT = EDC_NAMESPACE + "endpoint"; + + @JsonProperty(TYPE) + private final String type = "DataAddress"; + private String endpointType; + private String endpoint; + private List endpointProperties = new ArrayList<>(); + + private DspDataAddress() { + + } + + public String getEndpointType() { + return endpointType; + } + + public String getEndpoint() { + return endpoint; + } + + public List getEndpointProperties() { + return endpointProperties; + } + + public static final class EndpointProperty { + @JsonProperty(TYPE) + private final String type = "EndpointProperty"; + private final String name; + private final String value; + + public EndpointProperty( + @JsonProperty("name") String name, + @JsonProperty("value") String value + ) { + this.name = name; + this.value = value; + } + + public String getName() { + return name; + } + + public String getValue() { + return value; + } + + } + + @JsonPOJOBuilder(withPrefix = "") + public static class Builder { + + private final DspDataAddress instance = new DspDataAddress(); + + @JsonCreator + public static Builder newInstance() { + return new Builder(); + } + + private Builder() { + + } + + public DspDataAddress build() { + return instance; + } + + public Builder endpointType(String endpointType) { + instance.endpointType = endpointType; + return this; + } + + public Builder endpoint(String endpoint) { + instance.endpoint = endpoint; + return this; + } + + public Builder endpointProperties(List endpointProperties) { + instance.endpointProperties = endpointProperties; + return this; + } + + public Builder property(String name, String value) { + instance.endpointProperties.add(new EndpointProperty(name, value)); + return this; + } + } +} diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowController.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowController.java new file mode 100644 index 00000000000..e441351d966 --- /dev/null +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowController.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.logic; + +import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowController; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; +import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; +import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; +import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.signaling.domain.DataFlowPrepareMessage; +import org.eclipse.edc.signaling.domain.DataFlowStartMessage; +import org.eclipse.edc.signaling.domain.DspDataAddress; +import org.eclipse.edc.signaling.port.ClientFactory; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.result.ServiceResult; +import org.eclipse.edc.transform.spi.TypeTransformerRegistry; +import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl; +import org.jetbrains.annotations.NotNull; + +import java.util.Collection; +import java.util.Set; +import java.util.UUID; + +import static java.util.Collections.emptySet; +import static java.util.stream.Collectors.toSet; +import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; + +/** + * Data plane signaling implementation of the DataFlowController + */ +public class DataPlaneSignalingFlowController implements DataFlowController { + + private final ControlApiUrl callbackUrl; + private final DataPlaneSelectorService selectorClient; + private final String selectionStrategy; + private final TypeTransformerRegistry typeTransformerRegistry; + private final ClientFactory clientFactory; + + public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, String selectionStrategy, + TypeTransformerRegistry typeTransformerRegistry, ClientFactory clientFactory) { + this.callbackUrl = callbackUrl; + this.selectorClient = selectorClient; + this.selectionStrategy = selectionStrategy; + this.typeTransformerRegistry = typeTransformerRegistry; + this.clientFactory = clientFactory; + } + + @Override + public boolean canHandle(TransferProcess transferProcess) { + return true; + } + + @Override + public StatusResult prepare(TransferProcess transferProcess, Policy policy) { + var selection = selectorClient.select(selectionStrategy, dataPlane -> dataPlane.getAllowedTransferTypes().contains(transferProcess.getTransferType())); + if (!selection.succeeded()) { + return StatusResult.failure(FATAL_ERROR, selection.getFailureDetail()); + } + + var builder = DataFlowPrepareMessage.Builder.newInstance() + .messageId(UUID.randomUUID().toString()) + .participantId(policy.getAssignee()) + .counterPartyId(policy.getAssigner()) + .dataspaceContext(transferProcess.getProtocol()) + .processId(transferProcess.getId()) + .agreementId(transferProcess.getContractId()) + .datasetId(transferProcess.getAssetId()) + .callbackAddress(callbackUrl.get()) + .transferType(transferProcess.getTransferType()); + + var dataplaneMetadata = transferProcess.getDataplaneMetadata(); + if (dataplaneMetadata != null) { + builder.labels(dataplaneMetadata.getLabels()); + builder.metadata(dataplaneMetadata.getProperties()); + } + + var message = builder.build(); + + return clientFactory.createClient(selection.getContent()) + .prepare(message) + .compose(response -> typeTransformerRegistry.transform(response, DataFlowResponse.class) + .flatMap(this::toStatusResult)); + } + + @Override + public @NotNull StatusResult start(TransferProcess transferProcess, Policy policy) { + var selection = selectorClient.select(selectionStrategy, dataPlane -> dataPlane.getAllowedTransferTypes().contains(transferProcess.getTransferType())); + if (!selection.succeeded()) { + return StatusResult.failure(FATAL_ERROR, selection.getFailureDetail()); + } + + var builder = DataFlowStartMessage.Builder.newInstance() + .messageId(UUID.randomUUID().toString()) + .participantId(policy.getAssignee()) + .counterPartyId(policy.getAssigner()) + .dataspaceContext(transferProcess.getProtocol()) + .processId(transferProcess.getId()) + .agreementId(transferProcess.getContractId()) + .datasetId(transferProcess.getAssetId()) + .callbackAddress(callbackUrl.get()) + .transferType(transferProcess.getTransferType()); + + var dataAddress = transferProcess.getDataDestination(); + if (dataAddress != null) { + var dspDataAddressTransformation = typeTransformerRegistry.transform(dataAddress, DspDataAddress.class); + if (dspDataAddressTransformation.failed()) { + return StatusResult.failure(FATAL_ERROR, dspDataAddressTransformation.getFailureDetail()); + } + builder.dataAddress(dspDataAddressTransformation.getContent()); + } + + var dataplaneMetadata = transferProcess.getDataplaneMetadata(); + if (dataplaneMetadata != null) { + builder.labels(dataplaneMetadata.getLabels()); + builder.metadata(dataplaneMetadata.getProperties()); + } + + var message = builder.build(); + + return clientFactory.createClient(selection.getContent()) + .start(message) + .compose(response -> typeTransformerRegistry.transform(response, DataFlowResponse.class) + .flatMap(this::toStatusResult)); + } + + @Override + public StatusResult suspend(TransferProcess transferProcess) { + return StatusResult.failure(FATAL_ERROR, "not implemented"); + } + + @Override + public StatusResult terminate(TransferProcess transferProcess) { + var dataPlaneId = transferProcess.getDataPlaneId(); + if (dataPlaneId == null) { + return StatusResult.success(); + } + + return selectorClient.findById(transferProcess.getDataPlaneId()) + .flatMap(this::toStatusResult) + .map(clientFactory::createClient) + .compose(client -> client.terminate(transferProcess.getId())); + } + + @Override + public Set transferTypesFor(Asset asset) { + return selectorClient.getAll().map(Collection::stream) + .map(it -> it.map(DataPlaneInstance::getAllowedTransferTypes) + .flatMap(Collection::stream).collect(toSet())) + .orElse(f -> emptySet()); + } + + private @NotNull StatusResult toStatusResult(ServiceResult r) { + if (r.succeeded()) { + return StatusResult.success(r.getContent()); + } else { + return StatusResult.failure(FATAL_ERROR, r.getFailureDetail()); + } + } + + private @NotNull StatusResult toStatusResult(Result it) { + if (it.succeeded()) { + return StatusResult.success(it.getContent()); + } else { + return StatusResult.failure(FATAL_ERROR, it.getFailureDetail()); + } + } +} diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/ClientFactory.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/ClientFactory.java new file mode 100644 index 00000000000..624eb269718 --- /dev/null +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/ClientFactory.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.port; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; +import org.eclipse.edc.http.spi.ControlApiHttpClient; + +import java.util.function.Supplier; + +public class ClientFactory { + + private final ControlApiHttpClient httpClient; + private final Supplier objectMapperSupplier; + + public ClientFactory(ControlApiHttpClient httpClient, Supplier objectMapperSupplier) { + this.httpClient = httpClient; + this.objectMapperSupplier = objectMapperSupplier; + } + + public DataPlaneSignalingClient createClient(DataPlaneInstance instance) { + return new DataPlaneSignalingClient(instance, httpClient, objectMapperSupplier); + } +} diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneSignalingClient.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneSignalingClient.java index 309543ae8cf..6c032618bd1 100644 --- a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneSignalingClient.java +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneSignalingClient.java @@ -16,22 +16,18 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.json.JsonObject; import okhttp3.MediaType; import okhttp3.Request; import okhttp3.RequestBody; -import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient; import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; import org.eclipse.edc.http.spi.ControlApiHttpClient; -import org.eclipse.edc.jsonld.spi.JsonLd; +import org.eclipse.edc.signaling.domain.DataFlowPrepareMessage; +import org.eclipse.edc.signaling.domain.DataFlowResponseMessage; +import org.eclipse.edc.signaling.domain.DataFlowStartMessage; import org.eclipse.edc.spi.response.ResponseStatus; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.result.ServiceFailure; -import org.eclipse.edc.spi.types.domain.transfer.DataFlowProvisionMessage; -import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; -import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; -import org.eclipse.edc.transform.spi.TypeTransformerRegistry; import org.jetbrains.annotations.NotNull; import java.io.IOException; @@ -43,34 +39,29 @@ /** * Client that implements the Data Plane Signaling spec */ -public class DataPlaneSignalingClient implements DataPlaneClient { +public class DataPlaneSignalingClient { private static final MediaType TYPE_JSON = MediaType.parse("application/json"); private final DataPlaneInstance dataPlane; private final ControlApiHttpClient httpClient; private final Supplier objectMapperSupplier; - private final TypeTransformerRegistry transformerRegistry; - private final JsonLd jsonLd; - private final String jsonLdScope; public DataPlaneSignalingClient(DataPlaneInstance dataPlane, ControlApiHttpClient httpClient, - Supplier objectMapperSupplier, TypeTransformerRegistry typeTransformerRegistry, - JsonLd jsonLd, String jsonLdScope) { + Supplier objectMapperSupplier) { this.dataPlane = dataPlane; this.httpClient = httpClient; this.objectMapperSupplier = objectMapperSupplier; - transformerRegistry = typeTransformerRegistry; - this.jsonLd = jsonLd; - this.jsonLdScope = jsonLdScope; } - @Override - public StatusResult prepare(DataFlowProvisionMessage request) { - return StatusResult.failure(ResponseStatus.FATAL_ERROR, "not implemented"); + public StatusResult prepare(DataFlowPrepareMessage request) { + var url = "%s/prepare".formatted(dataPlane.getUrl()); + return createRequestBuilder(request, url) + .compose(builder -> httpClient.request(builder) + .flatMap(result -> result.map(this::handleResponse) + .orElse(this::failedResult))); } - @Override public StatusResult start(DataFlowStartMessage request) { var url = "%s/start".formatted(dataPlane.getUrl()); return createRequestBuilder(request, url) @@ -79,17 +70,14 @@ public StatusResult start(DataFlowStartMessage request) .orElse(this::failedResult))); } - @Override public StatusResult suspend(String transferProcessId) { return StatusResult.failure(ResponseStatus.FATAL_ERROR, "not implemented"); } - @Override public StatusResult terminate(String transferProcessId) { - return StatusResult.failure(ResponseStatus.FATAL_ERROR, "not implemented"); + return StatusResult.success(); } - @Override public StatusResult checkAvailability() { var requestBuilder = new Request.Builder().get().url(dataPlane.getUrl() + "/"); return httpClient.request(requestBuilder) @@ -116,9 +104,7 @@ private StatusResult deserializeStartMessage(String res } private StatusResult createRequestBuilder(Object message, String url) { - return transformerRegistry.transform(message, JsonObject.class) - .compose(this::compact) - .compose(this::serializeMessage) + return this.serialize(message) .map(rawBody -> RequestBody.create(rawBody, TYPE_JSON)) .map(body -> new Request.Builder().post(body).url(url)) .flatMap(it -> { @@ -130,11 +116,7 @@ private StatusResult createRequestBuilder(Object message, Strin }); } - private Result compact(JsonObject object) { - return jsonLd.compact(object, jsonLdScope); - } - - private Result serializeMessage(Object message) { + private Result serialize(Object message) { try { return Result.success(objectMapperSupplier.get().writeValueAsString(message)); } catch (JsonProcessingException e) { diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneRegistrationApi.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneRegistrationApi.java similarity index 98% rename from data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneRegistrationApi.java rename to data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneRegistrationApi.java index 36ab8011b9a..f5776ba3bdc 100644 --- a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneRegistrationApi.java +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneRegistrationApi.java @@ -12,7 +12,7 @@ * */ -package org.eclipse.edc.signaling.port; +package org.eclipse.edc.signaling.port.api; import io.swagger.v3.oas.annotations.OpenAPIDefinition; import io.swagger.v3.oas.annotations.Operation; diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneRegistrationApiController.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneRegistrationApiController.java similarity index 98% rename from data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneRegistrationApiController.java rename to data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneRegistrationApiController.java index 2a11b19b74e..9a3149ef8fe 100644 --- a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneRegistrationApiController.java +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneRegistrationApiController.java @@ -12,7 +12,7 @@ * */ -package org.eclipse.edc.signaling.port; +package org.eclipse.edc.signaling.port.api; import jakarta.ws.rs.Consumes; import jakarta.ws.rs.DELETE; diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApi.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApi.java new file mode 100644 index 00000000000..10af438722a --- /dev/null +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApi.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.port.api; + +import io.swagger.v3.oas.annotations.OpenAPIDefinition; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.ArraySchema; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.ws.rs.core.Response; +import org.eclipse.edc.api.model.ApiCoreSchema; + +import static jakarta.ws.rs.HttpMethod.POST; + +@OpenAPIDefinition +@Tag(name = "Data Plane Transfer events") +public interface DataPlaneTransferApi { + + @Operation( + method = POST, + description = "Notify a completed transfer", + responses = { + @ApiResponse(responseCode = "200", description = "Completed notification delivered correctly"), + @ApiResponse(responseCode = "404", description = "Transfer process does not exist", + content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class)))) + } + ) + Response completed(String transferId); + + +} diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiController.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiController.java new file mode 100644 index 00000000000..61d20663e91 --- /dev/null +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiController.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.port.api; + +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.Response; +import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessService; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; + +import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; +import static org.eclipse.edc.web.spi.exception.ServiceResultHandler.exceptionMapper; + +@Path("/transfers") +@Produces(APPLICATION_JSON) +@Consumes(APPLICATION_JSON) +public class DataPlaneTransferApiController implements DataPlaneTransferApi { + + private final TransferProcessService transferProcessService; + + public DataPlaneTransferApiController(TransferProcessService transferProcessService) { + this.transferProcessService = transferProcessService; + } + + @Path("/{transferId}/dataflow/completed") + @POST + @Override + public Response completed(@PathParam("transferId") String transferId) { + transferProcessService.complete(transferId).orElseThrow(exceptionMapper(TransferProcess.class, transferId)); + return Response.ok().build(); + } + +} diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DataAddressToDspDataAddressTransformer.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DataAddressToDspDataAddressTransformer.java new file mode 100644 index 00000000000..b477a840d18 --- /dev/null +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DataAddressToDspDataAddressTransformer.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.port.transformer; + +import org.eclipse.edc.signaling.domain.DspDataAddress; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.transform.spi.TransformerContext; +import org.eclipse.edc.transform.spi.TypeTransformer; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.eclipse.edc.signaling.domain.DspDataAddress.DSP_DATA_ADDRESS_ENDPOINT; + +public class DataAddressToDspDataAddressTransformer implements TypeTransformer { + @Override + public Class getInputType() { + return DataAddress.class; + } + + @Override + public Class getOutputType() { + return DspDataAddress.class; + } + + @Override + public @Nullable DspDataAddress transform(@NotNull DataAddress dataAddress, @NotNull TransformerContext context) { + var builder = DspDataAddress.Builder + .newInstance() + .endpointType(dataAddress.getType()) + .endpoint(dataAddress.getStringProperty(DSP_DATA_ADDRESS_ENDPOINT)); + + dataAddress.getProperties().forEach((key, value) -> builder.property(key, value.toString())); + + return builder.build(); + } +} diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformer.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformer.java new file mode 100644 index 00000000000..8abd86ab2e5 --- /dev/null +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformer.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.port.transformer; + +import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; +import org.eclipse.edc.signaling.domain.DataFlowResponseMessage; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.transform.spi.TransformerContext; +import org.eclipse.edc.transform.spi.TypeTransformer; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class DataFlowResponseMessageToDataFlowResponseTransformer implements TypeTransformer { + + @Override + public Class getInputType() { + return DataFlowResponseMessage.class; + } + + @Override + public Class getOutputType() { + return DataFlowResponse.class; + } + + @Override + public @Nullable DataFlowResponse transform(@NotNull DataFlowResponseMessage dataFlowResponseMessage, @NotNull TransformerContext context) { + return DataFlowResponse.Builder.newInstance() + .dataAddress(context.transform(dataFlowResponseMessage.getDataAddress(), DataAddress.class)) + .dataPlaneId(dataFlowResponseMessage.getDataplaneId()) + .provisioning(false) + .build(); + } + +} diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DspDataAddressToDataAddressTransformer.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DspDataAddressToDataAddressTransformer.java new file mode 100644 index 00000000000..839d08a534a --- /dev/null +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DspDataAddressToDataAddressTransformer.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.port.transformer; + +import org.eclipse.edc.signaling.domain.DspDataAddress; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.transform.spi.TransformerContext; +import org.eclipse.edc.transform.spi.TypeTransformer; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.eclipse.edc.signaling.domain.DspDataAddress.DSP_DATA_ADDRESS_ENDPOINT; + +public class DspDataAddressToDataAddressTransformer implements TypeTransformer { + @Override + public Class getInputType() { + return DspDataAddress.class; + } + + @Override + public Class getOutputType() { + return DataAddress.class; + } + + @Override + public @Nullable DataAddress transform(@NotNull DspDataAddress dataAddress, @NotNull TransformerContext context) { + var builder = DataAddress.Builder.newInstance() + .type(dataAddress.getEndpointType()); + + dataAddress.getEndpointProperties().forEach(property -> builder.property(property.getName(), property.getValue())); + + return builder + .property(DSP_DATA_ADDRESS_ENDPOINT, dataAddress.getEndpoint()) + .build(); + } +} diff --git a/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/DataPlaneRegistrationApiControllerTest.java b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/DataPlaneRegistrationApiControllerTest.java index 724ae60ddad..54b11fe5b46 100644 --- a/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/DataPlaneRegistrationApiControllerTest.java +++ b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/DataPlaneRegistrationApiControllerTest.java @@ -18,7 +18,7 @@ import io.restassured.specification.RequestSpecification; import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; -import org.eclipse.edc.signaling.port.DataPlaneRegistrationApiController; +import org.eclipse.edc.signaling.port.api.DataPlaneRegistrationApiController; import org.eclipse.edc.spi.result.ServiceResult; import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase; import org.junit.jupiter.api.Nested; diff --git a/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/domain/DspDataAddressTest.java b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/domain/DspDataAddressTest.java new file mode 100644 index 00000000000..03e6646345c --- /dev/null +++ b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/domain/DspDataAddressTest.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.domain; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.edc.json.JacksonTypeManager; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class DspDataAddressTest { + + private final ObjectMapper mapper = new JacksonTypeManager().getMapper(); + + @Test + void serdes() throws JsonProcessingException { + var address = DspDataAddress.Builder.newInstance() + .endpoint("endpoint") + .endpointType("endpointType") + .property("key", "value") + .build(); + + var json = mapper.writeValueAsString(address); + + assertThat(json).contains("\"@type\":\"DataAddress\"").contains("\"@type\":\"EndpointProperty\""); + + var deserialized = mapper.readValue(json, DspDataAddress.class); + + assertThat(deserialized).usingRecursiveComparison().isEqualTo(address); + } +} diff --git a/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowControllerTest.java b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowControllerTest.java new file mode 100644 index 00000000000..21f5532da59 --- /dev/null +++ b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowControllerTest.java @@ -0,0 +1,278 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.logic; + +import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; +import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; +import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; +import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.signaling.domain.DataFlowPrepareMessage; +import org.eclipse.edc.signaling.domain.DataFlowResponseMessage; +import org.eclipse.edc.signaling.domain.DspDataAddress; +import org.eclipse.edc.signaling.port.ClientFactory; +import org.eclipse.edc.signaling.port.DataPlaneSignalingClient; +import org.eclipse.edc.spi.response.ResponseStatus; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.result.ServiceResult; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.transform.spi.TypeTransformerRegistry; +import org.jetbrains.annotations.NotNull; +import org.jspecify.annotations.NonNull; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +public class DataPlaneSignalingFlowControllerTest { + + private static final String HTTP_DATA_PULL = "HttpData-PULL"; + private final DataPlaneSignalingClient dataPlaneClient = mock(); + private final ClientFactory clientFactory = mock(); + private final DataPlaneSelectorService selectorService = mock(); + private final TypeTransformerRegistry typeTransformerRegistry = mock(); + + private final DataPlaneSignalingFlowController flowController = new DataPlaneSignalingFlowController( + () -> URI.create("http://localhost"), selectorService, + "random", typeTransformerRegistry, clientFactory); + + @Nested + class Prepare { + + @Test + void shouldCallPrepareOnDataPlane() { + var dataPlaneInstance = createDataPlaneInstance(); + var transferProcess = transferProcessBuilder().build(); + when(selectorService.select(any(), any())).thenReturn(ServiceResult.success(dataPlaneInstance)); + when(clientFactory.createClient(any())).thenReturn(dataPlaneClient); + var flowResponseMessage = DataFlowResponseMessage.Builder.newInstance() + .dataAddress(createDspDataAddress()) + .build(); + when(dataPlaneClient.prepare(any())).thenReturn(StatusResult.success(flowResponseMessage)); + when(typeTransformerRegistry.transform(isA(DataAddress.class), any())).thenReturn(Result.success(createDspDataAddress())); + var response = DataFlowResponse.Builder.newInstance().dataPlaneId("dataPlaneId").dataAddress(testDataAddress()).build(); + when(typeTransformerRegistry.transform(isA(DataFlowResponseMessage.class), any())).thenReturn(Result.success(response)); + + var result = flowController.prepare(transferProcess, policyBuilder().build()); + + assertThat(result).isSucceeded().isSameAs(response); + verify(dataPlaneClient).prepare(isA(DataFlowPrepareMessage.class)); + } + + @Test + void shouldReturnFailure_whenNoDataPlaneIsFound() { + var transferProcess = transferProcessBuilder().build(); + when(selectorService.select(any(), any())).thenReturn(ServiceResult.notFound("no data plane can provision this")); + + var result = flowController.prepare(transferProcess, policyBuilder().build()); + + assertThat(result).isFailed(); + verifyNoInteractions(clientFactory); + } + + } + + @Nested + class Start { + @Test + void shouldSelectAndCallStartOnDataplane() { + var policy = Policy.Builder.newInstance().assignee("participantId").build(); + var transferProcess = transferProcessBuilder() + .transferType(HTTP_DATA_PULL) + .contentDataAddress(testDataAddress()) + .build(); + var dataPlaneInstance = createDataPlaneInstance(); + when(selectorService.select(any(), any())).thenReturn(ServiceResult.success(dataPlaneInstance)); + when(clientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(typeTransformerRegistry.transform(isA(DataAddress.class), any())).thenReturn(Result.success(createDspDataAddress())); + var response = DataFlowResponse.Builder.newInstance().dataPlaneId("dataPlaneId").dataAddress(testDataAddress()).build(); + when(typeTransformerRegistry.transform(isA(DataFlowResponseMessage.class), any())).thenReturn(Result.success(response)); + when(dataPlaneClient.start(any())).thenReturn(StatusResult.success(DataFlowResponseMessage.Builder.newInstance() + .dataAddress(createDspDataAddress()) + .build())); + + var result = flowController.start(transferProcess, policy); + + assertThat(result).isSucceeded().isSameAs(response); + } + + @Test + void shouldFail_whenNoDataplaneSelected() { + var transferProcess = transferProcessBuilder() + .contentDataAddress(testDataAddress()) + .transferType(HTTP_DATA_PULL) + .build(); + + when(selectorService.select(any(), any())).thenReturn(ServiceResult.notFound("no dataplane found")); + + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); + + assertThat(result).isFailed(); + } + + @Test + void returnFailedResultIfTransferFails() { + var errorMsg = "error"; + var transferProcess = transferProcessBuilder() + .contentDataAddress(testDataAddress()) + .transferType(HTTP_DATA_PULL) + .build(); + + when(dataPlaneClient.start(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg)); + var dataPlaneInstance = createDataPlaneInstance(); + when(selectorService.select(any(), any())).thenReturn(ServiceResult.success(dataPlaneInstance)); + when(clientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(typeTransformerRegistry.transform(isA(DataAddress.class), any())).thenReturn(Result.success(createDspDataAddress())); + + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); + + verify(dataPlaneClient).start(any()); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains(errorMsg)); + } + } + + @Nested + class Terminate { + + @Test + void shouldCallTerminateOnTheRightDataPlane() { + var dataPlaneInstance = dataPlaneInstanceBuilder().id("dataPlaneId").build(); + var transferProcess = transferProcessBuilder() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .dataPlaneId("dataPlaneId") + .build(); + when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success()); + when(clientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.findById(any())).thenReturn(ServiceResult.success(dataPlaneInstance)); + + var result = flowController.terminate(transferProcess); + + assertThat(result).isSucceeded(); + verify(dataPlaneClient).terminate("transferProcessId"); + verify(clientFactory).createClient(dataPlaneInstance); + } + + @Test + void shouldFail_whenDataPlaneNotFound() { + var transferProcess = transferProcessBuilder() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .dataPlaneId("invalid") + .build(); + when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success()); + when(clientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.findById(any())).thenReturn(ServiceResult.notFound("not found")); + + var result = flowController.terminate(transferProcess); + + assertThat(result).isFailed().detail().isEqualTo("not found"); + } + + @Test + // a null dataPlaneId means that the flow has not been started so it can be considered as already terminated + void shouldReturnSuccess_whenDataPlaneIdIsNull() { + var transferProcess = transferProcessBuilder() + .id("transferProcessId") + .contentDataAddress(testDataAddress()) + .dataPlaneId(null) + .build(); + + var result = flowController.terminate(transferProcess); + + assertThat(result).isSucceeded(); + verifyNoInteractions(dataPlaneClient, clientFactory, selectorService); + } + } + + @Nested + class TransferTypes { + + @Test + void transferTypes_shouldReturnTypesForSpecifiedAsset() { + + var assetNoResponse = Asset.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build()).build(); + when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of( + dataPlaneInstanceBuilder().allowedTransferType("Custom-PUSH").build(), + dataPlaneInstanceBuilder().allowedTransferType("Custom-PULL").build() + ))); + + var transferTypes = flowController.transferTypesFor(assetNoResponse); + + assertThat(transferTypes).containsExactly("Custom-PUSH", "Custom-PULL"); + } + + @Test + void shouldReturnEmptyList_whenCannotGetDataplaneInstances() { + when(selectorService.getAll()).thenReturn(ServiceResult.unexpected("error")); + var asset = Asset.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build()).build(); + + var transferTypes = flowController.transferTypesFor(asset); + + assertThat(transferTypes).isEmpty(); + } + } + + @NotNull + private DataPlaneInstance.Builder dataPlaneInstanceBuilder() { + return DataPlaneInstance.Builder.newInstance().url("http://any"); + } + + private DataPlaneInstance createDataPlaneInstance() { + return dataPlaneInstanceBuilder().build(); + } + + private DataAddress testDataAddress() { + return DataAddress.Builder.newInstance().type("test-type").build(); + } + + private TransferProcess.Builder transferProcessBuilder() { + return TransferProcess.Builder.newInstance() + .correlationId(UUID.randomUUID().toString()) + .protocol("test-protocol") + .contractId(UUID.randomUUID().toString()) + .assetId(UUID.randomUUID().toString()) + .counterPartyAddress("test.connector.address") + .transferType("transferType") + .dataDestination(DataAddress.Builder.newInstance().type("test").build()); + } + + private Policy.Builder policyBuilder() { + return Policy.Builder.newInstance(); + } + + private DataAddress buildResponseChannel() { + return DataAddress.Builder.newInstance().type("Response").build(); + } + + private @NonNull DspDataAddress createDspDataAddress() { + return DspDataAddress.Builder.newInstance().build(); + } +} diff --git a/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiControllerTest.java b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiControllerTest.java new file mode 100644 index 00000000000..c5b33f53cf7 --- /dev/null +++ b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiControllerTest.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.port.api; + +import io.restassured.http.ContentType; +import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessService; +import org.eclipse.edc.spi.result.ServiceResult; +import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +import static io.restassured.RestAssured.given; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class DataPlaneTransferApiControllerTest extends RestControllerTestBase { + + private final TransferProcessService transferProcessService = mock(); + + @Nested + class Completed { + @Test + void shouldCallComplete() { + var transferId = UUID.randomUUID().toString(); + when(transferProcessService.complete(any())).thenReturn(ServiceResult.success()); + + given() + .port(port) + .contentType(ContentType.JSON) + .post("/transfers/{transferId}/dataflow/completed", transferId) + .then() + .log().ifValidationFails() + .statusCode(200); + + verify(transferProcessService).complete(transferId); + } + + @Test + void shouldReturnError_whenServiceCallFails() { + var transferId = UUID.randomUUID().toString(); + when(transferProcessService.complete(any())).thenReturn(ServiceResult.conflict("error")); + + given() + .port(port) + .contentType(ContentType.JSON) + .post("/transfers/{transferId}/dataflow/completed", transferId) + .then() + .log().ifValidationFails() + .statusCode(409); + } + } + + + @Override + protected Object controller() { + return new DataPlaneTransferApiController(transferProcessService); + } +} diff --git a/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DataAddressToDspDataAddressTransformerTest.java b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DataAddressToDspDataAddressTransformerTest.java new file mode 100644 index 00000000000..b7fb14f5683 --- /dev/null +++ b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DataAddressToDspDataAddressTransformerTest.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.port.transformer; + +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.signaling.domain.DspDataAddress.DSP_DATA_ADDRESS_ENDPOINT; +import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; +import static org.mockito.Mockito.mock; + +class DataAddressToDspDataAddressTransformerTest { + + private final DataAddressToDspDataAddressTransformer transformer = new DataAddressToDspDataAddressTransformer(); + + @Test + void shouldTransform() { + var dataAddress = DataAddress.Builder.newInstance() + .type("type") + .property(DSP_DATA_ADDRESS_ENDPOINT, "endpoint") + .property(EDC_NAMESPACE + "additionalProperty", "value") + .build(); + + var result = transformer.transform(dataAddress, mock()); + + assertThat(result).isNotNull(); + assertThat(result.getEndpointType()).isEqualTo("type"); + assertThat(result.getEndpoint()).isEqualTo("endpoint"); + assertThat(result.getEndpointProperties()).first().satisfies(property -> { + assertThat(property.getName()).isEqualTo(EDC_NAMESPACE + "additionalProperty"); + assertThat(property.getValue()).isEqualTo("value"); + }); + } +} diff --git a/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformerTest.java b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformerTest.java new file mode 100644 index 00000000000..b14f6282c93 --- /dev/null +++ b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformerTest.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.port.transformer; + +import org.eclipse.edc.signaling.domain.DataFlowResponseMessage; +import org.eclipse.edc.signaling.domain.DspDataAddress; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.transform.spi.TransformerContext; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class DataFlowResponseMessageToDataFlowResponseTransformerTest { + + private final DataFlowResponseMessageToDataFlowResponseTransformer transformer = new DataFlowResponseMessageToDataFlowResponseTransformer(); + private final TransformerContext context = mock(); + + @Test + void shouldTransform() { + var dataAddress = DataAddress.Builder.newInstance().type("any").build(); + when(context.transform(isA(DspDataAddress.class), any())).thenReturn(dataAddress); + var message = DataFlowResponseMessage.Builder.newInstance() + .dataplaneId("dataPlaneId") + .dataAddress(DspDataAddress.Builder.newInstance().build()) + .build(); + + var result = transformer.transform(message, context); + + assertThat(result).isNotNull(); + assertThat(result.getDataPlaneId()).isSameAs("dataPlaneId"); + assertThat(result.getDataAddress()).isSameAs(dataAddress); + } +} diff --git a/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DspDataAddressToDataAddressTransformerTest.java b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DspDataAddressToDataAddressTransformerTest.java new file mode 100644 index 00000000000..3b223e3975e --- /dev/null +++ b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DspDataAddressToDataAddressTransformerTest.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.signaling.port.transformer; + +import org.eclipse.edc.signaling.domain.DspDataAddress; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.signaling.domain.DspDataAddress.DSP_DATA_ADDRESS_ENDPOINT; +import static org.mockito.Mockito.mock; + +class DspDataAddressToDataAddressTransformerTest { + + private final DspDataAddressToDataAddressTransformer transformer = new DspDataAddressToDataAddressTransformer(); + + @Test + void shouldTransform() { + var dspDataAddress = DspDataAddress.Builder.newInstance().endpoint("endpoint").endpointType("type") + .property("key", "value").build(); + + var result = transformer.transform(dspDataAddress, mock()); + + assertThat(result).isNotNull(); + assertThat(result.getType()).isEqualTo("type"); + assertThat(result.getProperty(DSP_DATA_ADDRESS_ENDPOINT)).isEqualTo("endpoint"); + assertThat(result.getProperty("key")).isEqualTo("value"); + } +} diff --git a/extensions/common/api/control-api-configuration/src/main/resources/control-api-version.json b/extensions/common/api/control-api-configuration/src/main/resources/control-api-version.json index 8ba9fc97ea0..e66a44aed7c 100644 --- a/extensions/common/api/control-api-configuration/src/main/resources/control-api-version.json +++ b/extensions/common/api/control-api-configuration/src/main/resources/control-api-version.json @@ -1,8 +1,8 @@ [ { - "version": "2.1.1", + "version": "2.1.2", "urlPath": "/v1", - "lastUpdated": "2025-12-12T12:00:01Z", + "lastUpdated": "2025-12-22T12:00:01Z", "maturity": "stable" } ] diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/TransferDataPlaneSignalingExtension.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/TransferDataPlaneSignalingExtension.java index 25cd7178ef3..dd4b1c3b390 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/TransferDataPlaneSignalingExtension.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/TransferDataPlaneSignalingExtension.java @@ -14,7 +14,7 @@ package org.eclipse.edc.connector.controlplane.transfer.dataplane; -import org.eclipse.edc.connector.controlplane.transfer.dataplane.flow.DataPlaneSignalingFlowController; +import org.eclipse.edc.connector.controlplane.transfer.dataplane.flow.LegacyDataPlaneSignalingFlowController; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowPropertiesProvider; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; @@ -35,7 +35,7 @@ @Extension(NAME) public class TransferDataPlaneSignalingExtension implements ServiceExtension { - protected static final String NAME = "Transfer Data Plane Signaling Extension"; + protected static final String NAME = "Legacy Data Plane Signaling Extension"; private static final String DEFAULT_DATAPLANE_SELECTOR_STRATEGY = "random"; @@ -62,7 +62,7 @@ public class TransferDataPlaneSignalingExtension implements ServiceExtension { @Override public void initialize(ServiceExtensionContext context) { - var controller = new DataPlaneSignalingFlowController(callbackUrl, selectorService, getPropertiesProvider(), + var controller = new LegacyDataPlaneSignalingFlowController(callbackUrl, selectorService, getPropertiesProvider(), clientFactory, selectionStrategy, transferTypeParser); dataFlowManager.register(controller); } diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowController.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowController.java similarity index 95% rename from extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowController.java rename to extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowController.java index 4319a1be607..abfa9a9b002 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowController.java @@ -53,7 +53,7 @@ * @see Data plane signaling * @see Data plane signaling transfer type mapping */ -public class DataPlaneSignalingFlowController implements DataFlowController { +public class LegacyDataPlaneSignalingFlowController implements DataFlowController { private final ControlApiUrl callbackUrl; private final DataPlaneSelectorService selectorClient; @@ -62,9 +62,9 @@ public class DataPlaneSignalingFlowController implements DataFlowController { private final String selectionStrategy; private final TransferTypeParser transferTypeParser; - public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, - DataFlowPropertiesProvider propertiesProvider, DataPlaneClientFactory clientFactory, - String selectionStrategy, TransferTypeParser transferTypeParser) { + public LegacyDataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, + DataFlowPropertiesProvider propertiesProvider, DataPlaneClientFactory clientFactory, + String selectionStrategy, TransferTypeParser transferTypeParser) { this.callbackUrl = callbackUrl; this.selectorClient = selectorClient; this.propertiesProvider = propertiesProvider; diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/TransferDataPlaneSignalingExtensionTest.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/TransferDataPlaneSignalingExtensionTest.java index 431ea94f75d..98da6ff983a 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/TransferDataPlaneSignalingExtensionTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/TransferDataPlaneSignalingExtensionTest.java @@ -14,7 +14,7 @@ package org.eclipse.edc.connector.controlplane.transfer.dataplane; -import org.eclipse.edc.connector.controlplane.transfer.dataplane.flow.DataPlaneSignalingFlowController; +import org.eclipse.edc.connector.controlplane.transfer.dataplane.flow.LegacyDataPlaneSignalingFlowController; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager; import org.eclipse.edc.junit.extensions.DependencyInjectionExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; @@ -39,6 +39,6 @@ void setup(ServiceExtensionContext context) { @Test void initialize(ServiceExtensionContext context, TransferDataPlaneSignalingExtension extension) { extension.initialize(context); - verify(dataFlowManager).register(isA(DataPlaneSignalingFlowController.class)); + verify(dataFlowManager).register(isA(LegacyDataPlaneSignalingFlowController.class)); } } diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowControllerTest.java similarity index 99% rename from extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java rename to extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowControllerTest.java index 4f9f28ceaf5..6ede1ba37b8 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowControllerTest.java @@ -55,7 +55,7 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -public class DataPlaneSignalingFlowControllerTest { +public class LegacyDataPlaneSignalingFlowControllerTest { private static final String HTTP_DATA_PULL = "HttpData-PULL"; private final DataPlaneClient dataPlaneClient = mock(); @@ -64,7 +64,7 @@ public class DataPlaneSignalingFlowControllerTest { private final DataFlowPropertiesProvider propertiesProvider = mock(); private final TransferTypeParser transferTypeParser = mock(); - private final DataPlaneSignalingFlowController flowController = new DataPlaneSignalingFlowController( + private final LegacyDataPlaneSignalingFlowController flowController = new LegacyDataPlaneSignalingFlowController( () -> URI.create("http://localhost"), selectorService, propertiesProvider, dataPlaneClientFactory, "random", transferTypeParser); diff --git a/extensions/data-plane/data-plane-provision-http/src/test/java/org/eclipse/edc/connector/dataplane/provision/http/port/ProvisionHttpIntegrationTest.java b/extensions/data-plane/data-plane-provision-http/src/test/java/org/eclipse/edc/connector/dataplane/provision/http/port/ProvisionHttpIntegrationTest.java index 00c545321a6..f010e5c0eb3 100644 --- a/extensions/data-plane/data-plane-provision-http/src/test/java/org/eclipse/edc/connector/dataplane/provision/http/port/ProvisionHttpIntegrationTest.java +++ b/extensions/data-plane/data-plane-provision-http/src/test/java/org/eclipse/edc/connector/dataplane/provision/http/port/ProvisionHttpIntegrationTest.java @@ -33,7 +33,6 @@ import org.eclipse.edc.spi.system.configuration.ConfigFactory; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; -import org.eclipse.edc.spi.types.domain.transfer.FlowType; import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -68,6 +67,7 @@ import static org.eclipse.edc.http.client.testfixtures.HttpTestUtils.testHttpClient; import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PUSH; import static org.eclipse.edc.util.io.Ports.getFreePort; @ComponentTest @@ -122,7 +122,7 @@ void shouldProvision_andDeprovision(DataPlaneManager dataPlaneManager) { .property(EDC_NAMESPACE + "method", "POST") .build() ) - .transferType(new TransferType("HttpData", FlowType.PUSH)) + .transferType(new TransferType("HttpData", PUSH)) .callbackAddress(URI.create(controlPlane.baseUrl())) .build(); diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTransformExtension.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTransformExtension.java index ceb18f69440..fdd31cef24a 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTransformExtension.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTransformExtension.java @@ -39,7 +39,7 @@ @Extension(value = DataPlaneSignalingClientTransformExtension.NAME) public class DataPlaneSignalingClientTransformExtension implements ServiceExtension { - public static final String NAME = "Data Plane Signaling transform Client"; + public static final String NAME = "Legacy Data Plane Signaling transform Client"; @Inject private TypeTransformerRegistry transformerRegistry; diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/LegacyDataPlaneSignalingClientExtension.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/LegacyDataPlaneSignalingClientExtension.java index 0cd1a80c03c..da6f963fd2b 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/LegacyDataPlaneSignalingClientExtension.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/LegacyDataPlaneSignalingClientExtension.java @@ -16,12 +16,15 @@ import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient; import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; +import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; +import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneAvailabilityChecker; import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.http.spi.ControlApiHttpClient; import org.eclipse.edc.jsonld.spi.JsonLd; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; -import org.eclipse.edc.runtime.metamodel.annotation.Provider; +import org.eclipse.edc.runtime.metamodel.annotation.Provides; +import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.spi.types.TypeManager; @@ -41,8 +44,9 @@ * This extension provides an implementation of {@link DataPlaneClient} compliant with the data plane signaling protocol */ @Extension(value = LegacyDataPlaneSignalingClientExtension.NAME) +@Provides({ DataPlaneClientFactory.class, DataPlaneAvailabilityChecker.class }) public class LegacyDataPlaneSignalingClientExtension implements ServiceExtension { - public static final String NAME = "Data Plane Signaling Client"; + public static final String NAME = "Legacy Data Plane Signaling Client"; public static final String CONTROL_CLIENT_SCOPE = "CONTROL_CLIENT_SCOPE"; @Inject(required = false) @@ -61,8 +65,15 @@ public String name() { return NAME; } - @Provider - public DataPlaneClientFactory dataPlaneClientFactory(ServiceExtensionContext context) { + @Override + public void initialize(ServiceExtensionContext context) { + var dataPlaneClientFactory = dataPlaneClientFactory(context); + + context.registerService(DataPlaneClientFactory.class, dataPlaneClientFactory); + context.registerService(DataPlaneAvailabilityChecker.class, new LegacyDataPlaneAvailabilityChecker(dataPlaneClientFactory)); + } + + protected DataPlaneClientFactory dataPlaneClientFactory(ServiceExtensionContext context) { if (dataPlaneManager != null) { // Data plane manager is embedded in the current runtime @@ -80,6 +91,19 @@ public DataPlaneClientFactory dataPlaneClientFactory(ServiceExtensionContext con return instance -> new LegacyDataPlaneSignalingClient(httpClient, signalingApiTypeTransformerRegistry, jsonLd, CONTROL_CLIENT_SCOPE, typeManager, JSON_LD, instance); } + + private static class LegacyDataPlaneAvailabilityChecker implements DataPlaneAvailabilityChecker { + private final DataPlaneClientFactory dataPlaneClientFactory; + + LegacyDataPlaneAvailabilityChecker(DataPlaneClientFactory dataPlaneClientFactory) { + this.dataPlaneClientFactory = dataPlaneClientFactory; + } + + @Override + public StatusResult checkAvailability(DataPlaneInstance dataPlane) { + return dataPlaneClientFactory.createClient(dataPlane).checkAvailability(); + } + } } diff --git a/spi/data-plane-selector/data-plane-selector-spi/src/main/java/org/eclipse/edc/connector/dataplane/selector/spi/manager/DataPlaneAvailabilityChecker.java b/spi/data-plane-selector/data-plane-selector-spi/src/main/java/org/eclipse/edc/connector/dataplane/selector/spi/manager/DataPlaneAvailabilityChecker.java new file mode 100644 index 00000000000..c1bac58aa53 --- /dev/null +++ b/spi/data-plane-selector/data-plane-selector-spi/src/main/java/org/eclipse/edc/connector/dataplane/selector/spi/manager/DataPlaneAvailabilityChecker.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.selector.spi.manager; + +import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; +import org.eclipse.edc.spi.response.StatusResult; + +/** + * Check data plane availability + */ +public interface DataPlaneAvailabilityChecker { + + /** + * Check availability of the passed data plane instance + * + * @param dataPlane the data plane + * @return success if data plane is available, failure otherwise + */ + StatusResult checkAvailability(DataPlaneInstance dataPlane); + +} diff --git a/system-tests/e2e-transfer-test/control-plane/build.gradle.kts b/system-tests/e2e-transfer-test/control-plane/build.gradle.kts index 08a3c16e7e4..b027d42af5a 100644 --- a/system-tests/e2e-transfer-test/control-plane/build.gradle.kts +++ b/system-tests/e2e-transfer-test/control-plane/build.gradle.kts @@ -17,10 +17,7 @@ plugins { } dependencies { - implementation(project(":dist:bom:controlplane-base-bom")) { - exclude(group = "org.eclipse.edc", module = "data-plane-signaling-client") - - } + implementation(project(":dist:bom:controlplane-base-bom")) implementation(project(":extensions:common:iam:iam-mock")) } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java index 4c2dc711206..429e043c1e0 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java @@ -31,8 +31,7 @@ public interface Runtimes { interface ControlPlane { String[] MODULES = new String[]{ - ":system-tests:e2e-transfer-test:control-plane", - ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client" + ":system-tests:e2e-transfer-test:control-plane" }; String[] SIGNALING_MODULES = new String[]{ @@ -42,8 +41,7 @@ interface ControlPlane { String[] EMBEDDED_DP_MODULES = new String[]{ ":system-tests:e2e-transfer-test:control-plane", - ":system-tests:e2e-transfer-test:data-plane", - ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client" + ":system-tests:e2e-transfer-test:data-plane" }; String[] SQL_MODULES = new String[]{ @@ -69,6 +67,7 @@ static Config config(String participantId) { put("edc.negotiation.provider.state-machine.iteration-wait-millis", "50"); put("edc.transfer.state-machine.iteration-wait-millis", "50"); put("edc.data.plane.selector.state-machine.iteration-wait-millis", "100"); + put("edc.core.retry.retries.max", "1"); } }); } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferPushSignalingTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferPushSignalingTest.java index ccf501749e7..9ecf7e9388f 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferPushSignalingTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferPushSignalingTest.java @@ -36,11 +36,12 @@ import static org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset.EDC_ASSET_TYPE_TERM; import static org.eclipse.edc.connector.controlplane.test.system.utils.Participant.MANAGEMENT_V4; import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy; -import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETED; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.CONTEXT; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_CONNECTOR_MANAGEMENT_CONTEXT_V2; import static org.eclipse.edc.test.e2e.TransferEndToEndTestBase.CONSUMER_CP; +import static org.eclipse.edc.test.e2e.TransferEndToEndTestBase.CONSUMER_DP; import static org.eclipse.edc.test.e2e.TransferEndToEndTestBase.CONSUMER_ID; import static org.eclipse.edc.test.e2e.TransferEndToEndTestBase.PROVIDER_CP; import static org.eclipse.edc.test.e2e.TransferEndToEndTestBase.PROVIDER_DP; @@ -51,8 +52,8 @@ interface TransferPushSignalingTest { @Test - default void shouldRegisterDataPlaneThroughSignaling(@Runtime(PROVIDER_CP) TransferEndToEndParticipant provider, - @Runtime(CONSUMER_CP) TransferEndToEndParticipant consumer) { + default void shouldTransferFiniteDataWithPush(@Runtime(PROVIDER_CP) TransferEndToEndParticipant provider, + @Runtime(CONSUMER_CP) TransferEndToEndParticipant consumer) { provider.baseManagementRequest() .get("/dataplanes") .then() @@ -72,27 +73,30 @@ default void shouldRegisterDataPlaneThroughSignaling(@Runtime(PROVIDER_CP) Trans var transferProcessId = consumer.requestAssetFrom(assetId, provider) .withTransferType("Finite-PUSH").execute(); - consumer.awaitTransferToBeInState(transferProcessId, STARTED); + consumer.awaitTransferToBeInState(transferProcessId, COMPLETED); } @Nested @EndToEndTest class InMemory implements TransferPushSignalingTest { + static final Endpoints CONSUMER_ENDPOINTS = Runtimes.ControlPlane.ENDPOINTS.build(); + static final Endpoints PROVIDER_ENDPOINTS = Runtimes.ControlPlane.ENDPOINTS.build(); + @RegisterExtension + @Order(0) static final RuntimeExtension CONSUMER_CONTROL_PLANE = ComponentRuntimeExtension.Builder.newInstance() .name(CONSUMER_CP) .modules(Runtimes.ControlPlane.SIGNALING_MODULES) - .endpoints(Runtimes.ControlPlane.ENDPOINTS.build()) + .endpoints(CONSUMER_ENDPOINTS) .configurationProvider(() -> Runtimes.ControlPlane.config(CONSUMER_ID)) .paramProvider(TransferEndToEndParticipant.class, ctx -> TransferEndToEndParticipant.newInstance(ctx) .managementVersionBasePath(MANAGEMENT_V4) .build()) .build(); - static final Endpoints PROVIDER_ENDPOINTS = Runtimes.ControlPlane.ENDPOINTS.build(); - @RegisterExtension + @Order(0) static final RuntimeExtension PROVIDER_CONTROL_PLANE = ComponentRuntimeExtension.Builder.newInstance() .name(PROVIDER_CP) .modules(Runtimes.ControlPlane.SIGNALING_MODULES) @@ -104,6 +108,7 @@ class InMemory implements TransferPushSignalingTest { .build(); @RegisterExtension + @Order(1) static final RuntimeExtension PROVIDER_DATA_PLANE = ComponentRuntimeExtension.Builder.newInstance() .name(PROVIDER_DP) .modules(Runtimes.SignalingDataPlane.MODULES) @@ -111,6 +116,16 @@ class InMemory implements TransferPushSignalingTest { .configurationProvider(Runtimes.SignalingDataPlane::config) .configurationProvider(() -> Runtimes.ControlPlane.controlPlaneEndpointOf(PROVIDER_ENDPOINTS)) .build(); + + @RegisterExtension + @Order(1) + static final RuntimeExtension CONSUMER_DATA_PLANE = ComponentRuntimeExtension.Builder.newInstance() + .name(CONSUMER_DP) + .modules(Runtimes.SignalingDataPlane.MODULES) + .endpoints(Runtimes.SignalingDataPlane.ENDPOINTS.build()) + .configurationProvider(Runtimes.SignalingDataPlane::config) + .configurationProvider(() -> Runtimes.ControlPlane.controlPlaneEndpointOf(CONSUMER_ENDPOINTS)) + .build(); } @Nested @@ -131,12 +146,16 @@ class Postgres implements TransferPushSignalingTest { POSTGRESQL_EXTENSION.createDatabase(PROVIDER_DB); }; + static final Endpoints CONSUMER_ENDPOINTS = Runtimes.ControlPlane.ENDPOINTS.build(); + static final Endpoints PROVIDER_ENDPOINTS = Runtimes.ControlPlane.ENDPOINTS.build(); + @RegisterExtension + @Order(2) static final RuntimeExtension CONSUMER_CONTROL_PLANE = ComponentRuntimeExtension.Builder.newInstance() .name(CONSUMER_CP) .modules(Runtimes.ControlPlane.SIGNALING_MODULES) .modules(Runtimes.ControlPlane.SQL_MODULES) - .endpoints(Runtimes.ControlPlane.ENDPOINTS.build()) + .endpoints(CONSUMER_ENDPOINTS) .configurationProvider(() -> Runtimes.ControlPlane.config(CONSUMER_ID)) .configurationProvider(() -> POSTGRESQL_EXTENSION.configFor(CONSUMER_DB)) .paramProvider(TransferEndToEndParticipant.class, ctx -> TransferEndToEndParticipant.newInstance(ctx) @@ -144,9 +163,8 @@ class Postgres implements TransferPushSignalingTest { .build()) .build(); - static final Endpoints PROVIDER_ENDPOINTS = Runtimes.ControlPlane.ENDPOINTS.build(); - @RegisterExtension + @Order(2) static final RuntimeExtension PROVIDER_CONTROL_PLANE = ComponentRuntimeExtension.Builder.newInstance() .name(PROVIDER_CP) .modules(Runtimes.ControlPlane.SIGNALING_MODULES) @@ -160,6 +178,17 @@ class Postgres implements TransferPushSignalingTest { .build(); @RegisterExtension + @Order(3) + static final RuntimeExtension CONSUMER_DATA_PLANE = ComponentRuntimeExtension.Builder.newInstance() + .name(CONSUMER_DP) + .modules(Runtimes.SignalingDataPlane.MODULES) + .endpoints(Runtimes.SignalingDataPlane.ENDPOINTS.build()) + .configurationProvider(Runtimes.SignalingDataPlane::config) + .configurationProvider(() -> Runtimes.ControlPlane.controlPlaneEndpointOf(CONSUMER_ENDPOINTS)) + .build(); + + @RegisterExtension + @Order(3) static final RuntimeExtension PROVIDER_DATA_PLANE = ComponentRuntimeExtension.Builder.newInstance() .name(PROVIDER_DP) .modules(Runtimes.SignalingDataPlane.MODULES) diff --git a/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/ReceiveDataController.java b/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/ReceiveDataController.java new file mode 100644 index 00000000000..71fc01b3c82 --- /dev/null +++ b/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/ReceiveDataController.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.test.runtime.signaling; + +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import org.eclipse.edc.spi.monitor.Monitor; + +@Path("/") +public class ReceiveDataController { + + private final Monitor monitor; + + public ReceiveDataController(Monitor monitor) { + this.monitor = monitor; + } + + @POST + @Path("/receive") + public void receiveData(String data) { + monitor.info("Data received: " + data); + } + +} diff --git a/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/SignalingDataPlaneRuntimeExtension.java b/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/SignalingDataPlaneRuntimeExtension.java index 3d0f6257ec1..2ce7f825efa 100644 --- a/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/SignalingDataPlaneRuntimeExtension.java +++ b/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/SignalingDataPlaneRuntimeExtension.java @@ -14,15 +14,29 @@ package org.eclipse.edc.test.runtime.signaling; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import org.eclipse.dataplane.Dataplane; +import org.eclipse.dataplane.domain.DataAddress; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; +import org.eclipse.dataplane.logic.OnPrepare; import org.eclipse.dataplane.logic.OnStart; +import org.eclipse.dataplane.logic.OnTerminate; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Setting; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.web.spi.WebService; +import org.eclipse.edc.web.spi.exception.InvalidRequestException; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; + +import static java.util.Collections.emptyList; public class SignalingDataPlaneRuntimeExtension implements ServiceExtension { @@ -43,9 +57,12 @@ public void initialize(ServiceExtensionContext context) { dataplane = Dataplane.newInstance() .endpoint("http://localhost:%d%s/v1/dataflows".formatted(httpPort, httpPath)) .transferType("Finite-PUSH") + .onPrepare(new DataplaneOnPrepare()) .onStart(new DataplaneOnStart()) + .onTerminate(new DataplaneOnTerminate()) .build(); webService.registerResource(dataplane.controller()); + webService.registerResource(new ReceiveDataController(context.getMonitor())); } @Override @@ -54,7 +71,51 @@ public void start() { .orElseThrow(e -> new RuntimeException("Cannot register dataplane on controlplane", e)); } - private static class DataplaneOnStart implements OnStart { + private class DataplaneOnStart implements OnStart { + @Override + public Result action(DataFlow dataFlow) { + if (dataFlow.getDataAddress() == null) { + return Result.failure(new InvalidRequestException("DataAddress should not be null for PUSH transfers")); + } + var destinationUri = URI.create(dataFlow.getDataAddress().endpoint()); + var request = HttpRequest.newBuilder(destinationUri).POST(HttpRequest.BodyPublishers.ofString("test-data")).build(); + HttpClient.newHttpClient().sendAsync(request, HttpResponse.BodyHandlers.discarding()) + .whenComplete((response, throwable) -> { + if (throwable == null) { + var statusCode = response.statusCode(); + if (statusCode >= 200 && statusCode < 300) { + notifyCompleted(dataFlow); + } else { + dataplane.notifyErrored(dataFlow.getId(), new RuntimeException("Destination endpoint responded with " + statusCode)); + } + } else { + dataplane.notifyErrored(dataFlow.getId(), throwable); + } + }); + + return Result.success(dataFlow); + } + + private void notifyCompleted(DataFlow dataFlow) { + var retryPolicy = RetryPolicy.builder().withMaxRetries(5).withDelay(Duration.ofSeconds(1)).build(); + Failsafe.with(retryPolicy).run(context -> { + if (dataplane.notifyCompleted(dataFlow.getId()).failed()) { + throw new RuntimeException("Notification failed"); + } + }); + } + } + + private class DataplaneOnPrepare implements OnPrepare { + @Override + public Result action(DataFlow dataFlow) { + var destination = new DataAddress("Finite-PUSH", "http", "http://localhost:%d%s/receive".formatted(httpPort, httpPath), emptyList()); + dataFlow.setDataAddress(destination); + return Result.success(dataFlow); + } + } + + private class DataplaneOnTerminate implements OnTerminate { @Override public Result action(DataFlow dataFlow) { return Result.success(dataFlow);