Skip to content
This repository has been archived by the owner on Oct 11, 2023. It is now read-only.

Commit

Permalink
Integrate and merge replay feature to master (#338)
Browse files Browse the repository at this point in the history
* Create replay thread (#325)

* Create empty replay thread

* Remove connection using

* Remove references to connections in the replay task

* Update name of replay task test

* Move const to config file

* Add device replay actors  (#329)

* Create empty replay thread

* Remove connection using

* Remove references to connections in the replay task

* Update name of replay task test

* Add device replay actors

* Read replay file and stream telemetry (#334)

* update

* read replay file from storage

* replay file settings

* update

* update

* update validation

* PR comments

* fix test
  • Loading branch information
ap-git-hub authored and saixiaohui committed Jan 18, 2019
1 parent 942b2d5 commit 8a31fcf
Show file tree
Hide file tree
Showing 18 changed files with 666 additions and 52 deletions.
1 change: 0 additions & 1 deletion Services.Test/ReplayFileServiceTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public ReplayFileServiceTest()
this.target = new ReplayFileService(
this.config.Object,
this.enginesFactory.Object,
this.replayFilesStorage.Object,
this.log.Object);
}

Expand Down
25 changes: 25 additions & 0 deletions Services/Concurrency/ConcurrencyConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public interface IAppConcurrencyConfig
int MaxPendingTelemetry { get; }
int MaxPendingTwinWrites { get; }
int MinDeviceStateLoopDuration { get; }
int MinDeviceReplayLoopDuration { get; }
int MinDeviceConnectionLoopDuration { get; }
int MinDeviceTelemetryLoopDuration { get; }
int MinDevicePropertiesLoopDuration { get; }
Expand All @@ -26,6 +27,7 @@ public class AppConcurrencyConfig : IAppConcurrencyConfig
private const int DEFAULT_MAX_PENDING_TELEMETRY = 1000;
private const int DEFAULT_MAX_PENDING_TWIN_WRITES = 50;
private const int DEFAULT_MIN_DEVICE_STATE_LOOP_DURATION = 1000;
private const int DEFAULT_MIN_DEVICE_REPLAY_LOOP_DURATION = 1000;
private const int DEFAULT_MIN_DEVICE_CONNECTION_LOOP_DURATION = 1000;
private const int DEFAULT_MIN_DEVICE_TELEMETRY_LOOP_DURATION = 500;
private const int DEFAULT_MIN_DEVICE_PROPERTIES_LOOP_DURATION = 2000;
Expand All @@ -42,6 +44,7 @@ public class AppConcurrencyConfig : IAppConcurrencyConfig
private int maxPendingTelemetry;
private int maxPendingTwinWrites;
private int minDeviceStateLoopDuration;
private int minDeviceReplayLoopDuration;
private int minDeviceConnectionLoopDuration;
private int minDeviceTelemetryLoopDuration;
private int minDevicePropertiesLoopDuration;
Expand All @@ -55,6 +58,7 @@ public AppConcurrencyConfig()
this.MaxPendingTelemetry = DEFAULT_MAX_PENDING_TELEMETRY;
this.MaxPendingTwinWrites = DEFAULT_MAX_PENDING_TWIN_WRITES;
this.MinDeviceStateLoopDuration = DEFAULT_MIN_DEVICE_STATE_LOOP_DURATION;
this.MinDeviceReplayLoopDuration = DEFAULT_MIN_DEVICE_REPLAY_LOOP_DURATION;
this.MinDeviceConnectionLoopDuration = DEFAULT_MIN_DEVICE_CONNECTION_LOOP_DURATION;
this.MinDeviceTelemetryLoopDuration = DEFAULT_MIN_DEVICE_TELEMETRY_LOOP_DURATION;
this.MinDevicePropertiesLoopDuration = DEFAULT_MIN_DEVICE_PROPERTIES_LOOP_DURATION;
Expand Down Expand Up @@ -207,6 +211,27 @@ public int MinDeviceTelemetryLoopDuration
}
}

/// <summary>
/// When sending telemetry for all the devices in a thread, slow down if the loop through
/// all the devices takes less than N msecs. This is also the minimum time between two
/// messages from the same device.
/// </summary>
public int MinDeviceReplayLoopDuration
{
get => this.minDeviceReplayLoopDuration;
set
{
if (value < 1 || value > MAX_LOOP_DURATION)
{
throw new InvalidConfigurationException(
"The min duration of the device telemetry loop is not valid. " +
"Use a value within the range of 1 and " + MAX_LOOP_DURATION);
}

this.minDeviceReplayLoopDuration = value;
}
}

/// <summary>
/// When writing device twins for all the devices in a thread, slow down if the loop through
/// all the devices takes less than N msecs.
Expand Down
1 change: 1 addition & 0 deletions Services/Models/DeviceModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public DeviceModel()
this.Protocol = IoTHubProtocol.AMQP;
this.Simulation = new StateSimulation();
this.Properties = new Dictionary<string, object>();

this.Telemetry = new List<DeviceModelMessage>();
this.CloudToDeviceMethods = new Dictionary<string, Script>();
}
Expand Down
4 changes: 3 additions & 1 deletion Services/Models/Simulation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,12 @@ public DateTimeOffset? EndTime
[JsonProperty(Order = 150)]
public DateTimeOffset? ActualStartTime { get; set; }

