From 76ba994ec82f7b4b082f3e76b88c5f1fef60cb18 Mon Sep 17 00:00:00 2001 From: "nikolay.mikutskiy" Date: Tue, 19 Jul 2022 11:55:47 +0300 Subject: [PATCH 01/15] Fixed path filtering by maxLatency; --- .../messaging/info/network/Path.java | 7 +- .../messaging/payload/network/PathDto.java | 11 +- .../wfm/share/mappers/PathMapper.java | 4 +- .../src/main/java/org/openkilda/pce/Path.java | 2 + .../BestWeightAndShortestPathFinder.java | 82 +++++---- .../org/openkilda/pce/finder/PathFinder.java | 9 +- .../pce/impl/InMemoryPathComputer.java | 25 ++- .../pce/model/FindOneDirectionPathResult.java | 2 + .../BestWeightAndShortestPathFinderTest.java | 8 +- .../pce/finder/FilteringByMaxWeightTest.java | 171 ++++++++++++++++++ .../impl/InMemoryPathComputerBaseTest.java | 10 +- ...atencyPathComputationStrategyBaseTest.java | 29 ++- .../northbound/converter/PathMapper.java | 2 +- 13 files changed, 305 insertions(+), 57 deletions(-) create mode 100644 src-java/kilda-pce/src/test/java/org/openkilda/pce/finder/FilteringByMaxWeightTest.java diff --git a/src-java/base-topology/base-messaging/src/main/java/org/openkilda/messaging/info/network/Path.java b/src-java/base-topology/base-messaging/src/main/java/org/openkilda/messaging/info/network/Path.java index cb8b0eb4787..3e315d1fea3 100644 --- a/src-java/base-topology/base-messaging/src/main/java/org/openkilda/messaging/info/network/Path.java +++ b/src-java/base-topology/base-messaging/src/main/java/org/openkilda/messaging/info/network/Path.java @@ -37,11 +37,16 @@ public class Path implements Serializable { @JsonProperty("nodes") private List nodes; + @JsonProperty("is_backup_path") + private Boolean isBackupPath; + public Path(@JsonProperty("bandwidth") Long bandwidth, @JsonProperty("latency") Duration latency, - @JsonProperty("nodes") List nodes) { + @JsonProperty("nodes") List nodes, + @JsonProperty("is_backup_path") Boolean isBackupPath) { this.bandwidth = bandwidth; this.latency = latency; this.nodes = nodes; + this.isBackupPath = isBackupPath; } } diff --git a/src-java/base-topology/base-messaging/src/main/java/org/openkilda/messaging/payload/network/PathDto.java b/src-java/base-topology/base-messaging/src/main/java/org/openkilda/messaging/payload/network/PathDto.java index 17c8a2e050f..d6084efa10b 100644 --- a/src-java/base-topology/base-messaging/src/main/java/org/openkilda/messaging/payload/network/PathDto.java +++ b/src-java/base-topology/base-messaging/src/main/java/org/openkilda/messaging/payload/network/PathDto.java @@ -43,8 +43,11 @@ public class PathDto { @JsonProperty("nodes") private List nodes; - public PathDto(Long bandwidth, Duration latency, List nodes) { - this(bandwidth, latency.toNanos(), latency.toNanos(), latency.toMillis(), nodes); + @JsonProperty("is_backup_path") + private Boolean isBackupPath; + + public PathDto(Long bandwidth, Duration latency, List nodes, Boolean isBackupPath) { + this(bandwidth, latency.toNanos(), latency.toNanos(), latency.toMillis(), nodes, isBackupPath); } @JsonCreator @@ -52,11 +55,13 @@ public PathDto(@JsonProperty("bandwidth") Long bandwidth, @JsonProperty("latency") Long latency, @JsonProperty("latency_ns") Long latencyNs, @JsonProperty("latency_ms") Long latencyMs, - @JsonProperty("nodes") List nodes) { + @JsonProperty("nodes") List nodes, + @JsonProperty("is_backup_path") Boolean isBackupPath) { this.bandwidth = bandwidth; this.latency = latency; this.latencyNs = latencyNs; this.latencyMs = latencyMs; this.nodes = nodes; + this.isBackupPath = isBackupPath; } } diff --git a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/PathMapper.java b/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/PathMapper.java index 04510c26d78..a9373a3e5db 100644 --- a/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/PathMapper.java +++ b/src-java/base-topology/base-storm-topology/src/main/java/org/openkilda/wfm/share/mappers/PathMapper.java @@ -39,7 +39,7 @@ public abstract class PathMapper { */ public org.openkilda.messaging.info.network.Path map(org.openkilda.pce.Path path) { if (path == null || path.getSegments().isEmpty()) { - return new org.openkilda.messaging.info.network.Path(0L, Duration.ZERO, new ArrayList<>()); + return new org.openkilda.messaging.info.network.Path(0L, Duration.ZERO, new ArrayList<>(), false); } List nodes = new ArrayList<>(); @@ -62,6 +62,6 @@ public org.openkilda.messaging.info.network.Path map(org.openkilda.pce.Path path } return new org.openkilda.messaging.info.network.Path(path.getMinAvailableBandwidth(), - Duration.ofNanos(path.getLatency()), nodes); + Duration.ofNanos(path.getLatency()), nodes, path.isBackupPath()); } } diff --git a/src-java/kilda-pce/src/main/java/org/openkilda/pce/Path.java b/src-java/kilda-pce/src/main/java/org/openkilda/pce/Path.java index 6d889bec448..271d52be9c9 100644 --- a/src-java/kilda-pce/src/main/java/org/openkilda/pce/Path.java +++ b/src-java/kilda-pce/src/main/java/org/openkilda/pce/Path.java @@ -46,6 +46,8 @@ public class Path implements Serializable { @NonNull private List segments; + private boolean isBackupPath; + @Value @Builder(toBuilder = true) @EqualsAndHashCode diff --git a/src-java/kilda-pce/src/main/java/org/openkilda/pce/finder/BestWeightAndShortestPathFinder.java b/src-java/kilda-pce/src/main/java/org/openkilda/pce/finder/BestWeightAndShortestPathFinder.java index a9d315ea056..1be97ae29fc 100644 --- a/src-java/kilda-pce/src/main/java/org/openkilda/pce/finder/BestWeightAndShortestPathFinder.java +++ b/src-java/kilda-pce/src/main/java/org/openkilda/pce/finder/BestWeightAndShortestPathFinder.java @@ -141,21 +141,22 @@ private FindPathResult findPath(AvailableNetwork network, SwitchId startSwitchId } @Override - public List> findNPathsBetweenSwitches( + public List findNPathsBetweenSwitches( AvailableNetwork network, SwitchId startSwitchId, SwitchId endSwitchId, int count, WeightFunction weightFunction) throws UnroutableFlowException { Node end = network.getSwitch(endSwitchId); return findNPathsBetweenSwitches(network, startSwitchId, endSwitchId, count, weightFunction, - (Node start) -> findOneDirectionPath(start, end, weightFunction)); + (Node start) -> findOneDirectionPath(start, end, weightFunction), Long.MAX_VALUE, Long.MAX_VALUE); } @Override - public List> findNPathsBetweenSwitches( + public List findNPathsBetweenSwitches( AvailableNetwork network, SwitchId startSwitchId, SwitchId endSwitchId, int count, WeightFunction weightFunction, long maxWeight, long backUpMaxWeight) throws UnroutableFlowException { Node end = network.getSwitch(endSwitchId); return findNPathsBetweenSwitches(network, startSwitchId, endSwitchId, count, weightFunction, - (Node start) -> findOneDirectionPath(start, end, weightFunction, maxWeight, backUpMaxWeight)); + (Node start) -> findOneDirectionPath(start, end, weightFunction, maxWeight, backUpMaxWeight), + maxWeight, backUpMaxWeight); } /** @@ -163,10 +164,10 @@ public List> findNPathsBetweenSwitches( * * @return an list of N (or less) best paths. */ - private List> findNPathsBetweenSwitches( + private List findNPathsBetweenSwitches( AvailableNetwork network, SwitchId startSwitchId, SwitchId endSwitchId, int count, - WeightFunction weightFunction, Function getPath) - throws UnroutableFlowException { + WeightFunction weightFunction, Function getPath, + long maxWeight, long backUpMaxWeight) throws UnroutableFlowException { Node start = network.getSwitch(startSwitchId); Node end = network.getSwitch(endSwitchId); @@ -176,28 +177,31 @@ private List> findNPathsBetweenSwitches( } // Determine the shortest path from the start to the end. - List> bestPaths = new ArrayList<>(); - List firstPath = getPath.apply(start).getFoundPath(); - if (firstPath.isEmpty()) { - return new ArrayList<>(); + List bestPaths = new ArrayList<>(); + FindOneDirectionPathResult firstPath = getPath.apply(start); + + if (firstPath.getFoundPath().isEmpty()) { + return bestPaths; } - bestPaths.add(getPath.apply(start).getFoundPath()); + bestPaths.add(firstPath); // Initialize the set to store the potential kth shortest path. // Use LinkedHashSet to have deterministic results. - Set> potentialKthShortestPaths = new LinkedHashSet<>(); + Set potentialKthShortestPaths = new LinkedHashSet<>(); for (int k = 1; k < count; k++) { - List bestPath = bestPaths.get(k - 1); - for (int i = 0; i < bestPath.size(); i++) { + FindOneDirectionPathResult bestPath = bestPaths.get(k - 1); + List edgesList = bestPath.getFoundPath(); + for (int i = 0; i < edgesList.size(); i++) { // Spur node is retrieved from the previous k-shortest path. - Node spurNode = bestPath.get(i).getSrcSwitch(); + Node spurNode = edgesList.get(i).getSrcSwitch(); // The sequence of edges from the start to the spur node (without spur node). - List rootPath = new ArrayList<>(bestPath.subList(0, i)); + List rootPath = new ArrayList<>(edgesList.subList(0, i)); Set removedEdges = new HashSet<>(); // Remove the links that are part of the previous shortest paths which share the same root path. - for (List path : bestPaths) { + for (FindOneDirectionPathResult pathResult : bestPaths) { + List path = pathResult.getFoundPath(); if (path.size() > i && rootPath.equals(path.subList(0, i)) && spurNode.equals(path.get(i).getSrcSwitch())) { removedEdges.add(path.get(i)); @@ -210,13 +214,21 @@ private List> findNPathsBetweenSwitches( } // Calculate the spur path from the spur node to the end. - List pathFromSpurNode = getPath.apply(spurNode).getFoundPath(); - if (!pathFromSpurNode.isEmpty()) { + FindOneDirectionPathResult pathFromSpurNode = getPath.apply(spurNode); + if (!pathFromSpurNode.getFoundPath().isEmpty()) { List totalPath = new ArrayList<>(rootPath); // Entire path is made up of the root path and spur path. - totalPath.addAll(pathFromSpurNode); + totalPath.addAll(pathFromSpurNode.getFoundPath()); // Add the potential k-shortest path to the heap. - potentialKthShortestPaths.add(totalPath); + long totalPathWeight = totalPath.stream().map(weightFunction) + .map(PathWeight::toLong) + .reduce(0L, Long::sum); + // Filtering by maxWeight. + if (totalPathWeight < maxWeight) { + potentialKthShortestPaths.add(new FindOneDirectionPathResult(totalPath, false)); + } else if (totalPathWeight < backUpMaxWeight) { + potentialKthShortestPaths.add(new FindOneDirectionPathResult(totalPath, true)); + } } // Add back the edges and nodes that were removed from the graph. @@ -233,34 +245,38 @@ private List> findNPathsBetweenSwitches( } // Add the lowest weight path becomes the k-shortest path. - List newBestPath = + FindOneDirectionPathResult newBestPath = getBestPotentialKthShortestPath(potentialKthShortestPaths, bestPaths, weightFunction); - bestPaths.add(newBestPath); + if (newBestPath != null) { + bestPaths.add(newBestPath); + } } return bestPaths; } - private List getBestPotentialKthShortestPath(Set> potentialKthShortestPaths, - List> bestPaths, - WeightFunction weightFunction) { - List bestKthShortestPath = new ArrayList<>(); - + private FindOneDirectionPathResult getBestPotentialKthShortestPath(Set potentialPaths, + List bestPaths, + WeightFunction weightFunction) { + FindOneDirectionPathResult bestKthShortestPath = null; long bestAvailableBandwidth = Long.MIN_VALUE; long bestWeight = Long.MAX_VALUE; - for (List path : potentialKthShortestPaths) { + for (FindOneDirectionPathResult pathResult : potentialPaths) { + List path = pathResult.getFoundPath(); long currentAvailableBandwidth = getMinAvailableBandwidth(path); long currentWeight = getTotalWeight(path, weightFunction); - if (!bestPaths.contains(path) && (currentAvailableBandwidth > bestAvailableBandwidth + if (!bestPaths.contains(pathResult) && (currentAvailableBandwidth > bestAvailableBandwidth || (currentAvailableBandwidth == bestAvailableBandwidth && currentWeight < bestWeight))) { bestAvailableBandwidth = currentAvailableBandwidth; bestWeight = currentWeight; - bestKthShortestPath = path; + bestKthShortestPath = pathResult; } } - potentialKthShortestPaths.remove(bestKthShortestPath); + if (bestKthShortestPath != null) { + potentialPaths.remove(bestKthShortestPath); + } return bestKthShortestPath; } diff --git a/src-java/kilda-pce/src/main/java/org/openkilda/pce/finder/PathFinder.java b/src-java/kilda-pce/src/main/java/org/openkilda/pce/finder/PathFinder.java index 6d8a441e08a..8e818adb64f 100644 --- a/src-java/kilda-pce/src/main/java/org/openkilda/pce/finder/PathFinder.java +++ b/src-java/kilda-pce/src/main/java/org/openkilda/pce/finder/PathFinder.java @@ -18,7 +18,7 @@ import org.openkilda.model.SwitchId; import org.openkilda.pce.exception.UnroutableFlowException; import org.openkilda.pce.impl.AvailableNetwork; -import org.openkilda.pce.model.Edge; +import org.openkilda.pce.model.FindOneDirectionPathResult; import org.openkilda.pce.model.FindPathResult; import org.openkilda.pce.model.WeightFunction; @@ -68,15 +68,16 @@ FindPathResult findPathWithWeightCloseToMaxWeight(AvailableNetwork network, * * @return an list of N (or less) best paths. */ - List> findNPathsBetweenSwitches(AvailableNetwork network, SwitchId startSwitchId, SwitchId endSwitchId, - int count, WeightFunction weightFunction) throws UnroutableFlowException; + List findNPathsBetweenSwitches( + AvailableNetwork network, SwitchId startSwitchId, SwitchId endSwitchId, + int count, WeightFunction weightFunction) throws UnroutableFlowException; /** * Find N (or less) best paths wih max weight restrictions. * * @return an list of N (or less) best paths. */ - List> findNPathsBetweenSwitches( + List findNPathsBetweenSwitches( AvailableNetwork network, SwitchId startSwitchId, SwitchId endSwitchId, int count, WeightFunction weightFunction, long maxWeight, long backUpMaxWeight) throws UnroutableFlowException; } diff --git a/src-java/kilda-pce/src/main/java/org/openkilda/pce/impl/InMemoryPathComputer.java b/src-java/kilda-pce/src/main/java/org/openkilda/pce/impl/InMemoryPathComputer.java index f55717b1aa9..3b85ca73a19 100644 --- a/src-java/kilda-pce/src/main/java/org/openkilda/pce/impl/InMemoryPathComputer.java +++ b/src-java/kilda-pce/src/main/java/org/openkilda/pce/impl/InMemoryPathComputer.java @@ -37,6 +37,7 @@ import org.openkilda.pce.exception.UnroutableFlowException; import org.openkilda.pce.finder.PathFinder; import org.openkilda.pce.model.Edge; +import org.openkilda.pce.model.FindOneDirectionPathResult; import org.openkilda.pce.model.FindPathResult; import org.openkilda.pce.model.PathWeight; import org.openkilda.pce.model.WeightFunction; @@ -88,9 +89,11 @@ private GetPathsResult getPath(AvailableNetwork network, Flow flow, PathComputat if (flow.isOneSwitchFlow()) { log.info("No path computation for one-switch flow"); SwitchId singleSwitchId = flow.getSrcSwitchId(); + FindOneDirectionPathResult pathResult = FindOneDirectionPathResult.builder() + .foundPath(emptyList()).backUpPathComputationWayUsed(false).build(); return GetPathsResult.builder() - .forward(convertToPath(singleSwitchId, singleSwitchId, emptyList())) - .reverse(convertToPath(singleSwitchId, singleSwitchId, emptyList())) + .forward(convertToPath(singleSwitchId, singleSwitchId, pathResult)) + .reverse(convertToPath(singleSwitchId, singleSwitchId, pathResult)) .backUpPathComputationWayUsed(false) .build(); } @@ -173,7 +176,7 @@ public List getNPaths(SwitchId srcSwitchId, SwitchId dstSwitchId, int coun pathComputationStrategy = LATENCY; } - List> paths; + List paths; switch (pathComputationStrategy) { case COST: case LATENCY: @@ -199,7 +202,7 @@ public List getNPaths(SwitchId srcSwitchId, SwitchId dstSwitchId, int coun } return paths.stream() - .map(edges -> convertToPath(srcSwitchId, dstSwitchId, edges)) + .map(foundPathResult -> convertToPath(srcSwitchId, dstSwitchId, foundPathResult)) .sorted(comparator) .limit(count) .collect(Collectors.toList()); @@ -268,14 +271,21 @@ private GetPathsResult convertToGetPathsResult( SwitchId srcSwitchId, SwitchId dstSwitchId, FindPathResult findPathResult, PathComputationStrategy strategy, PathComputationStrategy originalStrategy) { return GetPathsResult.builder() - .forward(convertToPath(srcSwitchId, dstSwitchId, findPathResult.getFoundPath().getLeft())) - .reverse(convertToPath(dstSwitchId, srcSwitchId, findPathResult.getFoundPath().getRight())) + .forward(convertToPath(srcSwitchId, dstSwitchId, findPathResult.getFoundPath().getLeft(), + findPathResult.isBackUpPathComputationWayUsed())) + .reverse(convertToPath(dstSwitchId, srcSwitchId, findPathResult.getFoundPath().getRight(), + findPathResult.isBackUpPathComputationWayUsed())) .backUpPathComputationWayUsed(findPathResult.isBackUpPathComputationWayUsed() || !Objects.equals(originalStrategy, strategy)) .build(); } - private Path convertToPath(SwitchId srcSwitchId, SwitchId dstSwitchId, List edges) { + private Path convertToPath(SwitchId srcSwitchId, SwitchId dstSwitchId, List edges, boolean isBackupPath) { + return convertToPath(srcSwitchId, dstSwitchId, new FindOneDirectionPathResult(edges, isBackupPath)); + } + + private Path convertToPath(SwitchId srcSwitchId, SwitchId dstSwitchId, FindOneDirectionPathResult pathResult) { + List edges = pathResult.getFoundPath(); List segments = new LinkedList<>(); long latency = 0L; @@ -292,6 +302,7 @@ private Path convertToPath(SwitchId srcSwitchId, SwitchId dstSwitchId, List foundPath; boolean backUpPathComputationWayUsed; diff --git a/src-java/kilda-pce/src/test/java/org/openkilda/pce/finder/BestWeightAndShortestPathFinderTest.java b/src-java/kilda-pce/src/test/java/org/openkilda/pce/finder/BestWeightAndShortestPathFinderTest.java index 595a820f74c..8ae4d05309a 100644 --- a/src-java/kilda-pce/src/test/java/org/openkilda/pce/finder/BestWeightAndShortestPathFinderTest.java +++ b/src-java/kilda-pce/src/test/java/org/openkilda/pce/finder/BestWeightAndShortestPathFinderTest.java @@ -25,6 +25,7 @@ import org.openkilda.pce.exception.UnroutableFlowException; import org.openkilda.pce.impl.AvailableNetwork; import org.openkilda.pce.model.Edge; +import org.openkilda.pce.model.FindOneDirectionPathResult; import org.openkilda.pce.model.FindPathResult; import org.openkilda.pce.model.PathWeight; import org.openkilda.pce.model.WeightFunction; @@ -841,7 +842,7 @@ public void shouldFindNPath() throws UnroutableFlowException { // Cost is 5 expectedPaths.add(Lists.newArrayList(SWITCH_ID_A, SWITCH_ID_D, SWITCH_ID_C, SWITCH_ID_F)); - List> paths = + List paths = pathFinder.findNPathsBetweenSwitches(network, SWITCH_ID_A, SWITCH_ID_F, 1, WEIGHT_FUNCTION); assertEquals(expectedPaths, convertPaths(paths)); @@ -937,9 +938,10 @@ private AvailableNetwork buildTestNetworkForTestYensAlgorithm() { return network; } - private List> convertPaths(List> paths) { + private List> convertPaths(List paths) { List> convertedPaths = new ArrayList<>(); - for (List path : paths) { + for (FindOneDirectionPathResult pathResult : paths) { + List path = pathResult.getFoundPath(); List convertedPath = getInvolvedSwitches(path); convertedPaths.add(convertedPath); } diff --git a/src-java/kilda-pce/src/test/java/org/openkilda/pce/finder/FilteringByMaxWeightTest.java b/src-java/kilda-pce/src/test/java/org/openkilda/pce/finder/FilteringByMaxWeightTest.java new file mode 100644 index 00000000000..857c9b7d102 --- /dev/null +++ b/src-java/kilda-pce/src/test/java/org/openkilda/pce/finder/FilteringByMaxWeightTest.java @@ -0,0 +1,171 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.pce.finder; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.not; + +import org.openkilda.model.SwitchId; +import org.openkilda.pce.exception.UnroutableFlowException; +import org.openkilda.pce.impl.AvailableNetwork; +import org.openkilda.pce.model.Edge; +import org.openkilda.pce.model.FindOneDirectionPathResult; +import org.openkilda.pce.model.PathWeight; +import org.openkilda.pce.model.WeightFunction; + +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class FilteringByMaxWeightTest { + private static final int ALLOWED_DEPTH = 35; + private static final WeightFunction WEIGHT_FUNCTION = edge -> new PathWeight(edge.getCost()); + private static final int SHORTEST_PATH_WEIGHT = 6; + private static final int LONGEST_PATH_WEIGHT = 10; + public static final String IS_BACKUP_PATH_PARAM = "backUpPathComputationWayUsed"; + + private final AvailableNetwork network = new AvailableNetwork(); + private final BestWeightAndShortestPathFinder pathFinder = new BestWeightAndShortestPathFinder(ALLOWED_DEPTH); + + private static final SwitchId SWITCH_ID_1 = new SwitchId("00:00:00:00:00:00:00:01"); + private static final SwitchId SWITCH_ID_2 = new SwitchId("00:00:00:00:00:00:00:02"); + private static final SwitchId SWITCH_ID_3 = new SwitchId("00:00:00:00:00:00:00:03"); + private static final SwitchId SWITCH_ID_4 = new SwitchId("00:00:00:00:00:00:00:04"); + private static final SwitchId SWITCH_ID_5 = new SwitchId("00:00:00:00:00:00:00:05"); + + + @Before + public void setUp() { + /* + * Topology: + * + * SW1--1--SW2--4--SW4 + * 2| |5 + * SW3--3--SW5 + * + * All ISLs have equal cost. + */ + addBidirectionalLink(network, SWITCH_ID_1, SWITCH_ID_2, 1, 2, 1); + addBidirectionalLink(network, SWITCH_ID_2, SWITCH_ID_3, 5, 6, 2); + addBidirectionalLink(network, SWITCH_ID_3, SWITCH_ID_5, 7, 8, 3); + addBidirectionalLink(network, SWITCH_ID_2, SWITCH_ID_4, 3, 4, 4); + addBidirectionalLink(network, SWITCH_ID_4, SWITCH_ID_5, 9, 10, 5); + + } + + @Test + public void shouldFindBothPaths() throws UnroutableFlowException { + List paths = pathFinder + .findNPathsBetweenSwitches(network, SWITCH_ID_1, SWITCH_ID_5, 5, WEIGHT_FUNCTION, + SHORTEST_PATH_WEIGHT + 1, LONGEST_PATH_WEIGHT + 1); + + assertThat(paths.size(), equalTo(2)); + assertThat(paths, hasItem( + Matchers.hasProperty(IS_BACKUP_PATH_PARAM, equalTo(false)))); + assertThat(paths, hasItem( + Matchers.hasProperty(IS_BACKUP_PATH_PARAM, equalTo(true)))); + + } + + @Test + public void shouldFindOnlyBackupPaths() throws UnroutableFlowException { + List paths = pathFinder + .findNPathsBetweenSwitches(network, SWITCH_ID_1, SWITCH_ID_5, 5, WEIGHT_FUNCTION, + SHORTEST_PATH_WEIGHT - 1, LONGEST_PATH_WEIGHT + 1); + + assertThat(paths.size(), equalTo(2)); + assertThat(paths, hasItem( + Matchers.hasProperty(IS_BACKUP_PATH_PARAM, not(equalTo(false))))); + assertThat(paths, hasItem( + Matchers.hasProperty(IS_BACKUP_PATH_PARAM, equalTo(true)))); + + } + + @Test + public void shouldFindOnlyOneBackupPath() throws UnroutableFlowException { + List paths = pathFinder + .findNPathsBetweenSwitches(network, SWITCH_ID_1, SWITCH_ID_5, 5, WEIGHT_FUNCTION, + SHORTEST_PATH_WEIGHT - 1, LONGEST_PATH_WEIGHT - 1); + + assertThat(paths.size(), equalTo(1)); + assertThat(paths, hasItem( + Matchers.hasProperty(IS_BACKUP_PATH_PARAM, equalTo(true)))); + + } + + @Test + public void shouldFindOnlyOneNotBackupPath() throws UnroutableFlowException { + List paths = pathFinder + .findNPathsBetweenSwitches(network, SWITCH_ID_1, SWITCH_ID_5, 5, WEIGHT_FUNCTION, + SHORTEST_PATH_WEIGHT + 1, LONGEST_PATH_WEIGHT - 1); + + assertThat(paths.size(), equalTo(1)); + assertThat(paths, hasItem( + Matchers.hasProperty(IS_BACKUP_PATH_PARAM, equalTo(false)))); + + } + + @Test + public void shouldFindTwoBackupPaths() throws UnroutableFlowException { + List paths = pathFinder + .findNPathsBetweenSwitches(network, SWITCH_ID_1, SWITCH_ID_5, 5, WEIGHT_FUNCTION, + SHORTEST_PATH_WEIGHT - 1, LONGEST_PATH_WEIGHT + 1); + + assertThat(paths.size(), equalTo(2)); + assertThat(paths, hasItem( + Matchers.hasProperty(IS_BACKUP_PATH_PARAM, not(equalTo(false))))); + + } + + @Test + public void shouldFindTwoNotBackupPaths() throws UnroutableFlowException { + List paths = pathFinder + .findNPathsBetweenSwitches(network, SWITCH_ID_1, SWITCH_ID_5, 5, WEIGHT_FUNCTION, + LONGEST_PATH_WEIGHT + 1, Long.MAX_VALUE); + + assertThat(paths.size(), equalTo(2)); + assertThat(paths, hasItem( + Matchers.hasProperty(IS_BACKUP_PATH_PARAM, not(equalTo(true))))); + + } + + + private void addBidirectionalLink(AvailableNetwork network, SwitchId firstSwitch, SwitchId secondSwitch, + int srcPort, int dstPort, int cost) { + addLink(network, firstSwitch, secondSwitch, srcPort, dstPort, cost); + addLink(network, secondSwitch, firstSwitch, dstPort, srcPort, cost); + } + + private void addLink(AvailableNetwork network, SwitchId srcDpid, SwitchId dstDpid, int srcPort, int dstPort, + int cost) { + Edge edge = Edge.builder() + .srcSwitch(network.getOrAddNode(srcDpid, null)) + .srcPort(srcPort) + .destSwitch(network.getOrAddNode(dstDpid, null)) + .destPort(dstPort) + .latency(1) + .cost(cost) + .availableBandwidth(500000) + .underMaintenance(false) + .unstable(false) + .build(); + network.addEdge(edge); + } +} diff --git a/src-java/kilda-pce/src/test/java/org/openkilda/pce/impl/InMemoryPathComputerBaseTest.java b/src-java/kilda-pce/src/test/java/org/openkilda/pce/impl/InMemoryPathComputerBaseTest.java index c016fe0b386..0a949d56cd2 100644 --- a/src-java/kilda-pce/src/test/java/org/openkilda/pce/impl/InMemoryPathComputerBaseTest.java +++ b/src-java/kilda-pce/src/test/java/org/openkilda/pce/impl/InMemoryPathComputerBaseTest.java @@ -60,6 +60,7 @@ import org.openkilda.pce.finder.BestWeightAndShortestPathFinder; import org.openkilda.pce.finder.PathFinder; import org.openkilda.pce.model.Edge; +import org.openkilda.pce.model.FindOneDirectionPathResult; import org.openkilda.pce.model.Node; import org.openkilda.pce.model.WeightFunction; import org.openkilda.persistence.inmemory.InMemoryGraphBasedTest; @@ -349,14 +350,19 @@ private PathComputer mockPathComputerWithMaxWeight() throws UnroutableFlowExcept return new InMemoryPathComputer(availableNetworkFactory, pathFinderMock, config); } - private List> getPaths() { + private List getPaths() { List path1 = createPath(10, 5); List path2 = createPath(10, 4); List path3 = createPath(15, 5); List path4 = createPath(100, 100); List path5 = createPath(1, 1); - return Lists.newArrayList(path1, path2, path3, path4, path5); + return Lists.newArrayList( + new FindOneDirectionPathResult(path1, false), + new FindOneDirectionPathResult(path2, false), + new FindOneDirectionPathResult(path3, false), + new FindOneDirectionPathResult(path4, false), + new FindOneDirectionPathResult(path5, false)); } private void assertSortedByBandwidthAndLatency(List paths) { diff --git a/src-java/kilda-pce/src/test/java/org/openkilda/pce/impl/MaxLatencyPathComputationStrategyBaseTest.java b/src-java/kilda-pce/src/test/java/org/openkilda/pce/impl/MaxLatencyPathComputationStrategyBaseTest.java index 5a5c4fe1f03..7885c1ea7b6 100644 --- a/src-java/kilda-pce/src/test/java/org/openkilda/pce/impl/MaxLatencyPathComputationStrategyBaseTest.java +++ b/src-java/kilda-pce/src/test/java/org/openkilda/pce/impl/MaxLatencyPathComputationStrategyBaseTest.java @@ -17,10 +17,13 @@ import static java.lang.String.format; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.openkilda.model.Flow; @@ -30,6 +33,7 @@ import org.openkilda.model.Switch; import org.openkilda.model.SwitchId; import org.openkilda.pce.GetPathsResult; +import org.openkilda.pce.Path; import org.openkilda.pce.PathComputer; import org.openkilda.pce.exception.RecoverableException; import org.openkilda.pce.exception.UnroutableFlowException; @@ -37,6 +41,9 @@ import org.junit.Test; +import java.time.Duration; +import java.util.List; + public class MaxLatencyPathComputationStrategyBaseTest extends InMemoryPathComputerBaseTest { /** @@ -273,6 +280,26 @@ public void maxLatencyShouldChooseCorrectWayTest() throws Exception { assertEquals(path.getForward().getSegments().get(1).getSrcSwitchId(), new SwitchId("00:03")); } + @Test + public void shouldNotFindPathsGreaterThenMaxLatency() throws Exception { + createThreeWaysTopo(); + PathComputer pathComputer = new InMemoryPathComputer(availableNetworkFactory, + new BestWeightAndShortestPathFinder(200), config); + long maxLatencyNs = 20; + List paths = pathComputer.getNPaths( + getSwitchById("00:01").getSwitchId(), + getSwitchById("00:05").getSwitchId(), + 10, + FlowEncapsulationType.TRANSIT_VLAN, + PathComputationStrategy.MAX_LATENCY, + Duration.ofNanos(maxLatencyNs), + null + ); + + assertThat(paths, not(empty())); + paths.forEach(path -> assertThat(path.getLatency(), not(greaterThan(maxLatencyNs)))); + } + private void createThreeWaysTopo() { // / - B - \ // A - C - E diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/PathMapper.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/PathMapper.java index 723296b055a..54051c47cb4 100644 --- a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/PathMapper.java +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/converter/PathMapper.java @@ -27,7 +27,7 @@ public interface PathMapper { default PathDto mapToPath(Path data) { - return new PathDto(data.getBandwidth(), data.getLatency(), data.getNodes()); + return new PathDto(data.getBandwidth(), data.getLatency(), data.getNodes(), data.getIsBackupPath()); } GroupFlowPathPayload mapGroupFlowPathPayload(FlowPathDto data); From d605487271312742ae0e7e63070fa8c504af0b0b Mon Sep 17 00:00:00 2001 From: Dmitry Poltavets Date: Thu, 28 Jul 2022 11:47:39 +0400 Subject: [PATCH 02/15] Unignore test according to #3847 --- .../functionaltests/spec/switches/PortAntiflapSpec.groovy | 2 -- 1 file changed, 2 deletions(-) diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/PortAntiflapSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/PortAntiflapSpec.groovy index cd097b669b4..869a00c82f0 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/PortAntiflapSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/PortAntiflapSpec.groovy @@ -23,7 +23,6 @@ import groovy.util.logging.Slf4j import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value -import spock.lang.Ignore import spock.lang.Isolated import spock.lang.Issue import spock.lang.Narrative @@ -181,7 +180,6 @@ timeout"() { * directly to kafka, because currently it is the only way to simulate an incredibly rapid port flapping that * may sometimes occur on hardware switches(overheat?) */ - @Ignore("https://github.com/telstra/open-kilda/issues/3847") @Tags(SMOKE) def "System properly registers events order when port flaps incredibly fast (end with Down)"() { From 23c4313a51020a0019245d7c0bd867ff6b21bf26 Mon Sep 17 00:00:00 2001 From: Nikita Rydanov Date: Thu, 28 Jul 2022 12:24:39 +0400 Subject: [PATCH 03/15] Fixed wrong duplication of dump data --- .../actions/CompleteFlowPathRemovalAction.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/delete/actions/CompleteFlowPathRemovalAction.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/delete/actions/CompleteFlowPathRemovalAction.java index 50fb212bb25..84957edabba 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/delete/actions/CompleteFlowPathRemovalAction.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/delete/actions/CompleteFlowPathRemovalAction.java @@ -71,13 +71,21 @@ protected void perform(State from, State to, Event event, FlowDeleteContext cont return foundFlow; }); + Set flowPathPairs = new HashSet<>(); + + flowPathPairs.add(new FlowPathPair(flow.getForwardPath(), flow.getReversePath())); + flowPathPairs.add(new FlowPathPair(flow.getProtectedForwardPath(), flow.getProtectedReversePath())); + // Iterate to remove each path in a dedicated transaction. flow.getPathIds().forEach(pathId -> { Optional deletedPath = flowPathRepository.remove(pathId); - deletedPath.ifPresent(path -> { - updateIslsForFlowPath(path); - saveRemovalActionWithDumpToHistory(stateMachine, flow, new FlowPathPair(path, path)); - }); + deletedPath.ifPresent(this::updateIslsForFlowPath); + }); + + flowPathPairs.forEach(flowPathPair -> { + if (flowPathPair.getForward() != null && flowPathPair.getReverse() != null) { + saveRemovalActionWithDumpToHistory(stateMachine, flow, flowPathPair); + } }); } } From ded2ff0da6e08590696b00703be613a1342b5798 Mon Sep 17 00:00:00 2001 From: "nikolay.mikutskiy" Date: Mon, 1 Aug 2022 17:32:01 +0300 Subject: [PATCH 04/15] Fixed flow for testing swap endpoints api; --- .../functionaltests/spec/flows/SwapEndpointSpec.groovy | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/SwapEndpointSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/SwapEndpointSpec.groovy index 289db6abfbf..1f18edfe7cb 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/SwapEndpointSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/SwapEndpointSpec.groovy @@ -946,7 +946,6 @@ switches"() { } @Tidy - @Ignore("https://github.com/telstra/open-kilda/issues/3627") @Tags(LOW_PRIORITY) def "Able to swap endpoints for two flows when not enough bandwidth on ISL and ignore_bandwidth=true"() { setup: "Create two flows with different source and the same destination switches" @@ -961,8 +960,8 @@ switches"() { def flow1 = getFirstFlow(flow1SwitchPair, flow2SwitchPair) def flow2 = getSecondFlow(flow1SwitchPair, flow2SwitchPair, flow1) - flowHelper.addFlow(flow1) - flowHelper.addFlow(flow2.tap { it.ignoreBandwidth = true }) + flowHelper.addFlow(flow1.tap { it.ignoreBandwidth = true }) + flowHelper.addFlow(flow2) and: "Update the first flow so that it consumes all bandwidth on the link" def flow1Path = PathHelper.convert(northbound.getFlowPath(flow1.id)) From 2929157a4123b5fa35975a2f2680ca4edb5ede3d Mon Sep 17 00:00:00 2001 From: Nikita Rydanov Date: Tue, 2 Aug 2022 13:34:11 +0400 Subject: [PATCH 05/15] Changed error message for illegal state in predicate builder --- .../topology/switchmanager/service/impl/PredicateBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/impl/PredicateBuilder.java b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/impl/PredicateBuilder.java index cd3f242008b..ea250775da1 100644 --- a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/impl/PredicateBuilder.java +++ b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/impl/PredicateBuilder.java @@ -226,7 +226,7 @@ public static Predicate buildDeletePredicate(DeleteRulesAction acti case REMOVE_SERVER_42_ISL_RTT_OUTPUT: return buildPredicate(SERVER_42_ISL_RTT_OUTPUT_COOKIE); default: - throw new IllegalStateException(format("Unknown install rules action %s", action)); + throw new IllegalStateException(format("Received unexpected delete switch rule action: %s", action)); } } From 203abf4ad6fcfb6521854b2e677e7c0f2d9a2e0c Mon Sep 17 00:00:00 2001 From: Dmitry Poltavets Date: Wed, 3 Aug 2022 13:30:49 +0400 Subject: [PATCH 06/15] Add note to Northbound API v1 page. --- .../java/org/openkilda/northbound/config/SwaggerConfig.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/config/SwaggerConfig.java b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/config/SwaggerConfig.java index ccbbfad348e..bb367431e59 100644 --- a/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/config/SwaggerConfig.java +++ b/src-java/northbound-service/northbound/src/main/java/org/openkilda/northbound/config/SwaggerConfig.java @@ -55,7 +55,8 @@ public Docket apiV1() { .groupName("API v1") .apiInfo(new ApiInfoBuilder() .title("Northbound") - .description("Kilda SDN Controller API") + .description("Kilda SDN Controller API

NOTE: There are features " + + "that are present in API v2 but are not present in API v1.

") .version("1.0") .build()) .globalOperationParameters(Collections.singletonList(correlationIdParameter.build())) From feb34631a5bd60ce0ad98ca31b87d6750e4ad43d Mon Sep 17 00:00:00 2001 From: Artsiom Halavach Date: Thu, 4 Aug 2022 11:31:59 +0200 Subject: [PATCH 07/15] removing redundant checks, and fix source of ports array --- .../functionaltests/spec/switches/LagPortSpec.groovy | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/LagPortSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/LagPortSpec.groovy index d235ba99649..bee699a2976 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/LagPortSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/LagPortSpec.groovy @@ -639,11 +639,10 @@ class LagPortSpec extends HealthCheckSpecification { with(deleteResponse) { logicalPortNumber == lagPort - portNumbers.sort() == portsArrayUpdate.sort() + portNumbers.sort() == portsArray.sort() } - cleanup: - lagPort && northboundV2.deleteLagLogicalPort(sw.dpId, lagPort) + } void deleteAllLagPorts(SwitchId switchId) { From a2cd2b324aac963191e7795fb2130103d8d96f4a Mon Sep 17 00:00:00 2001 From: "nikolay.mikutskiy" Date: Thu, 4 Aug 2022 15:06:02 +0300 Subject: [PATCH 08/15] Fixed predicate selecting for OVERWRITE_DEFAULTS action; --- .../service/impl/PredicateBuilder.java | 2 +- .../service/SwitchRuleServiceTest.java | 167 ++++++++++++++++++ 2 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 src-java/swmanager-topology/swmanager-storm-topology/src/test/java/org/openkilda/wfm/topology/switchmanager/service/SwitchRuleServiceTest.java diff --git a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/impl/PredicateBuilder.java b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/impl/PredicateBuilder.java index cd3f242008b..32e95e720ee 100644 --- a/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/impl/PredicateBuilder.java +++ b/src-java/swmanager-topology/swmanager-storm-topology/src/main/java/org/openkilda/wfm/topology/switchmanager/service/impl/PredicateBuilder.java @@ -158,9 +158,9 @@ public static Predicate buildDeletePredicate(DeleteRulesAction acti return ignore -> true; case REMOVE_DEFAULTS: case REMOVE_ADD_DEFAULTS: + case OVERWRITE_DEFAULTS: return PredicateBuilder::allServiceRulesPredicate; case IGNORE_DEFAULTS: - case OVERWRITE_DEFAULTS: return PredicateBuilder::allNotServiceRulesPredicate; case REMOVE_DROP: return buildPredicate(DROP_RULE_COOKIE); diff --git a/src-java/swmanager-topology/swmanager-storm-topology/src/test/java/org/openkilda/wfm/topology/switchmanager/service/SwitchRuleServiceTest.java b/src-java/swmanager-topology/swmanager-storm-topology/src/test/java/org/openkilda/wfm/topology/switchmanager/service/SwitchRuleServiceTest.java new file mode 100644 index 00000000000..8cb495194cb --- /dev/null +++ b/src-java/swmanager-topology/swmanager-storm-topology/src/test/java/org/openkilda/wfm/topology/switchmanager/service/SwitchRuleServiceTest.java @@ -0,0 +1,167 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.wfm.topology.switchmanager.service; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.beans.HasPropertyWithValue.hasProperty; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.openkilda.messaging.command.switches.DeleteRulesAction.OVERWRITE_DEFAULTS; + +import org.openkilda.floodlight.api.request.rulemanager.FlowCommand; +import org.openkilda.floodlight.api.request.rulemanager.OfCommand; +import org.openkilda.messaging.command.switches.SwitchRulesDeleteRequest; +import org.openkilda.model.SwitchId; +import org.openkilda.model.bitops.BitField; +import org.openkilda.model.cookie.Cookie; +import org.openkilda.model.cookie.CookieBase; +import org.openkilda.persistence.PersistenceManager; +import org.openkilda.persistence.repositories.FlowPathRepository; +import org.openkilda.persistence.repositories.RepositoryFactory; +import org.openkilda.persistence.repositories.SwitchRepository; +import org.openkilda.rulemanager.DataAdapter; +import org.openkilda.rulemanager.FlowSpeakerData; +import org.openkilda.rulemanager.FlowSpeakerData.FlowSpeakerDataBuilder; +import org.openkilda.rulemanager.RuleManager; +import org.openkilda.rulemanager.SpeakerData; +import org.openkilda.wfm.topology.switchmanager.bolt.SwitchManagerHub; +import org.openkilda.wfm.topology.switchmanager.bolt.SwitchManagerHub.OfCommandAction; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +@RunWith(Parameterized.class) +public class SwitchRuleServiceTest { + private static final String KEY = "some key"; + private static final BitField SERVICE_BIT_FIELD = new BitField(0x8000_0000_0000_0000L); + private static final BitField FIRST_FLOW_BIT_FIELD = new BitField(0x2000_0000_0000_0000L); + private static final BitField SECOND_FLOW_BIT_FIELD = new BitField(0x4000_0000_0000_0000L); + + @Rule + public MockitoRule rule = MockitoJUnit.rule(); + + @Mock + private SwitchRepository switchRepository; + @Mock + private PersistenceManager persistenceManager; + @Mock + private RepositoryFactory repositoryFactory; + @Mock + private SwitchManagerCarrier carrier; + @Mock + private RuleManager ruleManager; + @Mock + private FlowPathRepository flowPathRepository; + + @Captor + private ArgumentCaptor commandsCaptor; + @Captor + private ArgumentCaptor> listCaptor; + + @Parameter + public List speakerDataList; + @Parameter(value = 1) + public Integer expectedServiceRulesCount; + + private SwitchRuleService switchRuleService; + + @Before + public void setUp() { + when(repositoryFactory.createSwitchRepository()).thenReturn(switchRepository); + when(switchRepository.exists(any(SwitchId.class))).thenReturn(true); + when(repositoryFactory.createFlowPathRepository()).thenReturn(flowPathRepository); + when(persistenceManager.getRepositoryFactory()).thenReturn(repositoryFactory); + when(flowPathRepository.findByEndpointSwitch(any(SwitchId.class))).thenReturn(Collections.emptyList()); + when(flowPathRepository.findBySegmentSwitch(any(SwitchId.class))).thenReturn(Collections.emptyList()); + when(ruleManager.buildRulesForSwitch(any(SwitchId.class), any(DataAdapter.class))) + .thenReturn(speakerDataList); + + switchRuleService = new SwitchRuleService(carrier, persistenceManager, ruleManager); + } + + @Test + public void shouldDeleteOnlyServiceRulesWhenOverwriteDefaultsAction() { + SwitchRulesDeleteRequest request = new SwitchRulesDeleteRequest( + new SwitchId("1"), + OVERWRITE_DEFAULTS, + null); + + switchRuleService.deleteRules(KEY, request); + + verify(carrier).sendOfCommandsToSpeaker(anyString(), listCaptor.capture(), + commandsCaptor.capture(), any(SwitchId.class)); + + List cookies = listCaptor.getValue().stream() + .map(command -> (FlowCommand) command) + .map(FlowCommand::getData) + .map(FlowSpeakerData::getCookie) + .collect(Collectors.toList()); + + assertAll( + () -> assertThat(commandsCaptor.getValue(), equalTo(OfCommandAction.DELETE)), + () -> assertThat(cookies.size(), equalTo(expectedServiceRulesCount)), + () -> assertThat(cookies, everyItem(hasProperty("serviceFlag", equalTo(true)))) + ); + } + + /** + * Generating test data. + */ + @Parameters(name = "ServiceRules count = {1}") + public static Collection data() { + FlowSpeakerDataBuilder speakerDataBuilder = FlowSpeakerData.builder(); + + FlowSpeakerData defaultRule = speakerDataBuilder + .cookie(new Cookie(SERVICE_BIT_FIELD.getMask())) + .build(); + FlowSpeakerData flowRule1 = speakerDataBuilder + .cookie(new Cookie(FIRST_FLOW_BIT_FIELD.getMask())) + .build(); + FlowSpeakerData flowRule2 = speakerDataBuilder + .cookie(new Cookie(SECOND_FLOW_BIT_FIELD.getMask())) + .build(); + + return Arrays.asList(new Object[][]{ + {new ArrayList<>(Arrays.asList(defaultRule, defaultRule, flowRule1, flowRule2)), 2}, + {new ArrayList<>(Arrays.asList(defaultRule, defaultRule, flowRule1, defaultRule)), 3}, + {new ArrayList<>(Arrays.asList(defaultRule, flowRule1, flowRule2)), 1}, + {new ArrayList<>(Arrays.asList(defaultRule, defaultRule)), 2}, + {new ArrayList<>(Arrays.asList(flowRule1, flowRule2)), 0} + }); + } +} From 3df93e43db03da890be4a65c87677cc728433391 Mon Sep 17 00:00:00 2001 From: Sergey Nikitin Date: Tue, 9 Aug 2022 01:23:27 +0400 Subject: [PATCH 09/15] Fixed func test --- .../spec/switches/FlowRulesSpec.groovy | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/FlowRulesSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/FlowRulesSpec.groovy index d6d4478a0e2..a78da615415 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/FlowRulesSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/switches/FlowRulesSpec.groovy @@ -160,8 +160,8 @@ class FlowRulesSpec extends HealthCheckSpecification { ], [// Drop all non-base rules (ie IGNORE), and add base rules back (eg overwrite) deleteRulesAction: DeleteRulesAction.OVERWRITE_DEFAULTS, - rulesDeleted : flowRulesCount + (s42IsEnabledOnSrcSw ? s42FlowRttIngressForwardCount : 0), - getExpectedRules : { sw, defaultRules -> defaultRules } + rulesDeleted : srcSwDefaultRules.size(), + getExpectedRules : { sw, defaultRules -> defaultRules + getFlowRules(sw) } ], [// Drop all default rules in single-table mode deleteRulesAction: DeleteRulesAction.REMOVE_DEFAULTS, @@ -239,13 +239,12 @@ class FlowRulesSpec extends HealthCheckSpecification { ], [// Drop all non-base rules (ie IGNORE), and add base rules back (eg overwrite) deleteRulesAction: DeleteRulesAction.OVERWRITE_DEFAULTS, - rulesDeleted : flowRulesCount + sharedRulesCount + - (s42IsEnabledOnSrcSw ? s42QinqOuterVlanCount + s42FlowRttIngressForwardCount : 0), - getExpectedRules : { sw, defaultRules -> - def noDefaultSwRules = northbound.getSwitchRules(srcSwitch.dpId).flowEntries - defaultRules - defaultRules + noDefaultSwRules.findAll { Cookie.isIngressRulePassThrough(it.cookie) } + - (s42IsEnabledOnSrcSw ? northbound.getSwitchRules(srcSwitch.dpId).flowEntries.findAll { - new Cookie(it.cookie).getType() == CookieType.SERVER_42_FLOW_RTT_INPUT } : []) + rulesDeleted : srcSwDefaultRules.size() + multiTableFlowRulesCount + + (s42IsEnabledOnSrcSw ? s42FlowRttInput : 0), + getExpectedRules : { sw, defaultRules -> defaultRules + getFlowRules(sw) + + northbound.getSwitchRules(srcSwitch.dpId).flowEntries.findAll { + Cookie.isIngressRulePassThrough(it.cookie) + } } ], [// Drop all default rules From 07252d7cd964cffd9c8cf719841b9d915b2f7452 Mon Sep 17 00:00:00 2001 From: Artsiom Halavach Date: Thu, 11 Aug 2022 10:52:07 +0200 Subject: [PATCH 10/15] adding new test for the vxlan flow ping changing flow creation flow to ensure vlan type (transit vlan) closes #4920 --- .../spec/flows/FlowPingSpec.groovy | 45 +++++++++++++++++++ .../java/org/openkilda/testing/Constants.java | 1 + 2 files changed, 46 insertions(+) diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/FlowPingSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/FlowPingSpec.groovy index 14b6c5879fd..cf13f9f20d7 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/FlowPingSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/FlowPingSpec.groovy @@ -53,6 +53,7 @@ class FlowPingSpec extends HealthCheckSpecification { def "Able to ping a flow with vlan"(Switch srcSwitch, Switch dstSwitch) { given: "A flow with random vlan" def flow = flowHelperV2.randomFlow(srcSwitch, dstSwitch) + flow.encapsulationType= FlowEncapsulationType.TRANSIT_VLAN flowHelperV2.addFlow(flow) when: "Ping the flow" @@ -94,6 +95,50 @@ class FlowPingSpec extends HealthCheckSpecification { swPair = new SwitchPair(src: srcSwitch, dst: dstSwitch, paths: []) } + @Tidy + @Unroll("Able to ping a flow with vxlan between switches #swPair.toString()") + @Tags([TOPOLOGY_DEPENDENT]) + def "Able to ping a flow with vxlan"(Switch srcSwitch, Switch dstSwitch) { + given: "A flow with random vxlan" + def flow = flowHelperV2.randomFlow(srcSwitch, dstSwitch) + flow.encapsulationType= FlowEncapsulationType.VXLAN + flowHelperV2.addFlow(flow) + + when: "Ping the flow" + def beforePingTime = new Date() + def unicastCounterBefore = northbound.getSwitchRules(srcSwitch.dpId).flowEntries.find { + it.cookie == DefaultRule.VERIFICATION_UNICAST_VXLAN_RULE_COOKIE.cookie //rule for the vxlan differs from vlan + }.byteCount + def response = northbound.pingFlow(flow.flowId, new PingInput()) + + then: "Ping is successful" + response.forward.pingSuccess + response.reverse.pingSuccess + + and: "No errors" + !response.error + !response.forward.error + !response.reverse.error + + + and: "Unicast rule packet count is increased and logged to otsdb" + def statsData = null + Wrappers.wait(STATS_LOGGING_TIMEOUT, 2) { + statsData = otsdb.query(beforePingTime, metricPrefix + "switch.flow.system.bytes", + [switchid : srcSwitch.dpId.toOtsdFormat(), + cookieHex: DefaultRule.VERIFICATION_UNICAST_VXLAN_RULE_COOKIE.toHexString()]).dps + assert statsData && !statsData.empty + } + statsData.values().last().toLong() > unicastCounterBefore + + cleanup: "Remove the flow" + flowHelperV2.deleteFlow(flow.flowId) + + where: + [srcSwitch, dstSwitch] << ofSwitchCombinations + swPair = new SwitchPair(src: srcSwitch, dst: dstSwitch, paths: []) + } + @Tidy @Unroll("Able to ping a flow with no vlan between switches #swPair.toString()") @Tags([TOPOLOGY_DEPENDENT]) diff --git a/src-java/testing/test-library/src/main/java/org/openkilda/testing/Constants.java b/src-java/testing/test-library/src/main/java/org/openkilda/testing/Constants.java index 70e8c20fa00..7b15e24ac8d 100644 --- a/src-java/testing/test-library/src/main/java/org/openkilda/testing/Constants.java +++ b/src-java/testing/test-library/src/main/java/org/openkilda/testing/Constants.java @@ -50,6 +50,7 @@ public enum DefaultRule { DROP_RULE(Cookie.DROP_RULE_COOKIE), //drop all unknown packets VERIFICATION_BROADCAST_RULE(Cookie.VERIFICATION_BROADCAST_RULE_COOKIE), //ISL discovery packets VERIFICATION_UNICAST_RULE(Cookie.VERIFICATION_UNICAST_RULE_COOKIE), //catch rule for flow pings + VERIFICATION_UNICAST_VXLAN_RULE_COOKIE(Cookie.VERIFICATION_UNICAST_VXLAN_RULE_COOKIE), DROP_LOOP_RULE(Cookie.DROP_VERIFICATION_LOOP_RULE_COOKIE), //drop packets that'll lead to self-loop ISLs CATCH_BFD_RULE(Cookie.CATCH_BFD_RULE_COOKIE); //catch rule for BFD sessions (noviflow-specific) From 7b968bb90c5d7579e235536565c430e4bccf9bf6 Mon Sep 17 00:00:00 2001 From: Artsiom Halavach Date: Thu, 11 Aug 2022 10:52:07 +0200 Subject: [PATCH 11/15] adding new test for the vxlan flow ping changing flow creation flow to ensure vlan type (transit vlan) closes #4920 --- .../spec/flows/FlowPingSpec.groovy | 48 +++++++++++++++++++ .../java/org/openkilda/testing/Constants.java | 1 + 2 files changed, 49 insertions(+) diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/FlowPingSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/FlowPingSpec.groovy index 14b6c5879fd..5b6ada1e1d2 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/FlowPingSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/FlowPingSpec.groovy @@ -53,6 +53,7 @@ class FlowPingSpec extends HealthCheckSpecification { def "Able to ping a flow with vlan"(Switch srcSwitch, Switch dstSwitch) { given: "A flow with random vlan" def flow = flowHelperV2.randomFlow(srcSwitch, dstSwitch) + flow.encapsulationType= FlowEncapsulationType.TRANSIT_VLAN flowHelperV2.addFlow(flow) when: "Ping the flow" @@ -94,6 +95,53 @@ class FlowPingSpec extends HealthCheckSpecification { swPair = new SwitchPair(src: srcSwitch, dst: dstSwitch, paths: []) } + @Tidy + @Unroll("Able to ping a flow with vxlan between switches #swPair.toString()") + @Tags([TOPOLOGY_DEPENDENT]) + def "Able to ping a flow with vxlan"() { + given: "A flow with random vxlan" + //defining switches pair with vxlan support + def switchPair = topologyHelper.getAllNeighboringSwitchPairs().find { swP -> + [swP.src, swP.dst].every { switchHelper.isVxlanEnabled(it.dpId) } + } + assumeTrue(switchPair as boolean, "Unable to find required switches in topology") + + def flow = flowHelperV2.randomFlow(switchPair) + flow.encapsulationType = FlowEncapsulationType.VXLAN + flowHelperV2.addFlow(flow) + + when: "Ping the flow" + def beforePingTime = new Date() + def unicastCounterBefore = northbound.getSwitchRules(switchPair.src.dpId).flowEntries.find { + it.cookie == DefaultRule.VERIFICATION_UNICAST_VXLAN_RULE_COOKIE.cookie //rule for the vxlan differs from vlan + }.byteCount + def response = northbound.pingFlow(flow.flowId, new PingInput()) + + then: "Ping is successful" + response.forward.pingSuccess + response.reverse.pingSuccess + + and: "No errors" + !response.error + !response.forward.error + !response.reverse.error + + + and: "Unicast rule packet count is increased and logged to otsdb" + def statsData = null + Wrappers.wait(STATS_LOGGING_TIMEOUT, 2) { + statsData = otsdb.query(beforePingTime, metricPrefix + "switch.flow.system.bytes", + [switchid : switchPair.src.dpId.toOtsdFormat(), + cookieHex: DefaultRule.VERIFICATION_UNICAST_VXLAN_RULE_COOKIE.toHexString()]).dps + assert statsData && !statsData.empty + } + statsData.values().last().toLong() > unicastCounterBefore + + cleanup: "Remove the flow" + flowHelperV2.deleteFlow(flow.flowId) + + } + @Tidy @Unroll("Able to ping a flow with no vlan between switches #swPair.toString()") @Tags([TOPOLOGY_DEPENDENT]) diff --git a/src-java/testing/test-library/src/main/java/org/openkilda/testing/Constants.java b/src-java/testing/test-library/src/main/java/org/openkilda/testing/Constants.java index 70e8c20fa00..7b15e24ac8d 100644 --- a/src-java/testing/test-library/src/main/java/org/openkilda/testing/Constants.java +++ b/src-java/testing/test-library/src/main/java/org/openkilda/testing/Constants.java @@ -50,6 +50,7 @@ public enum DefaultRule { DROP_RULE(Cookie.DROP_RULE_COOKIE), //drop all unknown packets VERIFICATION_BROADCAST_RULE(Cookie.VERIFICATION_BROADCAST_RULE_COOKIE), //ISL discovery packets VERIFICATION_UNICAST_RULE(Cookie.VERIFICATION_UNICAST_RULE_COOKIE), //catch rule for flow pings + VERIFICATION_UNICAST_VXLAN_RULE_COOKIE(Cookie.VERIFICATION_UNICAST_VXLAN_RULE_COOKIE), DROP_LOOP_RULE(Cookie.DROP_VERIFICATION_LOOP_RULE_COOKIE), //drop packets that'll lead to self-loop ISLs CATCH_BFD_RULE(Cookie.CATCH_BFD_RULE_COOKIE); //catch rule for BFD sessions (noviflow-specific) From 14cae6ab4091fb523e54d55d50ee0242e6727a6d Mon Sep 17 00:00:00 2001 From: Nikita Rydanov Date: Thu, 28 Jul 2022 16:24:59 +0400 Subject: [PATCH 12/15] Affinity and diverse group IDs are present now --- .../CompleteFlowPathRemovalAction.java | 3 +++ ...ipPathsAndResourcesDeallocationAction.java | 22 +++++++++++-------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/update/actions/CompleteFlowPathRemovalAction.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/update/actions/CompleteFlowPathRemovalAction.java index afa5b107e98..16469bc7fe1 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/update/actions/CompleteFlowPathRemovalAction.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/update/actions/CompleteFlowPathRemovalAction.java @@ -40,6 +40,9 @@ public CompleteFlowPathRemovalAction(PersistenceManager persistenceManager) { @Override protected void perform(State from, State to, Event event, FlowUpdateContext context, FlowUpdateFsm stateMachine) { Flow originalFlow = RequestedFlowMapper.INSTANCE.toFlow(stateMachine.getOriginalFlow()); + originalFlow.setAffinityGroupId(stateMachine.getOriginalAffinityFlowGroup()); + originalFlow.setDiverseGroupId(stateMachine.getOriginalDiverseFlowGroup()); + removeOldPrimaryFlowPaths(originalFlow, stateMachine); removeOldProtectedFlowPaths(originalFlow, stateMachine); diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/update/actions/SkipPathsAndResourcesDeallocationAction.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/update/actions/SkipPathsAndResourcesDeallocationAction.java index 6e44dd2ea4a..b4f60fe4d70 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/update/actions/SkipPathsAndResourcesDeallocationAction.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/update/actions/SkipPathsAndResourcesDeallocationAction.java @@ -17,14 +17,14 @@ import static java.lang.String.format; +import org.openkilda.model.Flow; import org.openkilda.model.FlowPath; import org.openkilda.model.PathId; import org.openkilda.persistence.PersistenceManager; -import org.openkilda.persistence.repositories.FlowPathRepository; import org.openkilda.wfm.share.history.model.FlowDumpData; import org.openkilda.wfm.share.history.model.FlowDumpData.DumpType; import org.openkilda.wfm.share.mappers.HistoryMapper; -import org.openkilda.wfm.topology.flowhs.fsm.common.actions.HistoryRecordingAction; +import org.openkilda.wfm.topology.flowhs.fsm.common.actions.FlowProcessingWithHistorySupportAction; import org.openkilda.wfm.topology.flowhs.fsm.update.FlowUpdateContext; import org.openkilda.wfm.topology.flowhs.fsm.update.FlowUpdateFsm; import org.openkilda.wfm.topology.flowhs.fsm.update.FlowUpdateFsm.Event; @@ -35,22 +35,25 @@ @Slf4j public class SkipPathsAndResourcesDeallocationAction - extends HistoryRecordingAction { - - private FlowPathRepository flowPathRepository; + extends FlowProcessingWithHistorySupportAction { public SkipPathsAndResourcesDeallocationAction(PersistenceManager persistenceManager) { - flowPathRepository = persistenceManager.getRepositoryFactory().createFlowPathRepository(); + super(persistenceManager); } @Override public void perform(State from, State to, Event event, FlowUpdateContext context, FlowUpdateFsm stateMachine) { if (stateMachine.getEndpointUpdate().isPartialUpdate()) { - FlowDumpData dumpData = HistoryMapper.INSTANCE.map( - RequestedFlowMapper.INSTANCE.toFlow(stateMachine.getOriginalFlow()), + + Flow originalFlow = RequestedFlowMapper.INSTANCE.toFlow(stateMachine.getOriginalFlow()); + originalFlow.setAffinityGroupId(stateMachine.getOriginalAffinityFlowGroup()); + originalFlow.setDiverseGroupId(stateMachine.getOriginalDiverseFlowGroup()); + + FlowDumpData dumpData = HistoryMapper.INSTANCE.map(originalFlow, getFlowPath(stateMachine.getNewPrimaryForwardPath()), getFlowPath(stateMachine.getNewPrimaryReversePath()), DumpType.STATE_BEFORE); + stateMachine.saveActionWithDumpToHistory("New endpoints were stored for flow", format("The flow endpoints were updated for: %s / %s", stateMachine.getTargetFlow().getSrcSwitch(), @@ -60,7 +63,8 @@ public void perform(State from, State to, Event event, FlowUpdateContext context } } - private FlowPath getFlowPath(PathId pathId) { + @Override + protected FlowPath getFlowPath(PathId pathId) { return flowPathRepository.findById(pathId).orElse(null); } } From b4048983f5a00405dab9a165a6e743a382028858 Mon Sep 17 00:00:00 2001 From: "nikolay.mikutskiy" Date: Tue, 16 Aug 2022 17:46:46 +0400 Subject: [PATCH 13/15] Added NOTIFY_FLOW_MONITOR step to checking changes; --- .../openkilda/wfm/topology/flowhs/fsm/update/FlowUpdateFsm.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/update/FlowUpdateFsm.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/update/FlowUpdateFsm.java index 769d42f6542..b7cd832d87f 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/update/FlowUpdateFsm.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/update/FlowUpdateFsm.java @@ -327,7 +327,7 @@ public Factory(@NonNull FlowUpdateHubCarrier carrier, @NonNull Config config, builder.transition().from(State.UPDATING_FLOW_STATUS).to(State.FLOW_STATUS_UPDATED).on(Event.NEXT) .perform(new UpdateFlowStatusAction(persistenceManager, dashboardLogger)); - builder.transition().from(State.FLOW_STATUS_UPDATED).to(State.FINISHED).on(Event.NEXT); + builder.transition().from(State.FLOW_STATUS_UPDATED).to(State.NOTIFY_FLOW_MONITOR).on(Event.NEXT); builder.transition().from(State.FLOW_STATUS_UPDATED) .to(State.NOTIFY_FLOW_MONITOR_WITH_ERROR).on(Event.ERROR); From 3353fbec60979e1b53ed8fceb51123414c0b40b1 Mon Sep 17 00:00:00 2001 From: Sergey Nikitin Date: Tue, 16 Aug 2022 21:09:44 +0400 Subject: [PATCH 14/15] Collect `flow.ingress.*` and `flow.*` metrics from mirror rule if it's exist Closes #4915 --- .../wfm/topology/flowhs/FlowHsTopology.java | 4 + .../bolts/FlowMirrorPointCreateHubBolt.java | 13 + .../bolts/FlowMirrorPointDeleteHubBolt.java | 13 + ...lowProcessingWithHistorySupportAction.java | 10 + .../NotifyFlowStatsOnNewPathsAction.java | 3 +- .../NotifyFlowStatsOnRemovedPathsAction.java | 3 +- .../create/actions/NotifyFlowStatsAction.java | 3 +- .../delete/actions/NotifyFlowStatsAction.java | 3 +- .../create/FlowMirrorPointCreateFsm.java | 7 + .../create/actions/NotifyFlowStatsAction.java | 47 ++++ .../delete/FlowMirrorPointDeleteFsm.java | 11 +- .../delete/actions/NotifyFlowStatsAction.java | 47 ++++ .../info/stats/BaseFlowPathInfo.java | 2 + .../info/stats/RemoveFlowPathInfo.java | 6 +- .../info/stats/UpdateFlowPathInfo.java | 6 +- .../stats/model/EndpointFlowDescriptor.java | 44 ++++ .../model/KildaEntryDescriptorHandler.java | 2 + .../service/AnyFlowStatsEntryHandler.java | 6 + .../stats/service/BaseCacheChangeHandler.java | 6 + .../FlowEndpointStatsEntryHandler.java | 17 +- .../stats/service/KildaEntryCacheService.java | 40 ++- .../stats/service/MeterStatsHandler.java | 6 + .../wfm/topology/stats/StatsTopologyTest.java | 98 +++++++- .../service/KildaEntryCacheServiceTest.java | 237 +++++++++++++----- 24 files changed, 539 insertions(+), 95 deletions(-) create mode 100644 src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/create/actions/NotifyFlowStatsAction.java create mode 100644 src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/delete/actions/NotifyFlowStatsAction.java create mode 100644 src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/model/EndpointFlowDescriptor.java diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/FlowHsTopology.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/FlowHsTopology.java index dd775a82a42..7518dd5f053 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/FlowHsTopology.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/FlowHsTopology.java @@ -883,6 +883,10 @@ private void statsTopologyOutput(TopologyBuilder topologyBuilder) { .shuffleGrouping(ComponentId.YFLOW_REROUTE_HUB.name(), Stream.HUB_TO_STATS_TOPOLOGY_SENDER.name()) .shuffleGrouping(ComponentId.YFLOW_DELETE_HUB.name(), + Stream.HUB_TO_STATS_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.FLOW_CREATE_MIRROR_POINT_HUB.name(), + Stream.HUB_TO_STATS_TOPOLOGY_SENDER.name()) + .shuffleGrouping(ComponentId.FLOW_DELETE_MIRROR_POINT_HUB.name(), Stream.HUB_TO_STATS_TOPOLOGY_SENDER.name()); } diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointCreateHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointCreateHubBolt.java index 7eaf6ebf2fa..692a65c1453 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointCreateHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointCreateHubBolt.java @@ -18,6 +18,7 @@ import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_SPEAKER_WORKER; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_STATS_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_PAYLOAD; import org.openkilda.bluegreen.LifecycleEvent; @@ -26,6 +27,7 @@ import org.openkilda.messaging.Message; import org.openkilda.messaging.command.flow.FlowMirrorPointCreateRequest; import org.openkilda.messaging.info.InfoMessage; +import org.openkilda.messaging.info.stats.UpdateFlowPathInfo; import org.openkilda.pce.AvailableNetworkFactory; import org.openkilda.pce.PathComputer; import org.openkilda.pce.PathComputerConfig; @@ -46,6 +48,7 @@ import lombok.Builder; import lombok.Getter; +import lombok.NonNull; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; @@ -146,6 +149,15 @@ public void sendHistoryUpdate(FlowHistoryHolder historyHolder) { new Values(historyHolder.getTaskId(), message)); } + @Override + public void sendNotifyFlowStats(@NonNull UpdateFlowPathInfo flowPathInfo) { + Message message = new InfoMessage(flowPathInfo, System.currentTimeMillis(), + getCommandContext().getCorrelationId()); + + emitWithContext(HUB_TO_STATS_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(flowPathInfo.getFlowId(), message)); + } + @Override public void cancelTimeoutCallback(String key) { cancelCallback(key); @@ -164,6 +176,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); + declarer.declareStream(HUB_TO_STATS_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), new Fields(ZooKeeperBolt.FIELD_ID_STATE, ZooKeeperBolt.FIELD_ID_CONTEXT)); } diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointDeleteHubBolt.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointDeleteHubBolt.java index f47883b96ff..7c613324154 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointDeleteHubBolt.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/bolts/FlowMirrorPointDeleteHubBolt.java @@ -18,6 +18,7 @@ import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER; import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_SPEAKER_WORKER; +import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_STATS_TOPOLOGY_SENDER; import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_PAYLOAD; import org.openkilda.bluegreen.LifecycleEvent; @@ -26,6 +27,7 @@ import org.openkilda.messaging.Message; import org.openkilda.messaging.command.flow.FlowMirrorPointDeleteRequest; import org.openkilda.messaging.info.InfoMessage; +import org.openkilda.messaging.info.stats.RemoveFlowPathInfo; import org.openkilda.persistence.PersistenceManager; import org.openkilda.wfm.error.PipelineException; import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig; @@ -42,6 +44,7 @@ import lombok.Builder; import lombok.Getter; +import lombok.NonNull; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; @@ -133,6 +136,15 @@ public void sendHistoryUpdate(FlowHistoryHolder historyHolder) { new Values(historyHolder.getTaskId(), message)); } + @Override + public void sendNotifyFlowStats(@NonNull RemoveFlowPathInfo flowPathInfo) { + Message message = new InfoMessage(flowPathInfo, System.currentTimeMillis(), + getCommandContext().getCorrelationId()); + + emitWithContext(HUB_TO_STATS_TOPOLOGY_SENDER.name(), getCurrentTuple(), + new Values(flowPathInfo.getFlowId(), message)); + } + @Override public void cancelTimeoutCallback(String key) { cancelCallback(key); @@ -151,6 +163,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); + declarer.declareStream(HUB_TO_STATS_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS); declarer.declareStream(ZkStreams.ZK.toString(), new Fields(ZooKeeperBolt.FIELD_ID_STATE, ZooKeeperBolt.FIELD_ID_CONTEXT)); } diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/common/actions/FlowProcessingWithHistorySupportAction.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/common/actions/FlowProcessingWithHistorySupportAction.java index c152ec09b9b..8d1274fce48 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/common/actions/FlowProcessingWithHistorySupportAction.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/common/actions/FlowProcessingWithHistorySupportAction.java @@ -250,4 +250,14 @@ protected void setMirrorPointsToNewPath(PathId oldFlowPathId, PathId newFlowPath oldPath.getFlowMirrorPointsSet().forEach(newPath::addFlowMirrorPoints)))); } } + + protected boolean hasIngressMirror(FlowPath flowPath) { + return flowPath.getFlowMirrorPointsSet().stream() + .anyMatch(point -> point.getMirrorSwitchId().equals(flowPath.getSrcSwitchId())); + } + + protected boolean hasEgressMirror(FlowPath flowPath) { + return flowPath.getFlowMirrorPointsSet().stream() + .anyMatch(point -> point.getMirrorSwitchId().equals(flowPath.getDestSwitchId())); + } } diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/common/actions/NotifyFlowStatsOnNewPathsAction.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/common/actions/NotifyFlowStatsOnNewPathsAction.java index 0288b9854e4..52d4203520b 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/common/actions/NotifyFlowStatsOnNewPathsAction.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/common/actions/NotifyFlowStatsOnNewPathsAction.java @@ -45,7 +45,8 @@ protected void perform(S from, S to, E event, C context, T stateMachine) { Flow flow = flowPath.getFlow(); UpdateFlowPathInfo pathInfo = new UpdateFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), flowPath.getCookie(), flowPath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics()); + FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics(), + hasIngressMirror(flowPath), hasEgressMirror(flowPath)); carrier.sendNotifyFlowStats(pathInfo); }); } diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/common/actions/NotifyFlowStatsOnRemovedPathsAction.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/common/actions/NotifyFlowStatsOnRemovedPathsAction.java index 16fb91a5511..2678965c917 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/common/actions/NotifyFlowStatsOnRemovedPathsAction.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/common/actions/NotifyFlowStatsOnRemovedPathsAction.java @@ -48,7 +48,8 @@ protected void perform(S from, S to, E event, C context, T stateMachine) { Flow flow = flowPath.getFlow(); RemoveFlowPathInfo pathInfo = new RemoveFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), flowPath.getCookie(), flowPath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(originalFlow, flowPath), flow.getVlanStatistics()); + FlowPathMapper.INSTANCE.mapToPathNodes(originalFlow, flowPath), flow.getVlanStatistics(), + hasIngressMirror(flowPath), hasEgressMirror(flowPath)); carrier.sendNotifyFlowStats(pathInfo); }); } diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/create/actions/NotifyFlowStatsAction.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/create/actions/NotifyFlowStatsAction.java index 4317e751302..dd06f0b4ac8 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/create/actions/NotifyFlowStatsAction.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/create/actions/NotifyFlowStatsAction.java @@ -42,7 +42,8 @@ protected void perform(State from, State to, Event event, FlowCreateContext cont Flow flow = flowPath.getFlow(); UpdateFlowPathInfo pathInfo = new UpdateFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), flowPath.getCookie(), flowPath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics()); + FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics(), + hasIngressMirror(flowPath), hasEgressMirror(flowPath)); carrier.sendNotifyFlowStats(pathInfo); }); } diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/delete/actions/NotifyFlowStatsAction.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/delete/actions/NotifyFlowStatsAction.java index 1fe8f31e699..474f55ff6a7 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/delete/actions/NotifyFlowStatsAction.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/delete/actions/NotifyFlowStatsAction.java @@ -42,7 +42,8 @@ protected void perform(State from, State to, Event event, FlowDeleteContext cont Flow flow = flowPath.getFlow(); RemoveFlowPathInfo pathInfo = new RemoveFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), flowPath.getCookie(), flowPath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics()); + FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics(), + hasIngressMirror(flowPath), hasEgressMirror(flowPath)); carrier.sendNotifyFlowStats(pathInfo); }); } diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/create/FlowMirrorPointCreateFsm.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/create/FlowMirrorPointCreateFsm.java index ba383b28637..e57f6851111 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/create/FlowMirrorPointCreateFsm.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/create/FlowMirrorPointCreateFsm.java @@ -31,6 +31,7 @@ import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.EmitVerifyRulesRequestsAction; import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.HandleNotCompletedCommandsAction; import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.HandleNotDeallocatedFlowMirrorPathResourceAction; +import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.NotifyFlowStatsAction; import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.OnFinishedAction; import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.OnFinishedWithErrorAction; import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.OnReceivedInstallResponseAction; @@ -155,6 +156,11 @@ public Factory(@NonNull FlowGenericCarrier carrier, @NonNull PersistenceManager .onEach(Event.TIMEOUT, Event.MISSING_RULE_FOUND, Event.ERROR); builder.transitions().from(State.RULES_VALIDATED) + .toAmong(State.NOTIFY_FLOW_STATS, State.NOTIFY_FLOW_STATS) + .onEach(Event.NEXT, Event.ERROR) + .perform(new NotifyFlowStatsAction(persistenceManager)); + + builder.transitions().from(State.NOTIFY_FLOW_STATS) .toAmong(State.MIRROR_PATH_INSTALLATION_COMPLETED, State.MIRROR_PATH_INSTALLATION_COMPLETED) .onEach(Event.NEXT, Event.ERROR) .perform(new PostFlowMirrorPathInstallationAction(persistenceManager)); @@ -220,6 +226,7 @@ public enum State { VALIDATING_RULES, RULES_VALIDATED, + NOTIFY_FLOW_STATS, MIRROR_PATH_INSTALLATION_COMPLETED, FINISHED, diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/create/actions/NotifyFlowStatsAction.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/create/actions/NotifyFlowStatsAction.java new file mode 100644 index 00000000000..c69148eda1d --- /dev/null +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/create/actions/NotifyFlowStatsAction.java @@ -0,0 +1,47 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions; + +import org.openkilda.messaging.info.stats.UpdateFlowPathInfo; +import org.openkilda.model.Flow; +import org.openkilda.persistence.PersistenceManager; +import org.openkilda.wfm.share.mappers.FlowPathMapper; +import org.openkilda.wfm.topology.flowhs.fsm.common.actions.FlowProcessingWithHistorySupportAction; +import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.FlowMirrorPointCreateContext; +import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.FlowMirrorPointCreateFsm; +import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.FlowMirrorPointCreateFsm.Event; +import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.FlowMirrorPointCreateFsm.State; + +public class NotifyFlowStatsAction extends FlowProcessingWithHistorySupportAction { + + public NotifyFlowStatsAction(PersistenceManager persistenceManager) { + super(persistenceManager); + } + + @Override + protected void perform(FlowMirrorPointCreateFsm.State from, State to, Event event, + FlowMirrorPointCreateContext context, FlowMirrorPointCreateFsm stateMachine) { + flowPathRepository.findById(stateMachine.getFlowPathId()).ifPresent(flowPath -> { + Flow flow = flowPath.getFlow(); + UpdateFlowPathInfo pathInfo = new UpdateFlowPathInfo( + flow.getFlowId(), flow.getYFlowId(), flowPath.getCookie(), flowPath.getMeterId(), + FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics(), + hasIngressMirror(flowPath), hasEgressMirror(flowPath)); + stateMachine.getCarrier().sendNotifyFlowStats(pathInfo); + }); + } +} diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/delete/FlowMirrorPointDeleteFsm.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/delete/FlowMirrorPointDeleteFsm.java index e9827bff519..e628a9d33cc 100644 --- a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/delete/FlowMirrorPointDeleteFsm.java +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/delete/FlowMirrorPointDeleteFsm.java @@ -32,6 +32,7 @@ import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.delete.actions.EmitCommandRequestsAction; import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.delete.actions.HandleNotCompletedCommandsAction; import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.delete.actions.HandleNotDeallocatedFlowMirrorPathResourceAction; +import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.delete.actions.NotifyFlowStatsAction; import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.delete.actions.OnFinishedAction; import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.delete.actions.OnFinishedWithErrorAction; import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.delete.actions.OnReceivedCommandResponseAction; @@ -138,10 +139,17 @@ public Factory(@NonNull FlowGenericCarrier carrier, @NonNull PersistenceManager builder.onEntry(State.FLOW_MIRROR_POINTS_RECORD_PROCESSED) .perform(new PostFlowMirrorPathDeallocationAction(persistenceManager, resourcesManager)); - builder.transition().from(State.FLOW_MIRROR_POINTS_RECORD_PROCESSED).to(State.FINISHED).on(Event.NEXT); + builder.transition().from(State.FLOW_MIRROR_POINTS_RECORD_PROCESSED).to(State.NOTIFY_FLOW_STATS) + .on(Event.NEXT); builder.transition().from(State.FLOW_MIRROR_POINTS_RECORD_PROCESSED) .to(State.FINISHED_WITH_ERROR).on(Event.ERROR); + builder.onEntry(State.NOTIFY_FLOW_STATS).perform(new NotifyFlowStatsAction(persistenceManager)); + + builder.transition().from(State.NOTIFY_FLOW_STATS).to(State.FINISHED).on(Event.NEXT); + builder.transition().from(State.NOTIFY_FLOW_STATS).to(State.FINISHED_WITH_ERROR).on(Event.ERROR); + + builder.defineFinalState(State.FINISHED) .addEntryAction(new OnFinishedAction(persistenceManager, dashboardLogger)); builder.defineFinalState(State.FINISHED_WITH_ERROR) @@ -163,6 +171,7 @@ public enum State { REMOVING_GROUP, GROUP_REMOVED, + NOTIFY_FLOW_STATS, FLOW_MIRROR_POINTS_RECORD_PROCESSED, FINISHED, diff --git a/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/delete/actions/NotifyFlowStatsAction.java b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/delete/actions/NotifyFlowStatsAction.java new file mode 100644 index 00000000000..657be917128 --- /dev/null +++ b/src-java/flowhs-topology/flowhs-storm-topology/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/mirrorpoint/delete/actions/NotifyFlowStatsAction.java @@ -0,0 +1,47 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.delete.actions; + +import org.openkilda.messaging.info.stats.UpdateFlowPathInfo; +import org.openkilda.model.Flow; +import org.openkilda.persistence.PersistenceManager; +import org.openkilda.wfm.share.mappers.FlowPathMapper; +import org.openkilda.wfm.topology.flowhs.fsm.common.actions.FlowProcessingWithHistorySupportAction; +import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.delete.FlowMirrorPointDeleteContext; +import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.delete.FlowMirrorPointDeleteFsm; +import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.delete.FlowMirrorPointDeleteFsm.Event; +import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.delete.FlowMirrorPointDeleteFsm.State; + +public class NotifyFlowStatsAction extends FlowProcessingWithHistorySupportAction< + FlowMirrorPointDeleteFsm, State, Event, FlowMirrorPointDeleteContext> { + + public NotifyFlowStatsAction(PersistenceManager persistenceManager) { + super(persistenceManager); + } + + @Override + protected void perform(State from, State to, Event event, FlowMirrorPointDeleteContext context, + FlowMirrorPointDeleteFsm stateMachine) { + flowPathRepository.findById(stateMachine.getFlowPathId()).ifPresent(flowPath -> { + Flow flow = flowPath.getFlow(); + UpdateFlowPathInfo pathInfo = new UpdateFlowPathInfo( + flow.getFlowId(), flow.getYFlowId(), flowPath.getCookie(), flowPath.getMeterId(), + FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics(), + hasIngressMirror(flowPath), hasEgressMirror(flowPath)); + stateMachine.getCarrier().sendNotifyFlowStats(pathInfo); + }); + } +} diff --git a/src-java/stats-topology/stats-messaging/src/main/java/org/openkilda/messaging/info/stats/BaseFlowPathInfo.java b/src-java/stats-topology/stats-messaging/src/main/java/org/openkilda/messaging/info/stats/BaseFlowPathInfo.java index 09645a5b018..36c4005ac3d 100644 --- a/src-java/stats-topology/stats-messaging/src/main/java/org/openkilda/messaging/info/stats/BaseFlowPathInfo.java +++ b/src-java/stats-topology/stats-messaging/src/main/java/org/openkilda/messaging/info/stats/BaseFlowPathInfo.java @@ -40,4 +40,6 @@ public abstract class BaseFlowPathInfo extends StatsNotification { MeterId meterId; @NonNull List pathNodes; Set statVlans; + boolean ingressMirror; + boolean egressMirror; } diff --git a/src-java/stats-topology/stats-messaging/src/main/java/org/openkilda/messaging/info/stats/RemoveFlowPathInfo.java b/src-java/stats-topology/stats-messaging/src/main/java/org/openkilda/messaging/info/stats/RemoveFlowPathInfo.java index dff29a17b47..905fbdf9aba 100644 --- a/src-java/stats-topology/stats-messaging/src/main/java/org/openkilda/messaging/info/stats/RemoveFlowPathInfo.java +++ b/src-java/stats-topology/stats-messaging/src/main/java/org/openkilda/messaging/info/stats/RemoveFlowPathInfo.java @@ -45,7 +45,9 @@ public RemoveFlowPathInfo(@NonNull @JsonProperty("flow_id") String flowId, @NonNull @JsonProperty("cookie") FlowSegmentCookie cookie, @JsonProperty("meter_id") MeterId meterId, @NonNull @JsonProperty("path_nodes") List pathNodes, - @JsonProperty("stat_vlans") Set statVlans) { - super(flowId, yFlowId, cookie, meterId, pathNodes, statVlans); + @JsonProperty("stat_vlans") Set statVlans, + @JsonProperty("ingress_mirror") boolean ingressMirror, + @JsonProperty("egress_mirror") boolean egressMirror) { + super(flowId, yFlowId, cookie, meterId, pathNodes, statVlans, ingressMirror, egressMirror); } } diff --git a/src-java/stats-topology/stats-messaging/src/main/java/org/openkilda/messaging/info/stats/UpdateFlowPathInfo.java b/src-java/stats-topology/stats-messaging/src/main/java/org/openkilda/messaging/info/stats/UpdateFlowPathInfo.java index ec60ca82ee8..bc896309c84 100644 --- a/src-java/stats-topology/stats-messaging/src/main/java/org/openkilda/messaging/info/stats/UpdateFlowPathInfo.java +++ b/src-java/stats-topology/stats-messaging/src/main/java/org/openkilda/messaging/info/stats/UpdateFlowPathInfo.java @@ -47,7 +47,9 @@ public UpdateFlowPathInfo(@NonNull @JsonProperty("flow_id") String flowId, @NonNull @JsonProperty("cookie") FlowSegmentCookie cookie, @JsonProperty("meter_id") MeterId meterId, @NonNull @JsonProperty("path_nodes") List pathNodes, - @JsonProperty("stat_vlans") Set statVlans) { - super(flowId, yFlowId, cookie, meterId, pathNodes, statVlans); + @JsonProperty("stat_vlans") Set statVlans, + @JsonProperty("ingress_mirror") boolean ingressMirror, + @JsonProperty("egress_mirror") boolean egressMirror) { + super(flowId, yFlowId, cookie, meterId, pathNodes, statVlans, ingressMirror, egressMirror); } } diff --git a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/model/EndpointFlowDescriptor.java b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/model/EndpointFlowDescriptor.java new file mode 100644 index 00000000000..3e0880d31f5 --- /dev/null +++ b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/model/EndpointFlowDescriptor.java @@ -0,0 +1,44 @@ +/* Copyright 2022 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.wfm.topology.stats.model; + +import org.openkilda.model.MeterId; +import org.openkilda.model.SwitchId; +import org.openkilda.model.cookie.FlowSegmentCookie; + +import lombok.EqualsAndHashCode; +import lombok.NonNull; +import lombok.ToString; +import lombok.Value; + +@Value +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +public class EndpointFlowDescriptor extends CommonFlowDescriptor { + boolean hasMirror; + + public EndpointFlowDescriptor( + SwitchId switchId, MeasurePoint measurePoint, @NonNull String flowId, @NonNull FlowSegmentCookie cookie, + MeterId meterId, boolean hasMirror) { + super(switchId, measurePoint, flowId, cookie, meterId); + this.hasMirror = hasMirror; + } + + @Override + public void handle(KildaEntryDescriptorHandler handler) { + handler.handleStatsEntry(this); + } +} diff --git a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/model/KildaEntryDescriptorHandler.java b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/model/KildaEntryDescriptorHandler.java index 7dc00dce6a7..9de17f43291 100644 --- a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/model/KildaEntryDescriptorHandler.java +++ b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/model/KildaEntryDescriptorHandler.java @@ -22,6 +22,8 @@ public interface KildaEntryDescriptorHandler { void handleStatsEntry(StatVlanDescriptor descriptor); + void handleStatsEntry(EndpointFlowDescriptor descriptor); + void handleStatsEntry(YFlowDescriptor descriptor); void handleStatsEntry(YFlowSubDescriptor descriptor); diff --git a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/AnyFlowStatsEntryHandler.java b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/AnyFlowStatsEntryHandler.java index 470173663e9..1e4b26c4adb 100644 --- a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/AnyFlowStatsEntryHandler.java +++ b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/AnyFlowStatsEntryHandler.java @@ -25,6 +25,7 @@ import org.openkilda.wfm.topology.stats.model.CommonFlowDescriptor; import org.openkilda.wfm.topology.stats.model.DummyFlowDescriptor; import org.openkilda.wfm.topology.stats.model.DummyMeterDescriptor; +import org.openkilda.wfm.topology.stats.model.EndpointFlowDescriptor; import org.openkilda.wfm.topology.stats.model.KildaEntryDescriptor; import org.openkilda.wfm.topology.stats.model.StatVlanDescriptor; import org.openkilda.wfm.topology.stats.model.YFlowDescriptor; @@ -85,6 +86,11 @@ public void handleStatsEntry(StatVlanDescriptor descriptor) { // nothing to do here } + @Override + public void handleStatsEntry(EndpointFlowDescriptor descriptor) { + handleStatsEntry((CommonFlowDescriptor) descriptor); + } + private void emitMeterPoints(TagsFormatter tagsFormatter) { meterEmitter.emitPacketAndBytePoints( new MetricFormatter("flow.raw."), timestamp, statsEntry.getPacketCount(), statsEntry.getByteCount(), diff --git a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/BaseCacheChangeHandler.java b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/BaseCacheChangeHandler.java index fc1adbd98d5..16546894d75 100644 --- a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/BaseCacheChangeHandler.java +++ b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/BaseCacheChangeHandler.java @@ -23,6 +23,7 @@ import org.openkilda.wfm.topology.stats.model.CookieCacheKey; import org.openkilda.wfm.topology.stats.model.DummyFlowDescriptor; import org.openkilda.wfm.topology.stats.model.DummyMeterDescriptor; +import org.openkilda.wfm.topology.stats.model.EndpointFlowDescriptor; import org.openkilda.wfm.topology.stats.model.KildaEntryDescriptor; import org.openkilda.wfm.topology.stats.model.KildaEntryDescriptorHandler; import org.openkilda.wfm.topology.stats.model.MeterCacheKey; @@ -64,6 +65,11 @@ public void handleStatsEntry(StatVlanDescriptor descriptor) { } } + @Override + public void handleStatsEntry(EndpointFlowDescriptor descriptor) { + handleFlowStatsEntry(descriptor.getSwitchId(), descriptor.getCookie(), descriptor.getMeterId(), descriptor); + } + @Override public void handleStatsEntry(YFlowDescriptor descriptor) { cacheAction(new MeterCacheKey(descriptor.getSwitchId(), descriptor.getMeterId().getValue()), descriptor); diff --git a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/FlowEndpointStatsEntryHandler.java b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/FlowEndpointStatsEntryHandler.java index f4d4f423971..30ae0cb8ec8 100644 --- a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/FlowEndpointStatsEntryHandler.java +++ b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/FlowEndpointStatsEntryHandler.java @@ -25,6 +25,7 @@ import org.openkilda.wfm.topology.stats.model.CommonFlowDescriptor; import org.openkilda.wfm.topology.stats.model.DummyFlowDescriptor; import org.openkilda.wfm.topology.stats.model.DummyMeterDescriptor; +import org.openkilda.wfm.topology.stats.model.EndpointFlowDescriptor; import org.openkilda.wfm.topology.stats.model.KildaEntryDescriptor; import org.openkilda.wfm.topology.stats.model.MeasurePoint; import org.openkilda.wfm.topology.stats.model.StatVlanDescriptor; @@ -52,10 +53,15 @@ private FlowEndpointStatsEntryHandler( } @Override - public void handleStatsEntry(CommonFlowDescriptor descriptor) { + public void handleStatsEntry(EndpointFlowDescriptor descriptor) { TagsFormatter tags = initTags(false); tags.addFlowIdTag(descriptor.getFlowId()); - emitMeterPoints(tags, descriptor.getMeasurePoint()); + emitMeterPoints(tags, descriptor.getMeasurePoint(), descriptor.isHasMirror()); + } + + @Override + public void handleStatsEntry(CommonFlowDescriptor descriptor) { + // nothing to do here } @Override @@ -80,7 +86,7 @@ public void handleStatsEntry(YFlowSubDescriptor descriptor) { TagsFormatter tags = initTags(true); tags.addFlowIdTag(descriptor.getSubFlowId()); tags.addYFlowIdTag(descriptor.getYFlowId()); - emitMeterPoints(tags, descriptor.getMeasurePoint()); + emitMeterPoints(tags, descriptor.getMeasurePoint(), false); } @Override @@ -116,9 +122,10 @@ public void handleStatsEntry(StatVlanDescriptor descriptor) { emitStatVlan(tags); } - private void emitMeterPoints(TagsFormatter tags, MeasurePoint measurePoint) { + private void emitMeterPoints(TagsFormatter tags, MeasurePoint measurePoint, boolean hasMirror) { FlowSegmentCookie cookie = decodeFlowSegmentCookie(statsEntry.getCookie()); - if (cookie != null && !isFlowSatelliteEntry(cookie)) { + if (cookie != null && cookie.getType() == CookieType.SERVICE_OR_FLOW_SEGMENT + && cookie.isMirror() == hasMirror) { directionFromCookieIntoTags(cookie, tags); switch (measurePoint) { case INGRESS: diff --git a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/KildaEntryCacheService.java b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/KildaEntryCacheService.java index 0d25b851f6a..bedbbfe6ce9 100644 --- a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/KildaEntryCacheService.java +++ b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/KildaEntryCacheService.java @@ -44,6 +44,7 @@ import org.openkilda.wfm.share.mappers.FlowPathMapper; import org.openkilda.wfm.topology.stats.model.CommonFlowDescriptor; import org.openkilda.wfm.topology.stats.model.CookieCacheKey; +import org.openkilda.wfm.topology.stats.model.EndpointFlowDescriptor; import org.openkilda.wfm.topology.stats.model.KildaEntryDescriptor; import org.openkilda.wfm.topology.stats.model.KildaEntryDescriptorHandler; import org.openkilda.wfm.topology.stats.model.MeasurePoint; @@ -176,7 +177,8 @@ public boolean deactivate() { private void updateCache(KildaEntryDescriptorHandler cacheHandler, BaseFlowPathInfo pathInfo) { updateCache( cacheHandler, pathInfo.getFlowId(), pathInfo.getYFlowId(), pathInfo.getCookie(), pathInfo.getMeterId(), - pathInfo.getPathNodes(), pathInfo.getStatVlans()); + pathInfo.getPathNodes(), pathInfo.getStatVlans(), pathInfo.isIngressMirror(), + pathInfo.isEgressMirror()); } private void updateCache(KildaEntryDescriptorHandler cacheHandler, BaseYFlowStatsInfo yFlowStatsInfo) { @@ -195,7 +197,8 @@ private void updateCache(KildaEntryDescriptorHandler cacheHandler, BaseYFlowStat private void updateCache( KildaEntryDescriptorHandler cacheHandler, String flowId, String yFlowId, FlowSegmentCookie cookie, - MeterId meterId, List pathNodes, Set statsVlan) { + MeterId meterId, List pathNodes, Set statsVlan, boolean ingressMirror, + boolean egressMirror) { if (pathNodes.isEmpty()) { throw new IllegalArgumentException("The path can't be empty"); } @@ -207,10 +210,13 @@ private void updateCache( boolean isOneSwitchFlow = srcSwitchId.equals(dstSwitchId); if (isOneSwitchFlow) { - cacheHandler.handle(newPathEntry(srcSwitchId, ONE_SWITCH, flowId, yFlowId, cookie, meterId)); + cacheHandler.handle(newEndpointPathEntry(srcSwitchId, ONE_SWITCH, flowId, yFlowId, cookie, meterId, + ingressMirror || egressMirror)); } else { - cacheHandler.handle(newPathEntry(srcSwitchId, INGRESS, flowId, yFlowId, cookie, meterId)); - cacheHandler.handle(newPathEntry(dstSwitchId, EGRESS, flowId, yFlowId, cookie)); + cacheHandler.handle(newEndpointPathEntry( + srcSwitchId, INGRESS, flowId, yFlowId, cookie, meterId, ingressMirror)); + cacheHandler.handle(newEndpointPathEntry( + dstSwitchId, EGRESS, flowId, yFlowId, cookie, null, egressMirror)); } cacheHandler.handle(new StatVlanDescriptor(srcSwitchId, INGRESS, flowId, cookie, statsVlan)); cacheHandler.handle(new StatVlanDescriptor(dstSwitchId, EGRESS, flowId, cookie, statsVlan)); @@ -236,9 +242,14 @@ private void refreshCommonFlowsCache() { .filter(Objects::nonNull) .forEach(path -> { Flow flow = path.getFlow(); + boolean ingressMirror = path.getFlowMirrorPointsSet().stream() + .anyMatch(point -> point.getMirrorSwitchId().equals(path.getSrcSwitchId())); + boolean egressMirror = path.getFlowMirrorPointsSet().stream() + .anyMatch(point -> point.getMirrorSwitchId().equals(path.getDestSwitchId())); updateCache( cacheHandler, flow.getFlowId(), flow.getYFlowId(), path.getCookie(), path.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flow, path), flow.getVlanStatistics()); + FlowPathMapper.INSTANCE.mapToPathNodes(flow, path), flow.getVlanStatistics(), ingressMirror, + egressMirror); }); } @@ -267,21 +278,24 @@ private void processTransitCookies( // Skip the first and the last nodes as they're handled as INGRESS and EGRESS. for (int i = 1; i < path.size() - 1; i++) { SwitchId transitSrc = path.get(i).getSwitchId(); - cacheHandler.handle(newPathEntry(transitSrc, TRANSIT, flowId, yFlowId, cookie)); + cacheHandler.handle(newTransitPathEntry(transitSrc, flowId, yFlowId, cookie)); } } - private static KildaEntryDescriptor newPathEntry( - SwitchId switchId, MeasurePoint measurePoint, String flowId, String yFlowId, FlowSegmentCookie cookie) { - return newPathEntry(switchId, measurePoint, flowId, yFlowId, cookie, null); + private static KildaEntryDescriptor newTransitPathEntry( + SwitchId switchId, String flowId, String yFlowId, FlowSegmentCookie cookie) { + if (yFlowId != null) { + return new YFlowSubDescriptor(switchId, TRANSIT, yFlowId, flowId, cookie, null); + } + return new CommonFlowDescriptor(switchId, TRANSIT, flowId, cookie, null); } - private static KildaEntryDescriptor newPathEntry( + private static KildaEntryDescriptor newEndpointPathEntry( SwitchId switchId, MeasurePoint measurePoint, String flowId, String yFlowId, FlowSegmentCookie cookie, - MeterId meterId) { + MeterId meterId, boolean hasMirror) { if (yFlowId != null) { return new YFlowSubDescriptor(switchId, measurePoint, yFlowId, flowId, cookie, meterId); } - return new CommonFlowDescriptor(switchId, measurePoint, flowId, cookie, meterId); + return new EndpointFlowDescriptor(switchId, measurePoint, flowId, cookie, meterId, hasMirror); } } diff --git a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/MeterStatsHandler.java b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/MeterStatsHandler.java index cb11a365c8b..a3c69fbe787 100644 --- a/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/MeterStatsHandler.java +++ b/src-java/stats-topology/stats-storm-topology/src/main/java/org/openkilda/wfm/topology/stats/service/MeterStatsHandler.java @@ -27,6 +27,7 @@ import org.openkilda.wfm.topology.stats.model.CommonFlowDescriptor; import org.openkilda.wfm.topology.stats.model.DummyFlowDescriptor; import org.openkilda.wfm.topology.stats.model.DummyMeterDescriptor; +import org.openkilda.wfm.topology.stats.model.EndpointFlowDescriptor; import org.openkilda.wfm.topology.stats.model.KildaEntryDescriptor; import org.openkilda.wfm.topology.stats.model.StatVlanDescriptor; import org.openkilda.wfm.topology.stats.model.YFlowDescriptor; @@ -58,6 +59,11 @@ public void handleStatsEntry(CommonFlowDescriptor descriptor) { handleFlowStats(tags, descriptor.getCookie(), descriptor.getFlowId()); } + @Override + public void handleStatsEntry(EndpointFlowDescriptor descriptor) { + handleStatsEntry((CommonFlowDescriptor) descriptor); + } + @Override public void handleStatsEntry(YFlowDescriptor descriptor) { TagsFormatter tags = initTags(); diff --git a/src-java/stats-topology/stats-storm-topology/src/test/java/org/openkilda/wfm/topology/stats/StatsTopologyTest.java b/src-java/stats-topology/stats-storm-topology/src/test/java/org/openkilda/wfm/topology/stats/StatsTopologyTest.java index e1b72267c23..d781f3b578e 100644 --- a/src-java/stats-topology/stats-storm-topology/src/test/java/org/openkilda/wfm/topology/stats/StatsTopologyTest.java +++ b/src-java/stats-topology/stats-storm-topology/src/test/java/org/openkilda/wfm/topology/stats/StatsTopologyTest.java @@ -129,6 +129,8 @@ public class StatsTopologyTest extends AbstractStormTest { private static final FlowSegmentCookie MAIN_REVERSE_COOKIE = new FlowSegmentCookie(REVERSE, MAIN_COOKIE); private static final FlowSegmentCookie PROTECTED_FORWARD_COOKIE = new FlowSegmentCookie(FORWARD, PROTECTED_COOKIE); private static final FlowSegmentCookie PROTECTED_REVERSE_COOKIE = new FlowSegmentCookie(REVERSE, PROTECTED_COOKIE); + private static final FlowSegmentCookie FORWARD_MIRROR_COOKIE = MAIN_FORWARD_COOKIE.toBuilder().mirror(true).build(); + private static final FlowSegmentCookie REVERSE_MIRROR_COOKIE = MAIN_REVERSE_COOKIE.toBuilder().mirror(true).build(); private static final FlowSegmentCookie STAT_VLAN_FORWARD_COOKIE_1 = MAIN_FORWARD_COOKIE.toBuilder() .type(CookieType.VLAN_STATS_PRE_INGRESS).statsVlan(STAT_VLAN_1).build(); private static final FlowSegmentCookie STAT_VLAN_FORWARD_COOKIE_2 = MAIN_FORWARD_COOKIE.toBuilder() @@ -458,6 +460,62 @@ public void flowVlanStatsTest() { validateStatVlan(vlanReverseStats2, STAT_VLAN_REVERSE_COOKIE_2, SWITCH_ID_3); } + @Test + public void flowStatsMirrorTest() { + // FLow without mirrors + Flow flow = createFlow(); + sendUpdateFlowPathInfo(flow.getForwardPath(), flow.getVlanStatistics(), false, false); + sendUpdateFlowPathInfo(flow.getReversePath(), flow.getVlanStatistics(), false, false); + + // Flow has no mirrors. Ingress rule generates ingress metrics + FlowStatsEntry ingressForwardStats = new FlowStatsEntry(1, MAIN_FORWARD_COOKIE.getValue(), 1, 2, 3, 4); + sendStatsMessage(new FlowStatsData(SWITCH_ID_1, Collections.singletonList(ingressForwardStats))); + validateFlowStats(ingressForwardStats, MAIN_FORWARD_COOKIE, SWITCH_ID_1, true, false); + + // Flow has no mirrors. Egress rule generates egress metrics + FlowStatsEntry egressReverseStats = new FlowStatsEntry(1, MAIN_REVERSE_COOKIE.getValue(), 5, 6, 7, 8); + sendStatsMessage(new FlowStatsData(SWITCH_ID_1, Collections.singletonList(egressReverseStats))); + validateFlowStats(egressReverseStats, MAIN_REVERSE_COOKIE, SWITCH_ID_1, false, true); + + // Add mirrors to the flow + sendUpdateFlowPathInfo(flow.getForwardPath(), flow.getVlanStatistics(), true, true); + sendUpdateFlowPathInfo(flow.getReversePath(), flow.getVlanStatistics(), true, true); + + // Flow has mirrors. Ingress rule generates only raw metrics + FlowStatsEntry ingressForwardRawStats = new FlowStatsEntry(1, MAIN_FORWARD_COOKIE.getValue(), 9, 10, 11, 12); + sendStatsMessage(new FlowStatsData(SWITCH_ID_1, Collections.singletonList(ingressForwardRawStats))); + validateFlowStats(ingressForwardRawStats, MAIN_FORWARD_COOKIE, SWITCH_ID_1, false, false); + + // Flow has mirrors. Egress rule generates only raw metrics + FlowStatsEntry egressReverseRawStats = new FlowStatsEntry(1, MAIN_REVERSE_COOKIE.getValue(), 13, 14, 15, 16); + sendStatsMessage(new FlowStatsData(SWITCH_ID_1, Collections.singletonList(egressReverseRawStats))); + validateFlowStats(egressReverseRawStats, MAIN_REVERSE_COOKIE, SWITCH_ID_1, false, false); + + // Flow has mirrors. Mirror rule generates ingress metrics + FlowStatsEntry mirrorForwardStats = new FlowStatsEntry(1, FORWARD_MIRROR_COOKIE.getValue(), 17, 18, 19, 20); + sendStatsMessage(new FlowStatsData(SWITCH_ID_1, Collections.singletonList(mirrorForwardStats))); + validateFlowStats(mirrorForwardStats, FORWARD_MIRROR_COOKIE, SWITCH_ID_1, true, false); + + // Flow has mirrors. Mirror rule generates egress metrics + FlowStatsEntry mirrorReverseRawStats = new FlowStatsEntry(1, REVERSE_MIRROR_COOKIE.getValue(), 21, 22, 23, 24); + sendStatsMessage(new FlowStatsData(SWITCH_ID_1, Collections.singletonList(mirrorReverseRawStats))); + validateFlowStats(mirrorReverseRawStats, REVERSE_MIRROR_COOKIE, SWITCH_ID_1, false, true); + + // Remove mirrors from the flow + sendUpdateFlowPathInfo(flow.getForwardPath(), flow.getVlanStatistics(), false, false); + sendUpdateFlowPathInfo(flow.getReversePath(), flow.getVlanStatistics(), false, false); + + // Flow has no mirrors. Ingress rule generates ingress metrics + FlowStatsEntry newIngressForwardStats = new FlowStatsEntry(1, MAIN_FORWARD_COOKIE.getValue(), 25, 26, 27, 28); + sendStatsMessage(new FlowStatsData(SWITCH_ID_1, Collections.singletonList(newIngressForwardStats))); + validateFlowStats(newIngressForwardStats, MAIN_FORWARD_COOKIE, SWITCH_ID_1, true, false); + + // Flow has no mirrors. Egress rule generates egress metrics + FlowStatsEntry newEgressReverseStats = new FlowStatsEntry(1, MAIN_REVERSE_COOKIE.getValue(), 29, 30, 31, 32); + sendStatsMessage(new FlowStatsData(SWITCH_ID_1, Collections.singletonList(newEgressReverseStats))); + validateFlowStats(newEgressReverseStats, MAIN_REVERSE_COOKIE, SWITCH_ID_1, false, true); + } + @Test public void flowStatsSwapPathTest() { Flow flow = createFlowWithProtectedPath(); @@ -528,13 +586,14 @@ private void validateFlowStats( } String direction = cookie.getDirection().name().toLowerCase(); + int expectedRawTagsCount = cookie.isMirror() ? 12 : 9; datapoints.forEach(datapoint -> { switch (datapoint.getMetric()) { case METRIC_PREFIX + "flow.raw.packets": case METRIC_PREFIX + "flow.raw.bytes": case METRIC_PREFIX + "flow.raw.bits": - assertEquals(9, datapoint.getTags().size()); + assertEquals(expectedRawTagsCount, datapoint.getTags().size()); assertEquals(flowId, datapoint.getTags().get("flowid")); assertEquals(direction, datapoint.getTags().get("direction")); assertEquals(String.valueOf(flowStats.getTableId()), datapoint.getTags().get("tableid")); @@ -542,7 +601,15 @@ private void validateFlowStats( assertEquals(switchId.toOtsdFormat(), datapoint.getTags().get("switchid")); assertEquals(Integer.toString(flowStats.getOutPort()), datapoint.getTags().get("outPort")); assertEquals(Integer.toString(flowStats.getInPort()), datapoint.getTags().get("inPort")); - assertEquals("false", datapoint.getTags().get("is_flow_satellite")); + + if (cookie.isMirror()) { + assertEquals("true", datapoint.getTags().get("is_flow_satellite")); + assertEquals("true", datapoint.getTags().get("is_mirror")); + assertEquals("false", datapoint.getTags().get("is_loop")); + assertEquals("false", datapoint.getTags().get("is_flowrtt_inject")); + } else { + assertEquals("false", datapoint.getTags().get("is_flow_satellite")); + } break; case METRIC_PREFIX + "flow.ingress.packets": case METRIC_PREFIX + "flow.ingress.bytes": @@ -859,17 +926,30 @@ private void sendStatsMessage(InfoData infoData) { } private void sendRemoveFlowPathInfo(FlowPath flowPath, Set vlanStatistics) { + sendRemoveFlowPathInfo(flowPath, vlanStatistics, hasIngressMirror(flowPath), hasEgressMirror(flowPath)); + } + + private void sendRemoveFlowPathInfo( + FlowPath flowPath, Set vlanStatistics, boolean ingressMirror, boolean egressMirror) { RemoveFlowPathInfo pathInfo = new RemoveFlowPathInfo( flowPath.getFlowId(), null, flowPath.getCookie(), flowPath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flowPath.getFlow(), flowPath), vlanStatistics); + FlowPathMapper.INSTANCE.mapToPathNodes(flowPath.getFlow(), flowPath), vlanStatistics, ingressMirror, + egressMirror); InfoMessage infoMessage = new InfoMessage(pathInfo, timestamp, UUID.randomUUID().toString(), null, null); sendMessage(infoMessage, statsTopologyConfig.getFlowStatsNotifyTopic()); } + private void sendUpdateFlowPathInfo(FlowPath flowPath, Set vlanStatistics) { + sendUpdateFlowPathInfo(flowPath, vlanStatistics, hasIngressMirror(flowPath), hasEgressMirror(flowPath)); + } + + private void sendUpdateFlowPathInfo( + FlowPath flowPath, Set vlanStatistics, boolean ingressMirror, boolean egressMirror) { UpdateFlowPathInfo pathInfo = new UpdateFlowPathInfo( flowPath.getFlowId(), null, flowPath.getCookie(), flowPath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flowPath.getFlow(), flowPath), vlanStatistics); + FlowPathMapper.INSTANCE.mapToPathNodes(flowPath.getFlow(), flowPath), vlanStatistics, ingressMirror, + egressMirror); InfoMessage infoMessage = new InfoMessage(pathInfo, timestamp, UUID.randomUUID().toString(), null, null); sendMessage(infoMessage, statsTopologyConfig.getFlowStatsNotifyTopic()); } @@ -926,4 +1006,14 @@ private Map createDatapointMap(List datapoints) { .stream() .collect(Collectors.toMap(Datapoint::getMetric, Function.identity())); } + + protected boolean hasIngressMirror(FlowPath flowPath) { + return flowPath.getFlowMirrorPointsSet().stream() + .anyMatch(point -> point.getMirrorSwitchId().equals(flowPath.getSrcSwitchId())); + } + + protected boolean hasEgressMirror(FlowPath flowPath) { + return flowPath.getFlowMirrorPointsSet().stream() + .anyMatch(point -> point.getMirrorSwitchId().equals(flowPath.getDestSwitchId())); + } } diff --git a/src-java/stats-topology/stats-storm-topology/src/test/java/org/openkilda/wfm/topology/stats/service/KildaEntryCacheServiceTest.java b/src-java/stats-topology/stats-storm-topology/src/test/java/org/openkilda/wfm/topology/stats/service/KildaEntryCacheServiceTest.java index 952a3f5566b..82ffc7b6277 100644 --- a/src-java/stats-topology/stats-storm-topology/src/test/java/org/openkilda/wfm/topology/stats/service/KildaEntryCacheServiceTest.java +++ b/src-java/stats-topology/stats-storm-topology/src/test/java/org/openkilda/wfm/topology/stats/service/KildaEntryCacheServiceTest.java @@ -36,9 +36,14 @@ import org.openkilda.messaging.info.stats.UpdateFlowPathInfo; import org.openkilda.model.Flow; import org.openkilda.model.FlowEndpoint; +import org.openkilda.model.FlowMirrorPoints; import org.openkilda.model.FlowPath; import org.openkilda.model.FlowPathDirection; +import org.openkilda.model.GroupId; import org.openkilda.model.MeterId; +import org.openkilda.model.MirrorDirection; +import org.openkilda.model.MirrorGroup; +import org.openkilda.model.MirrorGroupType; import org.openkilda.model.Switch; import org.openkilda.model.SwitchId; import org.openkilda.model.YFlow; @@ -53,6 +58,7 @@ import org.openkilda.wfm.share.yflow.TestYFlowBuilder; import org.openkilda.wfm.share.yflow.TestYSubFlowBuilder; import org.openkilda.wfm.topology.stats.model.CommonFlowDescriptor; +import org.openkilda.wfm.topology.stats.model.EndpointFlowDescriptor; import org.openkilda.wfm.topology.stats.model.FlowStatsAndDescriptor; import org.openkilda.wfm.topology.stats.model.KildaEntryDescriptor; import org.openkilda.wfm.topology.stats.model.MeterStatsAndDescriptor; @@ -105,6 +111,7 @@ public class KildaEntryCacheServiceTest { private static final SwitchId DST_SWITCH_ID = new SwitchId(2L); private static final SwitchId TRANSIT_SWITCH_ID = new SwitchId(3L); private static final Set STAT_VLANS = Sets.newHashSet(STAT_VLAN_1, STAT_VLAN_2); + public static final GroupId MIRROR_GROUP_ID = new GroupId(15); @Mock PersistenceManager persistenceManager; @@ -149,17 +156,17 @@ public void shouldRefreshCommonFlowsCookieCache() { assertDescriptionPopulation(statsEntries, statsOriginSrc.getStats().size(), 3); assertCookieCache( statsEntries, forwardPath.getCookie(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), - flow.getForwardPath().getMeterId())); + flow.getForwardPath().getMeterId(), false)); assertCookieCache( statsEntries, REVERSE_PATH_COOKIE, - new CommonFlowDescriptor( - flow.getSrcSwitchId(), EGRESS, flow.getFlowId(), REVERSE_PATH_COOKIE, null)); + new EndpointFlowDescriptor( + flow.getSrcSwitchId(), EGRESS, flow.getFlowId(), REVERSE_PATH_COOKIE, null, false)); assertCookieCache( statsEntries, PROTECTED_REVERSE_PATH_COOKIE, - new CommonFlowDescriptor( - flow.getSrcSwitchId(), EGRESS, flow.getFlowId(), PROTECTED_REVERSE_PATH_COOKIE, null)); + new EndpointFlowDescriptor( + flow.getSrcSwitchId(), EGRESS, flow.getFlowId(), PROTECTED_REVERSE_PATH_COOKIE, null, false)); FlowStatsData statsOriginDst = getFlowStatsDataDstSwitch(); service.completeAndForwardFlowStats(statsOriginDst); @@ -169,17 +176,17 @@ public void shouldRefreshCommonFlowsCookieCache() { assertDescriptionPopulation(statsEntries, statsOriginDst.getStats().size(), 3); assertCookieCache( statsEntries, forwardPath.getCookie(), - new CommonFlowDescriptor( - flow.getDestSwitchId(), EGRESS, flow.getFlowId(), forwardPath.getCookie(), null)); + new EndpointFlowDescriptor( + flow.getDestSwitchId(), EGRESS, flow.getFlowId(), forwardPath.getCookie(), null, false)); assertCookieCache( statsEntries, REVERSE_PATH_COOKIE, - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getDestSwitchId(), INGRESS, flow.getFlowId(), REVERSE_PATH_COOKIE, - flow.getReversePath().getMeterId())); + flow.getReversePath().getMeterId(), false)); assertCookieCache( statsEntries, PROTECTED_FORWARD_PATH_COOKIE, - new CommonFlowDescriptor( - flow.getDestSwitchId(), EGRESS, flow.getFlowId(), PROTECTED_FORWARD_PATH_COOKIE, null)); + new EndpointFlowDescriptor( + flow.getDestSwitchId(), EGRESS, flow.getFlowId(), PROTECTED_FORWARD_PATH_COOKIE, null, false)); FlowStatsData statsOriginTransit = getFlowStatsDataTransitSwitch(); service.completeAndForwardFlowStats(statsOriginTransit); @@ -210,9 +217,6 @@ public void shouldRefreshCommonFlowsCookieCache() { forwardPath.getCookie().toBuilder() .type(CookieType.SERVER_42_FLOW_RTT_INGRESS).build().getValue(), 0, 0, 0, 0), - new FlowStatsEntry( - 0, forwardPath.getCookie().toBuilder().mirror(true).build().getValue(), - 0, 0, 0, 0), new FlowStatsEntry( 0, forwardPath.getCookie().toBuilder().looped(true).build().getValue(), 0, 0, 0, 0), @@ -220,23 +224,18 @@ public void shouldRefreshCommonFlowsCookieCache() { new FlowStatsEntry(0, FORWARD_STAT_VLAN_COOKIE_2.getValue(), 0, 0, 0, 0)))); verify(carrier, atLeastOnce()).emitFlowStats(cookieCacheCaptor.capture()); statsEntries = cookieCacheCaptor.getValue().getStatsEntries(); - assertDescriptionPopulation(statsEntries, 5, 5); + assertDescriptionPopulation(statsEntries, 4, 4); assertCookieCache( statsEntries, forwardPath.getCookie().toBuilder() .type(CookieType.SERVER_42_FLOW_RTT_INGRESS).build(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), - forwardPath.getMeterId())); - assertCookieCache( - statsEntries, forwardPath.getCookie().toBuilder().mirror(true).build(), - new CommonFlowDescriptor( - flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), - forwardPath.getMeterId())); + forwardPath.getMeterId(), false)); assertCookieCache( statsEntries, forwardPath.getCookie().toBuilder().looped(true).build(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), - forwardPath.getMeterId())); + forwardPath.getMeterId(), false)); assertCookieCache( statsEntries, FORWARD_STAT_VLAN_COOKIE_1, new StatVlanDescriptor( @@ -247,6 +246,115 @@ public void shouldRefreshCommonFlowsCookieCache() { flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), STAT_VLANS)); } + @Test + public void shouldRefreshCommonFlowsCookieWithIngressMirrorCache() { + Flow flow = buildFlow(); + flow.getForwardPath().addFlowMirrorPoints(FlowMirrorPoints.builder() + .mirrorSwitch(flow.getSrcSwitch()) + .mirrorGroup(MirrorGroup.builder() + .switchId(flow.getSrcSwitchId()) + .flowId(flow.getFlowId()) + .pathId(flow.getForwardPathId()) + .groupId(MIRROR_GROUP_ID) + .mirrorDirection(MirrorDirection.INGRESS) + .mirrorGroupType(MirrorGroupType.TRAFFIC_INTEGRITY) + .build()) + .build()); + + when(flowRepository.findAll()).thenReturn(Collections.singletonList(flow)); + when(yFlowRepository.findAll()).thenReturn(Collections.emptyList()); + + service.activate(); + + final FlowPath forwardPath = flow.getForwardPath(); + + FlowStatsData statsOriginSrc = getFlowStatsDataSrcSwitch(); + service.completeAndForwardFlowStats(statsOriginSrc); + + verify(carrier, atLeastOnce()).emitFlowStats(cookieCacheCaptor.capture()); + List statsEntries = cookieCacheCaptor.getValue().getStatsEntries(); + assertDescriptionPopulation(statsEntries, statsOriginSrc.getStats().size(), 3); + assertCookieCache( + statsEntries, forwardPath.getCookie(), + new EndpointFlowDescriptor( + flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), + flow.getForwardPath().getMeterId(), true)); + assertCookieCache( + statsEntries, REVERSE_PATH_COOKIE, + new EndpointFlowDescriptor( + flow.getSrcSwitchId(), EGRESS, flow.getFlowId(), REVERSE_PATH_COOKIE, null, false)); + + + // mirror + service.completeAndForwardFlowStats( + new FlowStatsData(flow.getSrcSwitchId(), asList( + new FlowStatsEntry( + 0, forwardPath.getCookie().toBuilder().mirror(true).build().getValue(), + 0, 0, 0, 0)))); + verify(carrier, atLeastOnce()).emitFlowStats(cookieCacheCaptor.capture()); + statsEntries = cookieCacheCaptor.getValue().getStatsEntries(); + assertDescriptionPopulation(statsEntries, 1, 1); + assertCookieCache( + statsEntries, forwardPath.getCookie().toBuilder().mirror(true).build(), + new EndpointFlowDescriptor( + flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), + forwardPath.getMeterId(), true)); + } + + @Test + public void shouldRefreshCommonFlowsCookieWithEgressMirrorCache() { + Flow flow = buildFlow(); + flow.getForwardPath().addFlowMirrorPoints(FlowMirrorPoints.builder() + .mirrorSwitch(flow.getDestSwitch()) + .mirrorGroup(MirrorGroup.builder() + .switchId(flow.getDestSwitchId()) + .flowId(flow.getFlowId()) + .pathId(flow.getForwardPathId()) + .groupId(MIRROR_GROUP_ID) + .mirrorDirection(MirrorDirection.EGRESS) + .mirrorGroupType(MirrorGroupType.TRAFFIC_INTEGRITY) + .build()) + .build()); + + when(flowRepository.findAll()).thenReturn(Collections.singletonList(flow)); + when(yFlowRepository.findAll()).thenReturn(Collections.emptyList()); + + service.activate(); + + final FlowPath forwardPath = flow.getForwardPath(); + + FlowStatsData statsOriginDst = getFlowStatsDataDstSwitch(); + service.completeAndForwardFlowStats(statsOriginDst); + + verify(carrier, atLeastOnce()).emitFlowStats(cookieCacheCaptor.capture()); + List statsEntries = cookieCacheCaptor.getValue().getStatsEntries(); + assertDescriptionPopulation(statsEntries, statsOriginDst.getStats().size(), 3); + assertCookieCache( + statsEntries, REVERSE_PATH_COOKIE, + new EndpointFlowDescriptor( + flow.getDestSwitchId(), INGRESS, flow.getFlowId(), REVERSE_PATH_COOKIE, + flow.getReversePath().getMeterId(), false)); + assertCookieCache( + statsEntries, forwardPath.getCookie(), + new EndpointFlowDescriptor( + flow.getDestSwitchId(), EGRESS, flow.getFlowId(), forwardPath.getCookie(), null, true)); + + + // mirror + service.completeAndForwardFlowStats( + new FlowStatsData(flow.getDestSwitchId(), asList( + new FlowStatsEntry( + 0, forwardPath.getCookie().toBuilder().mirror(true).build().getValue(), + 0, 0, 0, 0)))); + verify(carrier, atLeastOnce()).emitFlowStats(cookieCacheCaptor.capture()); + statsEntries = cookieCacheCaptor.getValue().getStatsEntries(); + assertDescriptionPopulation(statsEntries, 1, 1); + assertCookieCache( + statsEntries, forwardPath.getCookie().toBuilder().mirror(true).build(), + new EndpointFlowDescriptor( + flow.getDestSwitchId(), EGRESS, flow.getFlowId(), forwardPath.getCookie(), null, true)); + } + @Test public void shouldRefreshYFlowSubFlowCookieCache() { Switch srcSwitch = Switch.builder().switchId(SRC_SWITCH_ID).build(); @@ -350,14 +458,14 @@ public void shouldCacheServiceRefreshMeterCache() { assertDescriptionPopulation(statsEntries, statsOriginSrc.getStats().size(), 2); assertMeterCache( statsEntries, forwardMeterId.getValue(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), flow.getForwardPath().getCookie(), - forwardMeterId)); + forwardMeterId, false)); assertMeterCache( statsEntries, forwardProtectedMeterId.getValue(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), flow.getProtectedForwardPath().getCookie(), - forwardProtectedMeterId)); + forwardProtectedMeterId, false)); MeterStatsData statsOriginDst = getMeterStatsDataDstSwitch(); service.completeAndForwardMeterStats(statsOriginDst); @@ -369,15 +477,15 @@ public void shouldCacheServiceRefreshMeterCache() { final MeterId reverseMeterId = flow.getReversePath().getMeterId(); assertMeterCache( statsEntries, reverseMeterId.getValue(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getDestSwitchId(), INGRESS, flow.getFlowId(), flow.getReversePath().getCookie(), - reverseMeterId)); + reverseMeterId, false)); final MeterId reverseProtectedMeterId = flow.getProtectedReversePath().getMeterId(); assertMeterCache( statsEntries, reverseProtectedMeterId.getValue(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getDestSwitchId(), INGRESS, flow.getFlowId(), flow.getProtectedReversePath().getCookie(), - reverseProtectedMeterId)); + reverseProtectedMeterId, false)); } @Test @@ -469,7 +577,7 @@ public void shouldCompleteFlowStats() { FlowPath forwardPath = flow.getForwardPath(); UpdateFlowPathInfo pathInfo = new UpdateFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), forwardPath.getCookie(), forwardPath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flow, forwardPath), STAT_VLANS); + FlowPathMapper.INSTANCE.mapToPathNodes(flow, forwardPath), STAT_VLANS, false, false); service.addOrUpdateCache(pathInfo); service.completeAndForwardFlowStats(statsOrigin); @@ -479,14 +587,14 @@ public void shouldCompleteFlowStats() { assertDescriptionPopulation(statsEntries, statsOrigin.getStats().size(), 1); assertCookieCache( statsEntries, forwardPath.getCookie(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), - forwardPath.getMeterId())); + forwardPath.getMeterId(), false)); FlowPath reversePath = flow.getReversePath(); UpdateFlowPathInfo pathInfo2 = new UpdateFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), reversePath.getCookie(), reversePath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flow, reversePath), STAT_VLANS); + FlowPathMapper.INSTANCE.mapToPathNodes(flow, reversePath), STAT_VLANS, false, false); service.addOrUpdateCache(pathInfo2); service.completeAndForwardFlowStats(statsOrigin); @@ -496,19 +604,19 @@ public void shouldCompleteFlowStats() { assertDescriptionPopulation(statsEntries, statsOrigin.getStats().size(), 2); assertCookieCache( statsEntries, forwardPath.getCookie(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), - forwardPath.getMeterId())); + forwardPath.getMeterId(), false)); assertCookieCache( statsEntries, reversePath.getCookie(), - new CommonFlowDescriptor( - flow.getSrcSwitchId(), EGRESS, flow.getFlowId(), reversePath.getCookie(), null)); + new EndpointFlowDescriptor( + flow.getSrcSwitchId(), EGRESS, flow.getFlowId(), reversePath.getCookie(), null, false)); FlowPath protectedReversePath = flow.getProtectedReversePath(); UpdateFlowPathInfo pathInfo3 = new UpdateFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), protectedReversePath.getCookie(), protectedReversePath.getMeterId(), FlowPathMapper.INSTANCE.mapToPathNodes(flow, protectedReversePath), - STAT_VLANS); + STAT_VLANS, false, false); service.addOrUpdateCache(pathInfo3); service.completeAndForwardFlowStats(statsOrigin); @@ -518,17 +626,18 @@ public void shouldCompleteFlowStats() { assertDescriptionPopulation(statsEntries, statsOrigin.getStats().size(), 3); assertCookieCache( statsEntries, forwardPath.getCookie(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), - forwardPath.getMeterId())); + forwardPath.getMeterId(), false)); assertCookieCache( statsEntries, reversePath.getCookie(), - new CommonFlowDescriptor( - flow.getSrcSwitchId(), EGRESS, flow.getFlowId(), reversePath.getCookie(), null)); + new EndpointFlowDescriptor( + flow.getSrcSwitchId(), EGRESS, flow.getFlowId(), reversePath.getCookie(), null, false)); assertCookieCache( statsEntries, protectedReversePath.getCookie(), - new CommonFlowDescriptor( - flow.getSrcSwitchId(), EGRESS, flow.getFlowId(), protectedReversePath.getCookie(), null)); + new EndpointFlowDescriptor( + flow.getSrcSwitchId(), EGRESS, flow.getFlowId(), protectedReversePath.getCookie(), null, + false)); } @Test @@ -545,7 +654,7 @@ public void shouldHandleRemovingFlowFromCache() { FlowPath forwardPath = flow.getForwardPath(); UpdateFlowPathInfo pathInfo = new UpdateFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), forwardPath.getCookie(), forwardPath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flow, forwardPath), STAT_VLANS); + FlowPathMapper.INSTANCE.mapToPathNodes(flow, forwardPath), STAT_VLANS, false, false); service.addOrUpdateCache(pathInfo); service.completeAndForwardFlowStats(statsOrigin); @@ -555,13 +664,13 @@ public void shouldHandleRemovingFlowFromCache() { assertDescriptionPopulation(statsEntries, statsOrigin.getStats().size(), 1); assertCookieCache( statsEntries, forwardPath.getCookie(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), - forwardPath.getMeterId())); + forwardPath.getMeterId(), false)); RemoveFlowPathInfo pathInfo2 = new RemoveFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), forwardPath.getCookie(), forwardPath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flow, forwardPath), STAT_VLANS); + FlowPathMapper.INSTANCE.mapToPathNodes(flow, forwardPath), STAT_VLANS, false, false); service.removeCached(pathInfo2); service.completeAndForwardFlowStats(statsOrigin); @@ -585,7 +694,7 @@ public void shouldCompleteMeterStats() { FlowPath forwardPath = flow.getForwardPath(); UpdateFlowPathInfo pathInfo = new UpdateFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), forwardPath.getCookie(), forwardPath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flow, forwardPath), STAT_VLANS); + FlowPathMapper.INSTANCE.mapToPathNodes(flow, forwardPath), STAT_VLANS, false, false); service.addOrUpdateCache(pathInfo); service.completeAndForwardMeterStats(statsOrigin); @@ -595,15 +704,15 @@ public void shouldCompleteMeterStats() { assertDescriptionPopulation(statsEntries, statsOrigin.getStats().size(), 1); assertMeterCache( statsEntries, forwardPath.getMeterId().getValue(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), - forwardPath.getMeterId())); + forwardPath.getMeterId(), false)); FlowPath protectedForwardPath = flow.getProtectedForwardPath(); UpdateFlowPathInfo pathInfo2 = new UpdateFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), protectedForwardPath.getCookie(), protectedForwardPath.getMeterId(), FlowPathMapper.INSTANCE.mapToPathNodes(flow, protectedForwardPath), - STAT_VLANS); + STAT_VLANS, false, false); service.addOrUpdateCache(pathInfo2); service.completeAndForwardMeterStats(statsOrigin); @@ -613,14 +722,14 @@ public void shouldCompleteMeterStats() { assertDescriptionPopulation(statsEntries, statsOrigin.getStats().size(), 2); assertMeterCache( statsEntries, forwardPath.getMeterId().getValue(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), - forwardPath.getMeterId())); + forwardPath.getMeterId(), false)); assertMeterCache( statsEntries, protectedForwardPath.getMeterId().getValue(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), protectedForwardPath.getCookie(), - protectedForwardPath.getMeterId())); + protectedForwardPath.getMeterId(), false)); } @Test @@ -637,7 +746,7 @@ public void shouldHandleRemovingMeterFromCache() { FlowPath forwardPath = flow.getForwardPath(); UpdateFlowPathInfo pathInfo = new UpdateFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), forwardPath.getCookie(), forwardPath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flow, forwardPath), STAT_VLANS); + FlowPathMapper.INSTANCE.mapToPathNodes(flow, forwardPath), STAT_VLANS, false, false); service.addOrUpdateCache(pathInfo); service.completeAndForwardMeterStats(statsOrigin); @@ -647,13 +756,13 @@ public void shouldHandleRemovingMeterFromCache() { assertDescriptionPopulation(statsEntries, statsOrigin.getStats().size(), 1); assertMeterCache( statsEntries, forwardPath.getMeterId().getValue(), - new CommonFlowDescriptor( + new EndpointFlowDescriptor( flow.getSrcSwitchId(), INGRESS, flow.getFlowId(), forwardPath.getCookie(), - forwardPath.getMeterId())); + forwardPath.getMeterId(), false)); RemoveFlowPathInfo pathInfo2 = new RemoveFlowPathInfo( flow.getFlowId(), flow.getYFlowId(), forwardPath.getCookie(), forwardPath.getMeterId(), - FlowPathMapper.INSTANCE.mapToPathNodes(flow, forwardPath), STAT_VLANS); + FlowPathMapper.INSTANCE.mapToPathNodes(flow, forwardPath), STAT_VLANS, false, false); service.removeCached(pathInfo2); service.completeAndForwardMeterStats(statsOrigin); From 859d27d648408f3690ca96f53cd89069119e3611 Mon Sep 17 00:00:00 2001 From: Sergey Nikitin Date: Thu, 18 Aug 2022 23:28:37 +0400 Subject: [PATCH 15/15] Updated CHANGELOG.md (Release 1.123.1) --- CHANGELOG.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 541ec06b072..fb965c67562 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,30 @@ # Changelog +## v1.123.1 (01/09/2022) + +### Bug Fixes: +- [#4928](https://github.com/telstra/open-kilda/pull/4928) Collect `flow.ingress.*` and `flow.*` metrics from mirror rule if it's exist (Issue: [#4915](https://github.com/telstra/open-kilda/issues/4915)) [**storm-topologies**] +- [#4929](https://github.com/telstra/open-kilda/pull/4929) Fixed flow monitoring (Issue: [#4923](https://github.com/telstra/open-kilda/issues/4923)) +- [#4901](https://github.com/telstra/open-kilda/pull/4901) Removal action now saves dump once for both primary and protected paths (Issue: [#4391](https://github.com/telstra/open-kilda/issues/4391)) +- [#4902](https://github.com/telstra/open-kilda/pull/4902) Added diverse and affinity group IDs to flow history update dump (Issue: [#3807](https://github.com/telstra/open-kilda/issues/3807)) +- [#4904](https://github.com/telstra/open-kilda/pull/4904) Fixed and enabled test (Issue: [#3627](https://github.com/telstra/open-kilda/issues/3627)) [**tests**] +- [#4905](https://github.com/telstra/open-kilda/pull/4905) Changed error message for illegal state in predicate builder [**storm-topologies**] +- [#4909](https://github.com/telstra/open-kilda/pull/4909) Fixed OVERWRITE_DEFAULTS action on delete rules (Issue: [#2960](https://github.com/telstra/open-kilda/issues/2960)) [**storm-topologies**] +- [#4889](https://github.com/telstra/open-kilda/pull/4889) Fixed path filtering by maxLatency (Issue: [#4525](https://github.com/telstra/open-kilda/issues/4525)) [**storm-topologies**] + +### Improvements: +- [#4900](https://github.com/telstra/open-kilda/pull/4900) Unignore test according to #3847 (Issues: [#3847](https://github.com/telstra/open-kilda/issues/3847) [#3847](https://github.com/telstra/open-kilda/issues/3847) [#3847](https://github.com/telstra/open-kilda/issues/3847)) [**tests**] +- [#4907](https://github.com/telstra/open-kilda/pull/4907) Add note to Northbound API v1 page. [**northbound**] +- [#4908](https://github.com/telstra/open-kilda/pull/4908) LAG port removal test fix - removing redundant checks, and fix source of ports array (Issue: [#4729](https://github.com/telstra/open-kilda/issues/4729)) [**tests**] +- [#4921](https://github.com/telstra/open-kilda/pull/4921) adding new test for the vxlan flow ping (Issue: [#4920](https://github.com/telstra/open-kilda/issues/4920)) [**tests**] + +For the complete list of changes, check out [the commit log](https://github.com/telstra/open-kilda/compare/v1.123.0...v1.123.1). + +### Affected Components: +nb, stats, nbworker, flow-hs, swmanager + +--- + ## v1.123.0 (08/08/2022) ### Features: