Skip to content

Commit

Permalink
Merge branch 'release-1.125.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
niksv committed Nov 10, 2022
2 parents 0f218e7 + 84c877a commit b5a6bc3
Show file tree
Hide file tree
Showing 61 changed files with 2,538 additions and 780 deletions.
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
# Changelog

## v1.125.2 (09/11/2022)

### Bug Fixes:
- [#4977](https://github.com/telstra/open-kilda/pull/4977) Set lacp_reply to true if value is not specified (Issue: [#4976](https://github.com/telstra/open-kilda/issues/4976)) [**northbound**]
- [#4981](https://github.com/telstra/open-kilda/pull/4981) Fixed updating of one switch Y flow (Issue: [#4972](https://github.com/telstra/open-kilda/issues/4972))
- [#4951](https://github.com/telstra/open-kilda/pull/4951) Fix stats for y flow ingress endpoint (Issue: [#4926](https://github.com/telstra/open-kilda/issues/4926)) [**floodlight**][**storm-topologies**]

### Improvements:
- [#4962](https://github.com/telstra/open-kilda/pull/4962) Implement true Y-flow sync
- [#4973](https://github.com/telstra/open-kilda/pull/4973) Added balanced grouping into connected devices topology
- [#4978](https://github.com/telstra/open-kilda/pull/4978) Added info about latency into create/update logs


For the complete list of changes, check out [the commit log](https://github.com/telstra/open-kilda/compare/v1.125.1...v1.125.2).

### Affected Components:
stats, flow-hs, fl, connected, nb

---

## v1.125.1 (27/10/2022)

### Bug Fixes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# topology configuration
config:
topology.parallelism: {{ getv "/kilda_storm_parallelism_level_new" }}
topology.parallelism: {{ getv "/kilda_storm_connected_devices_parallelism" }}
topology.workers: {{ getv "/kilda_storm_parallelism_workers_count" }}
topology.spouts.parallelism: 1

Expand All @@ -16,3 +16,5 @@ spouts:
bolts:
- id: "zookeeper.bolt"
parallelism: 1
- id: "packet-bolt"
parallelism: {{ getv "/kilda_storm_connected_devices_packet_bolt_parallelism" }}
4 changes: 4 additions & 0 deletions confd/vars/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ kilda_storm_parallelism_workers_count: 1
kilda_storm_history_parallelism: 2
kilda_storm_stats_parallelism: 2

# Connected devices
kilda_storm_connected_devices_parallelism: 2
kilda_storm_connected_devices_packet_bolt_parallelism: 2

# Flow HS
kilda_storm_flow_hs_parallelism: 2
kilda_storm_flow_hs_reroute_hub_count_multiplier: 2
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
63 changes: 63 additions & 0 deletions docs/design/hub-and-spoke/crud/sync/y-flow-sync-fsm.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
@startuml
title Y-Flow sync FSM state diagram

[*] -> SETUP
SETUP: enter / fire "error" if y-flow is IN_PROGRESS status
SETUP: enter / for each subflow fire "error" if subflow is IN_PROGRESS status
SETUP: enter / set y-flow status to IN_PROGRESS
SETUP: enter / for each subflow set flow status to IN_PROGRESS
SETUP: enter / for each subflow apply postponed flow changes
SETUP --> SYNC: next
SETUP --> COMMIT_ERROR: error

SYNC: enter / for each subflow launch main path install operation
SYNC: enter / [have protected path] for each subflow launch protected path install operation
SYNC --> SYNC_FAIL: sync_fail
SYNC --> SYNC_Y_RULES: guard_passed
SYNC --> CANCEL: error
SYNC --> CANCEL: timeout / report global timeout
SYNC: path_operation_response / handle path operation response
SYNC: path_operation_response / fire "sync_fail" on failed path response
SYNC: path_operation_response / fire "guard_passed" if no more pending path operations

SYNC_Y_RULES: enter / emit y-flow rules speaker install commands
SYNC_Y_RULES -> COMMIT_SUCCESS: guard_passed
SYNC_Y_RULES -> COMMIT_ERROR: timeout / report global timeout
SYNC_Y_RULES -> SYNC_Y_RULES_FAIL: speaker_error_response
SYNC_Y_RULES: speaker_response / report speaker command success
SYNC_Y_RULES: speaker_response / fire "guard_passed" if no more pending speaker commands

CANCEL: enter / request cancel path operation for all pending path operations
CANCEL --> COMMIT_ERROR: guard_passed
CANCEL: path_operation_response / handle path operation response
CANCEL: path_operation_response / fire "guard_passed" if no more pending path operations
CANCEL: error / fire "guard_passed" if no pending path operations
CANCEL: timeout / report global timeout

SYNC_FAIL: enter / fire "guard_passed" if no pending path operations
SYNC_FAIL --> COMMIT_ERROR: guard_passed
SYNC_FAIL --> CANCEL: error
SYNC_FAIL --> CANCEL: timeout / report global timeout
SYNC_FAIL: path_operation_response / handle path operation response
SYNC_FAIL: path_operation_response / fire "guard_passed" if no more pending path operations

SYNC_Y_RULES_FAIL: enter / report speaker command error
SYNC_Y_RULES_FAIL: enter / fire "guard_passed" if no pending speaker commands
SYNC_Y_RULES_FAIL -> COMMIT_ERROR: guard_passed | timeout
SYNC_Y_RULES_FAIL: speaker_response / report speaker command success
SYNC_Y_RULES_FAIL: speaker_response / fire "guard_passed" if no more pending speaker commands
SYNC_Y_RULES_FAIL: speaker_error_response / report speaker command error
SYNC_Y_RULES_FAIL: speaker_error_response / fire "guard_passed" if no more pending speaker commands

COMMIT_SUCCESS: enter / update flow status
COMMIT_SUCCESS: enter / send success response
COMMIT_SUCCESS --> FINISHED: next
COMMIT_SUCCESS --> COMMIT_ERROR: error

COMMIT_ERROR: enter / update flow status
COMMIT_ERROR: enter / send failed response
COMMIT_ERROR --> FINISHED_WITH_ERROR: next

FINISHED --> [*]
FINISHED_WITH_ERROR --> [*]
@enduml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.openkilda.model.FlowEndpoint;
import org.openkilda.model.FlowStatus;
import org.openkilda.model.IslEndpoint;
import org.openkilda.model.PathComputationStrategy;
import org.openkilda.model.SwitchId;
import org.openkilda.reporting.AbstractDashboardLogger;

Expand Down Expand Up @@ -149,14 +150,16 @@ public void onFlowCreate(Flow flow) {
* Log a flow-create event.
*/
public void onFlowCreate(String flowId, SwitchId srcSwitch, int srcPort, int srcVlan,
SwitchId destSwitch, int destPort, int destVlan, String diverseFlowId, long bandwidth) {
SwitchId destSwitch, int destPort, int destVlan, String diverseFlowId, long bandwidth,
PathComputationStrategy strategy, Long maxLatency, Long maxLatencyTier2) {
Map<String, String> data = new HashMap<>();
data.put(TAG, "flow-create");
data.put(FLOW_ID, flowId);
data.put(EVENT_TYPE, FLOW_CREATE_EVENT);
invokeLogger(Level.INFO, String.format("Create the flow: %s, source %s_%d_%d, destination %s_%d_%d, "
+ "diverse flowId %s, bandwidth %d", flowId, srcSwitch, srcPort, srcVlan,
destSwitch, destPort, destVlan, diverseFlowId, bandwidth), data);
+ "diverse flowId %s, bandwidth %d, path computation strategy %s, max latency %s, "
+ "max latency tier2 %s", flowId, srcSwitch, srcPort, srcVlan, destSwitch, destPort, destVlan,
diverseFlowId, bandwidth, strategy, maxLatency, maxLatencyTier2), data);
}

/**
Expand Down Expand Up @@ -223,14 +226,16 @@ public void onFlowUpdate(Flow flow) {
* Log a flow-update event.
*/
public void onFlowUpdate(String flowId, SwitchId srcSwitch, int srcPort, int srcVlan,
SwitchId destSwitch, int destPort, int destVlan, String diverseFlowId, long bandwidth) {
SwitchId destSwitch, int destPort, int destVlan, String diverseFlowId, long bandwidth,
PathComputationStrategy strategy, Long maxLatency, Long maxLatencyTier2) {
Map<String, String> data = new HashMap<>();
data.put(TAG, "flow-update");
data.put(FLOW_ID, flowId);
data.put(EVENT_TYPE, FLOW_UPDATE_EVENT);
invokeLogger(Level.INFO, String.format("Update the flow %s with: source %s_%d_%d, destination %s_%d_%d, "
+ "diverse flowId %s, bandwidth %d", flowId, srcSwitch, srcPort, srcVlan,
destSwitch, destPort, destVlan, diverseFlowId, bandwidth), data);
+ "diverse flowId %s, bandwidth %d, path computation strategy %s, max latency %s, "
+ "max latency tier2 %s", flowId, srcSwitch, srcPort, srcVlan,
destSwitch, destPort, destVlan, diverseFlowId, bandwidth, strategy, maxLatency, maxLatencyTier2), data);
}

/**
Expand Down Expand Up @@ -499,14 +504,17 @@ public void onFailedFlowMirrorPointDelete(String flowId, String flowMirrorPointI
/**
* Log a y-flow-create event.
*/
public void onYFlowCreate(String yFlowId, FlowEndpoint sharedEndpoint,
List<FlowEndpoint> subFlowEndpoints, long maximumBandwidth) {
public void onYFlowCreate(
String yFlowId, FlowEndpoint sharedEndpoint, List<FlowEndpoint> subFlowEndpoints, long maximumBandwidth,
PathComputationStrategy strategy, Long maxLatency, Long maxLatencyTier2) {
Map<String, String> data = new HashMap<>();
data.put(TAG, "y-flow-create");
data.put(FLOW_ID, yFlowId);
data.put(EVENT_TYPE, YFLOW_CREATE_EVENT);
invokeLogger(Level.INFO, String.format("Create the y-flow: %s, shared endpoint %s, endpoints (%s), "
+ "bandwidth %d", yFlowId, sharedEndpoint, subFlowEndpoints, maximumBandwidth), data);
+ "bandwidth %d, path computation strategy %s, max latency %s, max latency tier2 %s",
yFlowId, sharedEndpoint, subFlowEndpoints, maximumBandwidth, strategy, maxLatency, maxLatencyTier2),
data);
}

/**
Expand Down Expand Up @@ -538,14 +546,17 @@ public void onFailedYFlowCreate(String yFlowId, String failureReason) {
/**
* Log a y-flow-update event.
*/
public void onYFlowUpdate(String yFlowId, FlowEndpoint sharedEndpoint,
List<FlowEndpoint> subFlowEndpoints, long maximumBandwidth) {
public void onYFlowUpdate(
String yFlowId, FlowEndpoint sharedEndpoint, List<FlowEndpoint> subFlowEndpoints, long maximumBandwidth,
PathComputationStrategy strategy, Long maxLatency, Long maxLatencyTier2) {
Map<String, String> data = new HashMap<>();
data.put(TAG, "y-flow-update");
data.put(FLOW_ID, yFlowId);
data.put(EVENT_TYPE, YFLOW_UPDATE_EVENT);
invokeLogger(Level.INFO, String.format("Update the y-flow: %s, shared endpoint %s, endpoints (%s), "
+ "bandwidth %d", yFlowId, sharedEndpoint, subFlowEndpoints, maximumBandwidth), data);
+ "bandwidth %d, path computation strategy %s, max latency %s, max latency tier2 %s",
yFlowId, sharedEndpoint, subFlowEndpoints, maximumBandwidth, strategy, maxLatency, maxLatencyTier2),
data);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.squirrelframework.foundation.fsm.StateMachine;
import org.squirrelframework.foundation.fsm.StateMachineStatus;
import org.squirrelframework.foundation.fsm.impl.AbstractStateMachine;


public abstract class AbstractBaseFsm<T extends StateMachine<T, S, E, C>, S, E, C>
public abstract class AbstractBaseFsm<T extends AbstractStateMachine<T, S, E, C>, S, E, C>
extends AbstractStateMachine<T, S, E, C> {

protected transient Logger log = makeLog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.openkilda.wfm.share.zk.ZooKeeperSpout;
import org.openkilda.wfm.topology.AbstractTopology;
import org.openkilda.wfm.topology.connecteddevices.bolts.PacketBolt;
import org.openkilda.wfm.topology.connecteddevices.bolts.RouterBolt;
import org.openkilda.wfm.topology.utils.KafkaRecordTranslator;

import org.apache.storm.generated.StormTopology;
Expand All @@ -30,6 +31,7 @@

public class ConnectedDevicesTopology extends AbstractTopology<ConnectedDevicesTopologyConfig> {
public static final String CONNECTED_DEVICES_SPOUT_ID = "connected-devices-spout";
public static final String ROUTER_BOLT_ID = "router-bolt";
public static final String PACKET_BOLT_ID = "packet-bolt";

public ConnectedDevicesTopology(LaunchEnvironment env) {
Expand All @@ -46,6 +48,7 @@ public StormTopology createTopology() {
createZkSpout(builder);

createSpout(builder);
createRouterBolt(builder, persistenceManager);
createPacketBolt(builder, persistenceManager);

createZkBolt(builder);
Expand All @@ -59,22 +62,29 @@ private void createZkSpout(TopologyBuilder builder) {
declareSpout(builder, zooKeeperSpout, ZooKeeperSpout.SPOUT_ID);
}

private void createPacketBolt(TopologyBuilder builder, PersistenceManager persistenceManager) {
PacketBolt routerBolt = new PacketBolt(persistenceManager, ZooKeeperSpout.SPOUT_ID);
declareBolt(builder, routerBolt, PACKET_BOLT_ID)
.fieldsGrouping(CONNECTED_DEVICES_SPOUT_ID, new Fields(KafkaRecordTranslator.FIELD_ID_KEY))
private void createRouterBolt(TopologyBuilder builder, PersistenceManager persistenceManager) {
RouterBolt routerBolt = new RouterBolt(persistenceManager, ZooKeeperSpout.SPOUT_ID);
declareBolt(builder, routerBolt, ROUTER_BOLT_ID)
.shuffleGrouping(CONNECTED_DEVICES_SPOUT_ID)
.allGrouping(ZooKeeperSpout.SPOUT_ID);
}

private void createPacketBolt(TopologyBuilder builder, PersistenceManager persistenceManager) {
PacketBolt packetBolt = new PacketBolt(persistenceManager);
declareBolt(builder, packetBolt, PACKET_BOLT_ID)
.fieldsGrouping(ROUTER_BOLT_ID, RouterBolt.PACKET_STREAM_ID,
new Fields(KafkaRecordTranslator.FIELD_ID_KEY));
}

private void createSpout(TopologyBuilder builder) {
declareKafkaSpout(builder, topologyConfig.getKafkaTopoConnectedDevicesTopic(), CONNECTED_DEVICES_SPOUT_ID);
}

private void createZkBolt(TopologyBuilder builder) {
ZooKeeperBolt zooKeeperBolt = new ZooKeeperBolt(getConfig().getBlueGreenMode(), getZkTopoName(),
getZookeeperConfig(), getBoltInstancesCount(PACKET_BOLT_ID));
getZookeeperConfig(), getBoltInstancesCount(ROUTER_BOLT_ID));
declareBolt(builder, zooKeeperBolt, ZooKeeperBolt.BOLT_ID)
.allGrouping(PACKET_BOLT_ID, ZkStreams.ZK.toString());
.allGrouping(ROUTER_BOLT_ID, ZkStreams.ZK.toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,23 @@

import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_PAYLOAD;

import org.openkilda.messaging.Message;
import org.openkilda.messaging.info.InfoData;
import org.openkilda.messaging.info.InfoMessage;
import org.openkilda.messaging.info.event.ArpInfoData;
import org.openkilda.messaging.info.event.ConnectedDevicePacketBase;
import org.openkilda.messaging.info.event.LldpInfoData;
import org.openkilda.persistence.PersistenceManager;
import org.openkilda.wfm.AbstractBolt;
import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.share.zk.ZkStreams;
import org.openkilda.wfm.share.zk.ZooKeeperBolt;
import org.openkilda.wfm.topology.connecteddevices.service.PacketService;

import lombok.extern.slf4j.Slf4j;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

@Slf4j
public class PacketBolt extends AbstractBolt {
private transient PacketService packetService;

public PacketBolt(PersistenceManager persistenceManager, String lifeCycleEventSourceComponent) {
super(persistenceManager, lifeCycleEventSourceComponent);
public PacketBolt(PersistenceManager persistenceManager) {
super(persistenceManager);
}

@Override
Expand All @@ -49,28 +43,14 @@ protected void init() {

@Override
protected void handleInput(Tuple input) throws PipelineException {
if (active) {
Message message = pullValue(input, FIELD_ID_PAYLOAD, Message.class);

if (message instanceof InfoMessage) {
log.debug("Received info message {}", message);
InfoData data = ((InfoMessage) message).getData();
if (data instanceof LldpInfoData) {
packetService.handleLldpData((LldpInfoData) data);
} else if (data instanceof ArpInfoData) {
packetService.handleArpData((ArpInfoData) data);
} else {
unhandledInput(input);
}
} else {
unhandledInput(input);
}
ConnectedDevicePacketBase data = pullValue(input, FIELD_ID_PAYLOAD, ConnectedDevicePacketBase.class);

if (data instanceof LldpInfoData) {
packetService.handleLldpData((LldpInfoData) data);
} else if (data instanceof ArpInfoData) {
packetService.handleArpData((ArpInfoData) data);
} else {
unhandledInput(input);
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(ZkStreams.ZK.toString(), new Fields(ZooKeeperBolt.FIELD_ID_STATE,
ZooKeeperBolt.FIELD_ID_CONTEXT));
}
}
Loading

0 comments on commit b5a6bc3

Please sign in to comment.