Skip to content
This repository has been archived by the owner on Oct 11, 2023. It is now read-only.

Commit

Permalink
Reliability improvements
Browse files Browse the repository at this point in the history
**Major:**
1. Backoff in case of d2c messaging throttling, stopping telemetry and twin operations
2. Improve perf reducing load on garbage collector and disposing IoT SDK resources
3. Improve device twin management, e.g. try to avoid twin writes when data is not changed, handle more error scenarios
4. Add "development" feature flag to enable/disable expensive runtime checks
5. Change partitions size from 1000 to 5000 devices to reduce the load on storage

**Minor:**
1. Backoff in case of device count throttling (e.g. when reaching 8000 devices in the free SKU)
2. Remove unused daily counter for telemetry
3. Improve perf reducing the number of no-op async tasks
4. Update IoT SDK and other dependencies
5. Clean up logging code, reimplement log filtering (removed in past PRs)
6. Print SDK version at startup
7. Add some scripts for development, see /scripts/development (create/delete simulation, start storage adapter)

**Bug fixes:**
1. Handle and recover from exceptions in the partitioning agent
2. Fix some swallowed errors/exceptions and unnecessary try/catch
3. Share script interpreter between methods and state to ensure device state consistency
4. Fix logged throughput in case of no traffic, i.e. show 0.0 msg/sec, and round value to 3 decimals
5. Fix logging from ConfigData
6. Change the dev endpoint used to delete simulations: don't delete devices (the endpoint was not working) - no user impact, dev only
  • Loading branch information
dluc authored Nov 30, 2018
1 parent 4cc7936 commit 8fa180f
Show file tree
Hide file tree
Showing 99 changed files with 1,982 additions and 948 deletions.
16 changes: 8 additions & 8 deletions PartitioningAgent.Test/PartitioningAgent.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.8.0"/>
<PackageReference Include="Moq" Version="4.10.0"/>
<PackageReference Include="xunit" Version="2.4.0"/>
<PackageReference Include="xunit.assert" Version="2.4.0"/>
<PackageReference Include="xunit.runner.console" Version="2.4.0"/>
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="Moq" Version="4.10.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.assert" Version="2.4.1" />
<PackageReference Include="xunit.runner.console" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\PartitioningAgent\PartitioningAgent.csproj"/>
<ProjectReference Include="..\Services\Services.csproj"/>
<ProjectReference Include="..\PartitioningAgent\PartitioningAgent.csproj" />
<ProjectReference Include="..\Services\Services.csproj" />
</ItemGroup>
</Project>
238 changes: 152 additions & 86 deletions PartitioningAgent/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,31 +77,24 @@ public async Task StartAsync(CancellationToken appStopToken)
var isMaster = await this.clusterNodes.SelfElectToMasterNodeAsync();
if (isMaster)
{
// Reload all simulations to have fresh status and discover new simulations
IList<Simulation> simulations = (await this.simulations.GetListAsync());

IList<Simulation> activeSimulations = simulations
.Where(x => x.IsActiveNow).ToList();
this.log.Debug("Active simulations loaded", () => new { activeSimulations.Count });

IList<Simulation> deletionRequiredSimulations = simulations
.Where(x => x.DeviceDeletionRequired).ToList();
this.log.Debug("InActive simulations loaded", () => new { deletionRequiredSimulations.Count });

await this.clusterNodes.RemoveStaleNodesAsync();

// Scale nodes in Vmss
await this.ScaleVmssNodes(activeSimulations);
var (success, activeSimulations, deletionRequiredSimulations) = await this.GetSimulations();
if (success)
{
// Scale nodes in Vmss
await this.ScaleVmssNodes(activeSimulations);

// Create IoTHub devices for all the active simulations
await this.CreateDevicesAsync(activeSimulations);
// Create IoTHub devices for all the active simulations
await this.CreateDevicesAsync(activeSimulations);

// Delete IoTHub devices for inactive simulations
await this.DeleteDevicesAsync(deletionRequiredSimulations);
// Delete IoTHub devices for inactive simulations
await this.DeleteDevicesAsync(deletionRequiredSimulations);

// Create and delete partitions
await this.CreatePartitionsAsync(activeSimulations);
await this.DeletePartitionsAsync(activeSimulations);
// Create and delete partitions
await this.CreatePartitionsAsync(activeSimulations);
await this.DeletePartitionsAsync(activeSimulations);
}
}

// Sleep some seconds before checking for new simulations (by default 15 seconds)
Expand All @@ -113,69 +106,116 @@ public void Stop()
{
this.running = false;
}

