Skip to content

+metrics #30 basic metrics "skeleton" #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ let targets: [PackageDescription.Target] = [
name: "DistributedActors",
dependencies: [
"NIO",
"NIOSSL",
"NIOExtras",
"NIOFoundationCompat",
"NIOSSL",
"Logging",

"SwiftProtobuf",

"Logging", "Metrics",

"DistributedActorsConcurrencyHelpers",
"CDistributedActorsMailbox",
"CDistributedActorsRunQueue",
"SwiftProtobuf",
]
),

Expand Down Expand Up @@ -108,6 +111,14 @@ let targets: [PackageDescription.Target] = [
dependencies: ["DistributedActors"],
path: "Samples/SampleCluster"
),
.target(
name: "SampleMetrics",
dependencies: [
"DistributedActors",
"SwiftPrometheus",
],
path: "Samples/SampleMetrics"
),

// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Internals; NOT SUPPORTED IN ANY WAY
Expand Down Expand Up @@ -135,8 +146,14 @@ let dependencies: [Package.Dependency] = [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.7.0"),
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.2.0"),

.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.4.0"),

.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-metrics.git", from: "1.0.0"),

// ~~~ only for samples ~~~
.package(url: "https://github.com/MrLotU/SwiftPrometheus", .branch("master"))
]

let package = Package(
Expand All @@ -160,17 +177,21 @@ let package = Package(
/* --- samples --- */

.executable(
name: "DistributedActorsSampleDiningPhilosophers",
name: "SampleDiningPhilosophers",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 like the shorter names

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

targets: ["SampleDiningPhilosophers"]
),
.executable(
name: "DistributedActorsSampleLetItCrash",
name: "SampleLetItCrash",
targets: ["SampleLetItCrash"]
),
.executable(
name: "DistributedActorsSampleCluster",
name: "SampleCluster",
targets: ["SampleCluster"]
),
.executable(
name: "SampleMetrics",
targets: ["SampleMetrics"]
),
],

dependencies: dependencies,
Expand Down
108 changes: 108 additions & 0 deletions Samples/SampleMetrics/main.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import DistributedActors

import Metrics
import Prometheus
//import StatsdClient

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Prometheus backend

let prom = PrometheusClient()
MetricsSystem.bootstrap(prom)

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: StatsD backend
//
// pip install pystatsd
// python -c 'import pystatsd; pystatsd.Server(debug=True).serve()'

// let statsdClient = try StatsdClient(host: "localhost", port: 8125)
// MetricsSystem.bootstrap(statsdClient)


// start actor system
let system = ActorSystem("Metrics") { settings in
settings.cluster.enabled = true
}

struct Talker {
enum Message {
case hello(Int, replyTo: ActorRef<Talker.Message>?)
}
static func talkTo(another talker: ActorRef<Message>?) -> Behavior<Message> {
return .setup { context in
context.log.info("Started \(context.myself.path)")
context.timers.startPeriodic(key: "next-chat", message: .hello(1, replyTo: talker), interval: .milliseconds(200))

return .receiveMessage { message in
// context.log.info("\(message)")

switch message {
case .hello(_, let talkTo):
talkTo?.tell(.hello(1, replyTo: talkTo))
}
return .same
}
}
}
}

struct DieAfterSomeTime {
static let behavior = Behavior<String>.setup { context in
context.log.info("Started \(context.myself.path)")
context.timers.startSingle(key: "die", message: "time-up", delay: .seconds(2))
return .receiveMessage { _ in
context.log.info("Stopping \(context.myself.path)...")
return .stop
}
}
}

struct MetricPrinter {
static var behavior: Behavior<String> {
return .setup { context in
context.log.info("Started \(context.myself.path)")
context.timers.startPeriodic(key: "print-metrics", message: "print", interval: .seconds(2))

return .receiveMessage { _ in
print("------------------------------------------------------------------------------------------")
print(prom.collect())

return .same
}
}
}
}

let props = Props().metrics(group: "talkers")

let t1 = try system.spawn("talker-1", props: props, Talker.talkTo(another: nil))
let t2 = try system.spawn("talker-2", props: props, Talker.talkTo(another: t1))
let t3 = try system.spawn("talker-3", props: props, Talker.talkTo(another: t2))
let t4 = try system.spawn("talker-4", props: props, Talker.talkTo(another: t3))

let m = try system.spawn("metricsPrinter", MetricPrinter.behavior)

for i in 1...10 {
_ = try system.spawn("life-\(i)", DieAfterSomeTime.behavior)
Thread.sleep(.seconds(1))
}

Thread.sleep(.seconds(100))

system.shutdown()
print("~~~~~~~~~~~~~~~ SHUTTING DOWN ~~~~~~~~~~~~~~~")
5 changes: 5 additions & 0 deletions Sources/DistributedActors/ActorAddress.swift
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,11 @@ public struct ActorPathSegment: Hashable {
}
}

extension ActorPathSegment {
internal static let _user: ActorPathSegment = try! ActorPathSegment("user")
internal static let _system: ActorPathSegment = try! ActorPathSegment("system")
}

