diff --git a/CHANGELOG.md b/CHANGELOG.md index d0a150898ac..f1689c2ab64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,31 @@ # Changelog +## v1.113.0 (25/01/2022) + +### Features: +- [#4651](https://github.com/telstra/open-kilda/pull/4651) Y Flow Ping: API [**northbound**][**storm-topologies**] +- [#4663](https://github.com/telstra/open-kilda/pull/4663) Y Flow Ping: manual ping [**storm-topologies**] + +### Bug Fixes: +- [#4664](https://github.com/telstra/open-kilda/pull/4664) Changed Kafka and Zookeeper ports in unit tests (Issue: [#4529](https://github.com/telstra/open-kilda/issues/4529)) [**tests**] +- [#4669](https://github.com/telstra/open-kilda/pull/4669) Fix `deepCopy` method for YFlow + +### Improvements: +- [#4672](https://github.com/telstra/open-kilda/pull/4672) [test] ignore latency related fields in partUpdateSpec [**tests**] +- [#4535](https://github.com/telstra/open-kilda/pull/4535) Cleanup mappers code [**northbound**][**storm-topologies**] +- [#4659](https://github.com/telstra/open-kilda/pull/4659) [test] fix FlowRulesSpec(find sharedRule) [**tests**] +- [#4662](https://github.com/telstra/open-kilda/pull/4662) [test] fix "cleanup" in partialUpdateSpec [**tests**] +- [#4665](https://github.com/telstra/open-kilda/pull/4665) Remove unused classes from Ping topology [**storm-topologies**] +- [#4670](https://github.com/telstra/open-kilda/pull/4670) [test] check that flow is up after updating in swapEndpSpec [**tests**] + + +For the complete list of changes, check out [the commit log](https://github.com/telstra/open-kilda/compare/v1.112.0...v1.113.0). + +### Affected Components: +flow-hs, flow-monitor, nb, ping, nbworker, swmanager + +--- + ## v1.112.0 (12/01/2022) ### Features: diff --git a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/bolt/KafkaDecoder.java b/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/bolt/KafkaDecoder.java deleted file mode 100644 index e425e09a120..00000000000 --- a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/bolt/KafkaDecoder.java +++ /dev/null @@ -1,74 +0,0 @@ -/* Copyright 2018 Telstra Open Source - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.openkilda.wfm.share.bolt; - -import org.openkilda.messaging.Message; -import org.openkilda.messaging.Utils; -import org.openkilda.wfm.AbstractBolt; -import org.openkilda.wfm.CommandContext; -import org.openkilda.wfm.error.JsonDecodeException; -import org.openkilda.wfm.error.PipelineException; -import org.openkilda.wfm.topology.utils.KafkaRecordTranslator; - -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -import java.io.IOException; - -/** - * Bolt which transforms received json into message object and command context with correlation id. - * - * @deprecated this decoder is redundant because deserialization may be performed by - * {@link org.openkilda.wfm.kafka.MessageDeserializer}. - */ -@Deprecated -public abstract class KafkaDecoder extends AbstractBolt { - public static final String FIELD_ID_INPUT = "input"; - - public static final Fields STREAM_FIELDS = new Fields(FIELD_ID_INPUT, FIELD_ID_CONTEXT); - - @Override - protected void handleInput(Tuple input) throws Exception { - String json = pullPayload(input); - Message message = decode(json); - CommandContext commandContext = new CommandContext(message); - - Values output = new Values(message, commandContext); - getOutput().emit(input, output); - } - - private String pullPayload(Tuple input) throws PipelineException { - return pullValue(input, KafkaRecordTranslator.FIELD_ID_PAYLOAD, String.class); - } - - private Message decode(String json) throws JsonDecodeException { - Message value; - - try { - value = Utils.MAPPER.readValue(json, Message.class); - } catch (IOException e) { - throw new JsonDecodeException(Message.class, json, e); - } - return value; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer outputManager) { - outputManager.declare(STREAM_FIELDS); - } -} diff --git a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/FlowMapper.java b/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/FlowMapper.java index 1b179315346..e16e0cc4753 100644 --- a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/FlowMapper.java +++ b/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/FlowMapper.java @@ -50,7 +50,7 @@ /** * Convert {@link Flow} to {@link FlowDto} and back. */ -@Mapper(uses = {FlowPathMapper.class, DetectConnectedDevicesMapper.class}, imports = {FlowStatusDetails.class}) +@Mapper(uses = {FlowPathMapper.class, DetectConnectedDevicesMapper.class}) public abstract class FlowMapper { public static final FlowMapper INSTANCE = Mappers.getMapper(FlowMapper.class); @@ -61,15 +61,12 @@ public abstract class FlowMapper { @Mapping(source = "destPort", target = "destinationPort") @Mapping(source = "destVlan", target = "destinationVlan") @Mapping(source = "destInnerVlan", target = "destinationInnerVlan") - @Mapping(target = "sourceSwitch", expression = "java(flow.getSrcSwitchId())") - @Mapping(target = "destinationSwitch", expression = "java(flow.getDestSwitchId())") + @Mapping(source = "srcSwitchId", target = "sourceSwitch") + @Mapping(source = "destSwitchId", target = "destinationSwitch") @Mapping(source = "status", target = "state") @Mapping(source = "timeModify", target = "lastUpdated") @Mapping(source = "timeCreate", target = "createdTime") - @Mapping(target = "flowStatusDetails", - expression = "java(flow.isAllocateProtectedPath() ? " - + "new FlowStatusDetails(flow.getMainFlowPrioritizedPathsStatus(), " - + "flow.getProtectedFlowPrioritizedPathsStatus()) : null)") + @Mapping(target = "flowStatusDetails", source = "flow") @Mapping(target = "cookie", ignore = true) @Mapping(target = "meterId", ignore = true) @Mapping(target = "transitEncapsulationId", ignore = true) @@ -274,6 +271,17 @@ private FlowPath buildPath(FlowDto flowDto) { .build(); } + /** + * Extract {@link FlowStatusDetails} from {@link Flow}. + */ + public FlowStatusDetails buildStatusDetails(Flow flow) { + if (flow != null && flow.isAllocateProtectedPath()) { + return new FlowStatusDetails( + flow.getMainFlowPrioritizedPathsStatus(), flow.getProtectedFlowPrioritizedPathsStatus()); + } + return null; + } + /** * Builds a flow from swap flow dto. * diff --git a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/HistoryMapper.java b/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/HistoryMapper.java index 4d36f402e47..fe54b4f1dfb 100644 --- a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/HistoryMapper.java +++ b/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/HistoryMapper.java @@ -105,8 +105,8 @@ public FlowDumpPayload map(FlowEventDump dump) { @Mapping(source = "reverse.meterId", target = "reverseMeterId") @Mapping(source = "forward.status", target = "forwardStatus") @Mapping(source = "reverse.status", target = "reverseStatus") - @Mapping(target = "forwardPath", expression = "java(mapPath(forward))") - @Mapping(target = "reversePath", expression = "java(mapPath(reverse))") + @Mapping(source = "forward", target = "forwardPath") + @Mapping(source = "reverse", target = "reversePath") @Mapping(source = "dumpType", target = "dumpType") public abstract FlowDumpData map(Flow flow, FlowPath forward, FlowPath reverse, DumpType dumpType); diff --git a/src-java/base-topology/base-storm-topology/src/test/java/org/openkilda/wfm/config/KafkaConfig.java b/src-java/base-topology/base-storm-topology/src/test/java/org/openkilda/wfm/config/KafkaConfig.java index a54a5f48ef5..87bc7b85721 100644 --- a/src-java/base-topology/base-storm-topology/src/test/java/org/openkilda/wfm/config/KafkaConfig.java +++ b/src-java/base-topology/base-storm-topology/src/test/java/org/openkilda/wfm/config/KafkaConfig.java @@ -22,8 +22,8 @@ @Key("kafka") public interface KafkaConfig { // To be able to run tests in parallel we must use different ports for kafka hosts in each test - int STATS_TOPOLOGY_TEST_KAFKA_PORT = 2182; - int ISL_LATENCY_TOPOLOGY_TEST_KAFKA_PORT = 2183; + int STATS_TOPOLOGY_TEST_KAFKA_PORT = 2188; + int ISL_LATENCY_TOPOLOGY_TEST_KAFKA_PORT = 2189; @Key("hosts") String getHosts(); diff --git a/src-java/base-topology/base-storm-topology/src/test/java/org/openkilda/wfm/config/ZookeeperConfig.java b/src-java/base-topology/base-storm-topology/src/test/java/org/openkilda/wfm/config/ZookeeperConfig.java index 86ba511ca28..e34324c3e52 100644 --- a/src-java/base-topology/base-storm-topology/src/test/java/org/openkilda/wfm/config/ZookeeperConfig.java +++ b/src-java/base-topology/base-storm-topology/src/test/java/org/openkilda/wfm/config/ZookeeperConfig.java @@ -23,8 +23,8 @@ @Key("zookeeper") public interface ZookeeperConfig { // To be able to run tests in parallel we must use different ports for zookeeper hosts in each test - int STATS_TOPOLOGY_TEST_ZOOKEEPER_PORT = 9093; - int ISL_LATENCY_TOPOLOGY_TEST_ZOOKEEPER_PORT = 9094; + int STATS_TOPOLOGY_TEST_ZOOKEEPER_PORT = 9098; + int ISL_LATENCY_TOPOLOGY_TEST_ZOOKEEPER_PORT = 9099; @Key("hosts") String getHosts(); diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/mapper/RequestedFlowMapper.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/mapper/RequestedFlowMapper.java index 630abe7d99d..e1e87b78a83 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/mapper/RequestedFlowMapper.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/mapper/RequestedFlowMapper.java @@ -15,6 +15,8 @@ package org.openkilda.wfm.topology.flowhs.mapper; +import org.openkilda.adapter.FlowDestAdapter; +import org.openkilda.adapter.FlowSourceAdapter; import org.openkilda.messaging.command.flow.FlowRequest; import org.openkilda.messaging.model.DetectConnectedDevicesDto; import org.openkilda.messaging.model.SwapFlowDto; @@ -23,10 +25,12 @@ import org.openkilda.model.FlowEncapsulationType; import org.openkilda.model.FlowEndpoint; import org.openkilda.model.PathComputationStrategy; +import org.openkilda.model.Switch; import org.openkilda.model.SwitchId; import org.openkilda.server42.control.messaging.flowrtt.ActivateFlowMonitoringInfoData; import org.openkilda.wfm.topology.flowhs.model.RequestedFlow; +import lombok.NonNull; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.factory.Mappers; @@ -51,23 +55,16 @@ public RequestedFlow toRequestedFlow(FlowRequest request) { /** * Convert {@link Flow} to {@link RequestedFlow}. */ - @Mapping(source = "flowId", target = "flowId") - @Mapping(target = "srcSwitch", expression = "java(flow.getSrcSwitchId())") - @Mapping(source = "srcPort", target = "srcPort") - @Mapping(source = "srcVlan", target = "srcVlan") - @Mapping(target = "destSwitch", expression = "java(flow.getDestSwitchId())") - @Mapping(source = "destPort", target = "destPort") - @Mapping(source = "destVlan", target = "destVlan") + @Mapping(source = "srcSwitchId", target = "srcSwitch") + @Mapping(source = "destSwitchId", target = "destSwitch") @Mapping(source = "encapsulationType", target = "flowEncapsulationType") @Mapping(target = "diverseFlowId", ignore = true) @Mapping(target = "affinityFlowId", ignore = true) - @Mapping(target = "loopSwitchId", source = "loopSwitchId") public abstract RequestedFlow toRequestedFlow(Flow flow); /** * Convert {@link SwapFlowDto} to {@link RequestedFlow}. */ - @Mapping(source = "flowId", target = "flowId") @Mapping(source = "sourceSwitch", target = "srcSwitch") @Mapping(source = "sourcePort", target = "srcPort") @Mapping(source = "sourceVlan", target = "srcVlan") @@ -97,26 +94,29 @@ public RequestedFlow toRequestedFlow(FlowRequest request) { /** * Convert {@link RequestedFlow} to {@link Flow}. */ - @Mapping(source = "flowId", target = "flowId") - @Mapping(target = "srcSwitch", - expression = "java(org.openkilda.model.Switch.builder().switchId(requestedFlow.getSrcSwitch()).build())") - @Mapping(source = "srcPort", target = "srcPort") - @Mapping(source = "srcVlan", target = "srcVlan") - @Mapping(target = "destSwitch", - expression = "java(org.openkilda.model.Switch.builder().switchId(requestedFlow.getDestSwitch()).build())") - @Mapping(source = "destPort", target = "destPort") - @Mapping(source = "destVlan", target = "destVlan") @Mapping(source = "flowEncapsulationType", target = "encapsulationType") @Mapping(target = "diverseGroupId", ignore = true) @Mapping(target = "affinityGroupId", ignore = true) @Mapping(target = "status", ignore = true) @Mapping(target = "statusInfo", ignore = true) @Mapping(target = "targetPathComputationStrategy", ignore = true) - @Mapping(target = "loopSwitchId", source = "loopSwitchId") @Mapping(target = "yFlowId", ignore = true) @Mapping(target = "yFlow", ignore = true) public abstract Flow toFlow(RequestedFlow requestedFlow); + /** + * Convert {@link Flow} to {@link FlowRequest}. + */ + public FlowRequest toFlowRequest(@NonNull Flow flow) { + FlowRequest request = generatedMap(flow); + request.setSource(new FlowSourceAdapter(flow).getEndpoint()); + request.setDestination(new FlowDestAdapter(flow).getEndpoint()); + if (flow.getPathComputationStrategy() != null) { + request.setPathComputationStrategy(flow.getPathComputationStrategy().toString().toLowerCase()); + } + return request; + } + @Mapping(source = "encapsulationType", target = "flowEncapsulationType") @Mapping(target = "srcSwitch", ignore = true) @Mapping(target = "srcPort", ignore = true) @@ -126,9 +126,22 @@ public RequestedFlow toRequestedFlow(FlowRequest request) { @Mapping(target = "destPort", ignore = true) @Mapping(target = "destVlan", ignore = true) @Mapping(target = "destInnerVlan", ignore = true) - @Mapping(target = "loopSwitchId", source = "loopSwitchId") protected abstract RequestedFlow generatedMap(FlowRequest request); + /** + * Convert {@link Flow} to {@link FlowRequest}. + */ + @Mapping(target = "pathComputationStrategy", ignore = true) + @Mapping(target = "source", ignore = true) + @Mapping(target = "destination", ignore = true) + @Mapping(target = "transitEncapsulationId", ignore = true) + @Mapping(target = "diverseFlowId", ignore = true) + @Mapping(target = "affinityFlowId", ignore = true) + @Mapping(target = "type", ignore = true) + @Mapping(target = "bulkUpdateFlowIds", ignore = true) + @Mapping(target = "doNotRevert", ignore = true) + protected abstract FlowRequest generatedMap(Flow flow); + public abstract FlowEncapsulationType map(org.openkilda.messaging.payload.flow.FlowEncapsulationType source); /** @@ -165,37 +178,6 @@ public PathComputationStrategy mapComputationStrategy(String raw) { return PathComputationStrategy.valueOf(raw.toUpperCase()); } - /** - * Convert {@link Flow} to {@link FlowRequest}. - */ - @Mapping(target = "flowId", source = "flowId") - @Mapping(target = "source", expression = "java(new FlowEndpoint(flow.getSrcSwitchId(), " - + "flow.getSrcPort(), flow.getSrcVlan(), flow.getSrcInnerVlan()))") - @Mapping(target = "destination", expression = "java(new FlowEndpoint(flow.getDestSwitchId(), " - + "flow.getDestPort(), flow.getDestVlan(), flow.getDestInnerVlan()))") - @Mapping(target = "encapsulationType", source = "encapsulationType") - @Mapping(target = "pathComputationStrategy", - expression = "java(java.util.Optional.ofNullable(flow.getPathComputationStrategy())" - + ".map(pcs -> pcs.toString().toLowerCase())" - + ".orElse(null))") - @Mapping(target = "bandwidth", source = "bandwidth") - @Mapping(target = "ignoreBandwidth", source = "ignoreBandwidth") - @Mapping(target = "periodicPings", source = "periodicPings") - @Mapping(target = "allocateProtectedPath", source = "allocateProtectedPath") - @Mapping(target = "description", source = "description") - @Mapping(target = "maxLatency", source = "maxLatency") - @Mapping(target = "priority", source = "priority") - @Mapping(target = "pinned", source = "pinned") - @Mapping(target = "detectConnectedDevices", source = "detectConnectedDevices") - @Mapping(target = "loopSwitchId", source = "loopSwitchId") - @Mapping(target = "transitEncapsulationId", ignore = true) - @Mapping(target = "diverseFlowId", ignore = true) - @Mapping(target = "affinityFlowId", ignore = true) - @Mapping(target = "type", ignore = true) - @Mapping(target = "bulkUpdateFlowIds", ignore = true) - @Mapping(target = "doNotRevert", ignore = true) - public abstract FlowRequest toFlowRequest(Flow flow); - /** * Convert {@link DetectConnectedDevices} to {@link DetectConnectedDevicesDto}. */ @@ -204,7 +186,6 @@ public PathComputationStrategy mapComputationStrategy(String raw) { /** * Convert {@link RequestedFlow} to {@link ActivateFlowMonitoringInfoData}. */ - @Mapping(target = "flowId", source = "flowId") @Mapping(target = "source.datapath", source = "srcSwitch") @Mapping(target = "source.portNumber", source = "srcPort") @Mapping(target = "source.vlanId", source = "srcVlan") @@ -215,6 +196,10 @@ public PathComputationStrategy mapComputationStrategy(String raw) { @Mapping(target = "destination.innerVlanId", source = "destInnerVlan") public abstract ActivateFlowMonitoringInfoData toActivateFlowMonitoringInfoData(RequestedFlow flow); + protected Switch newSwitch(SwitchId switchId) { + return Switch.builder().switchId(switchId).build(); + } + public SwitchId mapSwitchId(String value) { return value == null ? null : new SwitchId(value); } diff --git a/src-java/flowmonitoring-topology/flowmonitoring-storm-topology/src/main/java/org/openkilda/wfm/topology/flowmonitoring/mapper/FlowMapper.java b/src-java/flowmonitoring-topology/flowmonitoring-storm-topology/src/main/java/org/openkilda/wfm/topology/flowmonitoring/mapper/FlowMapper.java index fa0ad6f783e..89d5b177f38 100644 --- a/src-java/flowmonitoring-topology/flowmonitoring-storm-topology/src/main/java/org/openkilda/wfm/topology/flowmonitoring/mapper/FlowMapper.java +++ b/src-java/flowmonitoring-topology/flowmonitoring-storm-topology/src/main/java/org/openkilda/wfm/topology/flowmonitoring/mapper/FlowMapper.java @@ -20,13 +20,14 @@ import org.openkilda.messaging.payload.flow.PathNodePayload; import org.openkilda.model.Flow; import org.openkilda.model.PathSegment; +import org.openkilda.wfm.share.utils.TimestampHelper; import org.openkilda.wfm.topology.flowmonitoring.model.FlowPathLatency; import org.openkilda.wfm.topology.flowmonitoring.model.FlowState; import org.openkilda.wfm.topology.flowmonitoring.model.Link; +import lombok.NonNull; import org.mapstruct.Mapper; import org.mapstruct.Mapping; -import org.mapstruct.ReportingPolicy; import org.mapstruct.factory.Mappers; import java.util.ArrayList; @@ -34,22 +35,26 @@ import java.util.List; import java.util.stream.Collectors; -@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE) -public interface FlowMapper { - FlowMapper INSTANCE = Mappers.getMapper(FlowMapper.class); +@Mapper +public abstract class FlowMapper { + public static final FlowMapper INSTANCE = Mappers.getMapper(FlowMapper.class); @Mapping(target = "forwardPath", source = "forwardPath.segments") @Mapping(target = "reversePath", source = "reversePath.segments") - FlowState toFlowState(Flow flow); + @Mapping(target = "forwardPathLatency", ignore = true) + @Mapping(target = "reversePathLatency", ignore = true) + public abstract FlowState toFlowState(Flow flow); @Mapping(target = "forwardPath", source = "info.flowPath.forwardPath") @Mapping(target = "reversePath", source = "info.flowPath.reversePath") - FlowState toFlowState(UpdateFlowCommand info); + @Mapping(target = "forwardPathLatency", ignore = true) + @Mapping(target = "reversePathLatency", ignore = true) + public abstract FlowState toFlowState(UpdateFlowCommand info); /** * Convert flow path representation. */ - default List toLinks(List pathSegments) { + protected List toLinks(List pathSegments) { if (pathSegments == null) { return Collections.emptyList(); } @@ -59,7 +64,7 @@ default List toLinks(List pathSegments) { /** * Convert flow path representation. */ - default List pathNodesToLinks(List pathSegments) { + protected List pathNodesToLinks(List pathSegments) { List links = new ArrayList<>(); for (int i = 0; i < pathSegments.size() - 1; i++) { links.add(Link.builder() @@ -72,11 +77,15 @@ default List pathNodesToLinks(List pathSegments) { return links; } - Link toLink(PathSegment pathSegment); + public abstract Link toLink(PathSegment pathSegment); - @Mapping(target = "timestamp", expression = "java(org.openkilda.wfm.share.utils.TimestampHelper" - + ".noviflowTimestampToInstant(data.getT1()))") - @Mapping(target = "latency", expression = "java(org.openkilda.wfm.share.utils.TimestampHelper" - + ".noviflowTimestampsToDuration(data.getT0(), data.getT1()))") - FlowPathLatency toFlowPathLatency(FlowRttStatsData data); + /** + * Convert {@link FlowRttStatsData} to {@link FlowPathLatency}. + */ + public FlowPathLatency toFlowPathLatency(@NonNull FlowRttStatsData data) { + FlowPathLatency latency = new FlowPathLatency(); + latency.setTimestamp(TimestampHelper.noviflowTimestampToInstant(data.getT1())); + latency.setLatency(TimestampHelper.noviflowTimestampsToDuration(data.getT0(), data.getT1())); + return latency; + } } diff --git a/src-java/kilda-model/src/main/java/org/openkilda/model/YFlow.java b/src-java/kilda-model/src/main/java/org/openkilda/model/YFlow.java index dc20ee85e0e..6ac748b3edc 100644 --- a/src-java/kilda-model/src/main/java/org/openkilda/model/YFlow.java +++ b/src-java/kilda-model/src/main/java/org/openkilda/model/YFlow.java @@ -64,6 +64,8 @@ public class YFlow implements CompositeDataEntity { */ private YFlow() { data = new YFlowDataImpl(); + // The reference is used to link sub-flows back to the y-flow. See {@link #setSubFlows(Set)}. + ((YFlowDataImpl) data).yFlow = this; } /** @@ -94,7 +96,7 @@ public YFlow(@NonNull String yFlowId, @NonNull SharedEndpoint sharedEndpoint, .sharedEndpointMeterId(sharedEndpointMeterId); data = builder.build(); - // The reference is used to link sub-flows back to the y-flow. See {@link #setSegments(List)}. + // The reference is used to link sub-flows back to the y-flow. See {@link #setSubFlows(Set)}. ((YFlowDataImpl) data).yFlow = this; } @@ -355,6 +357,7 @@ public interface YFlowCloner { */ default YFlowData deepCopy(YFlowData source, YFlow targetFlow) { YFlowDataImpl result = new YFlowDataImpl(); + result.yFlow = targetFlow; copyWithoutSubFlows(source, result); result.setSubFlows(source.getSubFlows().stream() .map(subFlow -> new YSubFlow(subFlow, targetFlow)) diff --git a/src-java/nbworker-topology/nbworker-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/RequestedFlowMapper.java b/src-java/nbworker-topology/nbworker-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/RequestedFlowMapper.java index c08f8493ccf..1e70afb9ea3 100644 --- a/src-java/nbworker-topology/nbworker-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/RequestedFlowMapper.java +++ b/src-java/nbworker-topology/nbworker-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/RequestedFlowMapper.java @@ -15,20 +15,21 @@ package org.openkilda.wfm.share.mappers; +import org.openkilda.adapter.FlowDestAdapter; +import org.openkilda.adapter.FlowSourceAdapter; import org.openkilda.messaging.command.flow.FlowRequest; import org.openkilda.model.Flow; -import org.openkilda.model.FlowEndpoint; import org.openkilda.model.PathComputationStrategy; import org.openkilda.model.Switch; import org.openkilda.model.SwitchId; +import lombok.NonNull; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.factory.Mappers; -import java.util.Optional; - -@Mapper(imports = {FlowEndpoint.class, Switch.class, Optional.class, PathComputationStrategy.class}) +// TODO(surabujin): merge with org.openkilda.wfm.topology.flowhs.mapper.RequestedFlowMapper +@Mapper public abstract class RequestedFlowMapper { public static final RequestedFlowMapper INSTANCE = Mappers.getMapper(RequestedFlowMapper.class); @@ -36,16 +37,36 @@ public abstract class RequestedFlowMapper { /** * Convert {@link Flow} to {@link FlowRequest}. */ + public FlowRequest toFlowRequest(Flow flow) { + FlowRequest request = generatedMap(flow); + request.setSource(new FlowSourceAdapter(flow).getEndpoint()); + request.setDestination(new FlowDestAdapter(flow).getEndpoint()); + return request; + } + + /** + * Convert {@link FlowRequest} to {@link Flow}. + */ + @Mapping(target = "srcSwitch", source = "source.switchId") + @Mapping(target = "srcPort", source = "source.portNumber") + @Mapping(target = "srcVlan", source = "source.outerVlanId") + @Mapping(target = "srcInnerVlan", source = "source.innerVlanId") + @Mapping(target = "destSwitch", source = "destination.switchId") + @Mapping(target = "destPort", source = "destination.portNumber") + @Mapping(target = "destVlan", source = "destination.outerVlanId") + @Mapping(target = "destInnerVlan", source = "destination.innerVlanId") + @Mapping(target = "detectConnectedDevices", source = "detectConnectedDevices") + @Mapping(target = "diverseGroupId", ignore = true) + @Mapping(target = "affinityGroupId", ignore = true) + @Mapping(target = "statusInfo", ignore = true) + @Mapping(target = "targetPathComputationStrategy", ignore = true) + @Mapping(target = "status", ignore = true) + @Mapping(target = "yFlowId", ignore = true) + @Mapping(target = "yFlow", ignore = true) + public abstract Flow toFlow(@NonNull FlowRequest request); + @Mapping(target = "flowId", source = "flowId") - @Mapping(target = "source", expression = "java(new FlowEndpoint(flow.getSrcSwitchId(), " - + "flow.getSrcPort(), flow.getSrcVlan(), flow.getSrcInnerVlan()))") - @Mapping(target = "destination", expression = "java(new FlowEndpoint(flow.getDestSwitchId(), " - + "flow.getDestPort(), flow.getDestVlan(), flow.getDestInnerVlan()))") @Mapping(target = "encapsulationType", source = "encapsulationType") - @Mapping(target = "pathComputationStrategy", - expression = "java(java.util.Optional.ofNullable(flow.getPathComputationStrategy())" - + ".map(pcs -> pcs.toString().toLowerCase())" - + ".orElse(null))") @Mapping(target = "bandwidth", source = "bandwidth") @Mapping(target = "ignoreBandwidth", source = "ignoreBandwidth") @Mapping(target = "periodicPings", source = "periodicPings") @@ -55,47 +76,36 @@ public abstract class RequestedFlowMapper { @Mapping(target = "priority", source = "priority") @Mapping(target = "pinned", source = "pinned") @Mapping(target = "detectConnectedDevices", source = "detectConnectedDevices") + @Mapping(target = "loopSwitchId", source = "loopSwitchId") + @Mapping(target = "source", ignore = true) + @Mapping(target = "destination", ignore = true) @Mapping(target = "transitEncapsulationId", ignore = true) @Mapping(target = "diverseFlowId", ignore = true) @Mapping(target = "affinityFlowId", ignore = true) @Mapping(target = "type", ignore = true) @Mapping(target = "bulkUpdateFlowIds", ignore = true) @Mapping(target = "doNotRevert", ignore = true) - public abstract FlowRequest toFlowRequest(Flow flow); - - /** - * Convert {@link FlowRequest} to {@link Flow}. - */ - @Mapping(target = "srcSwitch", expression = "java(java.util.Optional.ofNullable(request.getSource())" - + ".map(FlowEndpoint::getSwitchId).map(id -> Switch.builder().switchId(id).build()).orElse(null))") - @Mapping(target = "srcPort", expression = "java(java.util.Optional.ofNullable(request.getSource())" - + ".map(FlowEndpoint::getPortNumber).orElse(null))") - @Mapping(target = "srcVlan", expression = "java(java.util.Optional.ofNullable(request.getSource())" - + ".map(FlowEndpoint::getOuterVlanId).orElse(null))") - @Mapping(target = "srcInnerVlan", expression = "java(java.util.Optional.ofNullable(request.getSource())" - + ".map(FlowEndpoint::getInnerVlanId).orElse(null))") - @Mapping(target = "destSwitch", expression = "java(java.util.Optional.ofNullable(request.getDestination())" - + ".map(FlowEndpoint::getSwitchId).map(id -> Switch.builder().switchId(id).build()).orElse(null))") - @Mapping(target = "destPort", expression = "java(java.util.Optional.ofNullable(request.getDestination())" - + ".map(FlowEndpoint::getPortNumber).orElse(null))") - @Mapping(target = "destVlan", expression = "java(java.util.Optional.ofNullable(request.getDestination())" - + ".map(FlowEndpoint::getOuterVlanId).orElse(null))") - @Mapping(target = "destInnerVlan", expression = "java(java.util.Optional.ofNullable(request.getDestination())" - + ".map(FlowEndpoint::getInnerVlanId).orElse(null))") - @Mapping(target = "pathComputationStrategy", - expression = "java(Optional.ofNullable(request.getPathComputationStrategy())" - + ".map(String::toUpperCase).map(PathComputationStrategy::valueOf).orElse(null))") - @Mapping(target = "detectConnectedDevices", source = "detectConnectedDevices") - @Mapping(target = "diverseGroupId", ignore = true) - @Mapping(target = "affinityGroupId", ignore = true) - @Mapping(target = "statusInfo", ignore = true) - @Mapping(target = "targetPathComputationStrategy", ignore = true) - @Mapping(target = "status", ignore = true) - @Mapping(target = "yFlowId", ignore = true) - @Mapping(target = "yFlow", ignore = true) - public abstract Flow toFlow(FlowRequest request); + protected abstract FlowRequest generatedMap(Flow flow); public SwitchId map(String value) { return value == null ? null : new SwitchId(value); } + + protected String mapPathComputationStrategy(PathComputationStrategy strategy) { + if (strategy == null) { + return null; + } + return strategy.toString().toLowerCase(); + } + + protected PathComputationStrategy mapPathComputationStrategy(String raw) { + if (raw == null) { + return null; + } + return PathComputationStrategy.valueOf(raw.toUpperCase()); + } + + protected Switch newSwitch(SwitchId switchId) { + return Switch.builder().switchId(switchId).build(); + } } diff --git a/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/yflows/SubFlowPingPayload.java b/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/yflows/SubFlowPingPayload.java new file mode 100644 index 00000000000..f58750b79f5 --- /dev/null +++ b/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/yflows/SubFlowPingPayload.java @@ -0,0 +1,32 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.northbound.dto.v2.yflows; + +import com.fasterxml.jackson.databind.PropertyNamingStrategy.SnakeCaseStrategy; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +@AllArgsConstructor +@JsonNaming(value = SnakeCaseStrategy.class) +public class SubFlowPingPayload { + String flowId; + UniSubFlowPingPayload forward; + UniSubFlowPingPayload reverse; +} diff --git a/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/yflows/UniSubFlowPingPayload.java b/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/yflows/UniSubFlowPingPayload.java new file mode 100644 index 00000000000..80061aeb759 --- /dev/null +++ b/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/yflows/UniSubFlowPingPayload.java @@ -0,0 +1,32 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.northbound.dto.v2.yflows; + +import com.fasterxml.jackson.databind.PropertyNamingStrategy.SnakeCaseStrategy; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +@AllArgsConstructor +@JsonNaming(value = SnakeCaseStrategy.class) +public class UniSubFlowPingPayload { + private boolean pingSuccess; + private String error; + private long latency; +} diff --git a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/InputDecoder.java b/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/yflows/YFlowPingPayload.java similarity index 56% rename from src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/InputDecoder.java rename to src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/yflows/YFlowPingPayload.java index 202e20644f0..1322fe5b06d 100644 --- a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/InputDecoder.java +++ b/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/yflows/YFlowPingPayload.java @@ -1,4 +1,4 @@ -/* Copyright 2018 Telstra Open Source +/* Copyright 2022 Telstra Open Source * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,10 +13,21 @@ * limitations under the License. */ -package org.openkilda.wfm.topology.ping.bolt; +package org.openkilda.northbound.dto.v2.yflows; -import org.openkilda.wfm.share.bolt.KafkaDecoder; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; -public class InputDecoder extends KafkaDecoder { - public static final String BOLT_ID = ComponentId.INPUT_DECODER.toString(); +import javax.validation.constraints.PositiveOrZero; + +@Data +@Builder +@AllArgsConstructor +public class YFlowPingPayload { + + @PositiveOrZero(message = "timeoutMillis can't be negative") + @JsonProperty("timeout") + int timeoutMillis; } diff --git a/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/yflows/YFlowPingResult.java b/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/yflows/YFlowPingResult.java new file mode 100644 index 00000000000..7de0eff9373 --- /dev/null +++ b/src-java/northbound-service/northbound-api/src/main/java/org/openkilda/northbound/dto/v2/yflows/YFlowPingResult.java @@ -0,0 +1,37 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.northbound.dto.v2.yflows; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.PropertyNamingStrategy.SnakeCaseStrategy; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +import java.util.List; + +@Data +@Builder +@AllArgsConstructor +@JsonNaming(SnakeCaseStrategy.class) +public class YFlowPingResult { + @JsonProperty("y_flow_id") + String yFlowId; + boolean pingSuccess; + String error; + List subFlows; +} diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/controller/v2/YFlowControllerV2.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/controller/v2/YFlowControllerV2.java index d5d41662223..0c986aeb898 100644 --- a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/controller/v2/YFlowControllerV2.java +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/controller/v2/YFlowControllerV2.java @@ -24,6 +24,8 @@ import org.openkilda.northbound.dto.v2.yflows.YFlowDump; import org.openkilda.northbound.dto.v2.yflows.YFlowPatchPayload; import org.openkilda.northbound.dto.v2.yflows.YFlowPaths; +import org.openkilda.northbound.dto.v2.yflows.YFlowPingPayload; +import org.openkilda.northbound.dto.v2.yflows.YFlowPingResult; import org.openkilda.northbound.dto.v2.yflows.YFlowRerouteResult; import org.openkilda.northbound.dto.v2.yflows.YFlowSyncResult; import org.openkilda.northbound.dto.v2.yflows.YFlowUpdatePayload; @@ -133,4 +135,15 @@ public CompletableFuture validateYFlow(@PathVariable("y_f public CompletableFuture synchronizeYFlow(@PathVariable("y_flow_id") String yFlowId) { return flowService.synchronizeYFlow(yFlowId); } + + @ApiOperation( + value = "Verify flow - using special network packet that is being routed in the same way as client traffic", + response = YFlowPingResult.class) + @PostMapping(path = "/{y_flow_id}/ping") + @ResponseStatus(HttpStatus.OK) + public CompletableFuture pingYFlow( + @RequestBody YFlowPingPayload payload, + @PathVariable("y_flow_id") String yFlowId) { + return flowService.pingYFlow(yFlowId, payload); + } } diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/FlowMapper.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/FlowMapper.java index c247fed3ace..a2dec624090 100644 --- a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/FlowMapper.java +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/FlowMapper.java @@ -30,7 +30,6 @@ import org.openkilda.messaging.model.FlowPatch; import org.openkilda.messaging.model.MirrorPointStatusDto; import org.openkilda.messaging.model.PatchEndpoint; -import org.openkilda.messaging.model.Ping; import org.openkilda.messaging.model.SwapFlowDto; import org.openkilda.messaging.nbtopology.response.FlowLoopDto; import org.openkilda.messaging.nbtopology.response.FlowMirrorPointsDumpResponse; @@ -68,18 +67,13 @@ import org.openkilda.northbound.dto.v2.flows.PathStatus; import org.openkilda.northbound.dto.v2.flows.SwapFlowPayload; -import com.google.common.collect.Sets; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.MappingTarget; -import java.time.format.DateTimeFormatter; - -@Mapper(componentModel = "spring", - uses = {FlowEncapsulationTypeMapper.class, FlowStatusMapper.class, PathComputationStrategyMapper.class, - InstantMapper.class}, - imports = {FlowEndpointPayload.class, FlowEndpointV2.class, DetectConnectedDevicesPayload.class, - DetectConnectedDevicesV2.class, DetectConnectedDevicesDto.class, Sets.class, DateTimeFormatter.class}) +@Mapper(componentModel = "spring", uses = { + FlowEncapsulationTypeMapper.class, FlowStatusMapper.class, PathComputationStrategyMapper.class, + KildaTypeMapper.class, TimeMapper.class, PingMapper.class}) public abstract class FlowMapper { /** * Map {@link FlowDto} into {@link FlowPayload}. @@ -122,8 +116,7 @@ public FlowResponseV2 toFlowResponseV2(FlowDto flowDto) { } @Mapping(target = "flowId", ignore = true) - @Mapping(target = "maxLatency", - expression = "java(flowPatchDto.getMaxLatency() != null ? flowPatchDto.getMaxLatency() * 1000000L : null)") + @Mapping(target = "maxLatency", qualifiedByName = "timeMillisToNanos") @Mapping(target = "priority", source = "priority") @Mapping(target = "periodicPings", source = "periodicPings") @Mapping(target = "targetPathComputationStrategy", source = "targetPathComputationStrategy") @@ -145,38 +138,25 @@ public FlowResponseV2 toFlowResponseV2(FlowDto flowDto) { @Mapping(target = "flowId", ignore = true) @Mapping(target = "bandwidth", source = "maximumBandwidth") @Mapping(target = "allocateProtectedPath", source = "allocateProtectedPath") - @Mapping(target = "maxLatency", - expression = "java(flowPatchDto.getMaxLatency() != null ? flowPatchDto.getMaxLatency() * 1000000L : null)") - @Mapping(target = "maxLatencyTier2", - expression = "java(flowPatchDto.getMaxLatencyTier2() != null ? " - + "flowPatchDto.getMaxLatencyTier2() * 1000000L : null)") + @Mapping(target = "maxLatency", qualifiedByName = "timeMillisToNanos") + @Mapping(target = "maxLatencyTier2", qualifiedByName = "timeMillisToNanos") @Mapping(target = "priority", source = "priority") @Mapping(target = "periodicPings", source = "periodicPings") @Mapping(target = "diverseFlowId", source = "diverseFlowId") public abstract FlowPatch toFlowPatch(FlowPatchV2 flowPatchDto); - @Mapping(target = "trackLldpConnectedDevices", - expression = "java(flowPatchEndpoint.getDetectConnectedDevices() != null ? " - + "flowPatchEndpoint.getDetectConnectedDevices().isLldp() : null)") - @Mapping(target = "trackArpConnectedDevices", - expression = "java(flowPatchEndpoint.getDetectConnectedDevices() != null ? " - + "flowPatchEndpoint.getDetectConnectedDevices().isArp() : null)") + @Mapping(target = "trackLldpConnectedDevices", source = "detectConnectedDevices.lldp") + @Mapping(target = "trackArpConnectedDevices", source = "detectConnectedDevices.arp") public abstract PatchEndpoint toPatchEndpoint(FlowPatchEndpoint flowPatchEndpoint); @Mapping(target = "bandwidth", source = "maximumBandwidth") - @Mapping(target = "detectConnectedDevices", expression = "java(new DetectConnectedDevicesDto(" - + "request.getSource().getDetectConnectedDevices().isLldp(), " - + "request.getSource().getDetectConnectedDevices().isArp(), " - + "request.getDestination().getDetectConnectedDevices().isLldp(), " - + "request.getDestination().getDetectConnectedDevices().isArp()))") + @Mapping(target = "detectConnectedDevices", source = "request") @Mapping(target = "transitEncapsulationId", ignore = true) @Mapping(target = "type", ignore = true) @Mapping(target = "bulkUpdateFlowIds", ignore = true) @Mapping(target = "doNotRevert", ignore = true) - @Mapping(target = "maxLatency", - expression = "java(request.getMaxLatency() != null ? request.getMaxLatency() * 1000000L : null)") - @Mapping(target = "maxLatencyTier2", - expression = "java(request.getMaxLatencyTier2() != null ? request.getMaxLatencyTier2() * 1000000L : null)") + @Mapping(target = "maxLatency", qualifiedByName = "timeMillisToNanos") + @Mapping(target = "maxLatencyTier2", qualifiedByName = "timeMillisToNanos") @Mapping(target = "loopSwitchId", ignore = true) public abstract FlowRequest toFlowRequest(FlowRequestV2 request); @@ -189,8 +169,7 @@ public FlowResponseV2 toFlowResponseV2(FlowDto flowDto) { @Mapping(target = "doNotRevert", ignore = true) @Mapping(target = "diverseFlowId", ignore = true) @Mapping(target = "affinityFlowId", ignore = true) - @Mapping(target = "maxLatency", - expression = "java(payload.getMaxLatency() != null ? payload.getMaxLatency() * 1000000L : null)") + @Mapping(target = "maxLatency", qualifiedByName = "timeMillisToNanos") @Mapping(target = "maxLatencyTier2", ignore = true) @Mapping(target = "loopSwitchId", ignore = true) @Mapping(target = "strictBandwidth", ignore = true) @@ -300,12 +279,12 @@ public SwapFlowPayload toSwapOutput(FlowDto flowDto) { return result; } - @Mapping(target = "sourceSwitch", expression = "java(request.getSource().getSwitchId())") - @Mapping(target = "destinationSwitch", expression = "java(request.getDestination().getSwitchId())") - @Mapping(target = "sourcePort", expression = "java(request.getSource().getPortNumber())") - @Mapping(target = "destinationPort", expression = "java(request.getDestination().getPortNumber())") - @Mapping(target = "sourceVlan", expression = "java(request.getSource().getVlanId())") - @Mapping(target = "destinationVlan", expression = "java(request.getDestination().getVlanId())") + @Mapping(target = "sourceSwitch", source = "source.switchId") + @Mapping(target = "destinationSwitch", source = "destination.switchId") + @Mapping(target = "sourcePort", source = "source.portNumber") + @Mapping(target = "destinationPort", source = "destination.portNumber") + @Mapping(target = "sourceVlan", source = "source.vlanId") + @Mapping(target = "destinationVlan", source = "destination.vlanId") public abstract SwapFlowDto toSwapFlowDto(SwapFlowPayload request); public abstract FlowValidationDto toFlowValidationDto(FlowValidationResponse response); @@ -317,10 +296,8 @@ public SwapFlowPayload toSwapOutput(FlowDto flowDto) { @Mapping(target = "diverseWith", source = "f.diverseWith") @Mapping(target = "source", source = "source") @Mapping(target = "destination", source = "destination") - @Mapping(target = "maxLatency", - expression = "java(f.getMaxLatency() != null ? f.getMaxLatency() / 1000000L : null)") - @Mapping(target = "maxLatencyTier2", - expression = "java(f.getMaxLatencyTier2() != null ? f.getMaxLatencyTier2() / 1000000L : null)") + @Mapping(target = "maxLatency", qualifiedByName = "timeNanosToMillis") + @Mapping(target = "maxLatencyTier2", qualifiedByName = "timeNanosToMillis") @Mapping(target = "loopSwitchId", source = "f.loopSwitchId") @Mapping(target = "forwardPathLatencyNs", source = "f.forwardLatency") @Mapping(target = "reversePathLatencyNs", source = "f.reverseLatency") @@ -330,8 +307,7 @@ public SwapFlowPayload toSwapOutput(FlowDto flowDto) { @Mapping(target = "id", source = "flowId") @Mapping(target = "maximumBandwidth", source = "bandwidth") @Mapping(target = "ignoreBandwidth", source = "ignoreBandwidth") - @Mapping(target = "maxLatency", - expression = "java(f.getMaxLatency() != null ? f.getMaxLatency() / 1000000L : null)") + @Mapping(target = "maxLatency", qualifiedByName = "timeNanosToMillis") @Mapping(target = "status", source = "state") @Mapping(target = "created", source = "createdTime") @Mapping(target = "pinned", source = "pinned") @@ -346,8 +322,7 @@ public SwapFlowPayload toSwapOutput(FlowDto flowDto) { @Mapping(target = "destination", ignore = true) @Mapping(target = "created", ignore = true) @Mapping(target = "status", ignore = true) - @Mapping(target = "maxLatency", - expression = "java(f.getMaxLatency() != null ? f.getMaxLatency() / 1000000L : null)") + @Mapping(target = "maxLatency", qualifiedByName = "timeNanosToMillis") protected abstract void generatedFlowResponsePayloadMap(@MappingTarget FlowResponsePayload target, FlowDto f); @Mapping(target = "flowId", source = "flowId") @@ -402,44 +377,13 @@ protected FlowEndpointPayload flowDtoDestinationFlowResponsePayload(FlowDto f) { @Mapping(target = "protectedPath", source = "protectedFlowPathStatus") public abstract PathStatus map(FlowStatusDetails flowStatusDetails); - /** - * Translate Java's error code(enum) into human readable string. - */ - public String getPingError(Ping.Errors error) { - if (error == null) { - return null; - } - - String message; - switch (error) { - case TIMEOUT: - message = "No ping for reasonable time"; - break; - case WRITE_FAILURE: - message = "Can't send ping"; - break; - case NOT_CAPABLE: - message = "Can't ping - at least one of endpoints are not capable to catch pings."; - break; - case SOURCE_NOT_AVAILABLE: - case DEST_NOT_AVAILABLE: - message = "Can't ping - at least one of endpoints are unavailable"; - break; - default: - message = error.toString(); - } - - return message; - } - - @Mapping(target = "switchId", expression = "java(new org.openkilda.model.SwitchId(payload.getSwitchId()))") public abstract FlowLoopResponse toFlowLoopResponse(FlowLoopDto payload); @Mapping(target = "flowId", source = "payload.flowId") @Mapping(target = "switchId", source = "payload.loopSwitchId") public abstract FlowLoopResponse toFlowLoopResponse(FlowResponse response); - @Mapping(target = "timestamp", source = "entry.statusChangeTimestamp") + @Mapping(target = "timestamp", source = "statusChangeTimestamp") public abstract FlowHistoryStatus toFlowHistoryStatus(FlowStatusTimestampsEntry entry); public abstract FlowMirrorPointCreateRequest toFlowMirrorPointCreateRequest(String flowId, @@ -477,4 +421,14 @@ public FlowEndpointV2 mapFlowEndpointV2(FlowEndpoint input) { } public abstract MirrorPointStatus toMirrorPointStatus(MirrorPointStatusDto dto); + + @Mapping(target = "srcLldp", source = "source.detectConnectedDevices.lldp") + @Mapping(target = "srcArp", source = "source.detectConnectedDevices.arp") + @Mapping(target = "dstLldp", source = "destination.detectConnectedDevices.lldp") + @Mapping(target = "dstArp", source = "destination.detectConnectedDevices.arp") + @Mapping(target = "srcSwitchLldp", ignore = true) + @Mapping(target = "srcSwitchArp", ignore = true) + @Mapping(target = "dstSwitchLldp", ignore = true) + @Mapping(target = "dstSwitchArp", ignore = true) + public abstract DetectConnectedDevicesDto toDetectConnectedDevicesDto(FlowRequestV2 request); } diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/KildaTypeMapper.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/KildaTypeMapper.java new file mode 100644 index 00000000000..dc59685d37f --- /dev/null +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/KildaTypeMapper.java @@ -0,0 +1,64 @@ +/* Copyright 2021 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.northbound.converter; + +import org.openkilda.model.MacAddress; +import org.openkilda.model.SwitchId; + +import org.mapstruct.Mapper; + +@Mapper(componentModel = "spring") +public abstract class KildaTypeMapper { + /** + * Convert {@link String} into {@link SwitchId}. + */ + public SwitchId mapSwitchId(String switchId) { + if (switchId == null) { + return null; + } + return new SwitchId(switchId); + } + + /** + * Convert {@link SwitchId} into {@link String}. + */ + public String mapSwitchId(SwitchId switchId) { + if (switchId == null) { + return null; + } + return switchId.toString(); + } + + /** + * Convert {@link String} into {@link MacAddress}. + */ + public MacAddress mapMacAddress(String macAddress) { + if (macAddress == null) { + return null; + } + return new MacAddress(macAddress); + } + + /** + * Convert {@link MacAddress} into {@link String}. + */ + public String mapMacAddress(MacAddress macAddress) { + if (macAddress == null) { + return null; + } + return macAddress.toString(); + } +} diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/LinkMapper.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/LinkMapper.java index 0f5bd2bc6df..49af0cd9205 100644 --- a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/LinkMapper.java +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/LinkMapper.java @@ -21,7 +21,6 @@ import org.openkilda.messaging.nbtopology.request.BfdPropertiesReadRequest; import org.openkilda.messaging.nbtopology.request.BfdPropertiesWriteRequest; import org.openkilda.messaging.nbtopology.response.BfdPropertiesResponse; -import org.openkilda.model.SwitchId; import org.openkilda.northbound.dto.v1.links.LinkDto; import org.openkilda.northbound.dto.v1.links.PathDto; import org.openkilda.northbound.dto.v2.links.BfdPropertiesByEndpoint; @@ -31,10 +30,9 @@ import org.mapstruct.Mapping; import org.mapstruct.MappingTarget; -import java.time.Duration; import java.util.Arrays; -@Mapper(componentModel = "spring") +@Mapper(componentModel = "spring", uses = {KildaTypeMapper.class, TimeMapper.class}) public abstract class LinkMapper { public abstract PathDto map(PathNode data); @@ -88,22 +86,4 @@ public LinkDto mapResponse(IslInfoData source) { @Mapping(target = "path", ignore = true) protected abstract void generatedMap(@MappingTarget LinkDto target, IslInfoData source); - - public Duration mapMillis(long millis) { - return Duration.ofMillis(millis); - } - - /** - * Convert {@link Duration} into {@link long}. - */ - public long mapMillis(Duration millis) { - if (millis == null) { - return 0; - } - return millis.toMillis(); - } - - public String toSwithId(SwitchId switchId) { - return switchId.toString(); - } } diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/PingMapper.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/PingMapper.java new file mode 100644 index 00000000000..045ef563bb7 --- /dev/null +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/PingMapper.java @@ -0,0 +1,54 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.northbound.converter; + +import org.openkilda.messaging.model.Ping; + +import org.mapstruct.Mapper; + +@Mapper(componentModel = "spring") +public abstract class PingMapper { + + public static final String TIMEOUT_ERROR_MESSAGE = "No ping for reasonable time"; + public static final String WRITE_ERROR_MESSAGE = "Can't send ping"; + public static final String NOT_CAPABLE_ERROR_MESSAGE = + "Can't ping - at least one of endpoints are not capable to catch pings."; + public static final String ENDPOINT_NOT_AVAILABLE_ERROR_MESSAGE = + "Can't ping - at least one of endpoints are unavailable"; + + /** + * Translate Java's error code(enum) into human readable string. + */ + public String getPingError(Ping.Errors error) { + if (error == null) { + return null; + } + + switch (error) { + case TIMEOUT: + return TIMEOUT_ERROR_MESSAGE; + case WRITE_FAILURE: + return WRITE_ERROR_MESSAGE; + case NOT_CAPABLE: + return NOT_CAPABLE_ERROR_MESSAGE; + case SOURCE_NOT_AVAILABLE: + case DEST_NOT_AVAILABLE: + return ENDPOINT_NOT_AVAILABLE_ERROR_MESSAGE; + default: + return error.toString(); + } + } +} diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/SwitchMapper.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/SwitchMapper.java index 3b54149829d..ffcd6395a4e 100644 --- a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/SwitchMapper.java +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/SwitchMapper.java @@ -36,7 +36,6 @@ import org.openkilda.messaging.model.SwitchPatch; import org.openkilda.messaging.payload.history.PortHistoryPayload; import org.openkilda.model.IpSocketAddress; -import org.openkilda.model.MacAddress; import org.openkilda.model.Switch; import org.openkilda.model.SwitchStatus; import org.openkilda.northbound.dto.v1.switches.GroupInfoDto; @@ -54,7 +53,6 @@ import org.openkilda.northbound.dto.v1.switches.RulesValidationDto; import org.openkilda.northbound.dto.v1.switches.RulesValidationResult; import org.openkilda.northbound.dto.v1.switches.SwitchDto; -import org.openkilda.northbound.dto.v1.switches.SwitchLocationDto; import org.openkilda.northbound.dto.v1.switches.SwitchPropertiesDto; import org.openkilda.northbound.dto.v1.switches.SwitchSyncResult; import org.openkilda.northbound.dto.v1.switches.SwitchValidationResult; @@ -68,13 +66,11 @@ import org.mapstruct.Mapper; import org.mapstruct.Mapping; -import java.util.Date; import java.util.List; import java.util.stream.Collectors; -@Mapper(componentModel = "spring", - uses = {FlowMapper.class, InstantMapper.class}, - imports = {Date.class, MacAddress.class, SwitchLocationDto.class, SwitchLocationDtoV2.class}) +@Mapper(componentModel = "spring", uses = { + FlowMapper.class, KildaTypeMapper.class, TimeMapper.class, FlowEncapsulationTypeMapper.class}) public abstract class SwitchMapper { @Mapping(source = "ofDescriptionManufacturer", target = "manufacturer") @@ -84,9 +80,8 @@ public abstract class SwitchMapper { @Mapping(source = "status", target = "state") @Mapping(source = "socketAddress.address", target = "address") @Mapping(source = "socketAddress.port", target = "port") - @Mapping(target = "location", expression = "java(new SwitchLocationDto(" - + "data.getLatitude(), data.getLongitude(), data.getStreet(), data.getCity(), data.getCountry()))") - public abstract SwitchDto toSwitchDto(Switch data); + @Mapping(target = "location", source = "input") + public abstract SwitchDto toSwitchDto(Switch input); /** * Convert {@link SwitchStatus} to {@link String} representation. @@ -168,25 +163,13 @@ public SwitchChangeType convertStatus(SwitchStatus status) { public abstract MeterMisconfiguredInfoDto toMeterMisconfiguredInfoDto(MeterMisconfiguredInfoEntry data); - @Mapping(target = "supportedTransitEncapsulation", - expression = "java(entry.getSupportedTransitEncapsulation().stream()" - + ".map(e -> e.toString().toLowerCase()).collect(java.util.stream.Collectors.toList()))") - @Mapping(target = "server42MacAddress", expression = "java(entry.getServer42MacAddress() == null ? null " - + ": entry.getServer42MacAddress().toString())") public abstract SwitchPropertiesDto map(org.openkilda.messaging.model.SwitchPropertiesDto entry); - @Mapping(target = "supportedTransitEncapsulation", - expression = "java(entry.getSupportedTransitEncapsulation() == null ? null : " - + "entry.getSupportedTransitEncapsulation().stream()" - + ".map(e-> org.openkilda.messaging.payload.flow.FlowEncapsulationType.valueOf(e.toUpperCase()))" - + ".collect(java.util.stream.Collectors.toSet()))") - @Mapping(target = "server42MacAddress", expression = "java(entry.getServer42MacAddress() == null ? null " - + ": new MacAddress(entry.getServer42MacAddress()))") public abstract org.openkilda.messaging.model.SwitchPropertiesDto map(SwitchPropertiesDto entry); @Mapping(source = "upEventsCount", target = "upCount") @Mapping(source = "downEventsCount", target = "downCount") - @Mapping(target = "date", expression = "java(Date.from(response.getTime()))") + @Mapping(source = "time", target = "date") public abstract PortHistoryResponse map(PortHistoryPayload response); @Mapping(source = "ofDescriptionManufacturer", target = "manufacturer") @@ -196,9 +179,8 @@ public SwitchChangeType convertStatus(SwitchStatus status) { @Mapping(source = "status", target = "state") @Mapping(source = "socketAddress.address", target = "address") @Mapping(source = "socketAddress.port", target = "port") - @Mapping(target = "location", expression = "java(new SwitchLocationDtoV2(" - + "data.getLatitude(), data.getLongitude(), data.getStreet(), data.getCity(), data.getCountry()))") - public abstract SwitchDtoV2 map(Switch data); + @Mapping(target = "location", source = "input") + public abstract SwitchDtoV2 map(Switch input); public abstract SwitchPatch map(SwitchPatchDto data); diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/TimeMapper.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/TimeMapper.java new file mode 100644 index 00000000000..e931423b77b --- /dev/null +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/TimeMapper.java @@ -0,0 +1,84 @@ +/* Copyright 2021 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.northbound.converter; + +import org.mapstruct.Mapper; +import org.mapstruct.Named; + +import java.time.Duration; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.TimeUnit; + +@Mapper(componentModel = "spring") +public abstract class TimeMapper { + /** + * Convert {@link String} to {@link Instant}. + */ + public Instant mapInstant(String value) { + if (value == null) { + return null; + } + + return Instant.parse(value); + } + + /** + * Format {@link Instant} into NB API representation. + */ + public String mapInstant(Instant timestamp) { + if (timestamp == null) { + return null; + } + return DateTimeFormatter.ISO_INSTANT.format(timestamp); + } + + /** + * Convert milliseconds into nanoseconds. + */ + @Named("timeMillisToNanos") + public Long timeMillisToNanos(Long millis) { + if (millis == null) { + return null; + } + return TimeUnit.MILLISECONDS.toNanos(millis); + } + + /** + * Convert nanoseconds into milliseconds. + */ + @Named("timeNanosToMillis") + public Long timeNanosToMillis(Long nanos) { + if (nanos == null) { + return null; + } + return TimeUnit.NANOSECONDS.toMillis(nanos); + } + + public Duration mapMillis(long millis) { + return Duration.ofMillis(millis); + } + + /** + * Convert {@link Duration} into {@link long}. + */ + public long mapMillis(Duration millis) { + if (millis == null) { + return 0; + } + return millis.toMillis(); + } +} diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/YFlowMapper.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/YFlowMapper.java index a02abb477ec..4cf1b32b680 100644 --- a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/YFlowMapper.java +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/YFlowMapper.java @@ -28,6 +28,7 @@ import org.openkilda.messaging.command.yflow.YFlowValidationResponse; import org.openkilda.messaging.info.event.PathInfoData; import org.openkilda.messaging.info.event.PathNode; +import org.openkilda.messaging.info.flow.YFlowPingResponse; import org.openkilda.messaging.payload.flow.FlowEndpointPayload; import org.openkilda.model.FlowEndpoint; import org.openkilda.northbound.dto.v2.flows.FlowEndpointV2; @@ -42,6 +43,7 @@ import org.openkilda.northbound.dto.v2.yflows.YFlowPatchPayload; import org.openkilda.northbound.dto.v2.yflows.YFlowPath; import org.openkilda.northbound.dto.v2.yflows.YFlowPaths; +import org.openkilda.northbound.dto.v2.yflows.YFlowPingResult; import org.openkilda.northbound.dto.v2.yflows.YFlowRerouteResult; import org.openkilda.northbound.dto.v2.yflows.YFlowSharedEndpoint; import org.openkilda.northbound.dto.v2.yflows.YFlowSharedEndpointEncapsulation; @@ -56,7 +58,7 @@ @Mapper(componentModel = "spring", uses = {FlowEncapsulationTypeMapper.class, FlowStatusMapper.class, PathComputationStrategyMapper.class, - InstantMapper.class}, + TimeMapper.class, PingMapper.class}, imports = {FlowEndpointPayload.class, FlowEndpointV2.class}) public abstract class YFlowMapper { @@ -138,4 +140,7 @@ public YFlowPath toYFlowPath(PathInfoData path) { public abstract YFlowValidationResult toValidationResult(YFlowValidationResponse source); public abstract YFlowSyncResult toSyncResult(YFlowRerouteResponse source); + + @Mapping(target = "yFlowId", source = "YFlowId") + public abstract YFlowPingResult toPingResult(YFlowPingResponse source); } diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/service/YFlowService.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/service/YFlowService.java index 44775fe15bc..e00b3159ca5 100644 --- a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/service/YFlowService.java +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/service/YFlowService.java @@ -21,6 +21,8 @@ import org.openkilda.northbound.dto.v2.yflows.YFlowDump; import org.openkilda.northbound.dto.v2.yflows.YFlowPatchPayload; import org.openkilda.northbound.dto.v2.yflows.YFlowPaths; +import org.openkilda.northbound.dto.v2.yflows.YFlowPingPayload; +import org.openkilda.northbound.dto.v2.yflows.YFlowPingResult; import org.openkilda.northbound.dto.v2.yflows.YFlowRerouteResult; import org.openkilda.northbound.dto.v2.yflows.YFlowSyncResult; import org.openkilda.northbound.dto.v2.yflows.YFlowUpdatePayload; @@ -53,4 +55,6 @@ public interface YFlowService { CompletableFuture validateYFlow(String yFlowId); CompletableFuture synchronizeYFlow(String yFlowId); + + CompletableFuture pingYFlow(String yFlowId, YFlowPingPayload payload); } diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/service/impl/YFlowServiceImpl.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/service/impl/YFlowServiceImpl.java index 21aa6bf96fc..163c9325243 100644 --- a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/service/impl/YFlowServiceImpl.java +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/service/impl/YFlowServiceImpl.java @@ -16,6 +16,7 @@ package org.openkilda.northbound.service.impl; import org.openkilda.messaging.command.CommandMessage; +import org.openkilda.messaging.command.flow.YFlowPingRequest; import org.openkilda.messaging.command.yflow.SubFlowsReadRequest; import org.openkilda.messaging.command.yflow.SubFlowsResponse; import org.openkilda.messaging.command.yflow.YFlowDeleteRequest; @@ -33,6 +34,7 @@ import org.openkilda.messaging.command.yflow.YFlowsDumpRequest; import org.openkilda.messaging.error.ErrorType; import org.openkilda.messaging.error.MessageException; +import org.openkilda.messaging.info.flow.YFlowPingResponse; import org.openkilda.northbound.converter.YFlowMapper; import org.openkilda.northbound.dto.v2.yflows.SubFlowsDump; import org.openkilda.northbound.dto.v2.yflows.YFlow; @@ -40,6 +42,8 @@ import org.openkilda.northbound.dto.v2.yflows.YFlowDump; import org.openkilda.northbound.dto.v2.yflows.YFlowPatchPayload; import org.openkilda.northbound.dto.v2.yflows.YFlowPaths; +import org.openkilda.northbound.dto.v2.yflows.YFlowPingPayload; +import org.openkilda.northbound.dto.v2.yflows.YFlowPingResult; import org.openkilda.northbound.dto.v2.yflows.YFlowRerouteResult; import org.openkilda.northbound.dto.v2.yflows.YFlowSyncResult; import org.openkilda.northbound.dto.v2.yflows.YFlowUpdatePayload; @@ -68,6 +72,9 @@ public class YFlowServiceImpl implements YFlowService { @Value("#{kafkaTopicsConfig.getTopoRerouteTopic()}") private String rerouteTopic; + @Value("#{kafkaTopicsConfig.getPingTopic()}") + private String pingTopic; + @Autowired private MessagingChannel messagingChannel; @@ -224,4 +231,14 @@ public CompletableFuture synchronizeYFlow(String yFlowId) { .thenApply(YFlowRerouteResponse.class::cast) .thenApply(flowMapper::toSyncResult); } + + @Override + public CompletableFuture pingYFlow(String yFlowId, YFlowPingPayload payload) { + log.debug("Processing y-flow ping: {} {}", yFlowId, payload); + CommandMessage command = new CommandMessage(new YFlowPingRequest(yFlowId, payload.getTimeoutMillis()), + System.currentTimeMillis(), RequestCorrelationId.getId()); + return messagingChannel.sendAndGet(pingTopic, command) + .thenApply(YFlowPingResponse.class::cast) + .thenApply(flowMapper::toPingResult); + } } diff --git a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/FlowMapperTest.java b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/FlowMapperTest.java index 43dfc3ddce1..131e70ebaa7 100644 --- a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/FlowMapperTest.java +++ b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/FlowMapperTest.java @@ -18,12 +18,18 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.openkilda.northbound.converter.PingMapper.TIMEOUT_ERROR_MESSAGE; import org.openkilda.messaging.command.flow.FlowMirrorPointCreateRequest; import org.openkilda.messaging.command.flow.FlowRequest; import org.openkilda.messaging.command.flow.FlowRequest.Type; import org.openkilda.messaging.info.flow.FlowMirrorPointResponse; +import org.openkilda.messaging.info.flow.FlowPingResponse; +import org.openkilda.messaging.info.flow.UniFlowPingResponse; import org.openkilda.messaging.model.FlowPatch; +import org.openkilda.messaging.model.Ping.Errors; +import org.openkilda.messaging.model.PingMeters; import org.openkilda.messaging.nbtopology.response.FlowMirrorPointsDumpResponse; import org.openkilda.messaging.nbtopology.response.FlowMirrorPointsDumpResponse.FlowMirrorPoint; import org.openkilda.messaging.payload.flow.DetectConnectedDevicesPayload; @@ -35,6 +41,7 @@ import org.openkilda.model.FlowEndpoint; import org.openkilda.model.SwitchId; import org.openkilda.northbound.dto.v1.flows.FlowPatchDto; +import org.openkilda.northbound.dto.v1.flows.PingOutput; import org.openkilda.northbound.dto.v2.flows.DetectConnectedDevicesV2; import org.openkilda.northbound.dto.v2.flows.FlowEndpointV2; import org.openkilda.northbound.dto.v2.flows.FlowMirrorPointPayload; @@ -49,7 +56,7 @@ import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; import org.springframework.test.context.junit4.SpringRunner; import java.util.List; @@ -108,6 +115,7 @@ public class FlowMapperTest { private static final String MIRROR_POINT_DIRECTION_B = "reverse"; private static final long MS_TO_NS_MULTIPLIER = 1000000L; + public static final String ERROR_MESSAGE = "Error"; @Autowired private FlowMapper flowMapper; @@ -386,31 +394,28 @@ public void testFlowMirrorPointsDumpResponseToFlowMirrorPointsResponseV2() { assertEquals(secondPoint.getSinkEndpoint().getInnerVlanId(), secondPayload.getSinkEndpoint().getInnerVlanId()); } + @Test + public void testPingOutput() { + FlowPingResponse response = new FlowPingResponse( + FLOW_ID, new UniFlowPingResponse(false, Errors.TIMEOUT, null, null), + new UniFlowPingResponse(true, null, new PingMeters(1, 2, 3), null), ERROR_MESSAGE); + PingOutput output = flowMapper.toPingOutput(response); + + assertEquals(response.getFlowId(), output.getFlowId()); + assertEquals(response.getError(), output.getError()); + + assertEquals(response.getForward().isPingSuccess(), output.getForward().isPingSuccess()); + assertEquals(0, output.getForward().getLatency()); + assertEquals(TIMEOUT_ERROR_MESSAGE, output.getForward().getError()); + + assertEquals(response.getReverse().isPingSuccess(), output.getReverse().isPingSuccess()); + assertEquals(1, output.getReverse().getLatency()); + assertNull(output.getReverse().getError()); + } + @TestConfiguration + @ComponentScan({"org.openkilda.northbound.converter"}) static class Config { - @Bean - public InstantMapper instantMapper() { - return new InstantMapperImpl(); - } - - @Bean - public PathComputationStrategyMapper pathComputationStrategyMapper() { - return new PathComputationStrategyMapperImpl(); - } - - @Bean - public FlowStatusMapper flowStatusMapper() { - return new FlowStatusMapperImpl(); - } - - @Bean - public FlowEncapsulationTypeMapper flowEncapsulationTypeMapper() { - return new FlowEncapsulationTypeMapperImpl(); - } - - @Bean - public FlowMapper flowMapper() { - return new FlowMapperImpl(); - } + // nothing to define here } } diff --git a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/LagPortMapperTest.java b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/LagPortMapperTest.java index 827cae7a8fc..1a3b636c48b 100644 --- a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/LagPortMapperTest.java +++ b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/LagPortMapperTest.java @@ -22,14 +22,20 @@ import com.google.common.collect.Lists; import org.junit.Test; -import org.mapstruct.factory.Mappers; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.junit4.SpringRunner; +@RunWith(SpringRunner.class) public class LagPortMapperTest { public static final int LOGICAL_PORT_NUMBER_1 = 2021; public static final int PHYSICAL_PORT_NUMBER_1 = 1; public static final int PHYSICAL_PORT_NUMBER_2 = 2; - private final LagPortMapper lagMapper = Mappers.getMapper(LagPortMapper.class); + @Autowired + private LagPortMapper lagMapper; @Test public void mapLagPortDtoTest() { @@ -52,4 +58,10 @@ public void mapLagResponseTest() { assertEquals(PHYSICAL_PORT_NUMBER_1, dto.getPortNumbers().get(0).intValue()); assertEquals(PHYSICAL_PORT_NUMBER_2, dto.getPortNumbers().get(1).intValue()); } + + @TestConfiguration + @ComponentScan({"org.openkilda.northbound.converter"}) + static class Config { + // nothing to define here + } } diff --git a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/LinkMapperTest.java b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/LinkMapperTest.java index c916194c4f6..620771f3f15 100644 --- a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/LinkMapperTest.java +++ b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/LinkMapperTest.java @@ -28,8 +28,13 @@ import org.openkilda.northbound.dto.v1.links.PathDto; import org.junit.Test; -import org.mapstruct.factory.Mappers; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.junit4.SpringRunner; +@RunWith(SpringRunner.class) public class LinkMapperTest { private static final SwitchId SWITCH_ID_1 = new SwitchId(1); private static final SwitchId SWITCH_ID_2 = new SwitchId(2); @@ -46,7 +51,8 @@ public class LinkMapperTest { public static final String BFD_SESSION_STATUS = "UP"; public static final long PACKET_ID = 10L; - private final LinkMapper linkMapper = Mappers.getMapper(LinkMapper.class); + @Autowired + private LinkMapper linkMapper; @Test public void testMapResponse() { @@ -96,4 +102,10 @@ private void assertPathNode(PathNode expected, PathDto actual) { assertEquals(expected.getSwitchId().toString(), actual.getSwitchId()); assertEquals(expected.getPortNo(), actual.getPortNo()); } + + @TestConfiguration + @ComponentScan({"org.openkilda.northbound.converter"}) + static class Config { + // nothing to define here + } } diff --git a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/SwitchMapperTest.java b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/SwitchMapperTest.java index a803995dac6..61db5ef2a16 100644 --- a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/SwitchMapperTest.java +++ b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/SwitchMapperTest.java @@ -38,10 +38,15 @@ import org.openkilda.northbound.dto.v2.switches.SwitchPatchDto; import org.junit.Test; -import org.mapstruct.factory.Mappers; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.junit4.SpringRunner; import java.util.Arrays; +@RunWith(SpringRunner.class) public class SwitchMapperTest { public static final int LOGICAL_PORT_NUMBER_1 = 1; public static final int LOGICAL_PORT_NUMBER_2 = 2; @@ -53,7 +58,9 @@ public class SwitchMapperTest { public static final int PHYSICAL_PORT_4 = 4; public static final int PHYSICAL_PORT_5 = 5; public static final int PHYSICAL_PORT_6 = 6; - private SwitchMapper switchMapper = Mappers.getMapper(SwitchMapper.class); + + @Autowired + private SwitchMapper switchMapper; @Test public void testSwitchPropertiesDto() { @@ -234,4 +241,10 @@ private Switch getSwitch() { .country("France") .build(); } + + @TestConfiguration + @ComponentScan({"org.openkilda.northbound.converter"}) + static class Config { + // nothing to define here + } } diff --git a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/YFlowMapperTest.java b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/YFlowMapperTest.java new file mode 100644 index 00000000000..d877ba08a14 --- /dev/null +++ b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/converter/YFlowMapperTest.java @@ -0,0 +1,95 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.northbound.converter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.openkilda.messaging.info.flow.SubFlowPingPayload; +import org.openkilda.messaging.info.flow.UniSubFlowPingPayload; +import org.openkilda.messaging.info.flow.YFlowPingResponse; +import org.openkilda.messaging.model.Ping.Errors; +import org.openkilda.northbound.dto.v2.yflows.YFlowPingResult; + +import com.google.common.collect.Lists; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +public class YFlowMapperTest { + public static final String ERROR_MESSAGE = "Error"; + public static final String Y_FLOW_ID = "y_flow_id"; + public static final String SUB_FLOW_1 = "flow_1"; + public static final String SUB_FLOW_2 = "flow_2"; + + @Autowired + private YFlowMapper mapper; + + @Test + public void pingResultTest() { + YFlowPingResponse response = new YFlowPingResponse(Y_FLOW_ID, false, ERROR_MESSAGE, Lists.newArrayList( + new SubFlowPingPayload(SUB_FLOW_1, + new UniSubFlowPingPayload(true, null, 1), + new UniSubFlowPingPayload(false, Errors.TIMEOUT, 2)), + new SubFlowPingPayload(SUB_FLOW_2, + new UniSubFlowPingPayload(false, Errors.DEST_NOT_AVAILABLE, 3), + new UniSubFlowPingPayload(true, null, 4)))); + + YFlowPingResult result = mapper.toPingResult(response); + assertEquals(response.getYFlowId(), result.getYFlowId()); + assertEquals(response.isPingSuccess(), result.isPingSuccess()); + assertEquals(response.getError(), result.getError()); + assertEquals(response.getSubFlows().size(), result.getSubFlows().size()); + assertSubFlowPingPayload(response.getSubFlows().get(0), result.getSubFlows().get(0)); + } + + private void assertSubFlowPingPayload( + SubFlowPingPayload expected, org.openkilda.northbound.dto.v2.yflows.SubFlowPingPayload actual) { + assertEquals(expected.getFlowId(), actual.getFlowId()); + assertUniSubFlowPingPayload(expected.getForward(), actual.getForward()); + assertUniSubFlowPingPayload(expected.getReverse(), actual.getReverse()); + } + + private void assertUniSubFlowPingPayload( + UniSubFlowPingPayload expected, org.openkilda.northbound.dto.v2.yflows.UniSubFlowPingPayload actual) { + assertEquals(expected.isPingSuccess(), actual.isPingSuccess()); + assertEquals(expected.getLatency(), actual.getLatency()); + assertError(expected, actual); + } + + private void assertError( + UniSubFlowPingPayload expected, org.openkilda.northbound.dto.v2.yflows.UniSubFlowPingPayload actual) { + if (expected.getError() == null) { + assertNull(actual.getError()); + } else if (Errors.TIMEOUT.equals(expected.getError())) { + assertEquals(PingMapper.TIMEOUT_ERROR_MESSAGE, actual.getError()); + } else if (Errors.DEST_NOT_AVAILABLE.equals(expected.getError())) { + assertEquals(PingMapper.ENDPOINT_NOT_AVAILABLE_ERROR_MESSAGE, actual.getError()); + } else { + throw new AssertionError(String.format("Unknown error type: %s", expected.getError())); + } + } + + @TestConfiguration + @ComponentScan({"org.openkilda.northbound.converter"}) + static class Config { + // nothing to define here + } +} diff --git a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/service/SwitchServiceTest.java b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/service/SwitchServiceTest.java index be42cacbf9c..0f0fdf2b856 100644 --- a/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/service/SwitchServiceTest.java +++ b/src-java/northbound-service/northbound/src/test/java/org/openkilda/northbound/service/SwitchServiceTest.java @@ -29,24 +29,6 @@ import org.openkilda.model.SwitchId; import org.openkilda.northbound.MessageExchanger; import org.openkilda.northbound.config.KafkaConfig; -import org.openkilda.northbound.converter.ConnectedDeviceMapper; -import org.openkilda.northbound.converter.ConnectedDeviceMapperImpl; -import org.openkilda.northbound.converter.FlowEncapsulationTypeMapper; -import org.openkilda.northbound.converter.FlowEncapsulationTypeMapperImpl; -import org.openkilda.northbound.converter.FlowMapper; -import org.openkilda.northbound.converter.FlowMapperImpl; -import org.openkilda.northbound.converter.FlowStatusMapper; -import org.openkilda.northbound.converter.FlowStatusMapperImpl; -import org.openkilda.northbound.converter.InstantMapper; -import org.openkilda.northbound.converter.InstantMapperImpl; -import org.openkilda.northbound.converter.LagPortMapper; -import org.openkilda.northbound.converter.LagPortMapperImpl; -import org.openkilda.northbound.converter.PathComputationStrategyMapper; -import org.openkilda.northbound.converter.PathComputationStrategyMapperImpl; -import org.openkilda.northbound.converter.PortPropertiesMapper; -import org.openkilda.northbound.converter.PortPropertiesMapperImpl; -import org.openkilda.northbound.converter.SwitchMapper; -import org.openkilda.northbound.converter.SwitchMapperImpl; import org.openkilda.northbound.dto.v1.switches.RulesSyncDto; import org.openkilda.northbound.dto.v1.switches.RulesSyncResult; import org.openkilda.northbound.dto.v1.switches.SwitchSyncResult; @@ -60,6 +42,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.PropertySource; import org.springframework.test.context.junit4.SpringRunner; @@ -147,6 +130,7 @@ private MeterInfoEntry getMeterInfo(Long cookie) { @TestConfiguration @Import(KafkaConfig.class) + @ComponentScan({"org.openkilda.northbound.converter"}) @PropertySource({"classpath:northbound.properties"}) static class Config { @Bean @@ -158,51 +142,5 @@ public MessagingChannel messagingChannel() { public SwitchService switchService(MessagingChannel messagingChannel) { return new SwitchServiceImpl(messagingChannel); } - - @Bean - public SwitchMapper switchMapper() { - return new SwitchMapperImpl(); - } - - @Bean - public ConnectedDeviceMapper connectedDeviceMapper() { - return new ConnectedDeviceMapperImpl(); - } - - @Bean - public PortPropertiesMapper portPropertiesMapper() { - return new PortPropertiesMapperImpl(); - } - - @Bean - public InstantMapper instantMapper() { - return new InstantMapperImpl(); - } - - @Bean - public PathComputationStrategyMapper pathComputationStrategyMapper() { - return new PathComputationStrategyMapperImpl(); - } - - @Bean - public FlowStatusMapper flowStatusMapper() { - return new FlowStatusMapperImpl(); - } - - @Bean - public FlowEncapsulationTypeMapper flowEncapsulationTypeMapper() { - return new FlowEncapsulationTypeMapperImpl(); - } - - @Bean - public FlowMapper flowMapper() { - return new FlowMapperImpl(); - } - - @Bean - public LagPortMapper lagMapper() { - return new LagPortMapperImpl(); - } } - } diff --git a/src-java/ping-topology/ping-messaging/src/main/java/org/openkilda/messaging/command/flow/YFlowPingRequest.java b/src-java/ping-topology/ping-messaging/src/main/java/org/openkilda/messaging/command/flow/YFlowPingRequest.java new file mode 100644 index 00000000000..b3413d4ccfe --- /dev/null +++ b/src-java/ping-topology/ping-messaging/src/main/java/org/openkilda/messaging/command/flow/YFlowPingRequest.java @@ -0,0 +1,37 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.messaging.command.flow; + +import org.openkilda.messaging.command.CommandData; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.PropertyNamingStrategy.SnakeCaseStrategy; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@Builder +@AllArgsConstructor +@EqualsAndHashCode(callSuper = false) +@JsonNaming(SnakeCaseStrategy.class) +public class YFlowPingRequest extends CommandData { + @JsonProperty("y_flow_id") + String yFlowId; + long timeout; +} diff --git a/src-java/ping-topology/ping-messaging/src/main/java/org/openkilda/messaging/info/flow/SubFlowPingPayload.java b/src-java/ping-topology/ping-messaging/src/main/java/org/openkilda/messaging/info/flow/SubFlowPingPayload.java new file mode 100644 index 00000000000..74ffea65fb6 --- /dev/null +++ b/src-java/ping-topology/ping-messaging/src/main/java/org/openkilda/messaging/info/flow/SubFlowPingPayload.java @@ -0,0 +1,32 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.messaging.info.flow; + +import com.fasterxml.jackson.databind.PropertyNamingStrategy.SnakeCaseStrategy; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +@AllArgsConstructor +@JsonNaming(value = SnakeCaseStrategy.class) +public class SubFlowPingPayload { + String flowId; + UniSubFlowPingPayload forward; + UniSubFlowPingPayload reverse; +} diff --git a/src-java/ping-topology/ping-messaging/src/main/java/org/openkilda/messaging/info/flow/UniSubFlowPingPayload.java b/src-java/ping-topology/ping-messaging/src/main/java/org/openkilda/messaging/info/flow/UniSubFlowPingPayload.java new file mode 100644 index 00000000000..5424f6a911b --- /dev/null +++ b/src-java/ping-topology/ping-messaging/src/main/java/org/openkilda/messaging/info/flow/UniSubFlowPingPayload.java @@ -0,0 +1,38 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.messaging.info.flow; + +import org.openkilda.messaging.model.Ping.Errors; + +import com.fasterxml.jackson.databind.PropertyNamingStrategy.SnakeCaseStrategy; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +@AllArgsConstructor +@JsonNaming(value = SnakeCaseStrategy.class) +public class UniSubFlowPingPayload { + private boolean pingSuccess; + private Errors error; + private long latency; + + public UniSubFlowPingPayload(Errors error, long latency) { + this(error == null, error, latency); + } +} diff --git a/src-java/ping-topology/ping-messaging/src/main/java/org/openkilda/messaging/info/flow/YFlowPingResponse.java b/src-java/ping-topology/ping-messaging/src/main/java/org/openkilda/messaging/info/flow/YFlowPingResponse.java new file mode 100644 index 00000000000..cfd584a71a9 --- /dev/null +++ b/src-java/ping-topology/ping-messaging/src/main/java/org/openkilda/messaging/info/flow/YFlowPingResponse.java @@ -0,0 +1,63 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.messaging.info.flow; + +import org.openkilda.messaging.info.InfoData; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.PropertyNamingStrategy.SnakeCaseStrategy; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.List; + +@Data +@Builder +@AllArgsConstructor +@EqualsAndHashCode(callSuper = false) +@JsonNaming(SnakeCaseStrategy.class) +public class YFlowPingResponse extends InfoData { + @JsonProperty("y_flow_id") + String yFlowId; + boolean pingSuccess; + String error; + List subFlows; + + public YFlowPingResponse(String yFlowId, String error, List subFlows) { + this.yFlowId = yFlowId; + this.error = error; + this.subFlows = subFlows; + this.pingSuccess = checkPingSuccess(error, subFlows); + } + + @JsonProperty + private boolean checkPingSuccess(String error, List subFlows) { + if (error != null) { + return false; + } + if (subFlows != null) { + for (SubFlowPingPayload subFlow : subFlows) { + if (!subFlow.getForward().isPingSuccess() || !subFlow.getReverse().isPingSuccess()) { + return false; + } + } + } + return true; + } +} diff --git a/src-java/ping-topology/ping-messaging/src/test/java/org/openkilda/messaging/command/flow/YFlowPingRequestTest.java b/src-java/ping-topology/ping-messaging/src/test/java/org/openkilda/messaging/command/flow/YFlowPingRequestTest.java new file mode 100644 index 00000000000..860cb38d4ed --- /dev/null +++ b/src-java/ping-topology/ping-messaging/src/test/java/org/openkilda/messaging/command/flow/YFlowPingRequestTest.java @@ -0,0 +1,40 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.messaging.command.flow; + +import static org.junit.Assert.assertEquals; + +import org.openkilda.messaging.StringSerializer; +import org.openkilda.messaging.command.CommandData; +import org.openkilda.messaging.command.CommandMessage; + +import org.junit.Test; + +public class YFlowPingRequestTest { + StringSerializer serializer = new StringSerializer(); + + @Test + public void serializeLoop() throws Exception { + YFlowPingRequest origin = new YFlowPingRequest("yFlowId", 5000); + CommandMessage wrapper = new CommandMessage(origin, System.currentTimeMillis(), getClass().getSimpleName()); + + serializer.serialize(wrapper); + CommandMessage decodedWrapper = (CommandMessage) serializer.deserialize(); + CommandData decoded = decodedWrapper.getData(); + + assertEquals(origin, decoded); + } +} diff --git a/src-java/ping-topology/ping-messaging/src/test/java/org/openkilda/messaging/info/flow/YFlowPingResponseTest.java b/src-java/ping-topology/ping-messaging/src/test/java/org/openkilda/messaging/info/flow/YFlowPingResponseTest.java new file mode 100644 index 00000000000..dd085eca08d --- /dev/null +++ b/src-java/ping-topology/ping-messaging/src/test/java/org/openkilda/messaging/info/flow/YFlowPingResponseTest.java @@ -0,0 +1,53 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.messaging.info.flow; + +import static org.junit.Assert.assertEquals; + +import org.openkilda.messaging.StringSerializer; +import org.openkilda.messaging.info.InfoData; +import org.openkilda.messaging.info.InfoMessage; +import org.openkilda.messaging.model.Ping.Errors; + +import com.google.common.collect.Lists; +import org.junit.Test; + +public class YFlowPingResponseTest { + private static final String Y_FLOW_ID = "y_flow_id1"; + private static final String ERROR_MESSAGE = "ERROR"; + private static final String SUB_FLOW_1 = "flow_1"; + private static final String SUB_FLOW_2 = "flow_2"; + StringSerializer serializer = new StringSerializer(); + + @Test + public void serializeLoop() throws Exception { + YFlowPingResponse origin = new YFlowPingResponse(Y_FLOW_ID, false, ERROR_MESSAGE, Lists.newArrayList( + new SubFlowPingPayload(SUB_FLOW_1, + new UniSubFlowPingPayload(true, null, 1), + new UniSubFlowPingPayload(false, Errors.TIMEOUT, 2)), + new SubFlowPingPayload(SUB_FLOW_2, + new UniSubFlowPingPayload(false, Errors.DEST_NOT_AVAILABLE, 3), + new UniSubFlowPingPayload(true, null, 4)))); + + InfoMessage wrapper = new InfoMessage(origin, System.currentTimeMillis(), getClass().getSimpleName()); + + serializer.serialize(wrapper); + InfoMessage decodedWrapper = (InfoMessage) serializer.deserialize(); + InfoData decoded = decodedWrapper.getData(); + + assertEquals(origin, decoded); + } +} diff --git a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/PingTopology.java b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/PingTopology.java index 18e7cbe9c41..d8c42e28c58 100644 --- a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/PingTopology.java +++ b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/PingTopology.java @@ -146,6 +146,7 @@ private void flowFetcher(TopologyBuilder topology) { // NOTE(tdurakov): global grouping is responsible for proper handling parallelism of 2 .globalGrouping(TickDeduplicator.BOLT_ID, TickDeduplicator.STREAM_PING_ID) .shuffleGrouping(InputRouter.BOLT_ID, InputRouter.STREAM_ON_DEMAND_REQUEST_ID) + .shuffleGrouping(InputRouter.BOLT_ID, InputRouter.STREAM_ON_DEMAND_Y_FLOW_REQUEST_ID) .allGrouping(InputRouter.BOLT_ID, InputRouter.STREAM_PERIODIC_PING_UPDATE_REQUEST_ID); } @@ -269,6 +270,7 @@ private void northboundEncoder(TopologyBuilder topology) { NorthboundEncoder bolt = new NorthboundEncoder(); declareBolt(topology, bolt, NorthboundEncoder.BOLT_ID) .shuffleGrouping(FlowFetcher.BOLT_ID, FlowFetcher.STREAM_ON_DEMAND_RESPONSE_ID) + .shuffleGrouping(FlowFetcher.BOLT_ID, FlowFetcher.STREAM_ON_DEMAND_Y_FLOW_RESPONSE_ID) .shuffleGrouping(OnDemandResultManager.BOLT_ID); KafkaBolt output = buildKafkaBolt(topologyConfig.getKafkaNorthboundTopic()); diff --git a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/FlowFetcher.java b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/FlowFetcher.java index f965d9cd98b..0c20e574959 100644 --- a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/FlowFetcher.java +++ b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/FlowFetcher.java @@ -15,19 +15,27 @@ package org.openkilda.wfm.topology.ping.bolt; +import static java.lang.String.format; + import org.openkilda.messaging.Utils; import org.openkilda.messaging.command.flow.FlowPingRequest; import org.openkilda.messaging.command.flow.PeriodicPingCommand; +import org.openkilda.messaging.command.flow.YFlowPingRequest; import org.openkilda.messaging.info.flow.FlowPingResponse; +import org.openkilda.messaging.info.flow.YFlowPingResponse; import org.openkilda.model.Flow; import org.openkilda.model.FlowTransitEncapsulation; +import org.openkilda.model.YFlow; +import org.openkilda.model.YSubFlow; import org.openkilda.persistence.PersistenceManager; import org.openkilda.persistence.context.PersistenceContextRequired; import org.openkilda.persistence.repositories.FlowRepository; +import org.openkilda.persistence.repositories.YFlowRepository; import org.openkilda.wfm.CommandContext; import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; import org.openkilda.wfm.share.flow.resources.FlowResourcesManager; +import org.openkilda.wfm.topology.ping.model.GroupId; import org.openkilda.wfm.topology.ping.model.PingContext; import org.openkilda.wfm.topology.ping.model.PingContext.Kinds; @@ -40,6 +48,8 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -61,10 +71,13 @@ public class FlowFetcher extends Abstract { public static final Fields STREAM_ON_DEMAND_RESPONSE_FIELDS = new Fields( FIELD_ID_ON_DEMAND_RESPONSE, FIELD_ID_CONTEXT); public static final String STREAM_ON_DEMAND_RESPONSE_ID = "on_demand_response"; + public static final String STREAM_ON_DEMAND_Y_FLOW_RESPONSE_ID = "on_demand_y_flow_response"; + public static final int DIRECTION_COUNT_PER_FLOW = 2; // forward and reverse private final FlowResourcesConfig flowResourcesConfig; private transient FlowResourcesManager flowResourcesManager; private transient FlowRepository flowRepository; + private transient YFlowRepository yFlowRepository; private Set flowsSet; private long periodicPingCacheExpiryInterval; private long lastPeriodicPingCacheRefresh; @@ -86,6 +99,8 @@ protected void handleInput(Tuple input) throws Exception { String sourceStream = input.getSourceStreamId(); if (InputRouter.STREAM_ON_DEMAND_REQUEST_ID.equals(sourceStream)) { handleOnDemandRequest(input); + } else if (InputRouter.STREAM_ON_DEMAND_Y_FLOW_REQUEST_ID.equals(sourceStream)) { + handleOnDemandYFlowRequest(input); } else if (InputRouter.STREAM_PERIODIC_PING_UPDATE_REQUEST_ID.equals(sourceStream)) { updatePeriodicPingHeap(input); } @@ -157,6 +172,7 @@ private void handleOnDemandRequest(Tuple input) throws PipelineException { Optional transitEncapsulation = getTransitEncapsulation(flow); if (transitEncapsulation.isPresent()) { PingContext pingContext = PingContext.builder() + .group(new GroupId(DIRECTION_COUNT_PER_FLOW)) .kind(Kinds.ON_DEMAND) .flow(flow) .transitEncapsulation(transitEncapsulation.get()) @@ -164,16 +180,65 @@ private void handleOnDemandRequest(Tuple input) throws PipelineException { .build(); emit(input, pingContext, pullContext(input)); } else { - emitOnDemandResponse(input, request, String.format( + emitOnDemandResponse(input, request, format( "Encapsulation resource not found for flow %s", request.getFlowId())); } } else { - emitOnDemandResponse(input, request, String.format( + emitOnDemandResponse(input, request, format( "Flow %s should not be one switch flow", request.getFlowId())); } } else { - emitOnDemandResponse(input, request, String.format( - "Flow %s does not exist", request.getFlowId())); + emitOnDemandResponse(input, request, format("Flow %s does not exist", request.getFlowId())); + } + } + + private void handleOnDemandYFlowRequest(Tuple input) throws PipelineException { + log.debug("Handle on demand ping request"); + YFlowPingRequest request = pullOnDemandYFlowRequest(input); + + Optional optionalYFlow = yFlowRepository.findById(request.getYFlowId()); + + if (!optionalYFlow.isPresent()) { + emitOnDemandYFlowResponse(input, request, format("YFlow %s does not exist", request.getYFlowId())); + return; + } + + YFlow yFlow = optionalYFlow.get(); + Set subFlows = yFlow.getSubFlows(); + + if (subFlows.isEmpty()) { + emitOnDemandYFlowResponse(input, request, format("YFlow %s has no sub flows", request.getYFlowId())); + return; + } + + GroupId groupId = new GroupId(subFlows.size() * DIRECTION_COUNT_PER_FLOW); + List subFlowPingRequests = new ArrayList<>(); + + for (YSubFlow subFlow : subFlows) { + Flow flow = subFlow.getFlow(); + flow.getPaths(); // Load paths to use in PingProducer + flowRepository.detach(flow); + + Optional transitEncapsulation = getTransitEncapsulation(flow); + if (transitEncapsulation.isPresent()) { + subFlowPingRequests.add(PingContext.builder() + .group(groupId) + .kind(Kinds.ON_DEMAND_Y_FLOW) + .flow(flow) + .yFlowId(yFlow.getYFlowId()) + .transitEncapsulation(transitEncapsulation.get()) + .timeout(request.getTimeout()) + .build()); + } else { + emitOnDemandYFlowResponse(input, request, format( + "Encapsulation resource not found for sub flow %s of YFlow %s", + subFlow.getSubFlowId(), yFlow.getYFlowId())); + return; + } + } + CommandContext commandContext = pullContext(input); + for (PingContext pingContext : subFlowPingRequests) { + emit(input, pingContext, commandContext); } } @@ -187,7 +252,7 @@ private Optional getFlowWithTransitEncapsulation(F private Optional getTransitEncapsulation(Flow flow) { return flowResourcesManager.getEncapsulationResources(flow.getForwardPathId(), - flow.getReversePathId(), flow.getEncapsulationType()) + flow.getReversePathId(), flow.getEncapsulationType()) .map(resources -> new FlowTransitEncapsulation(resources.getTransitEncapsulationId(), resources.getEncapsulationType())); @@ -205,6 +270,12 @@ private void emitOnDemandResponse(Tuple input, FlowPingRequest request, String e getOutput().emit(STREAM_ON_DEMAND_RESPONSE_ID, input, output); } + private void emitOnDemandYFlowResponse(Tuple input, YFlowPingRequest request, String errorMessage) + throws PipelineException { + YFlowPingResponse response = new YFlowPingResponse(request.getYFlowId(), false, errorMessage, null); + getOutput().emit(STREAM_ON_DEMAND_Y_FLOW_RESPONSE_ID, input, new Values(response, pullContext(input))); + } + private void emitCacheExpire(Tuple input, CommandContext commandContext, Set flows) { OutputCollector collector = getOutput(); flowsSet.removeAll(flows); @@ -218,6 +289,10 @@ private FlowPingRequest pullOnDemandRequest(Tuple input) throws PipelineExceptio return pullValue(input, InputRouter.FIELD_ID_PING_REQUEST, FlowPingRequest.class); } + private YFlowPingRequest pullOnDemandYFlowRequest(Tuple input) throws PipelineException { + return pullValue(input, InputRouter.FIELD_ID_PING_REQUEST, YFlowPingRequest.class); + } + private PeriodicPingCommand pullPeriodicPingRequest(Tuple input) throws PipelineException { return pullValue(input, InputRouter.FIELD_ID_PING_REQUEST, PeriodicPingCommand.class); } @@ -227,6 +302,7 @@ public void declareOutputFields(OutputFieldsDeclarer outputManager) { outputManager.declare(STREAM_FIELDS); outputManager.declareStream(STREAM_EXPIRE_CACHE_ID, STREAM_EXPIRE_CACHE_FIELDS); outputManager.declareStream(STREAM_ON_DEMAND_RESPONSE_ID, STREAM_ON_DEMAND_RESPONSE_FIELDS); + outputManager.declareStream(STREAM_ON_DEMAND_Y_FLOW_RESPONSE_ID, STREAM_ON_DEMAND_RESPONSE_FIELDS); } @Override @@ -235,6 +311,7 @@ public void init() { super.init(); flowRepository = persistenceManager.getRepositoryFactory().createFlowRepository(); + yFlowRepository = persistenceManager.getRepositoryFactory().createYFlowRepository(); flowResourcesManager = new FlowResourcesManager(persistenceManager, flowResourcesConfig); try { refreshHeap(null, false); diff --git a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/GroupCollector.java b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/GroupCollector.java index 20bc5b81bd0..2413a3e2280 100644 --- a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/GroupCollector.java +++ b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/GroupCollector.java @@ -80,8 +80,12 @@ private void expire(Tuple input) { private void collect(Tuple input) throws Exception { CollectorDescriptor descriptor = saveCurrentRecord(input); if (descriptor.isCompleted()) { - Group group = descriptor.makeGroup(); - emitGroup(input, group); + try { + Group group = descriptor.makeGroup(); + emitGroup(input, group); + } catch (IllegalStateException e) { + throw new WorkflowException(this, input, e.toString()); + } } } diff --git a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/InputRouter.java b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/InputRouter.java index ebec4e96cac..5796d112378 100644 --- a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/InputRouter.java +++ b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/InputRouter.java @@ -22,6 +22,7 @@ import org.openkilda.messaging.command.CommandMessage; import org.openkilda.messaging.command.flow.FlowPingRequest; import org.openkilda.messaging.command.flow.PeriodicPingCommand; +import org.openkilda.messaging.command.flow.YFlowPingRequest; import org.openkilda.messaging.floodlight.response.PingResponse; import org.openkilda.messaging.info.InfoData; import org.openkilda.messaging.info.InfoMessage; @@ -47,6 +48,7 @@ public class InputRouter extends Abstract { public static final Fields STREAM_PING_REQUEST_FIELDS = new Fields( FIELD_ID_PING_REQUEST, FIELD_ID_CONTEXT); public static final String STREAM_ON_DEMAND_REQUEST_ID = "ping_request"; + public static final String STREAM_ON_DEMAND_Y_FLOW_REQUEST_ID = "ing_request_y_flow"; public static final String STREAM_PERIODIC_PING_UPDATE_REQUEST_ID = "periodic_ping_request"; public static final Fields STREAM_ZOOKEEPER_FIELDS = new Fields(ZooKeeperBolt.FIELD_ID_STATE, @@ -86,6 +88,8 @@ private void routeCommandMessage(Tuple input, CommandMessage message) throws Pip final CommandData data = message.getData(); if (data instanceof FlowPingRequest) { emit(input, new Values(data), STREAM_ON_DEMAND_REQUEST_ID); + } else if (data instanceof YFlowPingRequest) { + emit(input, new Values(data), STREAM_ON_DEMAND_Y_FLOW_REQUEST_ID); } else if (data instanceof PeriodicPingCommand) { emit(input, new Values(data), STREAM_PERIODIC_PING_UPDATE_REQUEST_ID); } else { @@ -106,6 +110,7 @@ private Message pullInput(Tuple input) throws PipelineException { public void declareOutputFields(OutputFieldsDeclarer outputManager) { outputManager.declareStream(STREAM_SPEAKER_PING_RESPONSE_ID, STREAM_SPEAKER_PING_RESPONSE_FIELDS); outputManager.declareStream(STREAM_ON_DEMAND_REQUEST_ID, STREAM_PING_REQUEST_FIELDS); + outputManager.declareStream(STREAM_ON_DEMAND_Y_FLOW_REQUEST_ID, STREAM_PING_REQUEST_FIELDS); outputManager.declareStream(STREAM_PERIODIC_PING_UPDATE_REQUEST_ID, STREAM_PING_REQUEST_FIELDS); outputManager.declareStream(STREAM_ZOOKEEPER, STREAM_ZOOKEEPER_FIELDS); } diff --git a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/OnDemandResultManager.java b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/OnDemandResultManager.java index 7bf17df98b4..8eb9dd5ca63 100644 --- a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/OnDemandResultManager.java +++ b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/OnDemandResultManager.java @@ -15,11 +15,21 @@ package org.openkilda.wfm.topology.ping.bolt; +import static java.lang.String.format; +import static org.openkilda.messaging.model.FlowDirection.FORWARD; +import static org.openkilda.messaging.model.FlowDirection.REVERSE; + +import org.openkilda.messaging.info.InfoData; import org.openkilda.messaging.info.flow.FlowPingResponse; import org.openkilda.messaging.info.flow.FlowPingResponse.FlowPingResponseBuilder; +import org.openkilda.messaging.info.flow.SubFlowPingPayload; import org.openkilda.messaging.info.flow.UniFlowPingResponse; +import org.openkilda.messaging.info.flow.UniSubFlowPingPayload; +import org.openkilda.messaging.info.flow.YFlowPingResponse; +import org.openkilda.messaging.model.FlowDirection; import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.topology.ping.model.Group; +import org.openkilda.wfm.topology.ping.model.Group.Type; import org.openkilda.wfm.topology.ping.model.PingContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -28,8 +38,12 @@ import org.apache.storm.tuple.Values; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; public class OnDemandResultManager extends ResultManager { public static final String BOLT_ID = ComponentId.ON_DEMAND_RESULT_MANAGER.toString(); @@ -45,12 +59,21 @@ protected void handle(Tuple input, PingContext pingContext) throws PipelineExcep @Override protected void handleGroup(Tuple input) throws PipelineException { final Group group = pullPingGroup(input); - FlowPingResponse response; + if (group.getType() == Type.FLOW) { + handleFlowResponse(input, group); + } else if (group.getType() == Type.Y_FLOW) { + handleYFlowResponse(input, group); + } else { + unhandledInput(input); + } + } + + private void handleFlowResponse(Tuple input, Group group) throws PipelineException { try { - response = collectResults(group); + FlowPingResponse response = collectResults(group); emit(input, response); } catch (IllegalArgumentException e) { - for (FlowPingResponse errorResponse : produceErrors(group, e.getMessage())) { + for (FlowPingResponse errorResponse : produceFlowErrors(group, e.getMessage())) { emit(input, errorResponse); } } @@ -71,14 +94,13 @@ private FlowPingResponse collectResults(Group group) { builder.reverse(makeResponse(entry)); break; default: - throw new IllegalArgumentException(String.format( - "Unsupported %s.%s value", + throw new IllegalArgumentException(format("Unsupported %s.%s value", entry.getDirection().getClass().getName(), entry.getDirection())); } } if (idSet.size() != 1) { - throw new IllegalArgumentException(String.format( + throw new IllegalArgumentException(format( "Expect exact one flow id in pings group response, got - \"%s\"", String.join("\", \"", idSet))); } builder.flowId(idSet.iterator().next()); @@ -86,7 +108,7 @@ private FlowPingResponse collectResults(Group group) { return builder.build(); } - private List produceErrors(Group group, String errorMessage) { + private List produceFlowErrors(Group group, String errorMessage) { HashSet seen = new HashSet<>(); ArrayList results = new ArrayList<>(); for (PingContext pingContext : group.getRecords()) { @@ -108,7 +130,78 @@ private UniFlowPingResponse makeResponse(PingContext pingContext) { return new UniFlowPingResponse(pingContext.getPing(), pingContext.getMeters(), pingContext.getError()); } - private void emit(Tuple input, FlowPingResponse response) throws PipelineException { + private void handleYFlowResponse(Tuple input, Group group) throws PipelineException { + try { + YFlowPingResponse response = buildYFlowPingResponse(group); + emit(input, response); + } catch (IllegalArgumentException e) { + String yFlowId = group.getRecords().stream().map(PingContext::getYFlowId).filter(Objects::nonNull) + .findFirst().orElse(null); + YFlowPingResponse errorResponse = new YFlowPingResponse(yFlowId, e.getMessage(), null); + emit(input, errorResponse); + } + } + + private YFlowPingResponse buildYFlowPingResponse(Group group) { + Map subFlowMap = new HashMap<>(); + Set yFlowIdSet = new HashSet<>(); + + for (PingContext record : group.getRecords()) { + if (record.getYFlowId() == null) { + throw new IllegalArgumentException(format("Ping report %s has no yFlowId", record)); + } + yFlowIdSet.add(record.getYFlowId()); + SubFlowPingPayload subFlow = subFlowMap.computeIfAbsent(record.getFlowId(), + mappingFunction -> new SubFlowPingPayload(record.getFlowId(), null, null)); + + switch (record.getDirection()) { + case FORWARD: + validateSubFlow(subFlow.getForward(), FORWARD, group, record, subFlow.getFlowId()); + subFlow.setForward(makeSubFlowPayload(record)); + break; + case REVERSE: + validateSubFlow(subFlow.getReverse(), REVERSE, group, record, subFlow.getFlowId()); + subFlow.setReverse(makeSubFlowPayload(record)); + break; + default: + throw new IllegalArgumentException(format("Unsupported %s.%s value", + record.getDirection().getClass().getName(), record.getDirection())); + } + } + + if (subFlowMap.size() * 2 != group.getRecords().size()) { + throw new IllegalArgumentException(format( + "Expect %d unique Flow IDs in ping group responses, but got responses about %d Flows: %s", + group.getRecords().size() / 2, subFlowMap.size(), subFlowMap.keySet())); + } + + if (yFlowIdSet.size() != 1) { + throw new IllegalArgumentException(format( + "Expect exact one Y Flow id in pings group response, got - %s", yFlowIdSet)); + } + + return new YFlowPingResponse(yFlowIdSet.iterator().next(), null, new ArrayList<>(subFlowMap.values())); + } + + private void validateSubFlow(UniSubFlowPingPayload existPayloadPayload, FlowDirection direction, + Group group, PingContext pingContext, String flowId) { + if (existPayloadPayload != null) { + throw new IllegalArgumentException( + format("Ping Group %s has 2 ping reports for %s direction of Flow %s. " + + "First report %s, Second report %s", group, direction, flowId, + existPayloadPayload, pingContext)); + } + } + + private UniSubFlowPingPayload makeSubFlowPayload(PingContext pingContext) { + long latency = 0; + if (pingContext.getMeters() != null) { + latency = pingContext.getMeters().getNetworkLatency(); + } + return new UniSubFlowPingPayload(pingContext.getError(), latency); + } + + private void emit(Tuple input, InfoData response) throws PipelineException { Values output = new Values(response, pullContext(input)); getOutput().emit(input, output); } diff --git a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/PingProducer.java b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/PingProducer.java index fbd997e0ad0..341a2ef275f 100644 --- a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/PingProducer.java +++ b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/PingProducer.java @@ -28,7 +28,6 @@ import org.openkilda.model.PathSegment; import org.openkilda.wfm.CommandContext; import org.openkilda.wfm.error.PipelineException; -import org.openkilda.wfm.topology.ping.model.GroupId; import org.openkilda.wfm.topology.ping.model.PingContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -48,17 +47,15 @@ protected void handleInput(Tuple input) throws Exception { PingContext pingContext = pullPingContext(input); // TODO(surabujin): add one switch flow filter - GroupId group = new GroupId(2); - emit(input, produce(pingContext, group, FlowDirection.FORWARD)); - emit(input, produce(pingContext, group, FlowDirection.REVERSE)); + emit(input, produce(pingContext, FlowDirection.FORWARD)); + emit(input, produce(pingContext, FlowDirection.REVERSE)); } - private PingContext produce(PingContext pingContext, GroupId group, FlowDirection direction) { + private PingContext produce(PingContext pingContext, FlowDirection direction) { Ping ping = buildPing(pingContext, direction); return pingContext.toBuilder() .ping(ping) .direction(direction) - .group(group) .build(); } diff --git a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/ResultDispatcher.java b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/ResultDispatcher.java index 8844359abcc..f866f3cffa8 100644 --- a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/ResultDispatcher.java +++ b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/ResultDispatcher.java @@ -41,21 +41,17 @@ protected void handleInput(Tuple input) throws Exception { } private String dispatch(PingContext pingContext) { - String value; final Kinds kind = pingContext.getKind(); switch (kind) { case PERIODIC: - value = STREAM_PERIODIC_ID; - break; + return STREAM_PERIODIC_ID; case ON_DEMAND: - value = STREAM_MANUAL_ID; - break; - + case ON_DEMAND_Y_FLOW: + return STREAM_MANUAL_ID; default: throw new IllegalArgumentException(String.format( "Unsupported value %s.%s", kind.getClass().getName(), kind)); } - return value; } @Override diff --git a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/model/CollectorDescriptor.java b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/model/CollectorDescriptor.java index dddb8650aac..d94ba52ea6e 100644 --- a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/model/CollectorDescriptor.java +++ b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/model/CollectorDescriptor.java @@ -15,6 +15,9 @@ package org.openkilda.wfm.topology.ping.model; +import org.openkilda.wfm.topology.ping.model.Group.Type; +import org.openkilda.wfm.topology.ping.model.PingContext.Kinds; + import lombok.Data; import lombok.EqualsAndHashCode; @@ -22,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; @Data @EqualsAndHashCode(callSuper = true) @@ -49,7 +53,26 @@ public int add(PingContext pingContext) { } public Group makeGroup() { - return new Group(getGroupId(), getRecords()); + return new Group(getGroupId(), getType(), getRecords()); + } + + private Group.Type getType() { + List kinds = records.stream().map(PingContext::getKind).distinct().collect(Collectors.toList()); + if (kinds.isEmpty()) { + return null; + } + if (kinds.size() > 1) { + throw new IllegalStateException(String.format("GroupId %s contains records about different ping types: %s. " + + "Full record objects: %s", groupId, kinds, records)); + } + switch (kinds.get(0)) { + case ON_DEMAND: + return Type.FLOW; + case ON_DEMAND_Y_FLOW: + return Type.Y_FLOW; + default: + return null; + } } public boolean isCompleted() { diff --git a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/model/Group.java b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/model/Group.java index dbbb224651d..46257f755fa 100644 --- a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/model/Group.java +++ b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/model/Group.java @@ -23,11 +23,18 @@ @Value public class Group implements Serializable { - private GroupId id; - private List records; + GroupId id; + Type type; + List records; - public Group(GroupId id, List records) { + public Group(GroupId id, Type type, List records) { this.id = id; + this.type = type; this.records = ImmutableList.copyOf(records); } + + public enum Type { + FLOW, + Y_FLOW + } } diff --git a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/model/PingContext.java b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/model/PingContext.java index f6cc34fc889..f3038e8b11a 100644 --- a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/model/PingContext.java +++ b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/model/PingContext.java @@ -35,12 +35,14 @@ public class PingContext implements Serializable { public enum Kinds { PERIODIC, - ON_DEMAND + ON_DEMAND, + ON_DEMAND_Y_FLOW } private Kinds kind; private GroupId group; + private String yFlowId; private Flow flow; private FlowTransitEncapsulation transitEncapsulation; private FlowDirection direction; diff --git a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/mappers/LogicalPortMapper.java b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/mappers/LogicalPortMapper.java index cc39f3b32b6..44869b08fa8 100644 --- a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/mappers/LogicalPortMapper.java +++ b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/mappers/LogicalPortMapper.java @@ -27,7 +27,7 @@ import java.util.Optional; -@Mapper(imports = {LogicalPortType.class}) +@Mapper public abstract class LogicalPortMapper { public static final LogicalPortMapper INSTANCE = Mappers.getMapper(LogicalPortMapper.class); @@ -36,7 +36,7 @@ public abstract class LogicalPortMapper { @Mapping(target = "actual", ignore = true) public abstract LogicalPortInfoEntry map(LogicalPort logicalPort); - @Mapping(target = "type", expression = "java(LogicalPortType.LAG)") + @Mapping(target = "type", constant = "LAG") @Mapping(target = "expected", ignore = true) @Mapping(target = "actual", ignore = true) public abstract LogicalPortInfoEntry map(LagLogicalPort logicalPort); diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/PartialUpdateSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/PartialUpdateSpec.groovy index cc258879ab7..0fd99297fa5 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/PartialUpdateSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/PartialUpdateSpec.groovy @@ -374,10 +374,11 @@ class PartialUpdateSpec extends HealthCheckSpecification { // } // } // } -// def dstSwitchesAreFine = true + def dstSwitchesAreFine = false cleanup: flow && flowHelperV2.deleteFlow(flow.flowId) + !dstSwitchesAreFine && [dstSwitch, newDstSwitch]*.dpId.each { northbound.synchronizeSwitch(it, true) } } @Tidy @@ -555,6 +556,9 @@ class PartialUpdateSpec extends HealthCheckSpecification { expect northboundV2.getFlow(flow.flowId), sameBeanAs(flowBeforeUpdate) .ignoring("lastUpdated") .ignoring("diverseWith") + .ignoring("latencyLastModifiedTime") + .ignoring("forwardPathLatencyNs") + .ignoring("reversePathLatencyNs") and: "Flow rules have not been reinstalled" northbound.getSwitchRules(swPair.src.dpId).flowEntries*.cookie.containsAll(originalCookies) @@ -589,6 +593,9 @@ class PartialUpdateSpec extends HealthCheckSpecification { then: "Flow is left intact" expect northboundV2.getFlow(flow.flowId), sameBeanAs(flowBeforeUpdate) .ignoring("lastUpdated") + .ignoring("latencyLastModifiedTime") + .ignoring("forwardPathLatencyNs") + .ignoring("reversePathLatencyNs") and: "Flow rules have not been reinstalled" northbound.getSwitchRules(swPair.src.dpId).flowEntries*.cookie.containsAll(originalCookies) diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/SwapEndpointSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/SwapEndpointSpec.groovy index 5516fd62c41..f4bee090869 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/SwapEndpointSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/SwapEndpointSpec.groovy @@ -6,6 +6,7 @@ import static org.openkilda.functionaltests.extension.tags.Tag.HARDWARE import static org.openkilda.functionaltests.extension.tags.Tag.LOW_PRIORITY import static org.openkilda.functionaltests.helpers.FlowHistoryConstants.REROUTE_ACTION import static org.openkilda.functionaltests.helpers.FlowHistoryConstants.REROUTE_FAIL +import static org.openkilda.testing.Constants.FLOW_CRUD_TIMEOUT import static org.openkilda.testing.Constants.NON_EXISTENT_FLOW_ID import static org.openkilda.testing.Constants.PATH_INSTALLATION_TIME import static org.openkilda.testing.Constants.RULES_DELETION_TIME @@ -783,6 +784,7 @@ switches"() { def flow1Isl = pathHelper.getInvolvedIsls(flow1Path)[0] def flow1IslMaxBw = islUtils.getIslInfo(flow1Isl).get().maxBandwidth northbound.updateFlow(flow1.id, flow1.tap { it.maximumBandwidth = flow1IslMaxBw }) + Wrappers.wait(FLOW_CRUD_TIMEOUT) { assert northbound.getFlowStatus(flow1.id).status == FlowState.UP } and: "Break all alternative paths for the first flow" def altPaths = flow1SwitchPair.paths.findAll { it != flow1Path } @@ -876,6 +878,7 @@ switches"() { def flow1Isl = pathHelper.getInvolvedIsls(flow1Path)[0] def flow1IslMaxBw = islUtils.getIslInfo(flow1Isl).get().maxBandwidth northbound.updateFlow(flow1.id, flow1.tap { it.maximumBandwidth = flow1IslMaxBw }) + Wrappers.wait(FLOW_CRUD_TIMEOUT) { assert northbound.getFlowStatus(flow1.id).status == FlowState.UP } and: "Break all alternative paths for the first flow" def altPaths = flow1SwitchPair.paths.findAll { it != flow1Path } @@ -967,6 +970,7 @@ switches"() { def flow1Isl = pathHelper.getInvolvedIsls(flow1Path)[0] def flow1IslMaxBw = islUtils.getIslInfo(flow1Isl).get().maxBandwidth northbound.updateFlow(flow1.id, flow1.tap { it.maximumBandwidth = flow1IslMaxBw }) + Wrappers.wait(FLOW_CRUD_TIMEOUT) { assert northbound.getFlowStatus(flow1.id).status == FlowState.UP } and: "Break all alternative paths for the first flow" def altPaths = flow1SwitchPair.paths.findAll { it != flow1Path } diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/FlowRulesSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/FlowRulesSpec.groovy index 4777f064698..32d9fab3305 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/FlowRulesSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/FlowRulesSpec.groovy @@ -721,8 +721,10 @@ class FlowRulesSpec extends HealthCheckSpecification { def flowInfo = database.getFlow(flow.flowId) def flowRulesSrcSw = getFlowRules(srcSwitch) def flowRulesDstSw = getFlowRules(dstSwitch) - def sharedRuleSrcSw = flowRulesSrcSw.find { new Cookie(it.cookie).getType() == CookieType.SHARED_OF_FLOW }.cookie - def sharedRuleDstSw = flowRulesDstSw.find { new Cookie(it.cookie).getType() == CookieType.SHARED_OF_FLOW }.cookie + def sharedRuleSrcSw = flowRulesSrcSw.find { new Cookie(it.cookie).getType() == CookieType.SHARED_OF_FLOW && + it.match.inPort.toInteger() == flow.source.portNumber }.cookie + def sharedRuleDstSw = flowRulesDstSw.find { new Cookie(it.cookie).getType() == CookieType.SHARED_OF_FLOW && + it.match.inPort.toInteger() == flow.destination.portNumber }.cookie def ingressSrcSw = flowInfo.forwardPath.cookie.value def egressSrcSw = flowInfo.reversePath.cookie.value @@ -796,8 +798,10 @@ class FlowRulesSpec extends HealthCheckSpecification { rulesAfterRerouteSrcSw = getFlowRules(srcSwitch) rulesAfterRerouteDstSw = getFlowRules(dstSwitch) //system doesn't reinstall shared rule - assert rulesAfterRerouteSrcSw.find { new Cookie(it.cookie).getType() == CookieType.SHARED_OF_FLOW }.cookie == sharedRuleSrcSw - assert rulesAfterRerouteDstSw.find { new Cookie(it.cookie).getType() == CookieType.SHARED_OF_FLOW }.cookie == sharedRuleDstSw + assert rulesAfterRerouteSrcSw.find { new Cookie(it.cookie).getType() == CookieType.SHARED_OF_FLOW && + it.match.inPort.toInteger() == flow.source.portNumber }.cookie == sharedRuleSrcSw + assert rulesAfterRerouteDstSw.find { new Cookie(it.cookie).getType() == CookieType.SHARED_OF_FLOW && + it.match.inPort.toInteger() == flow.destination.portNumber }.cookie == sharedRuleDstSw rulesAfterRerouteSrcSw.findAll { new Cookie(it.cookie).getType() != CookieType.SHARED_OF_FLOW }.cookie.each { assert !(it in [ingressSrcSw, egressSrcSw]) }