// ReplayFileId is the id of the replay file in storage
// ReplayFileId is the replay file data in storage
[JsonProperty(Order = 160)]
public string ReplayFileId { get; set; }

public bool ReplayFileRunIndefinitely { get; set; }

public Simulation()
{
// When unspecified, a simulation is enabled
Expand Down
31 changes: 15 additions & 16 deletions Services/ReplayFileService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Runtime;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Storage;
using Newtonsoft.Json;
using Microsoft.VisualBasic.FileIO;
using FieldType = Microsoft.VisualBasic.FileIO.FieldType;

namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services
{
Expand Down Expand Up @@ -39,13 +37,13 @@ public interface IReplayFileService

public class ReplayFileService : IReplayFileService
{
private const int NUM_CSV_COLS = 3;
private readonly IEngine replayFilesStorage;
private readonly ILogger log;

public ReplayFileService(
IServicesConfig config,
IEngines engines,
IEngine storage,
ILogger logger)
{
this.replayFilesStorage = engines.Build(config.ReplayFilesStorage);
Expand Down Expand Up @@ -149,25 +147,26 @@ public string ValidateFile(Stream stream)
{
var reader = new StreamReader(stream);
var file = reader.ReadToEnd();

using (TextFieldParser parser = new TextFieldParser(file))
while (!reader.EndOfStream)
{
parser.TextFieldType = FieldType.Delimited;
parser.SetDelimiters(",");
while (!parser.EndOfData)
try
{
try
{
string[] lines = parser.ReadFields();
}
catch (MalformedLineException ex)
string line = reader.ReadLine();
string[] fields = line.Split(',');
if (fields.Length < NUM_CSV_COLS)
{
this.log.Error("Replay file has invalid csv format", () => new { ex });
throw new InvalidInputException("Replay file has invalid csv format", ex);
this.log.Error("Replay file has invalid csv format");
throw new InvalidInputException("Replay file has invalid csv format");
}
}
catch (Exception ex)
{
this.log.Error("Error parsing replay file", () => new { ex });
throw new InvalidInputException("Error parsing replay file", ex);
}
}

return file;
}
}
Expand Down
6 changes: 6 additions & 0 deletions Services/Services.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
<RootNamespace>Microsoft.Azure.IoTSolutions.DeviceSimulation.Services</RootNamespace>
</PropertyGroup>
<ItemGroup>
<Content Include="data\replayfile\replay.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
<Content Include="data\replayfile\simulationReplayTest.csv">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
<Content Include="data\templates\multiple-simulations-template.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
Expand Down
34 changes: 34 additions & 0 deletions Services/data/replayfile/replay.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"SchemaVersion": "1.0.0",
"Id": "replay",
"Version": "0.0.1",
"Name": "Replay",
"Description": "Fake device model for csv file replay",
"Protocol": "AMQP",
"ReplayFile": "",
"Simulation": {
"InitialState": {
"online": true
},
"Interval": "00:00:00",
"Scripts": [
]
},
"Properties": {
},
"Tags": {
},
"Telemetry": [
{
"MessageTemplate": "",
"MessageSchema": {
"Name": "replay-sensors;v1",
"Format": "JSON",
"Fields": {
}
}
}
],
"CloudToDeviceMethods": {
}
}
3 changes: 3 additions & 0 deletions Services/data/replayfile/simulationReplayTest.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
telemetry, 00:00:00,{"temperature": 51.32, "temperature_unit": "fahrenheit", "humidity": 69.59, "humidity_unit":"RH", "pressure": 440.20, "pressure_unit": "psi"}
telemetry, 00:00:30,{"temperature": 71.32, "temperature_unit": "fahrenheit", "humidity": 79.59, "humidity_unit":"RH", "pressure": 240.20, "pressure_unit": "psi"}
telemetry, 00:01:00,{"temperature": 30.00, "temperature_unit": "fahrenheit", "humidity": 59.59, "humidity_unit":"RH", "pressure": 340.20, "pressure_unit": "psi"}
6 changes: 5 additions & 1 deletion SimulationAgent.Test/SimulationManagerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceConnection;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceProperties;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceReplay;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceState;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceTelemetry;
using Moq;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class SimulationManagerTest
private readonly ConcurrentDictionary<string, IDeviceConnectionActor> mockDeviceContext;
private readonly ConcurrentDictionary<string, IDeviceTelemetryActor> deviceTelemetryActors;
private readonly ConcurrentDictionary<string, IDevicePropertiesActor> devicePropertiesActors;
private readonly ConcurrentDictionary<string, IDeviceReplayActor> deviceReplayActors;

private SimulationManager target;

Expand Down Expand Up @@ -85,13 +87,15 @@ public SimulationManagerTest()
this.mockDeviceContext = new ConcurrentDictionary<string, IDeviceConnectionActor>();
this.deviceTelemetryActors = new ConcurrentDictionary<string, IDeviceTelemetryActor>();
this.devicePropertiesActors = new ConcurrentDictionary<string, IDevicePropertiesActor>();
this.deviceReplayActors = new ConcurrentDictionary<string, IDeviceReplayActor>();

this.target.InitAsync(
simulation,
this.deviceStateActors,
this.mockDeviceContext,
this.deviceTelemetryActors,
this.devicePropertiesActors).Wait(Constants.TEST_TIMEOUT);
this.devicePropertiesActors,
this.deviceReplayActors).Wait(Constants.TEST_TIMEOUT);
}

