Skip to content

Commit

Permalink
增加leader断开连接的处理。
Browse files Browse the repository at this point in the history
  • Loading branch information
bossma committed Jun 25, 2020
1 parent ed2b075 commit 1b189ab
Show file tree
Hide file tree
Showing 14 changed files with 479 additions and 324 deletions.
4 changes: 2 additions & 2 deletions examples/ElectionService1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ static void Main(string[] args)
Console.WriteLine("I am ElectionService1.");

// Consul
// LeaderElectionManager electionManager = new LeaderElectionManager("ElectionService", "ElectionService1", new ConsulElectionOptions());
LeaderElectionManager electionManager = new LeaderElectionManager("ElectionService", "ElectionService1", new ConsulElectionOptions());

// ZooKeeper
LeaderElectionManager electionManager = new LeaderElectionManager("ElectionService", "ElectionService1", new ZkElectionOptions());
// LeaderElectionManager electionManager = new LeaderElectionManager("ElectionService", "ElectionService1", new ZkElectionOptions());

electionManager.Watch(LeaderElectCompletedEventHandler);
Console.WriteLine("Start Election...");
Expand Down
4 changes: 2 additions & 2 deletions examples/ElectionService2/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ static void Main(string[] args)
Console.WriteLine("I am ElectionService2.");

// Consul
// LeaderElectionManager electionManager = new LeaderElectionManager("ElectionService", "ElectionService2", new ConsulElectionOptions());
LeaderElectionManager electionManager = new LeaderElectionManager("ElectionService", "ElectionService2", new ConsulElectionOptions());

// ZooKeeper
LeaderElectionManager electionManager = new LeaderElectionManager("ElectionService", "ElectionService2", new ZkElectionOptions());
// LeaderElectionManager electionManager = new LeaderElectionManager("ElectionService", "ElectionService2", new ZkElectionOptions());

electionManager.Watch(LeaderElectCompletedEventHandler);
Console.WriteLine("Start Election...");
Expand Down
152 changes: 127 additions & 25 deletions src/ConsulService.cs → src/ConsulElectionClient.cs
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
using Consul;
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Consul;

namespace FireflySoft.LeaderElection
{
/// <summary>
/// Consul服务管理相关
/// </summary>
internal class ConsulService
public class ConsulElectionClient
{
private readonly ConsulClient client;
private readonly ConsulClient _client;

public ConsulService(ConsulClient client)
/// <summary>
/// The ttl pass thread.
/// </summary>
private Thread ttlPassThread;

public ConsulElectionClient(ConsulClient _client)
{
this.client = client;
this._client = _client;
}

#region Agent

/// <summary>
/// 更新服务健康检查的TTL
/// </summary>
Expand All @@ -29,7 +30,7 @@ public void PassTTL(string serviceId)
Retry(() =>
{
var checkId = "CHECK:" + serviceId;
client.Agent.PassTTL(checkId, "Alive").Wait();
_client.Agent.PassTTL(checkId, "Alive").Wait();
}, 2);
}

Expand All @@ -41,7 +42,7 @@ public bool DeregisterService(string serviceId)
{
var deRegResult = Retry(() =>
{
return client.Agent.ServiceDeregister(serviceId).Result;
return _client.Agent.ServiceDeregister(serviceId).ConfigureAwait(false).GetAwaiter().GetResult();
}, 2);

if (deRegResult.StatusCode != System.Net.HttpStatusCode.OK)
Expand All @@ -61,7 +62,7 @@ public bool DeregisterServiceCheck(string checkId)
{
var deRegResult = Retry(() =>
{
return client.Agent.CheckDeregister(checkId).Result;
return _client.Agent.CheckDeregister(checkId).ConfigureAwait(false).GetAwaiter().GetResult();
}, 2);

if (deRegResult.StatusCode != System.Net.HttpStatusCode.OK)
Expand All @@ -72,11 +73,6 @@ public bool DeregisterServiceCheck(string checkId)
return true;
}

/// <summary>
/// The ttl pass thread.
/// </summary>
private Thread ttlPassThread;

/// <summary>
/// 注册服务,使用此方法注册的服务需要定时Pass TTL
/// </summary>
Expand All @@ -88,7 +84,7 @@ public string RegisterService(string serviceId, string serviceName, int ttl)
{
var deRegResult = Retry(() =>
{
return client.Agent.ServiceDeregister(serviceId).Result;
return _client.Agent.ServiceDeregister(serviceId).ConfigureAwait(false).GetAwaiter().GetResult();
}, 2);

if (deRegResult.StatusCode != System.Net.HttpStatusCode.OK)
Expand All @@ -98,11 +94,11 @@ public string RegisterService(string serviceId, string serviceName, int ttl)

var regResult = Retry(() =>
{
return client.Agent.ServiceRegister(new AgentServiceRegistration()
return _client.Agent.ServiceRegister(new AgentServiceRegistration()
{
ID = serviceId,
Name = serviceName
}).Result;
}).ConfigureAwait(false).GetAwaiter().GetResult();
}, 2);

Expand All @@ -114,7 +110,7 @@ public string RegisterService(string serviceId, string serviceName, int ttl)
string checkId = "CHECK:" + serviceId;
var regCheckResult = Retry(() =>
{
return client.Agent.CheckRegister(new AgentCheckRegistration()
return _client.Agent.CheckRegister(new AgentCheckRegistration()
{
ID = checkId,
Name = "CHECK " + serviceId,
Expand All @@ -123,7 +119,7 @@ public string RegisterService(string serviceId, string serviceName, int ttl)
ServiceID = serviceId,
Status = HealthStatus.Warning,
TTL = new TimeSpan(0, 0, ttl)
}).Result;
}).ConfigureAwait(false).GetAwaiter().GetResult();
}, 2);

if (regCheckResult.StatusCode != System.Net.HttpStatusCode.OK)
Expand Down Expand Up @@ -157,6 +153,112 @@ public string RegisterService(string serviceId, string serviceName, int ttl)

return checkId;
}
#endregion

