Skip to content

Commit

Permalink
Merge branch 'hotfix-1.115.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
niksv committed Mar 4, 2022
2 parents 39652d3 + a0d861d commit 53d6b6d
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 36 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## v1.115.2 (02/03/2022)

### Improvements:
- [#4710](https://github.com/telstra/open-kilda/pull/4710) Added SLA check sharding for Flow Monitoring

For the complete list of changes, check out [the commit log](https://github.com/telstra/open-kilda/compare/v1.115.1...v1.115.2).

### Affected Components:
flow-monitor

---
## v1.115.1 (21/02/2022)

### Bug Fixes:
Expand Down
6 changes: 6 additions & 0 deletions confd/templates/base-storm-topology/topology.properties.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,13 @@ flow.delete.speaker.command.retries = 3
blue.green.mode = {{ getv "/kilda_blue_green_mode" "blue" }}

# flow-monitoring topology
#
# flow.sla.check.shard.count parameter is needed to distribute load of Flow SLA
# check operation. Instead of checking of all flows ones in flow.sla.check.interval.seconds
# flows will be checked by chunks. That is why parameter flow.sla.check.interval.seconds
# must be divisible by flow.sla.check.shard.count
flow.sla.check.interval.seconds = {{ getv "/kilda_flow_sla_check_interval_seconds" }}
flow.sla.check.shard.count = {{ getv "/kilda_flow_sla_check_shard_count" }}
flow.rtt.stats.expiration.seconds = {{ getv "/kilda_flow_rtt_stats_expiration_seconds" }}
isl.rtt.latency.expiration.seconds = {{ getv "/kilda_isl_rtt_latency_expiration_seconds" }}
flow.latency.sla.timeout.seconds = {{ getv "/kilda_flow_latency_sla_timeout_seconds" }}
Expand Down
1 change: 1 addition & 0 deletions confd/vars/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ kilda_server42_control_storm_stub_component_name: "server42-control-storm-stub"
kilda_server42_control_storm_stub_run_id: "server42-control-storm-stub-run-id"

kilda_flow_sla_check_interval_seconds: 60
kilda_flow_sla_check_shard_count: 10
kilda_flow_rtt_stats_expiration_seconds: 30
kilda_isl_rtt_latency_expiration_seconds: 10
kilda_flow_latency_sla_timeout_seconds: 30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ private void flowSplitterBolt(TopologyBuilder topologyBuilder) {
}

private void tickBolt(TopologyBuilder topologyBuilder) {
declareBolt(topologyBuilder, new TickBolt(getConfig().getFlowSlaCheckIntervalSeconds()),
declareBolt(topologyBuilder, new TickBolt(getConfig().getFlowSlaCheckIntervalSeconds(),
getConfig().getFlowSlaCheckIntervalSeconds() / getConfig().getFlowSlaCheckShardCount()),
ComponentId.TICK_BOLT.name());
}

Expand Down Expand Up @@ -156,7 +157,8 @@ private void islCacheBolt(TopologyBuilder topologyBuilder, PersistenceManager pe
private void actionBolt(TopologyBuilder topologyBuilder, PersistenceManager persistenceManager) {
declareBolt(topologyBuilder, new ActionBolt(persistenceManager,
Duration.ofSeconds(getConfig().getFlowLatencySlaTimeoutSeconds()),
getConfig().getFlowLatencySlaThresholdPercent(), ZooKeeperSpout.SPOUT_ID),
getConfig().getFlowLatencySlaThresholdPercent(), ZooKeeperSpout.SPOUT_ID,
getConfig().getFlowSlaCheckShardCount()),
ComponentId.ACTION_BOLT.name())
.fieldsGrouping(ComponentId.FLOW_CACHE_BOLT.name(), ACTION_STREAM_ID.name(), FLOW_ID_FIELDS)
.fieldsGrouping(ComponentId.FLOW_CACHE_BOLT.name(), FLOW_UPDATE_STREAM_ID.name(), FLOW_ID_FIELDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ default String getKafkaTopoRerouteTopic() {
@Default("30")
int getFlowSlaCheckIntervalSeconds();

/*
* flow.sla.check.shard.count parameter is needed to distribute load of Flow SLA
* check operation. Instead of checking of all flows ones in flow.sla.check.interval.seconds
* flows will be checked by chunks. That is why parameter flow.sla.check.interval.seconds
* must be divisible by flow.sla.check.shard.count
*/
@Key("flow.sla.check.shard.count")
@Default("10")
int getFlowSlaCheckShardCount();

@Key("flow.rtt.stats.expiration.seconds")
@Default("3")
int getFlowRttStatsExpirationSeconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.openkilda.wfm.topology.flowmonitoring.bolt;

import static org.openkilda.wfm.share.bolt.KafkaEncoder.FIELD_ID_PAYLOAD;
import static org.openkilda.wfm.share.bolt.MonotonicClock.FIELD_ID_TICK_IDENTIFIER;
import static org.openkilda.wfm.topology.flowmonitoring.FlowMonitoringTopology.Stream.ACTION_STREAM_ID;
import static org.openkilda.wfm.topology.flowmonitoring.FlowMonitoringTopology.Stream.FLOW_REMOVE_STREAM_ID;
import static org.openkilda.wfm.topology.flowmonitoring.FlowMonitoringTopology.Stream.FLOW_UPDATE_STREAM_ID;
Expand Down Expand Up @@ -47,22 +48,26 @@

public class ActionBolt extends AbstractBolt implements FlowOperationsCarrier {

private Duration timeout;
private float threshold;
private final Duration timeout;
private final float threshold;
private final int shardCount;
private int currentShardNumber;
private transient ActionService actionService;

public ActionBolt(
PersistenceManager persistenceManager, Duration timeout, float threshold,
String lifeCycleEventSourceComponent) {
String lifeCycleEventSourceComponent, int shardCount) {
super(persistenceManager, lifeCycleEventSourceComponent);
this.timeout = timeout;
this.threshold = threshold;
this.currentShardNumber = 0;
this.shardCount = shardCount;
}

@Override
protected void init() {
super.init();
actionService = new ActionService(this, persistenceManager, Clock.systemUTC(), timeout, threshold);
actionService = new ActionService(this, persistenceManager, Clock.systemUTC(), timeout, threshold, shardCount);
}

@Override
Expand Down Expand Up @@ -91,7 +96,11 @@ protected void handleInput(Tuple input) throws PipelineException {
}

if (ComponentId.TICK_BOLT.name().equals(input.getSourceComponent())) {
actionService.processTick();
TickBolt.TickId tickId = pullValue(input, FIELD_ID_TICK_IDENTIFIER, TickBolt.TickId.class);
if (TickBolt.TickId.SLA_CHECK.equals(tickId)) {
actionService.processTick(currentShardNumber);
currentShardNumber = (currentShardNumber + 1) % shardCount;
}
} else {
unhandledInput(input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.openkilda.wfm.topology.flowmonitoring.bolt;

import static org.openkilda.wfm.share.bolt.MonotonicClock.FIELD_ID_TICK_IDENTIFIER;
import static org.openkilda.wfm.topology.flowmonitoring.FlowMonitoringTopology.Stream.FLOW_REMOVE_STREAM_ID;
import static org.openkilda.wfm.topology.flowmonitoring.FlowMonitoringTopology.Stream.FLOW_UPDATE_STREAM_ID;
import static org.openkilda.wfm.topology.flowmonitoring.bolt.FlowCacheBolt.FLOW_ID_FIELD;
Expand All @@ -29,6 +30,7 @@
import org.openkilda.wfm.share.zk.ZkStreams;
import org.openkilda.wfm.share.zk.ZooKeeperBolt;
import org.openkilda.wfm.topology.flowmonitoring.FlowMonitoringTopology.ComponentId;
import org.openkilda.wfm.topology.flowmonitoring.bolt.TickBolt.TickId;
import org.openkilda.wfm.topology.flowmonitoring.service.FlowStateCacheService;

import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -52,8 +54,11 @@ protected void init() {
protected void handleInput(Tuple input) throws PipelineException {
if (active) {
if (ComponentId.TICK_BOLT.name().equals(input.getSourceComponent())) {
flowStateCacheService.getFlows()
.forEach(flowId -> emit(input, new Values(flowId, getCommandContext())));
TickId tickId = pullValue(input, FIELD_ID_TICK_IDENTIFIER, TickId.class);
if (TickId.CACHE_UPDATE.equals(tickId)) {
flowStateCacheService.getFlows()
.forEach(flowId -> emit(input, new Values(flowId, getCommandContext())));
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,19 @@
import org.openkilda.wfm.topology.flowmonitoring.bolt.TickBolt.TickId;

public class TickBolt extends MonotonicClock<TickId> {
public TickBolt(Integer interval) {
super(new MonotonicClock.ClockConfig<>(), interval);
public TickBolt(int cacheCheckInterval, int slaCheckInterval) {
super(createConfig(cacheCheckInterval, slaCheckInterval));
}

enum TickId {}
private static ClockConfig<TickId> createConfig(int cacheCheckInterval, int slaCheckInterval) {
ClockConfig<TickId> config = new ClockConfig<>();
config.addTickInterval(TickId.CACHE_UPDATE, cacheCheckInterval);
config.addTickInterval(TickId.SLA_CHECK, slaCheckInterval);
return config;
}

enum TickId {
CACHE_UPDATE,
SLA_CHECK
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

import java.time.Clock;
Expand All @@ -55,29 +56,29 @@ public class ActionService implements FlowSlaMonitoringCarrier {
private static final Set<PathComputationStrategy> LATENCY_BASED_STRATEGIES =
Sets.newHashSet(PathComputationStrategy.LATENCY, PathComputationStrategy.MAX_LATENCY);

private FlowOperationsCarrier carrier;
private FlowRepository flowRepository;
private FlowStatsRepository flowStatsRepository;
private KildaFeatureTogglesRepository featureTogglesRepository;
private TransactionManager transactionManager;
private FlowLatencyMonitoringFsmFactory fsmFactory;
private FsmExecutor<FlowLatencyMonitoringFsm, State, Event, Context> fsmExecutor;
private final FlowOperationsCarrier carrier;
private final FlowRepository flowRepository;
private final FlowStatsRepository flowStatsRepository;
private final KildaFeatureTogglesRepository featureTogglesRepository;
private final TransactionManager transactionManager;
private final FlowLatencyMonitoringFsmFactory fsmFactory;
private final FsmExecutor<FlowLatencyMonitoringFsm, State, Event, Context> fsmExecutor;

private float threshold;
private final int shardCount;

@VisibleForTesting
protected Map<String, FlowLatencyMonitoringFsm> fsms = new HashMap<>();
protected Map<FsmKey, FlowLatencyMonitoringFsm> fsms = new HashMap<>();

public ActionService(FlowOperationsCarrier carrier, PersistenceManager persistenceManager,
Clock clock, Duration timeout, float threshold) {
Clock clock, Duration timeout, float threshold, int shardCount) {
this.carrier = carrier;
flowRepository = persistenceManager.getRepositoryFactory().createFlowRepository();
flowStatsRepository = persistenceManager.getRepositoryFactory().createFlowStatsRepository();
featureTogglesRepository = persistenceManager.getRepositoryFactory().createFeatureTogglesRepository();
transactionManager = persistenceManager.getTransactionManager();
fsmFactory = FlowLatencyMonitoringFsm.factory(clock, timeout, threshold);
fsmExecutor = fsmFactory.produceExecutor();
this.threshold = threshold;
this.shardCount = shardCount;
}

/**
Expand All @@ -103,7 +104,7 @@ public void removeFlowInfo(String flowId) {
* Check flow SLA is violated.
*/
public void processFlowLatencyMeasurement(String flowId, FlowDirection direction, Duration latency) {
String key = getFsmKey(flowId, direction);
FsmKey key = getFsmKey(flowId, direction);
FlowLatencyMonitoringFsm fsm = fsms.get(key);
if (fsm == null) {
Flow flow = flowRepository.findById(flowId)
Expand All @@ -126,15 +127,25 @@ public void processFlowLatencyMeasurement(String flowId, FlowDirection direction
/**
* Process tick.
*/
public void processTick() {
public void processTick(int shardNumber) {
Context context = Context.builder()
.carrier(this)
.build();
fsms.values().forEach(fsm -> fsmExecutor.fire(fsm, Event.TICK, context));
if (log.isDebugEnabled()) {
log.debug("Processing flow SLA checks for shard {}", shardNumber);
}
for (FsmKey key : fsms.keySet()) {
if (key.flowId.hashCode() % shardCount == shardNumber) {
if (log.isTraceEnabled()) {
log.trace("Processing SLA check for flow FSM {}: Shard number: {}", key, shardNumber);
}
fsmExecutor.fire(fsms.get(key), Event.TICK, context);
}
}
}

private String getFsmKey(String flowId, FlowDirection direction) {
return format("%s_%s", flowId, direction.name().toLowerCase());
private FsmKey getFsmKey(String flowId, FlowDirection direction) {
return new FsmKey(flowId, direction);
}

/**
Expand Down Expand Up @@ -200,4 +211,10 @@ public void sendFlowRerouteRequest(String flowId) {
private boolean isReactionsEnabled() {
return featureTogglesRepository.getOrDefault().getFlowLatencyMonitoringReactions();
}

@Value
private static class FsmKey {
String flowId;
FlowDirection direction;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class ActionServiceTest extends InMemoryGraphBasedTest {

private static final Duration TIMEOUT = Duration.ofSeconds(30);
private static final float THRESHOLD = 0.1f;
public static final int SHARD_COUNT = 1;

private PersistenceDummyEntityFactory dummyFactory;
private FlowRepository flowRepository;
Expand Down Expand Up @@ -95,7 +96,7 @@ public void setup() {
flow = dummyFactory.makeFlow(new FlowEndpoint(SRC_SWITCH, IN_PORT),
new FlowEndpoint(DST_SWITCH, OUT_PORT));

service = new ActionService(carrier, persistenceManager, clock, TIMEOUT, THRESHOLD);
service = new ActionService(carrier, persistenceManager, clock, TIMEOUT, THRESHOLD, SHARD_COUNT);
}

@Test
Expand All @@ -111,7 +112,7 @@ public void shouldStayInHealthyState() {
clock.adjust(Duration.ofSeconds(10));
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.FORWARD, latency);
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.REVERSE, latency.minus(NANOSECOND));
service.processTick();
service.processTick(0);
}

assertEquals(2, service.fsms.values().size());
Expand All @@ -135,7 +136,7 @@ public void shouldFailTier1AndSendRerouteRequest() {
clock.adjust(Duration.ofSeconds(10));
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.FORWARD, latency);
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.REVERSE, latency.minus(NANOSECOND));
service.processTick();
service.processTick(0);
if (i == 0) {
assertTrue(service.fsms.values().stream().allMatch(fsm -> UNSTABLE.equals(fsm.getCurrentState())));
}
Expand Down Expand Up @@ -169,7 +170,7 @@ public void shouldFailTier1AndDoNotSendRerouteRequestWhenToggleIsFalse() {
clock.adjust(Duration.ofSeconds(10));
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.FORWARD, latency);
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.REVERSE, latency.minus(NANOSECOND));
service.processTick();
service.processTick(0);
if (i == 0) {
assertTrue(service.fsms.values().stream().allMatch(fsm -> UNSTABLE.equals(fsm.getCurrentState())));
}
Expand Down Expand Up @@ -202,7 +203,7 @@ public void shouldFailTier1AndDoNotSendRerouteRequestForCostStrategy() {
clock.adjust(Duration.ofSeconds(10));
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.FORWARD, latency);
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.REVERSE, latency.minus(NANOSECOND));
service.processTick();
service.processTick(0);
if (i == 0) {
assertTrue(service.fsms.values().stream().allMatch(fsm -> UNSTABLE.equals(fsm.getCurrentState())));
}
Expand Down Expand Up @@ -235,7 +236,7 @@ public void shouldFailTier2AndSendRerouteRequest() {
clock.adjust(Duration.ofSeconds(10));
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.FORWARD, latency);
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.REVERSE, latency.minus(NANOSECOND));
service.processTick();
service.processTick(0);
if (i == 0) {
assertTrue(service.fsms.values().stream().allMatch(fsm -> UNSTABLE.equals(fsm.getCurrentState())));
}
Expand Down Expand Up @@ -269,7 +270,7 @@ public void shouldFailTier2AndDoNotSendRerouteRequestForCostStrategy() {
clock.adjust(Duration.ofSeconds(10));
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.FORWARD, latency);
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.REVERSE, latency.minus(NANOSECOND));
service.processTick();
service.processTick(0);
if (i == 0) {
assertTrue(service.fsms.values().stream().allMatch(fsm -> UNSTABLE.equals(fsm.getCurrentState())));
}
Expand Down Expand Up @@ -297,7 +298,7 @@ public void shouldBecomeHealthyAndSendSyncRequest() {
clock.adjust(Duration.ofSeconds(10));
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.FORWARD, healthy);
service.processFlowLatencyMeasurement(flow.getFlowId(), FlowDirection.REVERSE, healthy.minus(NANOSECOND));
service.processTick();
service.processTick(0);
if (i == 0) {
assertTrue(service.fsms.values().stream().allMatch(fsm -> UNSTABLE.equals(fsm.getCurrentState())));
}
Expand Down

0 comments on commit 53d6b6d

Please sign in to comment.