private async Task ScaleVmssNodes(IList<Simulation> activeSimulations)
{
// Default node count is 1
var nodeCount = DEFAULT_NODE_COUNT;
var maxDevicesPerNode = this.clusteringConfig.MaxDevicesPerNode;

if (activeSimulations.Count > 0)
private async
Task<(bool success, IList<Simulation> activeSimulations, IList<Simulation> deletionRequiredSimulations)>
GetSimulations()
{
try
{
var models = new List<Simulation.DeviceModelRef>();
var customDevices = 0;
// Reload all simulations to have fresh status and discover new simulations
IList<Simulation> list = (await this.simulations.GetListAsync());

foreach (var simulation in activeSimulations)
{
// Loop through all the device models used in the simulation
models = (from model in simulation.DeviceModels where model.Count > 0 select model).ToList();
IList<Simulation> activeSimulations = list
.Where(x => x.IsActiveNow).ToList();
this.log.Debug("Active simulations loaded", () => new { activeSimulations.Count });

// Count total custom devices
customDevices += simulation.CustomDevices.Count;
}

// Calculate the total number of devices
var totalDevices = models.Sum(model => model.Count) + customDevices;
IList<Simulation> deletionRequiredSimulations = list
.Where(x => x.DeviceDeletionRequired).ToList();
this.log.Debug("Inactive simulations loaded", () => new { deletionRequiredSimulations.Count });

// Calculate number of nodes required
nodeCount = maxDevicesPerNode > 0 ? (int)Math.Ceiling((double)totalDevices / maxDevicesPerNode) : DEFAULT_NODE_COUNT;
return (true, activeSimulations, deletionRequiredSimulations);
}

if (this.currentNodeCount != nodeCount)
catch (Exception e)
{
// Send a request to update vmss auto scale settings to create vm instances
// TODO: when devices are added or removed, the number of VMs might need an update
await this.azureManagementAdapter.CreateOrUpdateVmssAutoscaleSettingsAsync(nodeCount);

this.currentNodeCount = nodeCount;
this.log.Error("An unexpected error occurred in the master node while loading the list of simulations", e);
return (false, null, null);
}
}

private async Task DeleteDevicesAsync(IList<Simulation> deletionRequiredSimulations)
private async Task ScaleVmssNodes(IList<Simulation> activeSimulations)
{
if (deletionRequiredSimulations.Count == 0) return;
try
{
// Default node count is 1
var nodeCount = DEFAULT_NODE_COUNT;
var maxDevicesPerNode = this.clusteringConfig.MaxDevicesPerNode;

if (activeSimulations.Count > 0)
{
var models = new List<Simulation.DeviceModelRef>();
var customDevices = 0;

foreach (var simulation in activeSimulations)
{
// Loop through all the device models used in the simulation
models = (from model in simulation.DeviceModels where model.Count > 0 select model).ToList();

// Count total custom devices
customDevices += simulation.CustomDevices.Count;
}

foreach (var simulation in deletionRequiredSimulations)
// Calculate the total number of devices
var totalDevices = models.Sum(model => model.Count) + customDevices;

// Calculate number of nodes required
nodeCount = maxDevicesPerNode > 0 ? (int) Math.Ceiling((double) totalDevices / maxDevicesPerNode) : DEFAULT_NODE_COUNT;
}

if (this.currentNodeCount != nodeCount)
{
// Send a request to update vmss auto scale settings to create vm instances
// TODO: when devices are added or removed, the number of VMs might need an update
await this.azureManagementAdapter.CreateOrUpdateVmssAutoscaleSettingsAsync(nodeCount);

this.currentNodeCount = nodeCount;
}
}
catch (Exception e)
{
await this.DeleteIoTHubDevicesAsync(simulation);
this.log.Error("Unexpected error while scaling the deployment", e);
}
}

private async Task CreateDevicesAsync(IList<Simulation> activeSimulations)
{
if (activeSimulations.Count == 0) return;
try
{
if (activeSimulations.Count == 0) return;

var simulationsWithDevicesToCreate = activeSimulations.Where(x => x.DeviceCreationRequired).ToList();
var simulationsWithDevicesToCreate = activeSimulations.Where(x => x.DeviceCreationRequired).ToList();

if (simulationsWithDevicesToCreate.Count == 0)
if (simulationsWithDevicesToCreate.Count == 0)
{
this.log.Debug("No simulations require device creation");
return;
}

foreach (var simulation in simulationsWithDevicesToCreate)
{
await this.CreateIoTHubDevicesAsync(simulation);
}
}
catch (Exception e)
{
this.log.Debug("No simulations require device creation");
return;
this.log.Error("Unexpected error while creating devices", e);
}
}

foreach (var simulation in simulationsWithDevicesToCreate)
private async Task DeleteDevicesAsync(IList<Simulation> deletionRequiredSimulations)
{
try
{
await this.CreateIoTHubDevicesAsync(simulation);
if (deletionRequiredSimulations.Count == 0) return;

foreach (var simulation in deletionRequiredSimulations)
{
await this.DeleteIoTHubDevicesAsync(simulation);
}
}
catch (Exception e)
{
this.log.Error("Unexpected error while deleting devices", e);
}
}

Expand Down Expand Up @@ -213,6 +253,8 @@ private async Task DeleteIoTHubDevicesAsync(Simulation simulation)
: "Device deletion is still in progress",
() => new { SimulationId = simulation.Id });
}