extension ActorPathSegment: CustomStringConvertible, CustomDebugStringConvertible {
public var description: String {
return "\(self.value)"
Expand Down
11 changes: 8 additions & 3 deletions Sources/DistributedActors/ActorShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
//===----------------------------------------------------------------------===//

import CDistributedActorsMailbox
import Dispatch
import Logging
import Metrics
import NIO

// ==== ----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -147,6 +147,7 @@ internal final class ActorShell<Message>: ActorContext<Message>, AbstractActor {

self.namingContext = ActorNamingContext()

// TODO: replace with TestMetrics which we could use to inspect the start/stop counts
#if SACT_TESTS_LEAKS
// We deliberately only count user actors here, because the number of
// system actors may change over time and they are also not relevant for
Expand All @@ -155,6 +156,9 @@ internal final class ActorShell<Message>: ActorContext<Message>, AbstractActor {
_ = system.userCellInitCounter.add(1)
}
#endif

super.init()
system.metrics.recordActorStart(self)
}

deinit {
Expand All @@ -164,6 +168,7 @@ internal final class ActorShell<Message>: ActorContext<Message>, AbstractActor {
_ = system.userCellInitCounter.sub(1)
}
#endif
system.metrics.recordActorStop(self)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ note: I'd want to have semantically meaningful "do a thing" methods for all metrics, we should not have to wiggle around with +1 or other "which counter exactly" here; all this should be encapsulated in the Metrics files so we can quickly skim it there what counter is updated when.

}

/// INTERNAL API: MUST be called immediately after constructing the cell and ref,
Expand Down Expand Up @@ -720,8 +725,8 @@ extension ActorShell {

extension ActorShell: CustomStringConvertible {
public var description: String {
let path = self._myCell.address.description
return "\(type(of: self))(\(path))"
let prettyTypeName = String(reflecting: Message.self).split(separator: ".").dropFirst().joined(separator: ".")
return "ActorShell<\(prettyTypeName)>(\(self.path))"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sidenote: without this the type was quite useless -- everything was <Message> 😉

}
}

Expand Down
19 changes: 19 additions & 0 deletions Sources/DistributedActors/ActorSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,34 @@ public final class ActorSystem {
// initialized during startup
public var serialization: Serialization!

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Receptionist

public var receptionist: ActorRef<Receptionist.Message> {
return self._receptionist
}

private var _receptionist: ActorRef<Receptionist.Message>!

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: CRDT Replicator

internal var replicator: ActorRef<CRDT.Replicator.Message> {
return self._replicator
}

private var _replicator: ActorRef<CRDT.Replicator.Message>!

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Metrics

internal var metrics: ActorSystemMetrics {
return self._metrics
}

private lazy var _metrics: ActorSystemMetrics = ActorSystemMetrics(self.settings.metrics)

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Cluster

// initialized during startup
Expand Down Expand Up @@ -99,6 +115,7 @@ public final class ActorSystem {
public convenience init(_ name: String, configuredWith configureSettings: (inout ActorSystemSettings) -> Void = { _ in () }) {
var settings = ActorSystemSettings()
settings.cluster.node.systemName = name
settings.metrics.rootName = name
configureSettings(&settings)
self.init(settings: settings)
}
Expand Down Expand Up @@ -191,6 +208,8 @@ public final class ActorSystem {
} catch {
fatalError("Failed while starting cluster subsystem! Error: \(error)")
}

_ = self.metrics // force init of metrics
}

public convenience init() {
Expand Down
3 changes: 3 additions & 0 deletions Sources/DistributedActors/ActorSystemSettings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public struct ActorSystemSettings {
return .init()
}

// TODO: LoggingSettings

/// Configure default log level for all `Logger` instances created by the library.
public var defaultLogLevel: Logger.Level = .info // TODO: maybe remove this? should be up to logging library to configure for us as well

Expand All @@ -34,6 +36,7 @@ public struct ActorSystemSettings {

public var actor: ActorSettings = .default
public var serialization: SerializationSettings = .default
public var metrics: MetricsSettings = .default(rootName: nil)
public var cluster: ClusterSettings = .default {
didSet {
self.serialization.localNode = self.cluster.uniqueBindNode
Expand Down
4 changes: 4 additions & 0 deletions Sources/DistributedActors/Cluster/ClusterShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ extension ClusterShell {
context.log.info("Bound to \(chan.localAddress.map { $0.description } ?? "<no-local-address>")")

let state = ClusterShellState(settings: clusterSettings, channel: chan, log: context.log)
context.system.metrics.recordMembership(state.membership)

return self.ready(state: state)
}
Expand Down Expand Up @@ -601,6 +602,9 @@ extension ClusterShell {
case .reachable:
self._events.publish(.reachability(.memberReachable(changedMember)))
}

context.system.metrics.recordMembership(state.membership)

return self.ready(state: state)
} else {
return .same
Expand Down
14 changes: 7 additions & 7 deletions Sources/DistributedActors/Cluster/ClusterShellState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ internal struct ClusterShellState: ReadOnlyClusterState {
typealias Messages = ClusterShell.Message

// TODO: maybe move log and settings outside of state into the shell?
public var log: Logger
public let settings: ClusterSettings
var log: Logger
let settings: ClusterSettings

public let selfNode: UniqueNode
public let channel: Channel
let selfNode: UniqueNode
let channel: Channel

public let eventLoopGroup: EventLoopGroup
let eventLoopGroup: EventLoopGroup

public var backoffStrategy: BackoffStrategy {
var backoffStrategy: BackoffStrategy {
return self.settings.handshakeBackoffStrategy
}

public let allocator: ByteBufferAllocator
let allocator: ByteBufferAllocator

internal var _handshakes: [Node: HandshakeStateMachine.State] = [:]
private var _associations: [Node: AssociationStateMachine.State] = [:]
Expand Down
Loading