diff --git a/core/common/lib/http-lib/src/main/java/org/eclipse/edc/http/client/ControlApiHttpClientImpl.java b/core/common/lib/http-lib/src/main/java/org/eclipse/edc/http/client/ControlApiHttpClientImpl.java index e10101e03b..5123e50f6e 100644 --- a/core/common/lib/http-lib/src/main/java/org/eclipse/edc/http/client/ControlApiHttpClientImpl.java +++ b/core/common/lib/http-lib/src/main/java/org/eclipse/edc/http/client/ControlApiHttpClientImpl.java @@ -47,7 +47,7 @@ public ServiceResult execute(Request.Builder requestBuilder) { public ServiceResult request(Request.Builder requestBuilder) { authenticationProvider.authenticationHeaders().forEach(requestBuilder::header); try ( - var response = httpClient.execute(requestBuilder.build(), List.of(retryWhenStatusIsNotIn(200, 204))); + var response = httpClient.execute(requestBuilder.build(), List.of(retryWhenStatusIsNotIn(200, 202, 204))); var responseBody = response.body(); ) { if (response.isSuccessful()) { diff --git a/core/control-plane/control-plane-transfer-manager/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java b/core/control-plane/control-plane-transfer-manager/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java index 0bf4902fa9..464139c9a0 100644 --- a/core/control-plane/control-plane-transfer-manager/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java +++ b/core/control-plane/control-plane-transfer-manager/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java @@ -197,8 +197,9 @@ private boolean processInitial(TransferProcess process) { } else { var response = provisioning.getContent(); process.setDataPlaneId(response.getDataPlaneId()); - if (response.isProvisioning()) { - process.transitionProvisioningRequested(); + if (response.isAsync()) { + process.transitionPreparationRequested(); + observable.invokeForEach(l -> l.preparationRequested(process)); } else { process.updateDestination(response.getDataAddress()); process.transitionRequesting(); @@ -296,7 +297,7 @@ private boolean startTransferFlow(TransferProcess process, Consumer dataFlowController.start(process, policy))) .doProcess(futureResult("Dispatch TransferRequestMessage to: " + process.getCounterPartyAddress(), (t, dataFlowResponse) -> { - if (dataFlowResponse.isProvisioning()) { + if (dataFlowResponse.isAsync()) { return completedFuture(StatusResult.success(dataFlowResponse)); } var messageBuilder = TransferStartMessage.Builder.newInstance().dataAddress(dataFlowResponse.getDataAddress()); @@ -305,7 +306,7 @@ private boolean startTransferFlow(TransferProcess process, Consumer { t.setDataPlaneId(dataFlowResponse.getDataPlaneId()); - if (dataFlowResponse.isProvisioning()) { + if (dataFlowResponse.isAsync()) { process.transitionStartupRequested(); update(t); diff --git a/core/control-plane/control-plane-transfer-manager/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java b/core/control-plane/control-plane-transfer-manager/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java index 16e7f52ab0..de10b413b3 100644 --- a/core/control-plane/control-plane-transfer-manager/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java +++ b/core/control-plane/control-plane-transfer-manager/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java @@ -75,7 +75,7 @@ import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETING_REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.INITIAL; -import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PROVISIONING_REQUESTED; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PREPARATION_REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.REQUESTING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.RESUMED; @@ -490,11 +490,11 @@ void shouldTransitionToTerminated_whenNoPolicyFound() { } @Test - void shouldTransitionToProvisioningRequested_whenProvisionThroughDataplaneSucceeds() { + void shouldTransitionToPreparationRequested_whenProvisionThroughDataplaneSucceeds() { var dataPlaneId = UUID.randomUUID().toString(); var dataFlowResponse = DataFlowResponse.Builder.newInstance() .dataPlaneId(dataPlaneId) - .provisioning(true) + .async(true) .build(); var transferProcess = createTransferProcess(INITIAL); when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))) @@ -506,10 +506,11 @@ void shouldTransitionToProvisioningRequested_whenProvisionThroughDataplaneSuccee await().untilAsserted(() -> { verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString()); + verify(listener).preparationRequested(transferProcess); var captor = ArgumentCaptor.forClass(TransferProcess.class); verify(transferProcessStore).save(captor.capture()); var storedTransferProcess = captor.getValue(); - assertThat(storedTransferProcess.getState()).isEqualTo(PROVISIONING_REQUESTED.code()); + assertThat(storedTransferProcess.getState()).isEqualTo(PREPARATION_REQUESTED.code()); assertThat(storedTransferProcess.getDataPlaneId()).isEqualTo(dataPlaneId); }); } @@ -521,7 +522,7 @@ void shouldTransitionToRequesting_whenProvisionThroughDataplaneSucceedsButNoActu var dataFlowResponse = DataFlowResponse.Builder.newInstance() .dataPlaneId(dataPlaneId) .dataAddress(dataDestination) - .provisioning(false) + .async(false) .build(); var transferProcess = createTransferProcess(INITIAL); when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))) @@ -699,7 +700,7 @@ void shouldNotSendMessageAndTransitionToStartupRequested_whenAsynchronousDataPla when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STARTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process); when(dataFlowController.start(any(), any())).thenReturn(StatusResult.success( - dataFlowResponseBuilder().provisioning(true).dataPlaneId("dataPlaneId").build())); + dataFlowResponseBuilder().async(true).dataPlaneId("dataPlaneId").build())); manager.start(); diff --git a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/listener/TransferProcessEventListener.java b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/listener/TransferProcessEventListener.java index 3ff91dae32..fc0bb0c118 100644 --- a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/listener/TransferProcessEventListener.java +++ b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/listener/TransferProcessEventListener.java @@ -19,8 +19,8 @@ import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessDeprovisioningRequested; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessEvent; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessInitiated; +import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessPreparationRequested; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessProvisioned; -import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessProvisioningRequested; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessRequested; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessStarted; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessSuspended; @@ -49,8 +49,8 @@ public void initiated(TransferProcess process) { } @Override - public void provisioningRequested(TransferProcess process) { - var event = withBaseProperties(TransferProcessProvisioningRequested.Builder.newInstance(), process) + public void preparationRequested(TransferProcess process) { + var event = withBaseProperties(TransferProcessPreparationRequested.Builder.newInstance(), process) .build(); eventRouter.publish(event); diff --git a/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/command/handlers/NotifyPreparedCommandHandlerTest.java b/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/command/handlers/NotifyPreparedCommandHandlerTest.java index 985f171b2e..0562810ca6 100644 --- a/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/command/handlers/NotifyPreparedCommandHandlerTest.java +++ b/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/command/handlers/NotifyPreparedCommandHandlerTest.java @@ -26,7 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess.Type.CONSUMER; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess.Type.PROVIDER; -import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PROVISIONING_REQUESTED; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PREPARATION_REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.REQUESTING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTUP_REQUESTED; @@ -48,7 +48,7 @@ void shouldTransitionProvisioned_whenConsumer() { var newDestination = DataAddress.Builder.newInstance().type("new").build(); var originalDestination = DataAddress.Builder.newInstance().type("original").build(); var command = new NotifyPreparedCommand("test-id", newDestination); - var entity = TransferProcess.Builder.newInstance().state(PROVISIONING_REQUESTED.code()).type(CONSUMER).dataDestination(originalDestination).build(); + var entity = TransferProcess.Builder.newInstance().state(PREPARATION_REQUESTED.code()).type(CONSUMER).dataDestination(originalDestination).build(); var result = handler.modify(entity, command); @@ -61,7 +61,7 @@ void shouldTransitionProvisioned_whenConsumer() { void shouldNotUpdateDestination_whenItIsMissing() { var command = new NotifyPreparedCommand("test-id", null); var originalDestination = DataAddress.Builder.newInstance().type("original").build(); - var entity = TransferProcess.Builder.newInstance().state(PROVISIONING_REQUESTED.code()).dataDestination(originalDestination).build(); + var entity = TransferProcess.Builder.newInstance().state(PREPARATION_REQUESTED.code()).dataDestination(originalDestination).build(); var result = handler.modify(entity, command); @@ -87,7 +87,7 @@ void shouldTransitionToStarting_whenProvider() { @Test void postAction_shouldCallProvisioned() { var command = new NotifyPreparedCommand("test-id", null); - var entity = TransferProcess.Builder.newInstance().state(PROVISIONING_REQUESTED.code()).build(); + var entity = TransferProcess.Builder.newInstance().state(PREPARATION_REQUESTED.code()).build(); var listener = mock(TransferProcessListener.class); observable.registerListener(listener); diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingApiExtension.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingApiExtension.java index 2a38480497..9ab469749f 100644 --- a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingApiExtension.java +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingApiExtension.java @@ -20,8 +20,12 @@ import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.signaling.port.api.DataPlaneRegistrationApiController; import org.eclipse.edc.signaling.port.api.DataPlaneTransferApiController; +import org.eclipse.edc.signaling.port.transformer.DataAddressToDspDataAddressTransformer; +import org.eclipse.edc.signaling.port.transformer.DataFlowResponseMessageToDataFlowResponseTransformer; +import org.eclipse.edc.signaling.port.transformer.DspDataAddressToDataAddressTransformer; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.transform.spi.TypeTransformerRegistry; import org.eclipse.edc.web.spi.WebService; import org.eclipse.edc.web.spi.configuration.ApiContext; @@ -38,6 +42,8 @@ public class DataPlaneSignalingApiExtension implements ServiceExtension { private WebService webService; @Inject private TransferProcessService transferProcessService; + @Inject + private TypeTransformerRegistry transformerRegistry; @Override public String name() { @@ -46,8 +52,12 @@ public String name() { @Override public void initialize(ServiceExtensionContext context) { + var typeTransformerRegistry = transformerRegistry.forContext("signaling-api"); + typeTransformerRegistry.register(new DataAddressToDspDataAddressTransformer()); + typeTransformerRegistry.register(new DataFlowResponseMessageToDataFlowResponseTransformer()); + typeTransformerRegistry.register(new DspDataAddressToDataAddressTransformer()); webService.registerResource(ApiContext.CONTROL, new DataPlaneRegistrationApiController(dataPlaneSelectorService)); - webService.registerResource(ApiContext.CONTROL, new DataPlaneTransferApiController(transferProcessService)); + webService.registerResource(ApiContext.CONTROL, new DataPlaneTransferApiController(transferProcessService, typeTransformerRegistry)); } diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneSignalingClient.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneSignalingClient.java index ef556fc994..d23cc1ccdb 100644 --- a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneSignalingClient.java +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/DataPlaneSignalingClient.java @@ -104,11 +104,11 @@ private StatusResult sendMessage(String flowId, String name, Object messag private StatusResult handleResponse(String responseBody) { return Optional.ofNullable(responseBody) - .map(this::deserializeStartMessage) + .map(this::deserializeResponseMessage) .orElseGet(() -> StatusResult.failure(FATAL_ERROR, "Body missing")); } - private StatusResult deserializeStartMessage(String responseBody) { + private StatusResult deserializeResponseMessage(String responseBody) { try { var message = objectMapperSupplier.get().readValue(responseBody, DataFlowResponseMessage.class); return StatusResult.success(message); diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApi.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApi.java index 10af438722..54c2680a0a 100644 --- a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApi.java +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApi.java @@ -23,6 +23,7 @@ import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.ws.rs.core.Response; import org.eclipse.edc.api.model.ApiCoreSchema; +import org.eclipse.edc.signaling.domain.DataFlowResponseMessage; import static jakarta.ws.rs.HttpMethod.POST; @@ -30,6 +31,17 @@ @Tag(name = "Data Plane Transfer events") public interface DataPlaneTransferApi { + @Operation( + method = POST, + description = "Notify a prepared transfer", + responses = { + @ApiResponse(responseCode = "200", description = "Prepared notification delivered correctly"), + @ApiResponse(responseCode = "404", description = "Transfer process does not exist", + content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class)))) + } + ) + Response prepared(String transferId, DataFlowResponseMessage message); + @Operation( method = POST, description = "Notify a completed transfer", diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiController.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiController.java index 61d20663e9..ba6b8d4f7b 100644 --- a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiController.java +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiController.java @@ -21,10 +21,17 @@ import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.Response; import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessService; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.command.NotifyPreparedCommand; +import org.eclipse.edc.signaling.domain.DataFlowPrepareMessage; +import org.eclipse.edc.signaling.domain.DataFlowResponseMessage; +import org.eclipse.edc.spi.result.ServiceResult; +import org.eclipse.edc.transform.spi.TypeTransformerRegistry; import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; import static org.eclipse.edc.web.spi.exception.ServiceResultHandler.exceptionMapper; +import static org.eclipse.edc.web.spi.exception.ServiceResultHandler.mapToException; @Path("/transfers") @Produces(APPLICATION_JSON) @@ -32,9 +39,25 @@ public class DataPlaneTransferApiController implements DataPlaneTransferApi { private final TransferProcessService transferProcessService; + private final TypeTransformerRegistry typeTransformerRegistry; - public DataPlaneTransferApiController(TransferProcessService transferProcessService) { + public DataPlaneTransferApiController(TransferProcessService transferProcessService, TypeTransformerRegistry typeTransformerRegistry) { this.transferProcessService = transferProcessService; + this.typeTransformerRegistry = typeTransformerRegistry; + } + + @Path("/{transferId}/dataflow/prepared") + @POST + @Override + public Response prepared(@PathParam("transferId") String transferId, DataFlowResponseMessage message) { + typeTransformerRegistry.transform(message, DataFlowResponse.class) + .map(ServiceResult::success) + .orElse(failure -> ServiceResult.badRequest(failure.getMessages())) + .map(response -> new NotifyPreparedCommand(transferId, response.getDataAddress())) + .compose(transferProcessService::notifyPrepared) + .orElseThrow(f -> mapToException(f, DataFlowPrepareMessage.class)); + + return Response.ok().build(); } @Path("/{transferId}/dataflow/completed") diff --git a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformer.java b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformer.java index 8abd86ab2e..876bd43d85 100644 --- a/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformer.java +++ b/data-protocols/data-plane-signaling/src/main/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformer.java @@ -39,7 +39,7 @@ public Class getOutputType() { return DataFlowResponse.Builder.newInstance() .dataAddress(context.transform(dataFlowResponseMessage.getDataAddress(), DataAddress.class)) .dataPlaneId(dataFlowResponseMessage.getDataplaneId()) - .provisioning(false) + .async(dataFlowResponseMessage.getState().endsWith("ING")) .build(); } diff --git a/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiControllerTest.java b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiControllerTest.java index c5b33f53cf..bbab15c3c8 100644 --- a/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiControllerTest.java +++ b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiControllerTest.java @@ -16,7 +16,12 @@ import io.restassured.http.ContentType; import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessService; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; +import org.eclipse.edc.signaling.domain.DataFlowResponseMessage; +import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.result.ServiceResult; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.transform.spi.TypeTransformerRegistry; import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -25,6 +30,8 @@ import static io.restassured.RestAssured.given; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -32,6 +39,69 @@ class DataPlaneTransferApiControllerTest extends RestControllerTestBase { private final TransferProcessService transferProcessService = mock(); + private final TypeTransformerRegistry typeTransformerRegistry = mock(); + + @Nested + class Prepared { + @Test + void shouldCallNotifyPrepared() { + var transferId = UUID.randomUUID().toString(); + var message = DataFlowResponseMessage.Builder.newInstance().build(); + var dataAddress = DataAddress.Builder.newInstance().type("test").build(); + var dataFlowResponse = DataFlowResponse.Builder.newInstance().dataAddress(dataAddress).build(); + + when(typeTransformerRegistry.transform(any(), eq(DataFlowResponse.class))).thenReturn(Result.success(dataFlowResponse)); + when(transferProcessService.notifyPrepared(any())).thenReturn(ServiceResult.success()); + + given() + .port(port) + .contentType(ContentType.JSON) + .body(message) + .post("/transfers/{transferId}/dataflow/prepared", transferId) + .then() + .log().ifValidationFails() + .statusCode(200); + + verify(typeTransformerRegistry).transform(any(DataFlowResponseMessage.class), eq(DataFlowResponse.class)); + verify(transferProcessService).notifyPrepared(argThat(c -> c != null && + c.getEntityId().equals(transferId) && + c.getDataAddress().equals(dataAddress))); + } + + @Test + void shouldReturnBadRequest_whenTransformationFails() { + var transferId = UUID.randomUUID().toString(); + var message = DataFlowResponseMessage.Builder.newInstance().build(); + when(typeTransformerRegistry.transform(any(), eq(DataFlowResponse.class))).thenReturn(Result.failure("error")); + + given() + .port(port) + .contentType(ContentType.JSON) + .body(message) + .post("/transfers/{transferId}/dataflow/prepared", transferId) + .then() + .log().ifValidationFails() + .statusCode(400); + } + + @Test + void shouldReturnConflict_whenServiceCallFails() { + var transferId = UUID.randomUUID().toString(); + var message = DataFlowResponseMessage.Builder.newInstance().build(); + var dataFlowResponse = DataFlowResponse.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("test").build()).build(); + when(typeTransformerRegistry.transform(any(), eq(DataFlowResponse.class))).thenReturn(Result.success(dataFlowResponse)); + when(transferProcessService.notifyPrepared(any())).thenReturn(ServiceResult.conflict("error")); + + given() + .port(port) + .contentType(ContentType.JSON) + .body(message) + .post("/transfers/{transferId}/dataflow/prepared", transferId) + .then() + .log().ifValidationFails() + .statusCode(409); + } + } @Nested class Completed { @@ -69,6 +139,6 @@ void shouldReturnError_whenServiceCallFails() { @Override protected Object controller() { - return new DataPlaneTransferApiController(transferProcessService); + return new DataPlaneTransferApiController(transferProcessService, typeTransformerRegistry); } } diff --git a/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformerTest.java b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformerTest.java index b14f6282c9..2975709547 100644 --- a/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformerTest.java +++ b/data-protocols/data-plane-signaling/src/test/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformerTest.java @@ -38,6 +38,7 @@ void shouldTransform() { var message = DataFlowResponseMessage.Builder.newInstance() .dataplaneId("dataPlaneId") .dataAddress(DspDataAddress.Builder.newInstance().build()) + .state("STARTED") .build(); var result = transformer.transform(message, context); @@ -45,5 +46,22 @@ void shouldTransform() { assertThat(result).isNotNull(); assertThat(result.getDataPlaneId()).isSameAs("dataPlaneId"); assertThat(result.getDataAddress()).isSameAs(dataAddress); + assertThat(result.isAsync()).isFalse(); + } + + @Test + void shouldBeAsync_whenStateEndsWithIng() { + var dataAddress = DataAddress.Builder.newInstance().type("any").build(); + when(context.transform(isA(DspDataAddress.class), any())).thenReturn(dataAddress); + var message = DataFlowResponseMessage.Builder.newInstance() + .dataplaneId("dataPlaneId") + .dataAddress(DspDataAddress.Builder.newInstance().build()) + .state("STARTING") + .build(); + + var result = transformer.transform(message, context); + + assertThat(result).isNotNull(); + assertThat(result.isAsync()).isTrue(); } } diff --git a/data-protocols/dsp/dsp-lib/dsp-transfer-process-lib/dsp-transfer-process-transform-lib/src/main/java/org/eclipse/edc/protocol/dsp/transferprocess/transform/type/from/JsonObjectFromTransferProcessTransformer.java b/data-protocols/dsp/dsp-lib/dsp-transfer-process-lib/dsp-transfer-process-transform-lib/src/main/java/org/eclipse/edc/protocol/dsp/transferprocess/transform/type/from/JsonObjectFromTransferProcessTransformer.java index 419588e82c..a19db2f42d 100644 --- a/data-protocols/dsp/dsp-lib/dsp-transfer-process-lib/dsp-transfer-process-transform-lib/src/main/java/org/eclipse/edc/protocol/dsp/transferprocess/transform/type/from/JsonObjectFromTransferProcessTransformer.java +++ b/data-protocols/dsp/dsp-lib/dsp-transfer-process-lib/dsp-transfer-process-transform-lib/src/main/java/org/eclipse/edc/protocol/dsp/transferprocess/transform/type/from/JsonObjectFromTransferProcessTransformer.java @@ -75,7 +75,7 @@ private String state(Integer state, String errorDetails, TransformerContext cont } return switch (transferProcessState) { - case INITIAL, REQUESTING, REQUESTED, PROVISIONING, PROVISIONING_REQUESTED, PROVISIONED, STARTUP_REQUESTED -> + case INITIAL, REQUESTING, REQUESTED, PROVISIONING, PREPARATION_REQUESTED, PROVISIONED, STARTUP_REQUESTED -> forNamespace(DSPACE_VALUE_TRANSFER_STATE_REQUESTED_TERM); case STARTING, SUSPENDING_REQUESTED, STARTED -> forNamespace(DSPACE_VALUE_TRANSFER_STATE_STARTED_TERM); case SUSPENDING, SUSPENDED, RESUMING, RESUMED -> diff --git a/extensions/common/api/control-api-configuration/src/main/resources/control-api-version.json b/extensions/common/api/control-api-configuration/src/main/resources/control-api-version.json index e66a44aed7..5a9d807301 100644 --- a/extensions/common/api/control-api-configuration/src/main/resources/control-api-version.json +++ b/extensions/common/api/control-api-configuration/src/main/resources/control-api-version.json @@ -1,8 +1,8 @@ [ { - "version": "2.1.2", + "version": "2.1.3", "urlPath": "/v1", - "lastUpdated": "2025-12-22T12:00:01Z", + "lastUpdated": "2026-01-27T12:00:01Z", "maturity": "stable" } ] diff --git a/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/controlplane/edr/store/receiver/EndpointDataReferenceStoreReceiverTest.java b/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/controlplane/edr/store/receiver/EndpointDataReferenceStoreReceiverTest.java index 064e6bb4a0..3ace6ef14d 100644 --- a/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/controlplane/edr/store/receiver/EndpointDataReferenceStoreReceiverTest.java +++ b/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/controlplane/edr/store/receiver/EndpointDataReferenceStoreReceiverTest.java @@ -22,8 +22,8 @@ import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessDeprovisioningRequested; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessEvent; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessInitiated; +import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessPreparationRequested; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessProvisioned; -import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessProvisioningRequested; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessRequested; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessStarted; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessSuspended; @@ -241,7 +241,7 @@ public Stream provideArguments(ExtensionContext context) { TransferProcessDeprovisioningRequested.Builder.newInstance(), TransferProcessInitiated.Builder.newInstance(), TransferProcessProvisioned.Builder.newInstance(), - TransferProcessProvisioningRequested.Builder.newInstance(), + TransferProcessPreparationRequested.Builder.newInstance(), TransferProcessRequested.Builder.newInstance().transferProcessId("id") ); diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowController.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowController.java index a174e1cd51..ff6fcc367d 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowController.java @@ -238,7 +238,7 @@ private DataFlowResponse toResponse(DataFlowResponseMessage it, DataPlaneInstanc return DataFlowResponse.Builder.newInstance() .dataAddress(it.getDataAddress()) .dataPlaneId(dataPlaneInstance.getId()) - .provisioning(it.isProvisioning()) + .async(it.isProvisioning()) .build(); } diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowControllerTest.java index 3f81059187..f3a1922b96 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/LegacyDataPlaneSignalingFlowControllerTest.java @@ -120,7 +120,7 @@ void shouldCallPrepareOnDataPlane() { assertThat(result).isSucceeded().satisfies(dataFlowResponse -> { assertThat(dataFlowResponse.getDataPlaneId()).isEqualTo(dataPlaneInstance.getId()); assertThat(dataFlowResponse.getDataAddress()).isSameAs(newDestinationAddress); - assertThat(dataFlowResponse.isProvisioning()).isTrue(); + assertThat(dataFlowResponse.isAsync()).isTrue(); }); verify(dataPlaneClient).prepare(isA(DataFlowProvisionMessage.class)); } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 401b111ad7..0d777db57e 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -9,7 +9,7 @@ atomikos = "6.0.0" awaitility = "4.2.2" bouncyCastle-jdk18on = "1.83" cloudEvents = "4.0.1" -dataplane-sdk = "0.0.3-SNAPSHOT" +dataplane-sdk = "0.0.5-SNAPSHOT" edc = "0.16.0-SNAPSHOT" failsafe = "3.3.2" h2 = "2.4.240" diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessDeprovisioningRequested.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessDeprovisioningRequested.java index 942397eddf..d6c210de82 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessDeprovisioningRequested.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessDeprovisioningRequested.java @@ -20,8 +20,11 @@ /** * This event is raised when the TransferProcess has been requested for deprovisioning. + * + * @deprecated will be removed soon. */ @JsonDeserialize(builder = TransferProcessDeprovisioningRequested.Builder.class) +@Deprecated(since = "0.16.0") public class TransferProcessDeprovisioningRequested extends TransferProcessEvent { private TransferProcessDeprovisioningRequested() { diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessProvisioningRequested.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessPreparationRequested.java similarity index 74% rename from spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessProvisioningRequested.java rename to spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessPreparationRequested.java index 90831177a4..c455df8bf8 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessProvisioningRequested.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessPreparationRequested.java @@ -19,24 +19,24 @@ import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; /** - * This event is raised when the TransferProcess has been requested for provisioning. + * This event is raised when the TransferProcess has been requested for preparation. */ -@JsonDeserialize(builder = TransferProcessProvisioningRequested.Builder.class) -public class TransferProcessProvisioningRequested extends TransferProcessEvent { +@JsonDeserialize(builder = TransferProcessPreparationRequested.Builder.class) +public class TransferProcessPreparationRequested extends TransferProcessEvent { - private TransferProcessProvisioningRequested() { + private TransferProcessPreparationRequested() { } @Override public String name() { - return "transfer.process.provisioningRequested"; + return "transfer.process.preparationRequested"; } @JsonPOJOBuilder(withPrefix = "") - public static class Builder extends TransferProcessEvent.Builder { + public static class Builder extends TransferProcessEvent.Builder { private Builder() { - super(new TransferProcessProvisioningRequested()); + super(new TransferProcessPreparationRequested()); } @JsonCreator diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/observe/TransferProcessListener.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/observe/TransferProcessListener.java index 34de6cfd33..23c524296d 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/observe/TransferProcessListener.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/observe/TransferProcessListener.java @@ -37,11 +37,11 @@ default void initiated(TransferProcess process) { } /** - * Called after an asynchronous provisioning for a {@link TransferProcess} was requested. + * Called after an asynchronous preparation for a {@link TransferProcess} was requested. * - * @param process the transfer process that has been requested for provisioning. + * @param process the transfer process that has been requested for preparation. */ - default void provisioningRequested(TransferProcess process) { + default void preparationRequested(TransferProcess process) { } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/DataFlowResponse.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/DataFlowResponse.java index 3c8dc9fa19..bfe58c9a0c 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/DataFlowResponse.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/DataFlowResponse.java @@ -24,7 +24,7 @@ public class DataFlowResponse { private DataAddress dataAddress; private String dataPlaneId; - private boolean provisioning; + private boolean async; private DataFlowResponse() { } @@ -37,8 +37,8 @@ public String getDataPlaneId() { return dataPlaneId; } - public boolean isProvisioning() { - return provisioning; + public boolean isAsync() { + return async; } public static class Builder { @@ -63,8 +63,8 @@ public Builder dataPlaneId(String dataPlaneId) { return this; } - public Builder provisioning(boolean provisioning) { - response.provisioning = provisioning; + public Builder async(boolean provisioning) { + response.async = provisioning; return this; } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java index 4bc0d5dcfb..56d6722d29 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java @@ -48,9 +48,9 @@ import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETING_REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.INITIAL; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PREPARATION_REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PROVISIONED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PROVISIONING; -import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PROVISIONING_REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.REQUESTING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.RESUMED; @@ -159,15 +159,15 @@ public void protocolMessageReceived(String id) { protocolMessages.addReceived(id); } - public void transitionProvisioningRequested() { - transition(PROVISIONING_REQUESTED, PROVISIONING, INITIAL); + public void transitionPreparationRequested() { + transition(PREPARATION_REQUESTED, PROVISIONING, INITIAL); } public void transitionRequesting() { if (Type.PROVIDER == type) { throw new IllegalStateException("Provider processes have no REQUESTING state"); } - transition(REQUESTING, INITIAL, PROVISIONED, PROVISIONING_REQUESTED, REQUESTING); + transition(REQUESTING, INITIAL, PROVISIONED, PREPARATION_REQUESTED, REQUESTING); } public void transitionRequested() { @@ -213,7 +213,7 @@ public void transitionCompleted() { } public boolean canBeTerminated() { - return currentStateIsOneOf(INITIAL, PROVISIONING, PROVISIONING_REQUESTED, PROVISIONED, REQUESTING, REQUESTED, + return currentStateIsOneOf(INITIAL, PROVISIONING, PREPARATION_REQUESTED, PROVISIONED, REQUESTING, REQUESTED, STARTING, STARTUP_REQUESTED, STARTED, COMPLETING, COMPLETING_REQUESTED, SUSPENDING, SUSPENDING_REQUESTED, SUSPENDED, RESUMING, TERMINATING, TERMINATING_REQUESTED); } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessStates.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessStates.java index c4687dc9a5..011d508f7a 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessStates.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessStates.java @@ -25,7 +25,7 @@ public enum TransferProcessStates { INITIAL(100), @Deprecated(since = "0.16.0") PROVISIONING(200), - PROVISIONING_REQUESTED(250), + PREPARATION_REQUESTED(250), @Deprecated(since = "0.16.0") PROVISIONED(300), REQUESTING(400), diff --git a/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessEventTest.java b/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessEventTest.java index 6c371e2c1e..c81e0fdb8a 100644 --- a/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessEventTest.java +++ b/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/event/TransferProcessEventTest.java @@ -70,7 +70,7 @@ public Stream provideArguments(ExtensionContext context) { TransferProcessTerminated.Builder.newInstance().reason("any reason"), TransferProcessInitiated.Builder.newInstance(), TransferProcessProvisioned.Builder.newInstance(), - TransferProcessProvisioningRequested.Builder.newInstance(), + TransferProcessPreparationRequested.Builder.newInstance(), TransferProcessRequested.Builder.newInstance().transferProcessId("id") ); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/dataplane/DataPlaneSignalingClient.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/dataplane/DataPlaneSignalingClient.java index 7e41d1d2c4..02d254b0fc 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/dataplane/DataPlaneSignalingClient.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/dataplane/DataPlaneSignalingClient.java @@ -14,10 +14,10 @@ package org.eclipse.edc.test.e2e.dataplane; -import io.restassured.RestAssured; import io.restassured.http.ContentType; import org.eclipse.edc.junit.extensions.ComponentRuntimeContext; +import static io.restassured.RestAssured.given; import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; @@ -31,7 +31,7 @@ public DataPlaneSignalingClient(ComponentRuntimeContext context) { public void awaitFlowToBe(String flowId, String status) { await().untilAsserted(() -> { var uri = context.getEndpoint("default").get(); - RestAssured.given() + given() .baseUri(uri.toString()) .get("/v1/dataflows/{flowId}/status", flowId) .then() @@ -41,4 +41,15 @@ public void awaitFlowToBe(String flowId, String status) { .body("state", equalTo(status)); }); } + + public void completePreparation(String flowId) { + var uri = context.getEndpoint("default").get(); + + given() + .baseUri(uri.toString()) + .post("/control/flows/{flowId}/complete-preparation", flowId) + .then() + .log().ifValidationFails() + .statusCode(204); + } } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferSignalingEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferSignalingEndToEndTest.java index b3ed91a1ac..42fcf5b95f 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferSignalingEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferSignalingEndToEndTest.java @@ -38,6 +38,7 @@ import static org.eclipse.edc.connector.controlplane.test.system.utils.Participant.MANAGEMENT_V4; import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETED; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PREPARATION_REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATED; @@ -161,8 +162,7 @@ default void shouldSuspendAndResumeFromProvider(@Runtime(PROVIDER_CP) TransferEn consumerDataPlane.awaitFlowToBe(consumerTransferProcessId, "STARTED"); providerDataPlane.awaitFlowToBe(providerTransferProcessId, "STARTED"); } - - + @Test default void shouldSuspendAndResumeFromConsumer(@Runtime(PROVIDER_CP) TransferEndToEndParticipant provider, @Runtime(CONSUMER_CP) TransferEndToEndParticipant consumer, @@ -194,6 +194,26 @@ default void shouldSuspendAndResumeFromConsumer(@Runtime(PROVIDER_CP) TransferEn providerDataPlane.awaitFlowToBe(providerTransferProcessId, "STARTED"); } + @Test + default void shouldSupportAsyncPreparation(@Runtime(PROVIDER_CP) TransferEndToEndParticipant provider, + @Runtime(CONSUMER_CP) TransferEndToEndParticipant consumer, + @Runtime(PROVIDER_DP) DataPlaneSignalingClient providerDataPlane, + @Runtime(CONSUMER_DP) DataPlaneSignalingClient consumerDataPlane) { + var assetId = createOffer(provider); + var consumerTransferProcessId = consumer.requestAssetFrom(assetId, provider) + .withTransferType("AsyncPrepare-PUSH").execute(); + + consumer.awaitTransferToBeInState(consumerTransferProcessId, PREPARATION_REQUESTED); + consumerDataPlane.awaitFlowToBe(consumerTransferProcessId, "PREPARING"); + + consumerDataPlane.completePreparation(consumerTransferProcessId); + + consumer.awaitTransferToBeInState(consumerTransferProcessId, COMPLETED); + var providerTransferProcessId = provider.getTransferProcessIdGivenCounterPartyOne(consumerTransferProcessId); + consumerDataPlane.awaitFlowToBe(consumerTransferProcessId, "COMPLETED"); + providerDataPlane.awaitFlowToBe(providerTransferProcessId, "COMPLETED"); + } + private String createOffer(TransferEndToEndParticipant provider) { var createAssetRequestBody = createObjectBuilder() .add(CONTEXT, createArrayBuilder().add(EDC_CONNECTOR_MANAGEMENT_CONTEXT_V2)) diff --git a/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/ControlController.java b/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/ControlController.java new file mode 100644 index 0000000000..64f74ad01a --- /dev/null +++ b/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/ControlController.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2025 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.edc.test.runtime.signaling; + +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import org.eclipse.dataplane.Dataplane; +import org.eclipse.dataplane.domain.DataAddress; +import org.eclipse.dataplane.domain.Result; +import org.eclipse.edc.spi.monitor.Monitor; + +import static java.util.Collections.emptyList; + +@Path("/control") +public class ControlController { + + private final Monitor monitor; + private final Dataplane dataplane; + private final SignalingDataPlaneRuntimeExtension.ApiConfiguration apiConfiguration; + + public ControlController(Monitor monitor, Dataplane dataplane, SignalingDataPlaneRuntimeExtension.ApiConfiguration apiConfiguration) { + this.monitor = monitor; + this.dataplane = dataplane; + this.apiConfiguration = apiConfiguration; + } + + @POST + @Path("/flows/{flowId}/complete-preparation") + public void completePreparation(@PathParam("flowId") String flowId) { + dataplane.notifyPrepared(flowId, dataFlow -> { + dataFlow.setDataAddress(new DataAddress("http", apiConfiguration.receiveDataEndpoint(), emptyList())); + return Result.success(dataFlow); + }).orElseThrow(RuntimeException::new); + } + + @GET + @Path("/source") + public String dataSource() { + monitor.info("Data requested"); + return "data"; + } + +} diff --git a/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/ReceiveDataController.java b/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/DataController.java similarity index 74% rename from system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/ReceiveDataController.java rename to system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/DataController.java index 71fc01b3c8..816dd0aaa8 100644 --- a/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/ReceiveDataController.java +++ b/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/DataController.java @@ -14,16 +14,17 @@ package org.eclipse.edc.test.runtime.signaling; +import jakarta.ws.rs.GET; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import org.eclipse.edc.spi.monitor.Monitor; -@Path("/") -public class ReceiveDataController { +@Path("/data") +public class DataController { private final Monitor monitor; - public ReceiveDataController(Monitor monitor) { + public DataController(Monitor monitor) { this.monitor = monitor; } @@ -33,4 +34,11 @@ public void receiveData(String data) { monitor.info("Data received: " + data); } + @GET + @Path("/source") + public String dataSource() { + monitor.info("Data requested"); + return "data"; + } + } diff --git a/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/SignalingDataPlaneRuntimeExtension.java b/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/SignalingDataPlaneRuntimeExtension.java index 97ed2217cc..c4c58790e7 100644 --- a/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/SignalingDataPlaneRuntimeExtension.java +++ b/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/SignalingDataPlaneRuntimeExtension.java @@ -23,8 +23,10 @@ import org.eclipse.dataplane.logic.OnPrepare; import org.eclipse.dataplane.logic.OnStart; import org.eclipse.dataplane.logic.OnStarted; +import org.eclipse.edc.runtime.metamodel.annotation.Configuration; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.runtime.metamodel.annotation.Settings; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; @@ -51,10 +53,8 @@ public class SignalingDataPlaneRuntimeExtension implements ServiceExtension { @Setting(key = "signaling.dataplane.controlplane.endpoint") private String controlplaneEndpoint; - @Setting(key = "web.http.port") - private int httpPort; - @Setting(key = "web.http.path") - private String httpPath; + @Configuration + private ApiConfiguration apiConfiguration; @Inject private WebService webService; @@ -67,11 +67,12 @@ public class SignalingDataPlaneRuntimeExtension implements ServiceExtension { @Override public void initialize(ServiceExtensionContext context) { dataplane = Dataplane.newInstance() - .endpoint("http://localhost:%d%s/v1/dataflows".formatted(httpPort, httpPath)) + .endpoint(apiConfiguration.dataFlowEndpoint()) .transferType("Finite-PUSH") .transferType("Finite-PULL") .transferType("NonFinite-PUSH") .transferType("NonFinite-PULL") + .transferType("AsyncPrepare-PUSH") .onPrepare(new DataplaneOnPrepare()) .onStart(new DataplaneOnStart()) .onStarted(new DataplaneOnStarted()) @@ -80,8 +81,8 @@ public void initialize(ServiceExtensionContext context) { .onTerminate(this::stopDataFlow) .build(); webService.registerResource(dataplane.controller()); - webService.registerResource(new ReceiveDataController(monitor)); - webService.registerResource(new SourceDataController(monitor)); + webService.registerResource(new DataController(monitor)); + webService.registerResource(new ControlController(monitor, dataplane, apiConfiguration)); } @Override @@ -100,30 +101,25 @@ public Result action(DataFlow dataFlow) { return Result.failure(new InvalidRequestException("DataAddress should not be null for PUSH transfers")); } - var future = Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { - var destinationUri = URI.create(dataFlow.getDataAddress().endpoint()); - var request = HttpRequest.newBuilder(destinationUri).POST(HttpRequest.BodyPublishers.ofString("test-data")).build(); - HttpClient.newHttpClient().sendAsync(request, HttpResponse.BodyHandlers.discarding()); - }, 0, 200, TimeUnit.MILLISECONDS); + var future = Executors.newSingleThreadScheduledExecutor() + .scheduleAtFixedRate(() -> pushData(dataFlow), 0, 200, TimeUnit.MILLISECONDS); ongoingNonFiniteTransfers.put(dataFlow.getId(), future); return Result.success(dataFlow); } - case "Finite-PUSH" -> { + case "Finite-PUSH", "AsyncPrepare-PUSH" -> { if (dataFlow.getDataAddress() == null) { return Result.failure(new InvalidRequestException("DataAddress should not be null for PUSH transfers")); } - var destinationUri = URI.create(dataFlow.getDataAddress().endpoint()); - var request = HttpRequest.newBuilder(destinationUri).POST(HttpRequest.BodyPublishers.ofString("test-data")).build(); - HttpClient.newHttpClient().sendAsync(request, HttpResponse.BodyHandlers.discarding()) + pushData(dataFlow) .whenComplete((response, throwable) -> notifyCompletion(dataFlow, response, throwable)); return Result.success(dataFlow); } case "NonFinite-PULL", "Finite-PULL" -> { - var dataAddress = new DataAddress(dataFlow.getTransferType(), "http", "http://localhost:%d%s/source".formatted(httpPort, httpPath), emptyList()); + var dataAddress = new DataAddress(dataFlow.getTransferType(), "http", apiConfiguration.dataSourceEndpoint(), emptyList()); dataFlow.setDataAddress(dataAddress); return Result.success(dataFlow); } @@ -133,6 +129,12 @@ public Result action(DataFlow dataFlow) { } } + private CompletableFuture> pushData(DataFlow dataFlow) { + var destinationUri = URI.create(dataFlow.getDataAddress().endpoint()); + var request = HttpRequest.newBuilder(destinationUri).POST(HttpRequest.BodyPublishers.ofString("test-data")).build(); + return HttpClient.newHttpClient().sendAsync(request, HttpResponse.BodyHandlers.discarding()); + } + } private class DataplaneOnStarted implements OnStarted { @@ -178,7 +180,11 @@ private CompletableFuture> requestData(DataFlow dataFlow, U private class DataplaneOnPrepare implements OnPrepare { @Override public Result action(DataFlow dataFlow) { - var destination = new DataAddress("Finite-PUSH", "http", "http://localhost:%d%s/receive".formatted(httpPort, httpPath), emptyList()); + if (dataFlow.getTransferType().startsWith("AsyncPrepare-")) { + dataFlow.transitionToPreparing(); + return Result.success(dataFlow); + } + var destination = new DataAddress("Finite-PUSH", "http", apiConfiguration.receiveDataEndpoint(), emptyList()); dataFlow.setDataAddress(destination); return Result.success(dataFlow); } @@ -216,4 +222,23 @@ private void notifyCompleted(DataFlow dataFlow) { } }); } + + @Settings + public record ApiConfiguration( + @Setting(key = "web.http.path") String path, + @Setting(key = "web.http.port") int port + ) { + + public String receiveDataEndpoint() { + return "http://localhost:%d%s/data/receive".formatted(port, path); + } + + public String dataSourceEndpoint() { + return "http://localhost:%d%s/data/source".formatted(port, path); + } + + public String dataFlowEndpoint() { + return "http://localhost:%d%s/v1/dataflows".formatted(port, path); + } + } } diff --git a/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/SourceDataController.java b/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/SourceDataController.java deleted file mode 100644 index 930f444988..0000000000 --- a/system-tests/e2e-transfer-test/signaling-data-plane/src/main/java/org/eclipse/edc/test/runtime/signaling/SourceDataController.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2026 Think-it GmbH - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Think-it GmbH - initial API and implementation - * - */ - -package org.eclipse.edc.test.runtime.signaling; - -import jakarta.ws.rs.GET; -import jakarta.ws.rs.Path; -import org.eclipse.edc.spi.monitor.Monitor; - -@Path("/") -public class SourceDataController { - private final Monitor monitor; - - public SourceDataController(Monitor monitor) { - this.monitor = monitor; - } - - @GET - @Path("/source") - public String dataSource() { - monitor.info("Data requested"); - return "data"; - } -}