Skip to content

+metrics #30 basic metrics "skeleton" #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ let targets: [PackageDescription.Target] = [
name: "DistributedActors",
dependencies: [
"NIO",
"NIOSSL",
"NIOExtras",
"NIOFoundationCompat",
"NIOSSL",
"Logging",

"SwiftProtobuf",

"Logging", "Metrics",

"DistributedActorsConcurrencyHelpers",
"CDistributedActorsMailbox",
"CDistributedActorsRunQueue",
"SwiftProtobuf",
]
),

Expand Down Expand Up @@ -108,6 +111,14 @@ let targets: [PackageDescription.Target] = [
dependencies: ["DistributedActors"],
path: "Samples/SampleCluster"
),
.target(
name: "SampleMetrics",
dependencies: [
"DistributedActors",
"SwiftPrometheus",
],
path: "Samples/SampleMetrics"
),

// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Internals; NOT SUPPORTED IN ANY WAY
Expand Down Expand Up @@ -135,8 +146,14 @@ let dependencies: [Package.Dependency] = [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.7.0"),
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.2.0"),

.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.4.0"),

.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-metrics.git", from: "1.0.0"),

// ~~~ only for samples ~~~
.package(url: "https://github.com/MrLotU/SwiftPrometheus", .branch("master"))
]

let package = Package(
Expand All @@ -160,17 +177,21 @@ let package = Package(
/* --- samples --- */

.executable(
name: "DistributedActorsSampleDiningPhilosophers",
name: "SampleDiningPhilosophers",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 like the shorter names

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

targets: ["SampleDiningPhilosophers"]
),
.executable(
name: "DistributedActorsSampleLetItCrash",
name: "SampleLetItCrash",
targets: ["SampleLetItCrash"]
),
.executable(
name: "DistributedActorsSampleCluster",
name: "SampleCluster",
targets: ["SampleCluster"]
),
.executable(
name: "SampleMetrics",
targets: ["SampleMetrics"]
),
],

dependencies: dependencies,
Expand Down
108 changes: 108 additions & 0 deletions Samples/SampleMetrics/main.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import DistributedActors

import Metrics
import Prometheus
//import StatsdClient

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Prometheus backend

let prom = PrometheusClient()
MetricsSystem.bootstrap(prom)

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: StatsD backend
//
// pip install pystatsd
// python -c 'import pystatsd; pystatsd.Server(debug=True).serve()'

// let statsdClient = try StatsdClient(host: "localhost", port: 8125)
// MetricsSystem.bootstrap(statsdClient)


// start actor system
let system = ActorSystem("Metrics") { settings in
settings.cluster.enabled = true
}

struct Talker {
enum Message {
case hello(Int, replyTo: ActorRef<Talker.Message>?)
}
static func talkTo(another talker: ActorRef<Message>?) -> Behavior<Message> {
return .setup { context in
context.log.info("Started \(context.myself.path)")
context.timers.startPeriodic(key: "next-chat", message: .hello(1, replyTo: talker), interval: .milliseconds(200))

return .receiveMessage { message in
// context.log.info("\(message)")

switch message {
case .hello(_, let talkTo):
talkTo?.tell(.hello(1, replyTo: talkTo))
}
return .same
}
}
}
}

struct DieAfterSomeTime {
static let behavior = Behavior<String>.setup { context in
context.log.info("Started \(context.myself.path)")
context.timers.startSingle(key: "die", message: "time-up", delay: .seconds(2))
return .receiveMessage { _ in
context.log.info("Stopping \(context.myself.path)...")
return .stop
}
}
}

struct MetricPrinter {
static var behavior: Behavior<String> {
return .setup { context in
context.log.info("Started \(context.myself.path)")
context.timers.startPeriodic(key: "print-metrics", message: "print", interval: .seconds(2))

return .receiveMessage { _ in
print("------------------------------------------------------------------------------------------")
print(prom.collect())

return .same
}
}
}
}

let props = Props().withMetrics(group: "talkers")

let t1 = try system.spawn("talker-1", props: props, Talker.talkTo(another: nil))
let t2 = try system.spawn("talker-2", props: props, Talker.talkTo(another: t1))
let t3 = try system.spawn("talker-3", props: props, Talker.talkTo(another: t2))
let t4 = try system.spawn("talker-4", props: props, Talker.talkTo(another: t3))

let m = try system.spawn("metricsPrinter", MetricPrinter.behavior)

for i in 1...10 {
_ = try system.spawn("life-\(i)", DieAfterSomeTime.behavior)
Thread.sleep(.seconds(1))
}

Thread.sleep(.seconds(100))

system.shutdown()
print("~~~~~~~~~~~~~~~ SHUTTING DOWN ~~~~~~~~~~~~~~~")
4 changes: 3 additions & 1 deletion Sources/DistributedActors/ActorShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
//===----------------------------------------------------------------------===//

import CDistributedActorsMailbox
import Dispatch
import Logging
import Metrics
import NIO

// ==== ----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -155,6 +155,7 @@ internal final class ActorShell<Message>: ActorContext<Message>, AbstractActor {
_ = system.userCellInitCounter.add(1)
}
#endif
system.metrics.actors_count.add(1)
}

