Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ServiceResult<Void> execute(Request.Builder requestBuilder) {
public ServiceResult<String> request(Request.Builder requestBuilder) {
authenticationProvider.authenticationHeaders().forEach(requestBuilder::header);
try (
var response = httpClient.execute(requestBuilder.build(), List.of(retryWhenStatusIsNotIn(200, 204)));
var response = httpClient.execute(requestBuilder.build(), List.of(retryWhenStatusIsNotIn(200, 202, 204)));
var responseBody = response.body();
) {
if (response.isSuccessful()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ private boolean processInitial(TransferProcess process) {
} else {
var response = provisioning.getContent();
process.setDataPlaneId(response.getDataPlaneId());
if (response.isProvisioning()) {
process.transitionProvisioningRequested();
if (response.isAsync()) {
process.transitionPreparationRequested();
observable.invokeForEach(l -> l.preparationRequested(process));
} else {
process.updateDestination(response.getDataAddress());
process.transitionRequesting();
Expand Down Expand Up @@ -296,7 +297,7 @@ private boolean startTransferFlow(TransferProcess process, Consumer<TransferProc
return entityRetryProcessFactory.retryProcessor(process)
.doProcess(result("Start DataFlow", (t, c) -> dataFlowController.start(process, policy)))
.doProcess(futureResult("Dispatch TransferRequestMessage to: " + process.getCounterPartyAddress(), (t, dataFlowResponse) -> {
if (dataFlowResponse.isProvisioning()) {
if (dataFlowResponse.isAsync()) {
return completedFuture(StatusResult.success(dataFlowResponse));
}
var messageBuilder = TransferStartMessage.Builder.newInstance().dataAddress(dataFlowResponse.getDataAddress());
Expand All @@ -305,7 +306,7 @@ private boolean startTransferFlow(TransferProcess process, Consumer<TransferProc
}))
.onSuccess((t, dataFlowResponse) -> {
t.setDataPlaneId(dataFlowResponse.getDataPlaneId());
if (dataFlowResponse.isProvisioning()) {
if (dataFlowResponse.isAsync()) {
process.transitionStartupRequested();
update(t);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETING;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETING_REQUESTED;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.INITIAL;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PROVISIONING_REQUESTED;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PREPARATION_REQUESTED;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.REQUESTED;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.REQUESTING;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.RESUMED;
Expand Down Expand Up @@ -490,11 +490,11 @@ void shouldTransitionToTerminated_whenNoPolicyFound() {
}

@Test
void shouldTransitionToProvisioningRequested_whenProvisionThroughDataplaneSucceeds() {
void shouldTransitionToPreparationRequested_whenProvisionThroughDataplaneSucceeds() {
var dataPlaneId = UUID.randomUUID().toString();
var dataFlowResponse = DataFlowResponse.Builder.newInstance()
.dataPlaneId(dataPlaneId)
.provisioning(true)
.async(true)
.build();
var transferProcess = createTransferProcess(INITIAL);
when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code())))
Expand All @@ -506,10 +506,11 @@ void shouldTransitionToProvisioningRequested_whenProvisionThroughDataplaneSuccee

await().untilAsserted(() -> {
verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString());
verify(listener).preparationRequested(transferProcess);
var captor = ArgumentCaptor.forClass(TransferProcess.class);
verify(transferProcessStore).save(captor.capture());
var storedTransferProcess = captor.getValue();
assertThat(storedTransferProcess.getState()).isEqualTo(PROVISIONING_REQUESTED.code());
assertThat(storedTransferProcess.getState()).isEqualTo(PREPARATION_REQUESTED.code());
assertThat(storedTransferProcess.getDataPlaneId()).isEqualTo(dataPlaneId);
});
}
Expand All @@ -521,7 +522,7 @@ void shouldTransitionToRequesting_whenProvisionThroughDataplaneSucceedsButNoActu
var dataFlowResponse = DataFlowResponse.Builder.newInstance()
.dataPlaneId(dataPlaneId)
.dataAddress(dataDestination)
.provisioning(false)
.async(false)
.build();
var transferProcess = createTransferProcess(INITIAL);
when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code())))
Expand Down Expand Up @@ -699,7 +700,7 @@ void shouldNotSendMessageAndTransitionToStartupRequested_whenAsynchronousDataPla
when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STARTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList());
when(transferProcessStore.findById(process.getId())).thenReturn(process);
when(dataFlowController.start(any(), any())).thenReturn(StatusResult.success(
dataFlowResponseBuilder().provisioning(true).dataPlaneId("dataPlaneId").build()));
dataFlowResponseBuilder().async(true).dataPlaneId("dataPlaneId").build()));

manager.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessDeprovisioningRequested;
import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessEvent;
import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessInitiated;
import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessPreparationRequested;
import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessProvisioned;
import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessProvisioningRequested;
import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessRequested;
import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessSuspended;
Expand Down Expand Up @@ -49,8 +49,8 @@ public void initiated(TransferProcess process) {
}

@Override
public void provisioningRequested(TransferProcess process) {
var event = withBaseProperties(TransferProcessProvisioningRequested.Builder.newInstance(), process)
public void preparationRequested(TransferProcess process) {
var event = withBaseProperties(TransferProcessPreparationRequested.Builder.newInstance(), process)
.build();

eventRouter.publish(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess.Type.CONSUMER;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess.Type.PROVIDER;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PROVISIONING_REQUESTED;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PREPARATION_REQUESTED;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.REQUESTING;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTING;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTUP_REQUESTED;
Expand All @@ -48,7 +48,7 @@ void shouldTransitionProvisioned_whenConsumer() {
var newDestination = DataAddress.Builder.newInstance().type("new").build();
var originalDestination = DataAddress.Builder.newInstance().type("original").build();
var command = new NotifyPreparedCommand("test-id", newDestination);
var entity = TransferProcess.Builder.newInstance().state(PROVISIONING_REQUESTED.code()).type(CONSUMER).dataDestination(originalDestination).build();
var entity = TransferProcess.Builder.newInstance().state(PREPARATION_REQUESTED.code()).type(CONSUMER).dataDestination(originalDestination).build();

var result = handler.modify(entity, command);

Expand All @@ -61,7 +61,7 @@ void shouldTransitionProvisioned_whenConsumer() {
void shouldNotUpdateDestination_whenItIsMissing() {
var command = new NotifyPreparedCommand("test-id", null);
var originalDestination = DataAddress.Builder.newInstance().type("original").build();
var entity = TransferProcess.Builder.newInstance().state(PROVISIONING_REQUESTED.code()).dataDestination(originalDestination).build();
var entity = TransferProcess.Builder.newInstance().state(PREPARATION_REQUESTED.code()).dataDestination(originalDestination).build();

var result = handler.modify(entity, command);

Expand All @@ -87,7 +87,7 @@ void shouldTransitionToStarting_whenProvider() {
@Test
void postAction_shouldCallProvisioned() {
var command = new NotifyPreparedCommand("test-id", null);
var entity = TransferProcess.Builder.newInstance().state(PROVISIONING_REQUESTED.code()).build();
var entity = TransferProcess.Builder.newInstance().state(PREPARATION_REQUESTED.code()).build();
var listener = mock(TransferProcessListener.class);
observable.registerListener(listener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.signaling.port.api.DataPlaneRegistrationApiController;
import org.eclipse.edc.signaling.port.api.DataPlaneTransferApiController;
import org.eclipse.edc.signaling.port.transformer.DataAddressToDspDataAddressTransformer;
import org.eclipse.edc.signaling.port.transformer.DataFlowResponseMessageToDataFlowResponseTransformer;
import org.eclipse.edc.signaling.port.transformer.DspDataAddressToDataAddressTransformer;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.web.spi.WebService;
import org.eclipse.edc.web.spi.configuration.ApiContext;

Expand All @@ -38,6 +42,8 @@ public class DataPlaneSignalingApiExtension implements ServiceExtension {
private WebService webService;
@Inject
private TransferProcessService transferProcessService;
@Inject
private TypeTransformerRegistry transformerRegistry;

@Override
public String name() {
Expand All @@ -46,8 +52,12 @@ public String name() {

@Override
public void initialize(ServiceExtensionContext context) {
var typeTransformerRegistry = transformerRegistry.forContext("signaling-api");
typeTransformerRegistry.register(new DataAddressToDspDataAddressTransformer());
typeTransformerRegistry.register(new DataFlowResponseMessageToDataFlowResponseTransformer());
typeTransformerRegistry.register(new DspDataAddressToDataAddressTransformer());
webService.registerResource(ApiContext.CONTROL, new DataPlaneRegistrationApiController(dataPlaneSelectorService));
webService.registerResource(ApiContext.CONTROL, new DataPlaneTransferApiController(transferProcessService));
webService.registerResource(ApiContext.CONTROL, new DataPlaneTransferApiController(transferProcessService, typeTransformerRegistry));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ private StatusResult<Void> sendMessage(String flowId, String name, Object messag

private StatusResult<DataFlowResponseMessage> handleResponse(String responseBody) {
return Optional.ofNullable(responseBody)
.map(this::deserializeStartMessage)
.map(this::deserializeResponseMessage)
.orElseGet(() -> StatusResult.failure(FATAL_ERROR, "Body missing"));
}

private StatusResult<DataFlowResponseMessage> deserializeStartMessage(String responseBody) {
private StatusResult<DataFlowResponseMessage> deserializeResponseMessage(String responseBody) {
try {
var message = objectMapperSupplier.get().readValue(responseBody, DataFlowResponseMessage.class);
return StatusResult.success(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,25 @@
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.ws.rs.core.Response;
import org.eclipse.edc.api.model.ApiCoreSchema;
import org.eclipse.edc.signaling.domain.DataFlowResponseMessage;

import static jakarta.ws.rs.HttpMethod.POST;

@OpenAPIDefinition
@Tag(name = "Data Plane Transfer events")
public interface DataPlaneTransferApi {

@Operation(
method = POST,
description = "Notify a prepared transfer",
responses = {
@ApiResponse(responseCode = "200", description = "Prepared notification delivered correctly"),
@ApiResponse(responseCode = "404", description = "Transfer process does not exist",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class))))
}
)
Response prepared(String transferId, DataFlowResponseMessage message);

@Operation(
method = POST,
description = "Notify a completed transfer",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,43 @@
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Response;
import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.command.NotifyPreparedCommand;
import org.eclipse.edc.signaling.domain.DataFlowPrepareMessage;
import org.eclipse.edc.signaling.domain.DataFlowResponseMessage;
import org.eclipse.edc.spi.result.ServiceResult;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
import static org.eclipse.edc.web.spi.exception.ServiceResultHandler.exceptionMapper;
import static org.eclipse.edc.web.spi.exception.ServiceResultHandler.mapToException;

@Path("/transfers")
@Produces(APPLICATION_JSON)
@Consumes(APPLICATION_JSON)
public class DataPlaneTransferApiController implements DataPlaneTransferApi {

private final TransferProcessService transferProcessService;
private final TypeTransformerRegistry typeTransformerRegistry;

public DataPlaneTransferApiController(TransferProcessService transferProcessService) {
public DataPlaneTransferApiController(TransferProcessService transferProcessService, TypeTransformerRegistry typeTransformerRegistry) {
this.transferProcessService = transferProcessService;
this.typeTransformerRegistry = typeTransformerRegistry;
}

@Path("/{transferId}/dataflow/prepared")
@POST
@Override
public Response prepared(@PathParam("transferId") String transferId, DataFlowResponseMessage message) {
typeTransformerRegistry.transform(message, DataFlowResponse.class)
.map(ServiceResult::success)
.orElse(failure -> ServiceResult.badRequest(failure.getMessages()))
.map(response -> new NotifyPreparedCommand(transferId, response.getDataAddress()))
.compose(transferProcessService::notifyPrepared)
.orElseThrow(f -> mapToException(f, DataFlowPrepareMessage.class));

return Response.ok().build();
}

@Path("/{transferId}/dataflow/completed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public Class<DataFlowResponse> getOutputType() {
return DataFlowResponse.Builder.newInstance()
.dataAddress(context.transform(dataFlowResponseMessage.getDataAddress(), DataAddress.class))
.dataPlaneId(dataFlowResponseMessage.getDataplaneId())
.provisioning(false)
.async(dataFlowResponseMessage.getState().endsWith("ING"))
.build();
}

Expand Down
Loading