From fbdac82980c94d7065f92de37da598c8e4d2b387 Mon Sep 17 00:00:00 2001 From: Timofey Durakov Date: Tue, 25 Jan 2022 15:29:26 +0400 Subject: [PATCH 1/7] Bumped version of github api and jwt for changelog generator --- tools/changelog-generator/requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/changelog-generator/requirements.txt b/tools/changelog-generator/requirements.txt index 0f259df1f0e..bd7e42367ed 100644 --- a/tools/changelog-generator/requirements.txt +++ b/tools/changelog-generator/requirements.txt @@ -7,8 +7,8 @@ GitPython==2.1.11 idna==2.8 Jinja2==2.11.3 MarkupSafe==1.1.1 -PyGithub==1.43.7 -PyJWT==1.7.1 +PyGithub==1.55 +PyJWT==2.0.0 requests==2.22.0 smmap2==2.0.5 urllib3==1.25.9 From ac7fd6218801f51032746e59fe3af4f3bac32486 Mon Sep 17 00:00:00 2001 From: Nikita Rydanov Date: Fri, 25 Feb 2022 16:33:55 +0400 Subject: [PATCH 2/7] Each request now has shared outer correlation-id and unique inner correlation-id --- .../org/openkilda/northbound/service/impl/FlowServiceImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/service/impl/FlowServiceImpl.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/service/impl/FlowServiceImpl.java index acc50034a61..cc958564394 100644 --- a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/service/impl/FlowServiceImpl.java +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/service/impl/FlowServiceImpl.java @@ -375,11 +375,12 @@ public CompletableFuture> deleteAllFlows() { logger.warn("Delete all flows request"); // TODO: Need a getFlowIDs .. since that is all we need CompletableFuture> getFlowsStage = this.getAllFlows(); + final String correlationId = RequestCorrelationId.getId(); getFlowsStage.thenApply(flows -> { List> deletionRequests = new ArrayList<>(); for (int i = 0; i < flows.size(); i++) { - String requestId = idFactory.produceChained(String.valueOf(i)); + String requestId = idFactory.produceChained(correlationId); FlowResponsePayload flow = flows.get(i); if (flow.getYFlowId() != null) { // Skip y-sub-flows. From 083c3d25771b8da4d933ba935818a7785f45f29b Mon Sep 17 00:00:00 2001 From: Nikita Rydanov Date: Thu, 17 Mar 2022 17:10:09 +0400 Subject: [PATCH 3/7] Removed duplicated addChangedFields call + unignored test --- .../wfm/topology/nbworker/services/FlowOperationsService.java | 4 ++-- .../functionaltests/spec/flows/PartialUpdateSpec.groovy | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src-java/nbworker-topology/nbworker-storm-topology/src/main/java/org/openkilda/wfm/topology/nbworker/services/FlowOperationsService.java b/src-java/nbworker-topology/nbworker-storm-topology/src/main/java/org/openkilda/wfm/topology/nbworker/services/FlowOperationsService.java index 15d8e0bdae9..067727257dd 100644 --- a/src-java/nbworker-topology/nbworker-storm-topology/src/main/java/org/openkilda/wfm/topology/nbworker/services/FlowOperationsService.java +++ b/src-java/nbworker-topology/nbworker-storm-topology/src/main/java/org/openkilda/wfm/topology/nbworker/services/FlowOperationsService.java @@ -386,9 +386,9 @@ public Flow updateFlow(FlowOperationsCarrier carrier, FlowPatch flowPatch) throw Flow updatedFlow = updateFlowResult.getUpdatedFlow(); if (updateFlowResult.isNeedUpdateFlow()) { FlowRequest flowRequest = RequestedFlowMapper.INSTANCE.toFlowRequest(updatedFlow); - addChangedFields(flowRequest, flowPatch); + FlowRequest changedRequest = addChangedFields(flowRequest, flowPatch); flowDashboardLogger.onFlowPatchUpdate(RequestedFlowMapper.INSTANCE.toFlow(flowRequest)); - carrier.sendUpdateRequest(addChangedFields(flowRequest, flowPatch)); + carrier.sendUpdateRequest(changedRequest); } else { flowDashboardLogger.onFlowPatchUpdate(updatedFlow); carrier.sendNorthboundResponse(buildFlowResponse(updatedFlow)); diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/PartialUpdateSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/PartialUpdateSpec.groovy index 0fd99297fa5..716f331f443 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/PartialUpdateSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/PartialUpdateSpec.groovy @@ -768,7 +768,6 @@ class PartialUpdateSpec extends HealthCheckSpecification { @Tidy @Tags(LOW_PRIORITY) - @Ignore("https://github.com/telstra/open-kilda/issues/4411") def "Able to update vlanId via partialUpdate in case vlanId==0 and innerVlanId!=0"() { given: "A default flow" assumeTrue(useMultitable, "Multi table is not enabled in kilda configuration") From 5f2893eb68ca062dd16a001858cee26151bb6657 Mon Sep 17 00:00:00 2001 From: violetta shakirova Date: Thu, 17 Mar 2022 19:54:52 +0400 Subject: [PATCH 4/7] Fixed lag creation with duplicate port numbers Closes: #4644 --- .../command/grpc/CreateOrUpdateLogicalPortRequest.java | 6 +++--- .../network/storm/bolt/bfd/worker/BfdWorker.java | 2 +- .../northbound/dto/v2/switches/LagPortRequest.java | 4 ++-- .../northbound/converter/LagPortMapperTest.java | 3 ++- .../swmanager/request/CreateLagPortRequest.java | 4 ++-- .../swmanager/request/UpdateLagPortRequest.java | 4 ++-- .../messaging/swmanager/response/LagPortResponse.java | 4 ++-- .../swmanager/request/CreateLagPortRequestTest.java | 4 ++-- .../swmanager/response/LagPortResponseTest.java | 4 ++-- .../wfm/topology/switchmanager/fsm/CreateLagPortFsm.java | 3 +-- .../wfm/topology/switchmanager/fsm/DeleteLagPortFsm.java | 4 ++-- .../service/handler/LagPortUpdateHandler.java | 4 ++-- .../switchmanager/service/impl/CommandBuilderImpl.java | 3 ++- .../switchmanager/service/UpdateLagPortServiceTest.java | 9 ++++----- .../service/handler/LagPortUpdateHandlerTest.java | 4 ++-- 15 files changed, 31 insertions(+), 31 deletions(-) diff --git a/src-java/grpc-speaker/grpc-api/src/main/java/org/openkilda/messaging/command/grpc/CreateOrUpdateLogicalPortRequest.java b/src-java/grpc-speaker/grpc-api/src/main/java/org/openkilda/messaging/command/grpc/CreateOrUpdateLogicalPortRequest.java index 3d65c850386..06bf1a1d4f4 100644 --- a/src-java/grpc-speaker/grpc-api/src/main/java/org/openkilda/messaging/command/grpc/CreateOrUpdateLogicalPortRequest.java +++ b/src-java/grpc-speaker/grpc-api/src/main/java/org/openkilda/messaging/command/grpc/CreateOrUpdateLogicalPortRequest.java @@ -22,14 +22,14 @@ import lombok.Data; import lombok.EqualsAndHashCode; -import java.util.List; +import java.util.Set; @Data @EqualsAndHashCode(callSuper = true) public class CreateOrUpdateLogicalPortRequest extends GrpcBaseRequest { @JsonProperty("port_numbers") - private List portNumbers; + private Set portNumbers; @JsonProperty("logical_port_number") private int logicalPortNumber; @@ -39,7 +39,7 @@ public class CreateOrUpdateLogicalPortRequest extends GrpcBaseRequest { @JsonCreator public CreateOrUpdateLogicalPortRequest(@JsonProperty("address") String address, - @JsonProperty("port_numbers") List portNumbers, + @JsonProperty("port_numbers") Set portNumbers, @JsonProperty("logical_port_number") int logicalPortNumber, @JsonProperty("type") LogicalPortType type) { super(address); diff --git a/src-java/network-topology/network-storm-topology/src/main/java/org/openkilda/wfm/topology/network/storm/bolt/bfd/worker/BfdWorker.java b/src-java/network-topology/network-storm-topology/src/main/java/org/openkilda/wfm/topology/network/storm/bolt/bfd/worker/BfdWorker.java index 55b47ed6206..039cf6ce422 100644 --- a/src-java/network-topology/network-storm-topology/src/main/java/org/openkilda/wfm/topology/network/storm/bolt/bfd/worker/BfdWorker.java +++ b/src-java/network-topology/network-storm-topology/src/main/java/org/openkilda/wfm/topology/network/storm/bolt/bfd/worker/BfdWorker.java @@ -150,7 +150,7 @@ public void processPortCreateRequest(String requestId, Endpoint logical, int phy } CreateOrUpdateLogicalPortRequest request = new CreateOrUpdateLogicalPortRequest( - address.get(), Collections.singletonList(physicalPortNumber), logical.getPortNumber(), + address.get(), Collections.singleton(physicalPortNumber), logical.getPortNumber(), LogicalPortType.BFD); emit(STREAM_GRPC_ID, getCurrentTuple(), makeGrpcTuple(requestId, request)); } diff --git a/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/switches/LagPortRequest.java b/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/switches/LagPortRequest.java index c4110321eed..2bcca8dc78b 100644 --- a/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/switches/LagPortRequest.java +++ b/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/switches/LagPortRequest.java @@ -21,12 +21,12 @@ import lombok.Data; import lombok.NoArgsConstructor; -import java.util.List; +import java.util.Set; @Data @NoArgsConstructor @AllArgsConstructor @JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class) public class LagPortRequest { - private List portNumbers; + private Set portNumbers; } diff --git a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/LagPortMapperTest.java b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/LagPortMapperTest.java index 30f90fed59e..71a5f6115c4 100644 --- a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/LagPortMapperTest.java +++ b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/LagPortMapperTest.java @@ -21,6 +21,7 @@ import org.openkilda.messaging.swmanager.response.LagPortResponse; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -51,7 +52,7 @@ public void mapLagPortDtoTest() { @Test public void mapLagResponseTest() { LagPortResponse response = new LagPortResponse(LOGICAL_PORT_NUMBER_1, - Lists.newArrayList(PHYSICAL_PORT_NUMBER_1, PHYSICAL_PORT_NUMBER_2)); + Sets.newHashSet(PHYSICAL_PORT_NUMBER_1, PHYSICAL_PORT_NUMBER_2)); org.openkilda.northbound.dto.v2.switches.LagPortResponse dto = lagMapper.map(response); assertEquals(LOGICAL_PORT_NUMBER_1, dto.getLogicalPortNumber()); diff --git a/src-java/swmanager-topology/swmanager-messaging/src/main/java/org/openkilda/messaging/swmanager/request/CreateLagPortRequest.java b/src-java/swmanager-topology/swmanager-messaging/src/main/java/org/openkilda/messaging/swmanager/request/CreateLagPortRequest.java index 12d7b007cbc..ba797f991ed 100644 --- a/src-java/swmanager-topology/swmanager-messaging/src/main/java/org/openkilda/messaging/swmanager/request/CreateLagPortRequest.java +++ b/src-java/swmanager-topology/swmanager-messaging/src/main/java/org/openkilda/messaging/swmanager/request/CreateLagPortRequest.java @@ -21,12 +21,12 @@ import lombok.EqualsAndHashCode; import lombok.Value; -import java.util.List; +import java.util.Set; @Value @EqualsAndHashCode(callSuper = false) public class CreateLagPortRequest extends CommandData { SwitchId switchId; - List portNumbers; + Set portNumbers; } diff --git a/src-java/swmanager-topology/swmanager-messaging/src/main/java/org/openkilda/messaging/swmanager/request/UpdateLagPortRequest.java b/src-java/swmanager-topology/swmanager-messaging/src/main/java/org/openkilda/messaging/swmanager/request/UpdateLagPortRequest.java index 3a28c559d08..b7ff5058100 100644 --- a/src-java/swmanager-topology/swmanager-messaging/src/main/java/org/openkilda/messaging/swmanager/request/UpdateLagPortRequest.java +++ b/src-java/swmanager-topology/swmanager-messaging/src/main/java/org/openkilda/messaging/swmanager/request/UpdateLagPortRequest.java @@ -21,12 +21,12 @@ import lombok.EqualsAndHashCode; import lombok.Value; -import java.util.List; +import java.util.Set; @Value @EqualsAndHashCode(callSuper = false) public class UpdateLagPortRequest extends CommandData { SwitchId switchId; int logicalPortNumber; - List targetPorts; + Set targetPorts; } diff --git a/src-java/swmanager-topology/swmanager-messaging/src/main/java/org/openkilda/messaging/swmanager/response/LagPortResponse.java b/src-java/swmanager-topology/swmanager-messaging/src/main/java/org/openkilda/messaging/swmanager/response/LagPortResponse.java index 75fc8e44d18..9779eed9e57 100644 --- a/src-java/swmanager-topology/swmanager-messaging/src/main/java/org/openkilda/messaging/swmanager/response/LagPortResponse.java +++ b/src-java/swmanager-topology/swmanager-messaging/src/main/java/org/openkilda/messaging/swmanager/response/LagPortResponse.java @@ -23,7 +23,7 @@ import lombok.EqualsAndHashCode; import lombok.Value; -import java.util.List; +import java.util.Set; @Value @EqualsAndHashCode(callSuper = false) @@ -31,5 +31,5 @@ @JsonInclude(JsonInclude.Include.NON_NULL) public class LagPortResponse extends InfoData { int logicalPortNumber; - List portNumbers; + Set portNumbers; } diff --git a/src-java/swmanager-topology/swmanager-messaging/src/test/java/org/openkilda/messaging/swmanager/request/CreateLagPortRequestTest.java b/src-java/swmanager-topology/swmanager-messaging/src/test/java/org/openkilda/messaging/swmanager/request/CreateLagPortRequestTest.java index 16aa5270cb2..b98ba31f9bf 100644 --- a/src-java/swmanager-topology/swmanager-messaging/src/test/java/org/openkilda/messaging/swmanager/request/CreateLagPortRequestTest.java +++ b/src-java/swmanager-topology/swmanager-messaging/src/test/java/org/openkilda/messaging/swmanager/request/CreateLagPortRequestTest.java @@ -19,7 +19,7 @@ import org.openkilda.messaging.command.CommandMessage; import org.openkilda.model.SwitchId; -import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; @@ -39,6 +39,6 @@ public void serializeLoop() throws Exception { } public static CreateLagPortRequest makeRequest() { - return new CreateLagPortRequest(new SwitchId(1), Lists.newArrayList(1, 2, 3)); + return new CreateLagPortRequest(new SwitchId(1), Sets.newHashSet(1, 2, 3)); } } diff --git a/src-java/swmanager-topology/swmanager-messaging/src/test/java/org/openkilda/messaging/swmanager/response/LagPortResponseTest.java b/src-java/swmanager-topology/swmanager-messaging/src/test/java/org/openkilda/messaging/swmanager/response/LagPortResponseTest.java index c2194911dbf..cf24ccce9c7 100644 --- a/src-java/swmanager-topology/swmanager-messaging/src/test/java/org/openkilda/messaging/swmanager/response/LagPortResponseTest.java +++ b/src-java/swmanager-topology/swmanager-messaging/src/test/java/org/openkilda/messaging/swmanager/response/LagPortResponseTest.java @@ -18,7 +18,7 @@ import org.openkilda.messaging.StringSerializer; import org.openkilda.messaging.info.InfoMessage; -import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; @@ -38,6 +38,6 @@ public void serializeLoop() throws Exception { } public static LagPortResponse makeResponse() { - return new LagPortResponse(1, Lists.newArrayList(1, 2, 3)); + return new LagPortResponse(1, Sets.newHashSet(1, 2, 3)); } } diff --git a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/fsm/CreateLagPortFsm.java b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/fsm/CreateLagPortFsm.java index 63a09f4a3e9..b87d85c45b2 100644 --- a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/fsm/CreateLagPortFsm.java +++ b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/fsm/CreateLagPortFsm.java @@ -51,7 +51,6 @@ import org.squirrelframework.foundation.fsm.StateMachineStatus; import org.squirrelframework.foundation.fsm.impl.AbstractStateMachine; -import java.util.ArrayList; import java.util.HashSet; @Slf4j @@ -140,7 +139,7 @@ void lagInstalled(CreateLagState from, CreateLagState to, CreateLagEvent event, void finishedEnter(CreateLagState from, CreateLagState to, CreateLagEvent event, CreateLagContext context) { LagPortResponse response = new LagPortResponse( - grpcRequest.getLogicalPortNumber(), new ArrayList<>(grpcRequest.getPortNumbers())); + grpcRequest.getLogicalPortNumber(), grpcRequest.getPortNumbers()); InfoMessage message = new InfoMessage(response, System.currentTimeMillis(), key); carrier.response(key, message); } diff --git a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/fsm/DeleteLagPortFsm.java b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/fsm/DeleteLagPortFsm.java index 3e1ad93d7cf..c8b52b1d386 100644 --- a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/fsm/DeleteLagPortFsm.java +++ b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/fsm/DeleteLagPortFsm.java @@ -156,12 +156,12 @@ void finishedEnter(DeleteLagState from, DeleteLagState to, DeleteLagEvent event, if (removedLagPort != null) { response = new LagPortResponse( removedLagPort.getLogicalPortNumber(), removedLagPort.getPhysicalPorts().stream() - .map(PhysicalPort::getPortNumber).collect(Collectors.toList())); + .map(PhysicalPort::getPortNumber).collect(Collectors.toSet())); } else { // dummy response entity // TODO(surabujin): weird behaviour, can we be more correct? - response = new LagPortResponse(request.getLogicalPortNumber(), Collections.emptyList()); + response = new LagPortResponse(request.getLogicalPortNumber(), Collections.emptySet()); } InfoMessage message = new InfoMessage(response, System.currentTimeMillis(), key); diff --git a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/handler/LagPortUpdateHandler.java b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/handler/LagPortUpdateHandler.java index 17bea073936..3f9689c5388 100644 --- a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/handler/LagPortUpdateHandler.java +++ b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/handler/LagPortUpdateHandler.java @@ -172,14 +172,14 @@ private MessageCookie newMessageCookie() { private CreateOrUpdateLogicalPortRequest newGrpcRequest(Set targetPorts) { String address = operationService.getSwitchIpAddress(goal.getSwitchId()); return new CreateOrUpdateLogicalPortRequest( - address, new ArrayList<>(targetPorts), goal.getLogicalPortNumber(), LogicalPortType.LAG); + address, targetPorts, goal.getLogicalPortNumber(), LogicalPortType.LAG); } public String formatLagPortReference() { return String.format("LAG logical port #%d on %s", goal.getLogicalPortNumber(), goal.getSwitchId()); } - private static String formatTargetPorts(List origin) { + private static String formatTargetPorts(Set origin) { List ports = new ArrayList<>(origin); Collections.sort(ports); return ports.stream().map(Object::toString).collect(Collectors.joining(", ")); diff --git a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/impl/CommandBuilderImpl.java b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/impl/CommandBuilderImpl.java index ae6770c24be..79da81dd8c5 100644 --- a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/impl/CommandBuilderImpl.java +++ b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/impl/CommandBuilderImpl.java @@ -78,6 +78,7 @@ import com.fasterxml.uuid.Generators; import com.fasterxml.uuid.NoArgGenerator; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.math.NumberUtils; @@ -474,7 +475,7 @@ public List buildLogicalPortInstallCommands( List requests = new ArrayList<>(); for (LogicalPortInfoEntry port : missingLogicalPorts) { requests.add(new CreateOrUpdateLogicalPortRequest( - ipAddress, port.getPhysicalPorts(), port.getLogicalPortNumber(), + ipAddress, Sets.newHashSet(port.getPhysicalPorts()), port.getLogicalPortNumber(), LogicalPortMapper.INSTANCE.map(port.getType()))); } return requests; diff --git a/src-java/swmanager-topology/swmanager-storm-topology/src/test/java/org/openkilda/wfm/topology/switchmanager/service/UpdateLagPortServiceTest.java b/src-java/swmanager-topology/swmanager-storm-topology/src/test/java/org/openkilda/wfm/topology/switchmanager/service/UpdateLagPortServiceTest.java index eeb939c269d..25f8efc1a68 100644 --- a/src-java/swmanager-topology/swmanager-storm-topology/src/test/java/org/openkilda/wfm/topology/switchmanager/service/UpdateLagPortServiceTest.java +++ b/src-java/swmanager-topology/swmanager-storm-topology/src/test/java/org/openkilda/wfm/topology/switchmanager/service/UpdateLagPortServiceTest.java @@ -24,6 +24,7 @@ import org.openkilda.wfm.topology.switchmanager.error.SwitchNotFoundException; import org.openkilda.wfm.topology.switchmanager.service.handler.LagPortUpdateHandler; +import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -31,8 +32,6 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import java.util.Arrays; - @RunWith(MockitoJUnitRunner.class) public class UpdateLagPortServiceTest { @Mock @@ -56,13 +55,13 @@ public void testKeepHandlerOnRequestKeyCollision() { Assert.assertFalse(subject.activeHandlers.containsKey(requestKey)); UpdateLagPortRequest request = new UpdateLagPortRequest( - new SwitchId(1), (int) config.getPoolConfig().getIdMinimum(), Arrays.asList(1, 2, 3)); + new SwitchId(1), (int) config.getPoolConfig().getIdMinimum(), Sets.newHashSet(1, 2, 3)); subject.update(requestKey, request); LagPortUpdateHandler origin = subject.activeHandlers.get(requestKey); Assert.assertNotNull(origin); UpdateLagPortRequest request2 = new UpdateLagPortRequest( - new SwitchId(2), (int) config.getPoolConfig().getIdMinimum(), Arrays.asList(1, 2, 3)); + new SwitchId(2), (int) config.getPoolConfig().getIdMinimum(), Sets.newHashSet(1, 2, 3)); Assert.assertThrows(InconsistentDataException.class, () -> subject.update(requestKey, request2)); Assert.assertSame(origin, subject.activeHandlers.get(requestKey)); } @@ -77,7 +76,7 @@ public void testHandlerRemoveOnException() { String requestKey = "test-key"; UpdateLagPortRequest request = new UpdateLagPortRequest( - switchId, (int) config.getPoolConfig().getIdMinimum(), Arrays.asList(1, 2, 3)); + switchId, (int) config.getPoolConfig().getIdMinimum(), Sets.newHashSet(1, 2, 3)); subject.update(requestKey, request); Mockito.verify(carrier).errorResponse( Mockito.eq(requestKey), Mockito.eq(ErrorType.NOT_FOUND), Mockito.anyString(), Mockito.anyString()); diff --git a/src-java/swmanager-topology/swmanager-storm-topology/src/test/java/org/openkilda/wfm/topology/switchmanager/service/handler/LagPortUpdateHandlerTest.java b/src-java/swmanager-topology/swmanager-storm-topology/src/test/java/org/openkilda/wfm/topology/switchmanager/service/handler/LagPortUpdateHandlerTest.java index 736930423ba..9771bfc17dc 100644 --- a/src-java/swmanager-topology/swmanager-storm-topology/src/test/java/org/openkilda/wfm/topology/switchmanager/service/handler/LagPortUpdateHandlerTest.java +++ b/src-java/swmanager-topology/swmanager-storm-topology/src/test/java/org/openkilda/wfm/topology/switchmanager/service/handler/LagPortUpdateHandlerTest.java @@ -31,6 +31,7 @@ import org.openkilda.wfm.topology.switchmanager.service.SwitchManagerCarrier; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,7 +40,6 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -173,6 +173,6 @@ private MessageCookie verifyStartHandler( } private UpdateLagPortRequest newRequest() { - return new UpdateLagPortRequest(new SwitchId(1), 2001, Arrays.asList(1, 2, 3)); + return new UpdateLagPortRequest(new SwitchId(1), 2001, Sets.newHashSet(1, 2, 3)); } } From ac2db6a2dc88a7cde627b6ae5c2c3621165431a2 Mon Sep 17 00:00:00 2001 From: Sergey Nikitin Date: Sat, 19 Mar 2022 00:11:14 +0400 Subject: [PATCH 5/7] Added HistoryTopology to save flow history events --- .../history-topology.topology_definition.toml | 5 + .../history-topology/history-topology.tmpl | 15 +++ confd/vars/main.yaml | 1 + docker/wfm/Dockerfile | 5 + docker/wfm/app/Makefile | 6 +- .../zookeeper/bin/create-zknode-and-start.sh | 8 ++ .../wfm/share/history/model/FlowDumpData.java | 6 +- .../history/model/FlowHistoryHolder.java | 8 +- .../wfm/topology/flowhs/FlowHsTopology.java | 47 +++++----- .../topology/flowhs/FlowHsTopologyConfig.java | 4 + .../flowhs/bolts/FlowCreateHubBolt.java | 11 ++- .../flowhs/bolts/FlowDeleteHubBolt.java | 11 ++- .../bolts/FlowMirrorPointCreateHubBolt.java | 12 ++- .../bolts/FlowMirrorPointDeleteHubBolt.java | 12 ++- .../flowhs/bolts/FlowPathSwapHubBolt.java | 11 ++- .../flowhs/bolts/FlowRerouteHubBolt.java | 11 ++- .../bolts/FlowSwapEndpointsHubBolt.java | 13 ++- .../flowhs/bolts/FlowUpdateHubBolt.java | 11 ++- .../flowhs/bolts/YFlowCreateHubBolt.java | 11 ++- .../flowhs/bolts/YFlowDeleteHubBolt.java | 11 ++- .../flowhs/bolts/YFlowRerouteHubBolt.java | 11 ++- .../flowhs/bolts/YFlowUpdateHubBolt.java | 11 ++- src-java/history-topology/README.md | 9 ++ .../history-storm-topology/.gitignore | 1 + .../history-storm-topology/build.gradle | 39 ++++++++ .../checkstyle/checkstyle-suppressions.xml | 9 ++ .../wfm/topology/history/HistoryTopology.java | 94 +++++++++++++++++++ .../history/HistoryTopologyConfig.java | 28 ++++++ .../topology/history/bolts}/HistoryBolt.java | 49 +++++----- .../resources/history-topology.yaml.example | 8 ++ .../openkilda/config/KafkaTopicsConfig.java | 4 + .../model/history/FlowEventDump.java | 14 +-- .../history/HibernateFlowEventDump.java | 6 +- .../ferma/frames/FlowEventDumpFrame.java | 10 +- src-java/settings.gradle | 2 + 35 files changed, 381 insertions(+), 133 deletions(-) create mode 100644 confd/conf.d/history-topology.topology_definition.toml create mode 100644 confd/templates/history-topology/history-topology.tmpl create mode 100644 src-java/history-topology/README.md create mode 100644 src-java/history-topology/history-storm-topology/.gitignore create mode 100644 src-java/history-topology/history-storm-topology/build.gradle create mode 100644 src-java/history-topology/history-storm-topology/src/checkstyle/checkstyle-suppressions.xml create mode 100644 src-java/history-topology/history-storm-topology/src/main/java/org/openkilda/wfm/topology/history/HistoryTopology.java create mode 100644 src-java/history-topology/history-storm-topology/src/main/java/org/openkilda/wfm/topology/history/HistoryTopologyConfig.java rename src-java/{base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/history/bolt => history-topology/history-storm-topology/src/main/java/org/openkilda/wfm/topology/history/bolts}/HistoryBolt.java (52%) create mode 100644 src-java/history-topology/history-storm-topology/src/main/resources/history-topology.yaml.example diff --git a/confd/conf.d/history-topology.topology_definition.toml b/confd/conf.d/history-topology.topology_definition.toml new file mode 100644 index 00000000000..4ee1e40d45f --- /dev/null +++ b/confd/conf.d/history-topology.topology_definition.toml @@ -0,0 +1,5 @@ +[template] +src = "history-topology/history-topology.tmpl" +dest = "src-java/history-topology/history-storm-topology/src/main/resources/history-topology.yaml" +keys = [ "/" ] +mode = "0644" diff --git a/confd/templates/history-topology/history-topology.tmpl b/confd/templates/history-topology/history-topology.tmpl new file mode 100644 index 00000000000..e7ecbceab49 --- /dev/null +++ b/confd/templates/history-topology/history-topology.tmpl @@ -0,0 +1,15 @@ +# topology configuration +config: + topology.parallelism: {{ getv "/kilda_storm_history_parallelism" }} + topology.workers: {{ getv "/kilda_storm_parallelism_workers_count" }} + topology.spouts.parallelism: 1 + +# spout definitions +spouts: + - id: "zookeeper.spout" + parallelism: 1 + +# bolt definitions +bolts: + - id: "zookeeper.bolt" + parallelism: 1 diff --git a/confd/vars/main.yaml b/confd/vars/main.yaml index affd2477549..f8d0bdc5f10 100644 --- a/confd/vars/main.yaml +++ b/confd/vars/main.yaml @@ -171,6 +171,7 @@ kilda_storm_flow_hs_parallelism: 2 kilda_storm_flow_hs_reroute_hub_count_multiplier: 2 kilda_storm_flowhs_workers: 1 kilda_storm_parallelism_workers_count: 1 +kilda_storm_history_parallelism: 2 kilda_storm_flow_monitoring_parallelism: 2 diff --git a/docker/wfm/Dockerfile b/docker/wfm/Dockerfile index bdb606330fc..c535a1c2613 100644 --- a/docker/wfm/Dockerfile +++ b/docker/wfm/Dockerfile @@ -60,6 +60,11 @@ COPY BUILD/flowmonitoring-storm-topology/build.gradle /app/flowmonitor COPY BUILD/flowmonitoring-storm-topology/resources/flowmonitoring-topology.yaml /app/flowmonitoring-storm-topology/topology-definition.yaml +#history +COPY BUILD/history-storm-topology/libs/* /app/history-storm-topology/libs/ +COPY BUILD/history-storm-topology/build.gradle /app/history-storm-topology/ +COPY BUILD/history-storm-topology/resources/history-topology.yaml /app/history-storm-topology/topology-definition.yaml + #isllatency COPY BUILD/isllatency-storm-topology/libs/* /app/isllatency-storm-topology/libs/ diff --git a/docker/wfm/app/Makefile b/docker/wfm/app/Makefile index 3f9a054563c..02f5fdb4849 100644 --- a/docker/wfm/app/Makefile +++ b/docker/wfm/app/Makefile @@ -1,8 +1,8 @@ # WFM topology management -TOPOLOGY := network flowhs flowmonitoring stats isllatency opentsdb portstate nbworker ping reroute swmanager floodlightrouter connecteddevices server42-control -BLUE_TOPOLOGY := network flowhs flowmonitoring stats isllatency opentsdb portstate nbworker ping reroute swmanager floodlightrouter connecteddevices server42-control -GREEN_TOPOLOGY := network flowhs flowmonitoring stats isllatency opentsdb portstate nbworker ping reroute swmanager floodlightrouter connecteddevices server42-control +TOPOLOGY := network flowhs flowmonitoring stats isllatency opentsdb portstate nbworker ping reroute swmanager floodlightrouter connecteddevices server42-control history +BLUE_TOPOLOGY := network flowhs flowmonitoring stats isllatency opentsdb portstate nbworker ping reroute swmanager floodlightrouter connecteddevices server42-control history +GREEN_TOPOLOGY := network flowhs flowmonitoring stats isllatency opentsdb portstate nbworker ping reroute swmanager floodlightrouter connecteddevices server42-control history STORM := /opt/storm/bin/storm diff --git a/docker/zookeeper/bin/create-zknode-and-start.sh b/docker/zookeeper/bin/create-zknode-and-start.sh index 58e063f39fd..d8a802b90f7 100755 --- a/docker/zookeeper/bin/create-zknode-and-start.sh +++ b/docker/zookeeper/bin/create-zknode-and-start.sh @@ -76,6 +76,14 @@ done /opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/connecteddevices/blue/expected_state "" /opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/connecteddevices/blue/build-version "v3r\$i0n" +# add history topology zkNodes +/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/history "" +/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/history/blue "" +/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/history/blue/signal "START" +/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/history/blue/state "" +/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/history/blue/expected_state "" +/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/history/blue/build-version "v3r\$i0n" + # add flowhs topology zkNodes /opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/flowhs "" /opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/flowhs/blue "" diff --git a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/history/model/FlowDumpData.java b/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/history/model/FlowDumpData.java index 6deeb8cf5b1..f466603febc 100644 --- a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/history/model/FlowDumpData.java +++ b/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/history/model/FlowDumpData.java @@ -20,7 +20,7 @@ import org.openkilda.model.MeterId; import org.openkilda.model.PathComputationStrategy; import org.openkilda.model.SwitchId; -import org.openkilda.model.cookie.Cookie; +import org.openkilda.model.cookie.FlowSegmentCookie; import lombok.AllArgsConstructor; import lombok.Builder; @@ -44,8 +44,8 @@ public class FlowDumpData implements Serializable { private int destinationVlan; private int sourceInnerVlan; private int destinationInnerVlan; - private Cookie forwardCookie; - private Cookie reverseCookie; + private FlowSegmentCookie forwardCookie; + private FlowSegmentCookie reverseCookie; private MeterId forwardMeterId; private MeterId reverseMeterId; private String forwardPath; diff --git a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/history/model/FlowHistoryHolder.java b/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/history/model/FlowHistoryHolder.java index 58561eee267..5c6b49d4388 100644 --- a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/history/model/FlowHistoryHolder.java +++ b/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/history/model/FlowHistoryHolder.java @@ -15,14 +15,16 @@ package org.openkilda.wfm.share.history.model; +import org.openkilda.messaging.info.InfoData; + import lombok.Builder; +import lombok.EqualsAndHashCode; import lombok.Value; -import java.io.Serializable; - @Value @Builder -public class FlowHistoryHolder implements Serializable { +@EqualsAndHashCode(callSuper = false) +public class FlowHistoryHolder extends InfoData { private final String taskId; private final FlowDumpData flowDumpData; diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/FlowHsTopology.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/FlowHsTopology.java index 3206852ac61..7706c000626 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/FlowHsTopology.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/FlowHsTopology.java @@ -52,8 +52,8 @@ import org.openkilda.rulemanager.RuleManagerConfig; import org.openkilda.wfm.LaunchEnvironment; import org.openkilda.wfm.kafka.AbstractMessageSerializer; +import org.openkilda.wfm.kafka.MessageSerializer; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.hubandspoke.CoordinatorBolt; import org.openkilda.wfm.share.hubandspoke.CoordinatorSpout; import org.openkilda.wfm.share.hubandspoke.HubBolt; @@ -161,7 +161,7 @@ public StormTopology createTopology() { flowMonitoringTopologyOutput(tb); statsTopologyOutput(tb); - history(tb, persistenceManager); + history(tb); zkSpout(tb); zkBolt(tb); @@ -1028,28 +1028,25 @@ private void statsTopologyOutput(TopologyBuilder topologyBuilder) { Stream.HUB_TO_STATS_TOPOLOGY_SENDER.name()); } - private void history(TopologyBuilder topologyBuilder, PersistenceManager persistenceManager) { - HistoryBolt historyBolt = new HistoryBolt(persistenceManager); - Fields grouping = HistoryBolt.newInputGroupingFields(); - declareBolt(topologyBuilder, historyBolt, ComponentId.HISTORY_BOLT.name()) - .fieldsGrouping(ComponentId.FLOW_CREATE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping) - .fieldsGrouping(ComponentId.FLOW_UPDATE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping) - .fieldsGrouping(ComponentId.FLOW_REROUTE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping) - .fieldsGrouping(ComponentId.FLOW_DELETE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping) - .fieldsGrouping(ComponentId.FLOW_PATH_SWAP_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping) - .fieldsGrouping(ComponentId.FLOW_SWAP_ENDPOINTS_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping) - .fieldsGrouping( - ComponentId.FLOW_CREATE_MIRROR_POINT_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping) - .fieldsGrouping( - ComponentId.FLOW_DELETE_MIRROR_POINT_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping) - .fieldsGrouping( - ComponentId.YFLOW_CREATE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping) - .fieldsGrouping( - ComponentId.YFLOW_UPDATE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping) - .fieldsGrouping( - ComponentId.YFLOW_REROUTE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping) - .fieldsGrouping( - ComponentId.YFLOW_DELETE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping); + private void history(TopologyBuilder topologyBuilder) { + KafkaBolt kafkaBolt = makeKafkaBolt( + topologyConfig.getKafkaHistoryTopic(), MessageSerializer.class); + declareBolt(topologyBuilder, kafkaBolt, ComponentId.HISTORY_BOLT.name()) + .shuffleGrouping(ComponentId.FLOW_CREATE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.FLOW_UPDATE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.FLOW_REROUTE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.FLOW_DELETE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.FLOW_PATH_SWAP_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.FLOW_SWAP_ENDPOINTS_HUB.name(), + Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.FLOW_CREATE_MIRROR_POINT_HUB.name(), + Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.FLOW_DELETE_MIRROR_POINT_HUB.name(), + Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.YFLOW_CREATE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.YFLOW_UPDATE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.YFLOW_REROUTE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.YFLOW_DELETE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name()); } @Override @@ -1158,7 +1155,7 @@ public enum Stream { ROUTER_TO_YFLOW_VALIDATION_HUB, HUB_TO_SPEAKER_WORKER, - HUB_TO_HISTORY_BOLT, + HUB_TO_HISTORY_TOPOLOGY_SENDER, HUB_TO_METRICS_BOLT, SPEAKER_WORKER_TO_HUB_CREATE, diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/FlowHsTopologyConfig.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/FlowHsTopologyConfig.java index 380f995ce6b..60d37678af3 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/FlowHsTopologyConfig.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/FlowHsTopologyConfig.java @@ -51,6 +51,10 @@ default String getKafkaPingTopic() { return getKafkaTopics().getPingTopic(); } + default String getKafkaHistoryTopic() { + return getKafkaTopics().getTopoHistoryTopic(); + } + default String getKafkaFlowHsServer42StormNotifyTopic() { return getKafkaTopics().getFlowHsServer42StormNotifyTopic(); } diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowCreateHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowCreateHubBolt.java index c376370e7a6..990f43cbb01 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowCreateHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowCreateHubBolt.java @@ -16,7 +16,7 @@ package org.openkilda.wfm.topology.flowhs.bolts; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_FLOW_MONITORING_TOPOLOGY_SENDER; -import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_METRICS_BOLT; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_PING_SENDER; @@ -44,7 +44,6 @@ import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; import org.openkilda.wfm.share.flow.resources.FlowResourcesManager; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.hubandspoke.HubBolt; import org.openkilda.wfm.share.utils.KeyProvider; @@ -164,8 +163,10 @@ public void sendNorthboundResponse(@NonNull Message message) { @Override public void sendHistoryUpdate(@NonNull FlowHistoryHolder historyHolder) { - emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple( - historyHolder, getCommandContext())); + InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(), + getCommandContext().getCorrelationId()); + emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(historyHolder.getTaskId(), message)); } @Override @@ -221,7 +222,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); - declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS); + declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_PING_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_SERVER42_CONTROL_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowDeleteHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowDeleteHubBolt.java index d8dbbbabd13..4ca605c7fdd 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowDeleteHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowDeleteHubBolt.java @@ -16,7 +16,7 @@ package org.openkilda.wfm.topology.flowhs.bolts; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_FLOW_MONITORING_TOPOLOGY_SENDER; -import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_METRICS_BOLT; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_PING_SENDER; @@ -41,7 +41,6 @@ import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; import org.openkilda.wfm.share.flow.resources.FlowResourcesManager; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.hubandspoke.HubBolt; import org.openkilda.wfm.share.utils.KeyProvider; @@ -150,8 +149,10 @@ public void sendNorthboundResponse(@NonNull Message message) { @Override public void sendHistoryUpdate(@NonNull FlowHistoryHolder historyHolder) { - emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple( - historyHolder, getCommandContext())); + InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(), + getCommandContext().getCorrelationId()); + emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(historyHolder.getTaskId(), message)); } @Override @@ -209,7 +210,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); - declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS); + declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_PING_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_SERVER42_CONTROL_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointCreateHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointCreateHubBolt.java index 07b8965ad81..7eaf6ebf2fa 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointCreateHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointCreateHubBolt.java @@ -15,7 +15,7 @@ package org.openkilda.wfm.topology.flowhs.bolts; -import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_SPEAKER_WORKER; import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_PAYLOAD; @@ -25,6 +25,7 @@ import org.openkilda.floodlight.api.response.SpeakerFlowSegmentResponse; import org.openkilda.messaging.Message; import org.openkilda.messaging.command.flow.FlowMirrorPointCreateRequest; +import org.openkilda.messaging.info.InfoMessage; import org.openkilda.pce.AvailableNetworkFactory; import org.openkilda.pce.PathComputer; import org.openkilda.pce.PathComputerConfig; @@ -33,7 +34,6 @@ import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; import org.openkilda.wfm.share.flow.resources.FlowResourcesManager; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.hubandspoke.HubBolt; import org.openkilda.wfm.share.utils.KeyProvider; @@ -140,8 +140,10 @@ public void sendNorthboundResponse(Message message) { @Override public void sendHistoryUpdate(FlowHistoryHolder historyHolder) { - emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple( - historyHolder, getCommandContext())); + InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(), + getCommandContext().getCorrelationId()); + emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(historyHolder.getTaskId(), message)); } @Override @@ -161,7 +163,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); - declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS); + declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), new Fields(ZooKeeperBolt.FIELD_ID_STATE, ZooKeeperBolt.FIELD_ID_CONTEXT)); } diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointDeleteHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointDeleteHubBolt.java index 09eecd882d6..f47883b96ff 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointDeleteHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointDeleteHubBolt.java @@ -15,7 +15,7 @@ package org.openkilda.wfm.topology.flowhs.bolts; -import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_SPEAKER_WORKER; import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_PAYLOAD; @@ -25,11 +25,11 @@ import org.openkilda.floodlight.api.response.SpeakerFlowSegmentResponse; import org.openkilda.messaging.Message; import org.openkilda.messaging.command.flow.FlowMirrorPointDeleteRequest; +import org.openkilda.messaging.info.InfoMessage; import org.openkilda.persistence.PersistenceManager; import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; import org.openkilda.wfm.share.flow.resources.FlowResourcesManager; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.hubandspoke.HubBolt; import org.openkilda.wfm.share.utils.KeyProvider; @@ -127,8 +127,10 @@ public void sendNorthboundResponse(Message message) { @Override public void sendHistoryUpdate(FlowHistoryHolder historyHolder) { - emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple( - historyHolder, getCommandContext())); + InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(), + getCommandContext().getCorrelationId()); + emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(historyHolder.getTaskId(), message)); } @Override @@ -148,7 +150,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); - declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS); + declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), new Fields(ZooKeeperBolt.FIELD_ID_STATE, ZooKeeperBolt.FIELD_ID_CONTEXT)); } diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowPathSwapHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowPathSwapHubBolt.java index eebafdc93c5..52357a473a0 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowPathSwapHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowPathSwapHubBolt.java @@ -16,7 +16,7 @@ package org.openkilda.wfm.topology.flowhs.bolts; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_FLOW_MONITORING_TOPOLOGY_SENDER; -import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_METRICS_BOLT; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_PING_SENDER; @@ -41,7 +41,6 @@ import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; import org.openkilda.wfm.share.flow.resources.FlowResourcesManager; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.hubandspoke.HubBolt; import org.openkilda.wfm.share.utils.KeyProvider; @@ -164,8 +163,10 @@ public void sendNorthboundResponse(Message message) { @Override public void sendHistoryUpdate(FlowHistoryHolder historyHolder) { - emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple( - historyHolder, getCommandContext())); + InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(), + getCommandContext().getCorrelationId()); + emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(historyHolder.getTaskId(), message)); } @Override @@ -190,7 +191,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); - declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS); + declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_PING_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_REROUTE_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowRerouteHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowRerouteHubBolt.java index 163838c33a9..f2b54009bb3 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowRerouteHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowRerouteHubBolt.java @@ -16,7 +16,7 @@ package org.openkilda.wfm.topology.flowhs.bolts; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_FLOW_MONITORING_TOPOLOGY_SENDER; -import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_METRICS_BOLT; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_PING_SENDER; @@ -46,7 +46,6 @@ import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; import org.openkilda.wfm.share.flow.resources.FlowResourcesManager; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.hubandspoke.HubBolt; import org.openkilda.wfm.share.utils.KeyProvider; @@ -165,8 +164,10 @@ public void sendNorthboundResponse(@NonNull Message message) { @Override public void sendHistoryUpdate(@NonNull FlowHistoryHolder historyHolder) { - emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple( - historyHolder, getCommandContext())); + InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(), + getCommandContext().getCorrelationId()); + emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(historyHolder.getTaskId(), message)); } @Override @@ -229,7 +230,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_REROUTE_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); - declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS); + declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_PING_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), new Fields(ZooKeeperBolt.FIELD_ID_STATE, ZooKeeperBolt.FIELD_ID_CONTEXT)); diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowSwapEndpointsHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowSwapEndpointsHubBolt.java index 008018f7b2f..8cb903ce9ff 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowSwapEndpointsHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowSwapEndpointsHubBolt.java @@ -15,7 +15,7 @@ package org.openkilda.wfm.topology.flowhs.bolts; -import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.SWAP_ENDPOINTS_HUB_TO_ROUTER_BOLT; import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_KEY; @@ -28,10 +28,10 @@ import org.openkilda.messaging.command.CommandMessage; import org.openkilda.messaging.command.flow.FlowRequest; import org.openkilda.messaging.command.flow.SwapFlowEndpointRequest; +import org.openkilda.messaging.info.InfoMessage; import org.openkilda.persistence.PersistenceManager; import org.openkilda.wfm.CommandContext; import org.openkilda.wfm.error.PipelineException; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.hubandspoke.HubBolt; import org.openkilda.wfm.share.utils.KeyProvider; @@ -40,6 +40,7 @@ import org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream; import org.openkilda.wfm.topology.flowhs.service.FlowSwapEndpointsHubCarrier; import org.openkilda.wfm.topology.flowhs.service.FlowSwapEndpointsHubService; +import org.openkilda.wfm.topology.utils.MessageKafkaTranslator; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; @@ -124,8 +125,10 @@ public void sendSpeakerRequest(SpeakerRequest command) { @Override public void sendHistoryUpdate(FlowHistoryHolder historyHolder) { - emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple( - historyHolder, getCommandContext())); + InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(), + getCommandContext().getCorrelationId()); + emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(historyHolder.getTaskId(), message)); } @Override @@ -144,7 +147,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(SWAP_ENDPOINTS_HUB_TO_ROUTER_BOLT.name(), STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), STREAM_FIELDS); - declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS); + declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), new Fields(ZooKeeperBolt.FIELD_ID_STATE, ZooKeeperBolt.FIELD_ID_CONTEXT)); } diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowUpdateHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowUpdateHubBolt.java index c305e58d9c0..30142554a48 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowUpdateHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowUpdateHubBolt.java @@ -16,7 +16,7 @@ package org.openkilda.wfm.topology.flowhs.bolts; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_FLOW_MONITORING_TOPOLOGY_SENDER; -import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_METRICS_BOLT; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_PING_SENDER; @@ -50,7 +50,6 @@ import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; import org.openkilda.wfm.share.flow.resources.FlowResourcesManager; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.hubandspoke.HubBolt; import org.openkilda.wfm.share.utils.KeyProvider; @@ -245,8 +244,10 @@ public void sendHubSwapEndpointsResponse(@NonNull Message message) { @Override public void sendHistoryUpdate(@NonNull FlowHistoryHolder historyHolder) { - emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple( - historyHolder, getCommandContext())); + InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(), + getCommandContext().getCorrelationId()); + emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(historyHolder.getTaskId(), message)); } @Override @@ -267,7 +268,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(UPDATE_HUB_TO_SWAP_ENDPOINTS_HUB.name(), MessageKafkaTranslator.STREAM_FIELDS); - declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS); + declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_PING_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_SERVER42_CONTROL_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowCreateHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowCreateHubBolt.java index 4c563dabe35..03759b822bf 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowCreateHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowCreateHubBolt.java @@ -16,7 +16,7 @@ package org.openkilda.wfm.topology.flowhs.bolts; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_FLOW_MONITORING_TOPOLOGY_SENDER; -import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_METRICS_BOLT; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_PING_SENDER; @@ -47,7 +47,6 @@ import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; import org.openkilda.wfm.share.flow.resources.FlowResourcesManager; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.hubandspoke.HubBolt; import org.openkilda.wfm.share.utils.KeyProvider; @@ -199,8 +198,10 @@ public void sendNorthboundResponse(@NonNull Message message) { @Override public void sendHistoryUpdate(@NonNull FlowHistoryHolder historyHolder) { - emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple( - historyHolder, getCommandContext())); + InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(), + getCommandContext().getCorrelationId()); + emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(historyHolder.getTaskId(), message)); } @Override @@ -256,7 +257,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); - declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS); + declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_PING_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_SERVER42_CONTROL_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowDeleteHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowDeleteHubBolt.java index a991d50707e..e6704ddd6e2 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowDeleteHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowDeleteHubBolt.java @@ -16,7 +16,7 @@ package org.openkilda.wfm.topology.flowhs.bolts; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_FLOW_MONITORING_TOPOLOGY_SENDER; -import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_METRICS_BOLT; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_PING_SENDER; @@ -44,7 +44,6 @@ import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; import org.openkilda.wfm.share.flow.resources.FlowResourcesManager; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.hubandspoke.HubBolt; import org.openkilda.wfm.share.utils.KeyProvider; @@ -169,8 +168,10 @@ public void sendNorthboundResponse(@NonNull Message message) { @Override public void sendHistoryUpdate(@NonNull FlowHistoryHolder historyHolder) { - emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple( - historyHolder, getCommandContext())); + InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(), + getCommandContext().getCorrelationId()); + emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(historyHolder.getTaskId(), message)); } @Override @@ -227,7 +228,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); - declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS); + declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_PING_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_SERVER42_CONTROL_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowRerouteHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowRerouteHubBolt.java index 03deef49d65..1da99659505 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowRerouteHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowRerouteHubBolt.java @@ -16,7 +16,7 @@ package org.openkilda.wfm.topology.flowhs.bolts; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_FLOW_MONITORING_TOPOLOGY_SENDER; -import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_METRICS_BOLT; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_PING_SENDER; @@ -50,7 +50,6 @@ import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; import org.openkilda.wfm.share.flow.resources.FlowResourcesManager; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.hubandspoke.HubBolt; import org.openkilda.wfm.share.utils.KeyProvider; @@ -192,8 +191,10 @@ public void sendNorthboundResponse(@NonNull Message message) { @Override public void sendHistoryUpdate(@NonNull FlowHistoryHolder historyHolder) { - emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple( - historyHolder, getCommandContext())); + InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(), + getCommandContext().getCorrelationId()); + emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(historyHolder.getTaskId(), message)); } @Override @@ -266,7 +267,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); - declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS); + declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_PING_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_SERVER42_CONTROL_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowUpdateHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowUpdateHubBolt.java index 73877c388ee..343ea253009 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowUpdateHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/YFlowUpdateHubBolt.java @@ -16,7 +16,7 @@ package org.openkilda.wfm.topology.flowhs.bolts; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_FLOW_MONITORING_TOPOLOGY_SENDER; -import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_METRICS_BOLT; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_PING_SENDER; @@ -51,7 +51,6 @@ import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; import org.openkilda.wfm.share.flow.resources.FlowResourcesManager; -import org.openkilda.wfm.share.history.bolt.HistoryBolt; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.hubandspoke.HubBolt; import org.openkilda.wfm.share.utils.KeyProvider; @@ -206,8 +205,10 @@ public void sendNorthboundResponse(@NonNull Message message) { @Override public void sendHistoryUpdate(@NonNull FlowHistoryHolder historyHolder) { - emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple( - historyHolder, getCommandContext())); + InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(), + getCommandContext().getCorrelationId()); + emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(historyHolder.getTaskId(), message)); } @Override @@ -268,7 +269,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); - declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS); + declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_PING_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_SERVER42_CONTROL_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), diff --git a/src-java/history-topology/README.md b/src-java/history-topology/README.md new file mode 100644 index 00000000000..d6fa8fadeb9 --- /dev/null +++ b/src-java/history-topology/README.md @@ -0,0 +1,9 @@ +# WFM (WorkFlow Manager) - History Topology + +This sub-project holds the Storm topology "History" which is used +to store history data into DataBase. + +## Deployment, Configuration, Testing and Debugging tips + +The guidelines, recommendations and commands from `base-topology/README.md` +are applicable to this topology. diff --git a/src-java/history-topology/history-storm-topology/.gitignore b/src-java/history-topology/history-storm-topology/.gitignore new file mode 100644 index 00000000000..15f8249c1bf --- /dev/null +++ b/src-java/history-topology/history-storm-topology/.gitignore @@ -0,0 +1 @@ +/src/main/resources/history-topology.yaml diff --git a/src-java/history-topology/history-storm-topology/build.gradle b/src-java/history-topology/history-storm-topology/build.gradle new file mode 100644 index 00000000000..59a01e8dfcb --- /dev/null +++ b/src-java/history-topology/history-storm-topology/build.gradle @@ -0,0 +1,39 @@ +plugins { + id 'io.freefair.aspectj.post-compile-weaving' + id 'com.github.johnrengelman.shadow' +} + +compileJava.ajc.options.compilerArgs += '-showWeaveInfo' +compileJava.ajc.options.compilerArgs += '-verbose' + +description = 'History Storm Topology' +dependencies { + implementation project(':base-storm-topology') + implementation project(':blue-green') + runtimeOnly project(path: ':base-storm-topology', configuration: 'releaseArtifacts') + runtimeOnly project(':kilda-persistence-orientdb') + runtimeOnly project(':kilda-persistence-hibernate') + + compileOnly('org.apache.storm:storm-core') + + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok-mapstruct-binding' +} + +jar { + manifest { + attributes 'Main-Class': 'org.openkilda.wfm.topology.history.HistoryTopology' + } + + classifier = 'original' +} + +shadowJar { + classifier = '' + mergeServiceFiles() +} + +artifacts { + archives shadowJar +} diff --git a/src-java/history-topology/history-storm-topology/src/checkstyle/checkstyle-suppressions.xml b/src-java/history-topology/history-storm-topology/src/checkstyle/checkstyle-suppressions.xml new file mode 100644 index 00000000000..2a127580be3 --- /dev/null +++ b/src-java/history-topology/history-storm-topology/src/checkstyle/checkstyle-suppressions.xml @@ -0,0 +1,9 @@ + + + + + + + \ No newline at end of file diff --git a/src-java/history-topology/history-storm-topology/src/main/java/org/openkilda/wfm/topology/history/HistoryTopology.java b/src-java/history-topology/history-storm-topology/src/main/java/org/openkilda/wfm/topology/history/HistoryTopology.java new file mode 100644 index 00000000000..ee7ce81b3b5 --- /dev/null +++ b/src-java/history-topology/history-storm-topology/src/main/java/org/openkilda/wfm/topology/history/HistoryTopology.java @@ -0,0 +1,94 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.wfm.topology.history; + +import org.openkilda.persistence.PersistenceManager; +import org.openkilda.wfm.LaunchEnvironment; +import org.openkilda.wfm.share.zk.ZkStreams; +import org.openkilda.wfm.share.zk.ZooKeeperBolt; +import org.openkilda.wfm.share.zk.ZooKeeperSpout; +import org.openkilda.wfm.topology.AbstractTopology; +import org.openkilda.wfm.topology.history.bolts.HistoryBolt; +import org.openkilda.wfm.topology.utils.KafkaRecordTranslator; + +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; + +public class HistoryTopology extends AbstractTopology { + public static final String HISTORY_SPOUT_ID = "history-spout"; + public static final String HISTORY_BOLT_ID = "history-bolt"; + + public HistoryTopology(LaunchEnvironment env) { + super(env, "history-topology", HistoryTopologyConfig.class); + } + + @Override + public StormTopology createTopology() { + TopologyBuilder builder = new TopologyBuilder(); + PersistenceManager persistenceManager = new PersistenceManager(configurationProvider); + + createZkSpout(builder); + + createSpout(builder); + createHistoryBolt(builder, persistenceManager); + + createZkBolt(builder); + + return builder.createTopology(); + } + + private void createZkSpout(TopologyBuilder builder) { + ZooKeeperSpout zooKeeperSpout = new ZooKeeperSpout(getConfig().getBlueGreenMode(), getZkTopoName(), + getZookeeperConfig()); + declareSpout(builder, zooKeeperSpout, ZooKeeperSpout.SPOUT_ID); + } + + private void createHistoryBolt(TopologyBuilder builder, PersistenceManager persistenceManager) { + HistoryBolt historyBolt = new HistoryBolt(persistenceManager, ZooKeeperSpout.SPOUT_ID); + declareBolt(builder, historyBolt, HISTORY_BOLT_ID) + .fieldsGrouping(HISTORY_SPOUT_ID, new Fields(KafkaRecordTranslator.FIELD_ID_KEY)) + .allGrouping(ZooKeeperSpout.SPOUT_ID); + } + + private void createSpout(TopologyBuilder builder) { + declareKafkaSpout(builder, topologyConfig.getKafkaTopoHistoryTopic(), HISTORY_SPOUT_ID); + } + + private void createZkBolt(TopologyBuilder builder) { + ZooKeeperBolt zooKeeperBolt = new ZooKeeperBolt(getConfig().getBlueGreenMode(), getZkTopoName(), + getZookeeperConfig(), getBoltInstancesCount(HISTORY_BOLT_ID)); + declareBolt(builder, zooKeeperBolt, ZooKeeperBolt.BOLT_ID) + .allGrouping(HISTORY_BOLT_ID, ZkStreams.ZK.toString()); + } + + @Override + protected String getZkTopoName() { + return "history"; + } + + /** + * Main method to run topology. + */ + public static void main(String[] args) { + try { + LaunchEnvironment env = new LaunchEnvironment(args); + (new HistoryTopology(env)).setup(); + } catch (Exception e) { + System.exit(handleLaunchException(e)); + } + } +} diff --git a/src-java/history-topology/history-storm-topology/src/main/java/org/openkilda/wfm/topology/history/HistoryTopologyConfig.java b/src-java/history-topology/history-storm-topology/src/main/java/org/openkilda/wfm/topology/history/HistoryTopologyConfig.java new file mode 100644 index 00000000000..e7f3cacc9da --- /dev/null +++ b/src-java/history-topology/history-storm-topology/src/main/java/org/openkilda/wfm/topology/history/HistoryTopologyConfig.java @@ -0,0 +1,28 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.wfm.topology.history; + +import org.openkilda.wfm.topology.AbstractTopologyConfig; + +import com.sabre.oss.conf4j.annotation.Configuration; + +@Configuration +public interface HistoryTopologyConfig extends AbstractTopologyConfig { + + default String getKafkaTopoHistoryTopic() { + return getKafkaTopics().getTopoHistoryTopic(); + } +} diff --git a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/history/bolt/HistoryBolt.java b/src-java/history-topology/history-storm-topology/src/main/java/org/openkilda/wfm/topology/history/bolts/HistoryBolt.java similarity index 52% rename from src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/history/bolt/HistoryBolt.java rename to src-java/history-topology/history-storm-topology/src/main/java/org/openkilda/wfm/topology/history/bolts/HistoryBolt.java index 24277531ada..1a77722e25f 100644 --- a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/history/bolt/HistoryBolt.java +++ b/src-java/history-topology/history-storm-topology/src/main/java/org/openkilda/wfm/topology/history/bolts/HistoryBolt.java @@ -1,4 +1,4 @@ -/* Copyright 2019 Telstra Open Source +/* Copyright 2022 Telstra Open Source * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,36 +13,29 @@ * limitations under the License. */ -package org.openkilda.wfm.share.history.bolt; +package org.openkilda.wfm.topology.history.bolts; +import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_PAYLOAD; + +import org.openkilda.messaging.Message; +import org.openkilda.messaging.info.InfoData; +import org.openkilda.messaging.info.InfoMessage; import org.openkilda.persistence.PersistenceManager; import org.openkilda.wfm.AbstractBolt; -import org.openkilda.wfm.CommandContext; import org.openkilda.wfm.share.history.model.FlowHistoryHolder; import org.openkilda.wfm.share.history.service.HistoryService; +import org.openkilda.wfm.share.zk.ZkStreams; +import org.openkilda.wfm.share.zk.ZooKeeperBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; public class HistoryBolt extends AbstractBolt { - public static final String FIELD_ID_PAYLOAD = "payload"; - public static final String FIELD_ID_TASK_ID = "task-id"; - public static final Fields INPUT_FIELDS = new Fields(FIELD_ID_PAYLOAD, FIELD_ID_TASK_ID, FIELD_ID_CONTEXT); - private transient HistoryService historyService; - public static Fields newInputGroupingFields() { - return new Fields(FIELD_ID_TASK_ID); - } - - public static Values newInputTuple(FlowHistoryHolder payload, CommandContext context) { - return new Values(payload, payload.getTaskId(), context); - } - - public HistoryBolt(PersistenceManager persistenceManager) { - super(persistenceManager); + public HistoryBolt(PersistenceManager persistenceManager, String lifeCycleEventSourceComponent) { + super(persistenceManager, lifeCycleEventSourceComponent); } @Override @@ -52,16 +45,24 @@ protected void init() { @Override protected void handleInput(Tuple input) throws Exception { - Object payload = input.getValueByField(FIELD_ID_PAYLOAD); - if (payload instanceof FlowHistoryHolder) { - historyService.store((FlowHistoryHolder) payload); - } else { - log.error("Skip undefined payload: {}", payload); + if (active) { + Message message = pullValue(input, FIELD_ID_PAYLOAD, Message.class); + if (message instanceof InfoMessage) { + InfoData payload = ((InfoMessage) message).getData(); + if (payload instanceof FlowHistoryHolder) { + historyService.store((FlowHistoryHolder) payload); + } else { + unhandledInput(input); + } + } else { + unhandledInput(input); + } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - + declarer.declareStream(ZkStreams.ZK.toString(), new Fields(ZooKeeperBolt.FIELD_ID_STATE, + ZooKeeperBolt.FIELD_ID_CONTEXT)); } } diff --git a/src-java/history-topology/history-storm-topology/src/main/resources/history-topology.yaml.example b/src-java/history-topology/history-storm-topology/src/main/resources/history-topology.yaml.example new file mode 100644 index 00000000000..ed561d46f15 --- /dev/null +++ b/src-java/history-topology/history-storm-topology/src/main/resources/history-topology.yaml.example @@ -0,0 +1,8 @@ +# topology configuration +# config: + +# spout definitions +# spouts: + +# bolt definitions +# bolts: diff --git a/src-java/kilda-configuration/src/main/java/org/openkilda/config/KafkaTopicsConfig.java b/src-java/kilda-configuration/src/main/java/org/openkilda/config/KafkaTopicsConfig.java index cc864bf3441..800902047b2 100644 --- a/src-java/kilda-configuration/src/main/java/org/openkilda/config/KafkaTopicsConfig.java +++ b/src-java/kilda-configuration/src/main/java/org/openkilda/config/KafkaTopicsConfig.java @@ -185,6 +185,10 @@ public interface KafkaTopicsConfig { @Default("kilda.floodlight.connected.devices.priv") String getTopoConnectedDevicesRegionTopic(); + @Key("topo.history.storm") + @Default("kilda.topo.history.storm.priv") + String getTopoHistoryTopic(); + // TODO(surabujin): check usage @Key("grpc.response") @Default("kilda.grpc.response.priv") diff --git a/src-java/kilda-model/src/main/java/org/openkilda/model/history/FlowEventDump.java b/src-java/kilda-model/src/main/java/org/openkilda/model/history/FlowEventDump.java index 034710f3ad4..e0437ad4a32 100644 --- a/src-java/kilda-model/src/main/java/org/openkilda/model/history/FlowEventDump.java +++ b/src-java/kilda-model/src/main/java/org/openkilda/model/history/FlowEventDump.java @@ -21,7 +21,7 @@ import org.openkilda.model.MeterId; import org.openkilda.model.PathComputationStrategy; import org.openkilda.model.SwitchId; -import org.openkilda.model.cookie.Cookie; +import org.openkilda.model.cookie.FlowSegmentCookie; import com.esotericsoftware.kryo.DefaultSerializer; import com.esotericsoftware.kryo.serializers.BeanSerializer; @@ -151,13 +151,13 @@ public interface FlowEventDumpData { void setIgnoreBandwidth(boolean ignoreBandwidth); - Cookie getForwardCookie(); + FlowSegmentCookie getForwardCookie(); - void setForwardCookie(Cookie forwardCookie); + void setForwardCookie(FlowSegmentCookie forwardCookie); - Cookie getReverseCookie(); + FlowSegmentCookie getReverseCookie(); - void setReverseCookie(Cookie reverseCookie); + void setReverseCookie(FlowSegmentCookie reverseCookie); SwitchId getSourceSwitch(); @@ -264,8 +264,8 @@ static final class FlowEventDumpDataImpl implements FlowEventDumpData, Serializa String type; long bandwidth; boolean ignoreBandwidth; - Cookie forwardCookie; - Cookie reverseCookie; + FlowSegmentCookie forwardCookie; + FlowSegmentCookie reverseCookie; SwitchId sourceSwitch; SwitchId destinationSwitch; int sourcePort; diff --git a/src-java/kilda-persistence-hibernate/src/main/java/org/openkilda/persistence/hibernate/entities/history/HibernateFlowEventDump.java b/src-java/kilda-persistence-hibernate/src/main/java/org/openkilda/persistence/hibernate/entities/history/HibernateFlowEventDump.java index b156e2c01c0..653a0ede37a 100644 --- a/src-java/kilda-persistence-hibernate/src/main/java/org/openkilda/persistence/hibernate/entities/history/HibernateFlowEventDump.java +++ b/src-java/kilda-persistence-hibernate/src/main/java/org/openkilda/persistence/hibernate/entities/history/HibernateFlowEventDump.java @@ -20,7 +20,7 @@ import org.openkilda.model.MeterId; import org.openkilda.model.PathComputationStrategy; import org.openkilda.model.SwitchId; -import org.openkilda.model.cookie.Cookie; +import org.openkilda.model.cookie.FlowSegmentCookie; import org.openkilda.model.history.FlowEventDump.FlowEventDumpData; import org.openkilda.persistence.hibernate.entities.EntityBase; import org.openkilda.persistence.hibernate.entities.JsonPayloadBase; @@ -107,8 +107,8 @@ public static class FlowEventDumpUnstructured extends JsonPayloadBase { private String flowId; private long bandwidth; private boolean ignoreBandwidth; - private Cookie forwardCookie; - private Cookie reverseCookie; + private FlowSegmentCookie forwardCookie; + private FlowSegmentCookie reverseCookie; private SwitchId sourceSwitch; private int sourcePort; diff --git a/src-java/kilda-persistence-tinkerpop/src/main/java/org/openkilda/persistence/ferma/frames/FlowEventDumpFrame.java b/src-java/kilda-persistence-tinkerpop/src/main/java/org/openkilda/persistence/ferma/frames/FlowEventDumpFrame.java index c286cf7c049..7a82f6ef1a8 100644 --- a/src-java/kilda-persistence-tinkerpop/src/main/java/org/openkilda/persistence/ferma/frames/FlowEventDumpFrame.java +++ b/src-java/kilda-persistence-tinkerpop/src/main/java/org/openkilda/persistence/ferma/frames/FlowEventDumpFrame.java @@ -20,7 +20,7 @@ import org.openkilda.model.MeterId; import org.openkilda.model.PathComputationStrategy; import org.openkilda.model.SwitchId; -import org.openkilda.model.cookie.Cookie; +import org.openkilda.model.cookie.FlowSegmentCookie; import org.openkilda.model.history.FlowEventDump.FlowEventDumpData; import org.openkilda.persistence.ferma.frames.converters.Convert; import org.openkilda.persistence.ferma.frames.converters.FlowEncapsulationTypeConverter; @@ -80,22 +80,22 @@ public abstract class FlowEventDumpFrame extends KildaBaseVertexFrame implements @Override @Property("forward_cookie") @Convert(FlowSegmentCookieConverter.class) - public abstract Cookie getForwardCookie(); + public abstract FlowSegmentCookie getForwardCookie(); @Override @Property("forward_cookie") @Convert(FlowSegmentCookieConverter.class) - public abstract void setForwardCookie(Cookie forwardCookie); + public abstract void setForwardCookie(FlowSegmentCookie forwardCookie); @Override @Property("reverse_cookie") @Convert(FlowSegmentCookieConverter.class) - public abstract Cookie getReverseCookie(); + public abstract FlowSegmentCookie getReverseCookie(); @Override @Property("reverse_cookie") @Convert(FlowSegmentCookieConverter.class) - public abstract void setReverseCookie(Cookie reverseCookie); + public abstract void setReverseCookie(FlowSegmentCookie reverseCookie); @Override @Property("src_switch") diff --git a/src-java/settings.gradle b/src-java/settings.gradle index cc96df0830d..939b31d11f7 100644 --- a/src-java/settings.gradle +++ b/src-java/settings.gradle @@ -15,6 +15,7 @@ include 'flowmonitoring-storm-topology' include 'flowmonitoring-messaging' include 'grpc-api' include 'grpc-service' +include 'history-storm-topology' include 'isllatency-messaging' include 'isllatency-storm-topology' include 'kilda-configuration' @@ -70,6 +71,7 @@ project(':flowmonitoring-storm-topology').projectDir = "$rootDir/flowmonitoring- project(':flowmonitoring-messaging').projectDir = "$rootDir/flowmonitoring-topology/flowmonitoring-messaging" as File project(':grpc-api').projectDir = "$rootDir/grpc-speaker/grpc-api" as File project(':grpc-service').projectDir = "$rootDir/grpc-speaker/grpc-service" as File +project(':history-storm-topology').projectDir = "$rootDir/history-topology/history-storm-topology" as File project(':isllatency-messaging').projectDir = "$rootDir/isllatency-topology/isllatency-messaging" as File project(':isllatency-storm-topology').projectDir = "$rootDir/isllatency-topology/isllatency-storm-topology" as File project(':nbworker-messaging').projectDir = "$rootDir/nbworker-topology/nbworker-messaging" as File From 1530d3d8da060c956bc69e09ab08bbe0d8af85e8 Mon Sep 17 00:00:00 2001 From: Violetta Shakirova Date: Mon, 21 Mar 2022 19:47:20 +0400 Subject: [PATCH 6/7] GRPC stub: return errors on invalid requests Close #4728 --- src-python/grpc-stub/server.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src-python/grpc-stub/server.py b/src-python/grpc-stub/server.py index f3e3759259c..48729c430c0 100644 --- a/src-python/grpc-stub/server.py +++ b/src-python/grpc-stub/server.py @@ -47,6 +47,16 @@ def SetLoginDetails(self, request, context): return noviflow_pb2.CliReply(reply_status=0) def SetConfigLogicalPort(self, request, context): + if request.logicalportno not in range(100, 63488): + return noviflow_pb2.CliReply(reply_status=185) + + if len(request.portno) > len(set(request.portno)): + return noviflow_pb2.CliReply(reply_status=190) + + for port in request.portno: + if port not in range(1, 100): + return noviflow_pb2.CliReply(reply_status=31) + if request.logicalportno in self.storage.logical_ports: port = self.storage.logical_ports[request.logicalportno] if port.type != request.logicalporttype: From 3a609de2728198d60be6b82153ed61cf2cad8bf4 Mon Sep 17 00:00:00 2001 From: Dmitry Poltavets Date: Wed, 23 Mar 2022 10:37:39 +0400 Subject: [PATCH 7/7] Updated changelog according to release 1.118.0 --- CHANGELOG.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e9151794a4b..3c648ec8fda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,27 @@ # Changelog +## v1.118.0 (24/03/2022) + +### Features: +- [#4730](https://github.com/telstra/open-kilda/pull/4730) Added HistoryTopology to save flow history events +- [#4733](https://github.com/telstra/open-kilda/pull/4733) GRPC stub: return errors on invalid requests (Issue: [#4728](https://github.com/telstra/open-kilda/issues/4728)) + +### Bug Fixes: +- [#4724](https://github.com/telstra/open-kilda/pull/4724) Fixed inner vlan change in case of partial update of default flow (Issue: [#4411](https://github.com/telstra/open-kilda/issues/4411)) [**storm-topologies**] +- [#4725](https://github.com/telstra/open-kilda/pull/4725) Fixed lag creation with duplicate port numbers (Issue: [#4644](https://github.com/telstra/open-kilda/issues/4644)) [**storm-topologies**] + +### Improvements: +- [#4679](https://github.com/telstra/open-kilda/pull/4679) Bumped version of github api and jwt for changelog generator +- [#4715](https://github.com/telstra/open-kilda/pull/4715) DeleteAllFlowsV1 endpoint correlation-id [**northbound**] + + +For the complete list of changes, check out [the commit log](https://github.com/telstra/open-kilda/compare/v1.117.0...v1.118.0). + +### Affected Components: +grpc, nbworker, history, nb, flow-hs, swmanager + +--- + ## v1.117.0 (21/03/2022) ### Features: