Skip to content

Commit

Permalink
Merge branch 'release-1.123.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
niksv committed Sep 1, 2022
2 parents 00aa459 + 859d27d commit 660cc29
Show file tree
Hide file tree
Showing 51 changed files with 1,130 additions and 186 deletions.
25 changes: 25 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
# Changelog

## v1.123.1 (01/09/2022)

### Bug Fixes:
- [#4928](https://github.com/telstra/open-kilda/pull/4928) Collect `flow.ingress.*` and `flow.*` metrics from mirror rule if it's exist (Issue: [#4915](https://github.com/telstra/open-kilda/issues/4915)) [**storm-topologies**]
- [#4929](https://github.com/telstra/open-kilda/pull/4929) Fixed flow monitoring (Issue: [#4923](https://github.com/telstra/open-kilda/issues/4923))
- [#4901](https://github.com/telstra/open-kilda/pull/4901) Removal action now saves dump once for both primary and protected paths (Issue: [#4391](https://github.com/telstra/open-kilda/issues/4391))
- [#4902](https://github.com/telstra/open-kilda/pull/4902) Added diverse and affinity group IDs to flow history update dump (Issue: [#3807](https://github.com/telstra/open-kilda/issues/3807))
- [#4904](https://github.com/telstra/open-kilda/pull/4904) Fixed and enabled test (Issue: [#3627](https://github.com/telstra/open-kilda/issues/3627)) [**tests**]
- [#4905](https://github.com/telstra/open-kilda/pull/4905) Changed error message for illegal state in predicate builder [**storm-topologies**]
- [#4909](https://github.com/telstra/open-kilda/pull/4909) Fixed OVERWRITE_DEFAULTS action on delete rules (Issue: [#2960](https://github.com/telstra/open-kilda/issues/2960)) [**storm-topologies**]
- [#4889](https://github.com/telstra/open-kilda/pull/4889) Fixed path filtering by maxLatency (Issue: [#4525](https://github.com/telstra/open-kilda/issues/4525)) [**storm-topologies**]

### Improvements:
- [#4900](https://github.com/telstra/open-kilda/pull/4900) Unignore test according to #3847 (Issues: [#3847](https://github.com/telstra/open-kilda/issues/3847) [#3847](https://github.com/telstra/open-kilda/issues/3847) [#3847](https://github.com/telstra/open-kilda/issues/3847)) [**tests**]
- [#4907](https://github.com/telstra/open-kilda/pull/4907) Add note to Northbound API v1 page. [**northbound**]
- [#4908](https://github.com/telstra/open-kilda/pull/4908) LAG port removal test fix - removing redundant checks, and fix source of ports array (Issue: [#4729](https://github.com/telstra/open-kilda/issues/4729)) [**tests**]
- [#4921](https://github.com/telstra/open-kilda/pull/4921) adding new test for the vxlan flow ping (Issue: [#4920](https://github.com/telstra/open-kilda/issues/4920)) [**tests**]

For the complete list of changes, check out [the commit log](https://github.com/telstra/open-kilda/compare/v1.123.0...v1.123.1).

### Affected Components:
nb, stats, nbworker, flow-hs, swmanager

---

## v1.123.0 (08/08/2022)

### Features:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ public class Path implements Serializable {
@JsonProperty("nodes")
private List<PathNodePayload> nodes;

@JsonProperty("is_backup_path")
private Boolean isBackupPath;

public Path(@JsonProperty("bandwidth") Long bandwidth,
@JsonProperty("latency") Duration latency,
@JsonProperty("nodes") List<PathNodePayload> nodes) {
@JsonProperty("nodes") List<PathNodePayload> nodes,
@JsonProperty("is_backup_path") Boolean isBackupPath) {
this.bandwidth = bandwidth;
this.latency = latency;
this.nodes = nodes;
this.isBackupPath = isBackupPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,25 @@ public class PathDto {
@JsonProperty("nodes")
private List<PathNodePayload> nodes;

public PathDto(Long bandwidth, Duration latency, List<PathNodePayload> nodes) {
this(bandwidth, latency.toNanos(), latency.toNanos(), latency.toMillis(), nodes);
@JsonProperty("is_backup_path")
private Boolean isBackupPath;

public PathDto(Long bandwidth, Duration latency, List<PathNodePayload> nodes, Boolean isBackupPath) {
this(bandwidth, latency.toNanos(), latency.toNanos(), latency.toMillis(), nodes, isBackupPath);
}

@JsonCreator
public PathDto(@JsonProperty("bandwidth") Long bandwidth,
@JsonProperty("latency") Long latency,
@JsonProperty("latency_ns") Long latencyNs,
@JsonProperty("latency_ms") Long latencyMs,
@JsonProperty("nodes") List<PathNodePayload> nodes) {
@JsonProperty("nodes") List<PathNodePayload> nodes,
@JsonProperty("is_backup_path") Boolean isBackupPath) {
this.bandwidth = bandwidth;
this.latency = latency;
this.latencyNs = latencyNs;
this.latencyMs = latencyMs;
this.nodes = nodes;
this.isBackupPath = isBackupPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class PathMapper {
*/
public org.openkilda.messaging.info.network.Path map(org.openkilda.pce.Path path) {
if (path == null || path.getSegments().isEmpty()) {
return new org.openkilda.messaging.info.network.Path(0L, Duration.ZERO, new ArrayList<>());
return new org.openkilda.messaging.info.network.Path(0L, Duration.ZERO, new ArrayList<>(), false);
}

List<PathNodePayload> nodes = new ArrayList<>();
Expand All @@ -62,6 +62,6 @@ public org.openkilda.messaging.info.network.Path map(org.openkilda.pce.Path path
}

return new org.openkilda.messaging.info.network.Path(path.getMinAvailableBandwidth(),
Duration.ofNanos(path.getLatency()), nodes);
Duration.ofNanos(path.getLatency()), nodes, path.isBackupPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,10 @@ private void statsTopologyOutput(TopologyBuilder topologyBuilder) {
.shuffleGrouping(ComponentId.YFLOW_REROUTE_HUB.name(),
Stream.HUB_TO_STATS_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.YFLOW_DELETE_HUB.name(),
Stream.HUB_TO_STATS_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.FLOW_CREATE_MIRROR_POINT_HUB.name(),
Stream.HUB_TO_STATS_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.FLOW_DELETE_MIRROR_POINT_HUB.name(),
Stream.HUB_TO_STATS_TOPOLOGY_SENDER.name());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_SPEAKER_WORKER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_STATS_TOPOLOGY_SENDER;
import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_PAYLOAD;

import org.openkilda.bluegreen.LifecycleEvent;
Expand All @@ -26,6 +27,7 @@
import org.openkilda.messaging.Message;
import org.openkilda.messaging.command.flow.FlowMirrorPointCreateRequest;
import org.openkilda.messaging.info.InfoMessage;
import org.openkilda.messaging.info.stats.UpdateFlowPathInfo;
import org.openkilda.pce.AvailableNetworkFactory;
import org.openkilda.pce.PathComputer;
import org.openkilda.pce.PathComputerConfig;
Expand All @@ -46,6 +48,7 @@

import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
Expand Down Expand Up @@ -146,6 +149,15 @@ public void sendHistoryUpdate(FlowHistoryHolder historyHolder) {
new Values(historyHolder.getTaskId(), message));
}

@Override
public void sendNotifyFlowStats(@NonNull UpdateFlowPathInfo flowPathInfo) {
Message message = new InfoMessage(flowPathInfo, System.currentTimeMillis(),
getCommandContext().getCorrelationId());

emitWithContext(HUB_TO_STATS_TOPOLOGY_SENDER.name(), getCurrentTuple(),
new Values(flowPathInfo.getFlowId(), message));
}

@Override
public void cancelTimeoutCallback(String key) {
cancelCallback(key);
Expand All @@ -164,6 +176,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_STATS_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(ZkStreams.ZK.toString(),
new Fields(ZooKeeperBolt.FIELD_ID_STATE, ZooKeeperBolt.FIELD_ID_CONTEXT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_SPEAKER_WORKER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_STATS_TOPOLOGY_SENDER;
import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_PAYLOAD;

import org.openkilda.bluegreen.LifecycleEvent;
Expand All @@ -26,6 +27,7 @@
import org.openkilda.messaging.Message;
import org.openkilda.messaging.command.flow.FlowMirrorPointDeleteRequest;
import org.openkilda.messaging.info.InfoMessage;
import org.openkilda.messaging.info.stats.RemoveFlowPathInfo;
import org.openkilda.persistence.PersistenceManager;
import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig;
Expand All @@ -42,6 +44,7 @@

import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
Expand Down Expand Up @@ -133,6 +136,15 @@ public void sendHistoryUpdate(FlowHistoryHolder historyHolder) {
new Values(historyHolder.getTaskId(), message));
}

@Override
public void sendNotifyFlowStats(@NonNull RemoveFlowPathInfo flowPathInfo) {
Message message = new InfoMessage(flowPathInfo, System.currentTimeMillis(),
getCommandContext().getCorrelationId());

emitWithContext(HUB_TO_STATS_TOPOLOGY_SENDER.name(), getCurrentTuple(),
new Values(flowPathInfo.getFlowId(), message));
}

@Override
public void cancelTimeoutCallback(String key) {
cancelCallback(key);
Expand All @@ -151,6 +163,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_STATS_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(ZkStreams.ZK.toString(),
new Fields(ZooKeeperBolt.FIELD_ID_STATE, ZooKeeperBolt.FIELD_ID_CONTEXT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,14 @@ protected void setMirrorPointsToNewPath(PathId oldFlowPathId, PathId newFlowPath
oldPath.getFlowMirrorPointsSet().forEach(newPath::addFlowMirrorPoints))));
}
}

protected boolean hasIngressMirror(FlowPath flowPath) {
return flowPath.getFlowMirrorPointsSet().stream()
.anyMatch(point -> point.getMirrorSwitchId().equals(flowPath.getSrcSwitchId()));
}

protected boolean hasEgressMirror(FlowPath flowPath) {
return flowPath.getFlowMirrorPointsSet().stream()
.anyMatch(point -> point.getMirrorSwitchId().equals(flowPath.getDestSwitchId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ protected void perform(S from, S to, E event, C context, T stateMachine) {
Flow flow = flowPath.getFlow();
UpdateFlowPathInfo pathInfo = new UpdateFlowPathInfo(
flow.getFlowId(), flow.getYFlowId(), flowPath.getCookie(), flowPath.getMeterId(),
FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics());
FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics(),
hasIngressMirror(flowPath), hasEgressMirror(flowPath));
carrier.sendNotifyFlowStats(pathInfo);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ protected void perform(S from, S to, E event, C context, T stateMachine) {
Flow flow = flowPath.getFlow();
RemoveFlowPathInfo pathInfo = new RemoveFlowPathInfo(
flow.getFlowId(), flow.getYFlowId(), flowPath.getCookie(), flowPath.getMeterId(),
FlowPathMapper.INSTANCE.mapToPathNodes(originalFlow, flowPath), flow.getVlanStatistics());
FlowPathMapper.INSTANCE.mapToPathNodes(originalFlow, flowPath), flow.getVlanStatistics(),
hasIngressMirror(flowPath), hasEgressMirror(flowPath));
carrier.sendNotifyFlowStats(pathInfo);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ protected void perform(State from, State to, Event event, FlowCreateContext cont
Flow flow = flowPath.getFlow();
UpdateFlowPathInfo pathInfo = new UpdateFlowPathInfo(
flow.getFlowId(), flow.getYFlowId(), flowPath.getCookie(), flowPath.getMeterId(),
FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics());
FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics(),
hasIngressMirror(flowPath), hasEgressMirror(flowPath));
carrier.sendNotifyFlowStats(pathInfo);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,21 @@ protected void perform(State from, State to, Event event, FlowDeleteContext cont
return foundFlow;
});

Set<FlowPathPair> flowPathPairs = new HashSet<>();

flowPathPairs.add(new FlowPathPair(flow.getForwardPath(), flow.getReversePath()));
flowPathPairs.add(new FlowPathPair(flow.getProtectedForwardPath(), flow.getProtectedReversePath()));

// Iterate to remove each path in a dedicated transaction.
flow.getPathIds().forEach(pathId -> {
Optional<FlowPath> deletedPath = flowPathRepository.remove(pathId);
deletedPath.ifPresent(path -> {
updateIslsForFlowPath(path);
saveRemovalActionWithDumpToHistory(stateMachine, flow, new FlowPathPair(path, path));
});
deletedPath.ifPresent(this::updateIslsForFlowPath);
});

flowPathPairs.forEach(flowPathPair -> {
if (flowPathPair.getForward() != null && flowPathPair.getReverse() != null) {
saveRemovalActionWithDumpToHistory(stateMachine, flow, flowPathPair);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ protected void perform(State from, State to, Event event, FlowDeleteContext cont
Flow flow = flowPath.getFlow();
RemoveFlowPathInfo pathInfo = new RemoveFlowPathInfo(
flow.getFlowId(), flow.getYFlowId(), flowPath.getCookie(), flowPath.getMeterId(),
FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics());
FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics(),
hasIngressMirror(flowPath), hasEgressMirror(flowPath));
carrier.sendNotifyFlowStats(pathInfo);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.EmitVerifyRulesRequestsAction;
import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.HandleNotCompletedCommandsAction;
import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.HandleNotDeallocatedFlowMirrorPathResourceAction;
import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.NotifyFlowStatsAction;
import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.OnFinishedAction;
import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.OnFinishedWithErrorAction;
import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions.OnReceivedInstallResponseAction;
Expand Down Expand Up @@ -155,6 +156,11 @@ public Factory(@NonNull FlowGenericCarrier carrier, @NonNull PersistenceManager
.onEach(Event.TIMEOUT, Event.MISSING_RULE_FOUND, Event.ERROR);

builder.transitions().from(State.RULES_VALIDATED)
.toAmong(State.NOTIFY_FLOW_STATS, State.NOTIFY_FLOW_STATS)
.onEach(Event.NEXT, Event.ERROR)
.perform(new NotifyFlowStatsAction(persistenceManager));

builder.transitions().from(State.NOTIFY_FLOW_STATS)
.toAmong(State.MIRROR_PATH_INSTALLATION_COMPLETED, State.MIRROR_PATH_INSTALLATION_COMPLETED)
.onEach(Event.NEXT, Event.ERROR)
.perform(new PostFlowMirrorPathInstallationAction(persistenceManager));
Expand Down Expand Up @@ -220,6 +226,7 @@ public enum State {
VALIDATING_RULES,
RULES_VALIDATED,

NOTIFY_FLOW_STATS,
MIRROR_PATH_INSTALLATION_COMPLETED,

FINISHED,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/* Copyright 2022 Telstra Open Source
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.actions;

import org.openkilda.messaging.info.stats.UpdateFlowPathInfo;
import org.openkilda.model.Flow;
import org.openkilda.persistence.PersistenceManager;
import org.openkilda.wfm.share.mappers.FlowPathMapper;
import org.openkilda.wfm.topology.flowhs.fsm.common.actions.FlowProcessingWithHistorySupportAction;
import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.FlowMirrorPointCreateContext;
import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.FlowMirrorPointCreateFsm;
import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.FlowMirrorPointCreateFsm.Event;
import org.openkilda.wfm.topology.flowhs.fsm.mirrorpoint.create.FlowMirrorPointCreateFsm.State;

public class NotifyFlowStatsAction extends FlowProcessingWithHistorySupportAction<FlowMirrorPointCreateFsm,
FlowMirrorPointCreateFsm.State, FlowMirrorPointCreateFsm.Event, FlowMirrorPointCreateContext> {

public NotifyFlowStatsAction(PersistenceManager persistenceManager) {
super(persistenceManager);
}

@Override
protected void perform(FlowMirrorPointCreateFsm.State from, State to, Event event,
FlowMirrorPointCreateContext context, FlowMirrorPointCreateFsm stateMachine) {
flowPathRepository.findById(stateMachine.getFlowPathId()).ifPresent(flowPath -> {
Flow flow = flowPath.getFlow();
UpdateFlowPathInfo pathInfo = new UpdateFlowPathInfo(
flow.getFlowId(), flow.getYFlowId(), flowPath.getCookie(), flowPath.getMeterId(),
FlowPathMapper.INSTANCE.mapToPathNodes(flow, flowPath), flow.getVlanStatistics(),
hasIngressMirror(flowPath), hasEgressMirror(flowPath));
stateMachine.getCarrier().sendNotifyFlowStats(pathInfo);
});
}
}
Loading

0 comments on commit 660cc29

Please sign in to comment.