deinit {
Expand All @@ -164,6 +165,7 @@ internal final class ActorShell<Message>: ActorContext<Message>, AbstractActor {
_ = system.userCellInitCounter.sub(1)
}
#endif
system.metrics.actors_count.add(-1)
}

/// INTERNAL API: MUST be called immediately after constructing the cell and ref,
Expand Down
19 changes: 19 additions & 0 deletions Sources/DistributedActors/ActorSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,34 @@ public final class ActorSystem {
// initialized during startup
public var serialization: Serialization!

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Receptionist

public var receptionist: ActorRef<Receptionist.Message> {
return self._receptionist
}

private var _receptionist: ActorRef<Receptionist.Message>!

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: CRDT Replicator

internal var replicator: ActorRef<CRDT.Replicator.Message> {
return self._replicator
}

private var _replicator: ActorRef<CRDT.Replicator.Message>!

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Metrics

internal var metrics: ActorSystemMetrics {
return self._metrics
}

private lazy var _metrics: ActorSystemMetrics = ActorSystemMetrics(self.settings.metrics)

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Cluster

// initialized during startup
Expand Down Expand Up @@ -99,6 +115,7 @@ public final class ActorSystem {
public convenience init(_ name: String, configuredWith configureSettings: (inout ActorSystemSettings) -> Void = { _ in () }) {
var settings = ActorSystemSettings()
settings.cluster.node.systemName = name
settings.metrics.rootName = name
configureSettings(&settings)
self.init(settings: settings)
}
Expand Down Expand Up @@ -191,6 +208,8 @@ public final class ActorSystem {
} catch {
fatalError("Failed while starting cluster subsystem! Error: \(error)")
}

_ = self.metrics // force init of metrics
}

public convenience init() {
Expand Down
3 changes: 3 additions & 0 deletions Sources/DistributedActors/ActorSystemSettings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public struct ActorSystemSettings {
return .init()
}

// TODO: LoggingSettings

/// Configure default log level for all `Logger` instances created by the library.
public var defaultLogLevel: Logger.Level = .info // TODO: maybe remove this? should be up to logging library to configure for us as well

Expand All @@ -34,6 +36,7 @@ public struct ActorSystemSettings {

public var actor: ActorSettings = .default
public var serialization: SerializationSettings = .default
public var metrics: MetricsSettings = .default(rootName: nil)
public var cluster: ClusterSettings = .default {
didSet {
self.serialization.localNode = self.cluster.uniqueBindNode
Expand Down
6 changes: 5 additions & 1 deletion Sources/DistributedActors/Cluster/ClusterShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ extension ClusterShell {
return context.awaitResultThrowing(of: chanElf, timeout: .milliseconds(300)) { (chan: Channel) in
context.log.info("Bound to \(chan.localAddress.map { $0.description } ?? "<no-local-address>")")

let state = ClusterShellState(settings: clusterSettings, channel: chan, log: context.log)
let state = ClusterShellState(settings: clusterSettings, metrics: context.system.metrics, channel: chan, log: context.log)
state.metrics.update(state.membership)

return self.ready(state: state)
}
Expand Down Expand Up @@ -601,6 +602,9 @@ extension ClusterShell {
case .reachable:
self._events.publish(.reachability(.memberReachable(changedMember)))
}

state.metrics.update(state.membership)

return self.ready(state: state)
} else {
return .same
Expand Down
18 changes: 10 additions & 8 deletions Sources/DistributedActors/Cluster/ClusterShellState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,31 @@ internal struct ClusterShellState: ReadOnlyClusterState {
typealias Messages = ClusterShell.Message

// TODO: maybe move log and settings outside of state into the shell?
public var log: Logger
public let settings: ClusterSettings
var log: Logger
let settings: ClusterSettings
let metrics: ActorSystemMetrics

public let selfNode: UniqueNode
public let channel: Channel
let selfNode: UniqueNode
let channel: Channel

public let eventLoopGroup: EventLoopGroup
let eventLoopGroup: EventLoopGroup

public var backoffStrategy: BackoffStrategy {
var backoffStrategy: BackoffStrategy {
return self.settings.handshakeBackoffStrategy
}

public let allocator: ByteBufferAllocator
let allocator: ByteBufferAllocator

internal var _handshakes: [Node: HandshakeStateMachine.State] = [:]
private var _associations: [Node: AssociationStateMachine.State] = [:]

// TODO: make private
internal var _membership: Membership

init(settings: ClusterSettings, channel: Channel, log: Logger) {
init(settings: ClusterSettings, metrics: ActorSystemMetrics, channel: Channel, log: Logger) {
self.log = log
self.settings = settings
self.metrics = metrics
self.allocator = settings.allocator
self.eventLoopGroup = settings.eventLoopGroup ?? settings.makeDefaultEventLoopGroup()

Expand Down
8 changes: 7 additions & 1 deletion Sources/DistributedActors/Cluster/Membership.swift
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,13 @@ public struct Membership: Hashable, ExpressibleByArrayLiteral {
}

func members(atLeast status: MemberStatus) -> [Member] {
return self._members.values.filter { $0.status <= status }
switch status {
case .joining:
// at least joining nodes == all nodes, since there is no status earlier than joining
return Array(self._members.values) // TODO: avoid copy
default:
return self._members.values.filter { $0.status <= status }
}
}
}

Expand Down
Loading