[Fact]
Expand Down
132 changes: 132 additions & 0 deletions SimulationAgent.Test/SimulationThreads/DeviceReplayTaskTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Concurrent;
using System.Threading;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceReplay;
using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.SimulationThreads;
using Moq;
using Xunit;
using System.Threading.Tasks;

namespace SimulationAgent.Test.SimulationThreads
{
public class DeviceReplayTaskTest
{
private const int NUM_ACTORS = 9;
private const int MAX_PENDING_TASKS = 5;

private readonly Mock<IAppConcurrencyConfig> mockAppConcurrencyConfig;
private readonly Mock<ILogger> mockLogger;
private readonly DeviceReplayTask target;
private readonly ConcurrentDictionary<string, Mock<IDeviceReplayActor>> mockDeviceReplayActors;
private readonly ConcurrentDictionary<string, IDeviceReplayActor> mockDeviceReplayActorObjects;
private readonly ConcurrentDictionary<string, Mock<ISimulationManager>> mockSimulationManagers;
private readonly ConcurrentDictionary<string, ISimulationManager> mockSimulationManagerObjects;

public DeviceReplayTaskTest()
{
this.mockDeviceReplayActors = new ConcurrentDictionary<string, Mock<IDeviceReplayActor>>();
this.mockDeviceReplayActorObjects = new ConcurrentDictionary<string, IDeviceReplayActor>();
this.mockSimulationManagers = new ConcurrentDictionary<string, Mock<ISimulationManager>>();
this.mockSimulationManagerObjects = new ConcurrentDictionary<string, ISimulationManager>();

this.mockAppConcurrencyConfig = new Mock<IAppConcurrencyConfig>();
this.mockAppConcurrencyConfig.SetupGet(x => x.MaxPendingTasks).Returns(MAX_PENDING_TASKS);
this.mockLogger = new Mock<ILogger>();

this.target = new DeviceReplayTask(this.mockAppConcurrencyConfig.Object, this.mockLogger.Object);
}

[Fact]
public void ItCallsRunAsyncOnAllReplayActors()
{
// Arrange
var cancellationToken = new CancellationTokenSource();

this.BuildMockDeviceReplayActors(
this.mockDeviceReplayActors,
this.mockDeviceReplayActorObjects,
cancellationToken,
NUM_ACTORS);

// Build a list of SimulationManagers
this.BuildMockSimluationManagers(
this.mockSimulationManagers,
this.mockSimulationManagerObjects,
cancellationToken,
NUM_ACTORS);

// Act
// Act on the target. The cancellation token will be cancelled through
// a callback that will be triggered when each device-replay actor
// is called.
var targetTask = this.target.RunAsync(
this.mockSimulationManagerObjects,
this.mockDeviceReplayActorObjects,
cancellationToken.Token);

// Assert
// Verify that each SimulationManager was called at least once
foreach (var actor in this.mockDeviceReplayActors)
actor.Value.Verify(x => x.HasWorkToDo(), Times.Once);
}

private void BuildMockDeviceReplayActors(
ConcurrentDictionary<string, Mock<IDeviceReplayActor>> mockDictionary,
ConcurrentDictionary<string, IDeviceReplayActor> objectDictionary,
CancellationTokenSource cancellationToken,
int count)
{
mockDictionary.Clear();
objectDictionary.Clear();

for (int i = 0; i < count; i++)
{
var deviceName = $"device_{i}";
var mockDeviceReplayActor = new Mock<IDeviceReplayActor>();

// Have each DeviceReplayActor report that it has work to do
mockDeviceReplayActor.Setup(x => x.HasWorkToDo()).Returns(true);
mockDeviceReplayActor.Setup(x => x.RunAsync()).Returns(Task.CompletedTask)
.Callback(() => { cancellationToken.Cancel(); });

mockDictionary.TryAdd(deviceName, mockDeviceReplayActor);
objectDictionary.TryAdd(deviceName, mockDeviceReplayActor.Object);
}
}

/*
* Creating two collections: one for the mocks, and another to store the
* mock objects. If we only created one collection and populated it with
* the mock objects, we wouldn't have a reference to the backing mock for
* each.
*/
private void BuildMockSimluationManagers(
ConcurrentDictionary<string, Mock<ISimulationManager>> mockSimulationManagers,
ConcurrentDictionary<string, ISimulationManager> mockSimulationManagerObjects,
CancellationTokenSource cancellationToken,
int count)
{
mockSimulationManagers.Clear();
mockSimulationManagerObjects.Clear();

for (int i = 0; i < count; i++)
{
var deviceName = $"simulation_{i}";
var mockSimulationManager = new Mock<ISimulationManager>();

// We only want the main loop in the target to run once, so here we'll
// trigger a callback which will cancel the cancellation token that
// the main loop uses.
mockSimulationManager.Setup(x => x.NewConnectionLoop())
.Callback(() => cancellationToken.Cancel());

mockSimulationManagers.TryAdd(deviceName, mockSimulationManager);
mockSimulationManagerObjects.TryAdd(deviceName, mockSimulationManager.Object);
}
}
}
}
Loading

0 comments on commit 8a31fcf

Please sign in to comment.