deviceService.Dispose();
}

// Start the job to delete the devices
Expand All @@ -233,6 +275,8 @@ private async Task DeleteIoTHubDevicesAsync(Simulation simulation)
{
this.log.Warn("Failed to start device deletion, will retry later");
}

deviceService.Dispose();
}
}

Expand All @@ -252,7 +296,11 @@ private async Task CreateIoTHubDevicesAsync(Simulation simulation)

if (await deviceService.IsJobCompleteAsync(simulation.DeviceCreationJobId, () => { creationFailed = true; }))
{
this.log.Info("All devices have been created, updating the simulation record", () => new { SimulationId = simulation.Id });
// Note: at this point we don't know if all devices have been created, quota can cause some errors,
// see job log in the storage account
this.log.Info("Device creation job complete, updating the simulation record. All devices should have been created. " +
"If any error occurred, the 'importErrors.log' file in the storage account contains the details.",
() => new { SimulationId = simulation.Id });

if (await this.simulations.TryToSetDeviceCreationCompleteAsync(simulation.Id))
{
Expand All @@ -270,6 +318,8 @@ private async Task CreateIoTHubDevicesAsync(Simulation simulation)
: "Device creation is still in progress",
() => new { SimulationId = simulation.Id });
}

deviceService.Dispose();
}

// Start the job to import the devices
Expand All @@ -290,53 +340,69 @@ private async Task CreateIoTHubDevicesAsync(Simulation simulation)
{
this.log.Warn("Failed to start device creation, will retry later");
}

deviceService.Dispose();
}
}

private async Task CreatePartitionsAsync(IList<Simulation> activeSimulations)
{
if (activeSimulations.Count == 0) return;
try
{
if (activeSimulations.Count == 0) return;

var simulationsToPartition = activeSimulations.Where(x => x.PartitioningRequired).ToList();
var simulationsToPartition = activeSimulations.Where(x => x.PartitioningRequired).ToList();

if (simulationsToPartition.Count == 0)
{
this.log.Debug("No simulations to be partitioned");
return;
}
if (simulationsToPartition.Count == 0)
{
this.log.Debug("No simulations to be partitioned");
return;
}

foreach (Simulation sim in simulationsToPartition)
foreach (Simulation sim in simulationsToPartition)
{
await this.partitions.CreateAsync(sim.Id);
}
}
catch (Exception e)
{
await this.partitions.CreateAsync(sim.Id);
this.log.Error("Unexpected error while creating partitions", e);
}
}

private async Task DeletePartitionsAsync(IList<Simulation> activeSimulations)
{
if (activeSimulations.Count == 0) return;
try
{
if (activeSimulations.Count == 0) return;

this.log.Debug("Searching partitions to delete...");
this.log.Debug("Searching partitions to delete...");

var allPartitions = await this.partitions.GetAllAsync();
var simulationIds = new HashSet<string>(activeSimulations.Select(x => x.Id));
var partitionIds = new List<string>();
foreach (var partition in allPartitions)
{
if (!simulationIds.Contains(partition.SimulationId))
var allPartitions = await this.partitions.GetAllAsync();
var simulationIds = new HashSet<string>(activeSimulations.Select(x => x.Id));
var partitionIds = new List<string>();
foreach (var partition in allPartitions)
{
partitionIds.Add(partition.Id);
if (!simulationIds.Contains(partition.SimulationId))
{
partitionIds.Add(partition.Id);
}
}

if (partitionIds.Count == 0)
{
this.log.Debug("No partitions to delete");
return;
}
}

if (partitionIds.Count == 0)
// TODO: partitions should be deleted only after its actors are down
this.log.Debug("Deleting partitions...", () => new { partitionIds.Count });
await this.partitions.DeleteListAsync(partitionIds);
}
catch (Exception e)
{
this.log.Debug("No partitions to delete");
return;
this.log.Error("Unexpected error while deleting partitions", e);
}

// TODO: partitions should be deleted only after its actors are down
this.log.Debug("Deleting partitions...", () => new { partitionIds.Count });
await this.partitions.DeleteListAsync(partitionIds);
}
}
}
Loading

0 comments on commit 8fa180f

Please sign in to comment.