#region KeyValue
/// <summary>
/// 创建一个KVPair实例
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public KVPair Create(string key)
{
return new KVPair(key);
}

/// <summary>
/// 阻塞获取对应Key的值
/// </summary>
/// <param name="key"></param>
/// <param name="waitTime"></param>
/// <param name="waitIndex"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public KVPair BlockGet(string key, TimeSpan waitTime, ulong waitIndex, CancellationToken cancellationToken = default)
{
return Retry(() =>
{
return _client.KV.Get(key, new QueryOptions()
{
WaitTime = waitTime,
WaitIndex = waitIndex
}, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult().Response;
}, 1);
}

/// <summary>
/// 获取对应Key的字符串值
/// </summary>
/// <param name="kv"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public bool Acquire(KVPair kv, CancellationToken cancellationToken = default)
{
return Retry(() =>
{
return _client.KV.Acquire(kv, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult().Response;
}, 1);
}

/// <summary>
/// 获取对应Key的字符串值
/// </summary>
/// <returns>The get.</returns>
/// <param name="key">Key.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public KVPair Get(string key, CancellationToken cancellationToken = default)
{
return Retry(() =>
{
return _client.KV.Get(key, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult().Response;
}, 2);
}

/// <summary>
/// 删除对应Key
/// </summary>
/// <param name="key"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public bool Delete(string key, CancellationToken cancellationToken = default)
{
return Retry(() =>
{
return _client.KV.Delete(key, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult().Response;
}, 2);
}

/// <summary>
/// 创建Session
/// </summary>
/// <returns>The session.</returns>
/// <param name="checkIds">Check identifier.</param>
/// <param name="lockDelay">Lock delay.</param>
public string CreateSession(List<string> checkIds, int lockDelay = 15)
{
return Retry(() =>
{
return _client.Session.Create(new SessionEntry()
{
Checks = checkIds,
LockDelay = new TimeSpan(0, 0, lockDelay),
}).ConfigureAwait(false).GetAwaiter().GetResult().Response;
}, 2);
}

/// <summary>
/// 移除Session
/// </summary>
/// <returns></returns>
public bool RemoveSession(string sessionId)
{
return Retry(() =>
{
return _client.Session.Destroy(sessionId).ConfigureAwait(false).GetAwaiter().GetResult().Response;
}, 2);
}
#endregion

private void Retry(Action action, int retryTimes)
{
Expand Down
2 changes: 1 addition & 1 deletion src/ConsulElectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public ConsulElectionOptions()
public ConsulClient ConsulClient { get; private set; }

/// <summary>
/// 重新选举沉默期:Leader状态丢失后,集群可以重新选举成功的等待时间,单位秒,默认10s
/// 重新选举沉默期:Leader下线后,集群可以重新选举成功的等待时间,单位秒,默认10s
/// </summary>
public int ReElectionSilencePeriod { get; set; } = 10;
}
Expand Down
Loading

0 comments on commit 1b189ab

Please sign in to comment.