Skip to content

Commit 4934208

Browse files
ktosoyim-lee
andauthoredNov 1, 2022
=cluster handle aggressively rejoining/replacing nodes from same host/port pair (apple#1083)
Co-authored-by: Yim Lee <[email protected]>
1 parent d816a10 commit 4934208

28 files changed

+271
-63
lines changed
 

‎Samples/Sources/SampleDiningPhilosophers/SamplePrettyLogHandler.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ struct SamplePrettyLogHandler: LogHandler {
106106

107107
let file = file.split(separator: "/").last ?? ""
108108
let line = line
109-
print("\(self.timestamp()) [\(file):\(line)] [\(nodeInfo)\(Self.CONSOLE_BOLD)\(label)\(Self.CONSOLE_RESET)] [\(level)] \(message)\(metadataString)")
109+
print("\(self.timestamp()) \(level) [\(nodeInfo)\(Self.CONSOLE_BOLD)\(label)\(Self.CONSOLE_RESET)][\(file):\(line)] \(message)\(metadataString)")
110110
}
111111

112112
internal func prettyPrint(metadata: Logger.MetadataValue) -> String {

‎Sources/DistributedActorsTestKit/LogCapture.swift

+8-5
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,13 @@ extension LogCapture {
115115
public func printLogs() {
116116
for log in self.logs {
117117
var metadataString: String = ""
118-
var actorPath: String = ""
118+
var actorIdentifier: String = ""
119119
if var metadata = log.metadata {
120-
if let path = metadata.removeValue(forKey: "actor/path") {
121-
actorPath = "[\(path)]"
120+
if let id = metadata.removeValue(forKey: "actor/id") {
121+
actorIdentifier = "[\(id)]"
122+
_ = metadata.removeValue(forKey: "actor/path") // discard it
123+
} else if let path = metadata.removeValue(forKey: "actor/path") {
124+
actorIdentifier = "[\(path)]"
122125
}
123126

124127
metadata.removeValue(forKey: "label")
@@ -148,10 +151,10 @@ extension LogCapture {
148151
metadataString = String(metadataString.dropLast(1))
149152
}
150153
}
151-
let date = ActorOriginLogHandler._createFormatter().string(from: log.date)
154+
let date = ActorOriginLogHandler._createSimpleFormatter().string(from: log.date)
152155
let file = log.file.split(separator: "/").last ?? ""
153156
let line = log.line
154-
print("[captured] [\(self.captureLabel)] [\(date)] [\(file):\(line)]\(actorPath) [\(log.level)] \(log.message)\(metadataString)")
157+
print("[captured] [\(self.captureLabel)] \(date) \(log.level) \(actorIdentifier) [\(file):\(line)] \(log.message)\(metadataString)")
155158
}
156159
}
157160

‎Sources/DistributedCluster/ActorLogging.swift

+8
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,14 @@ struct ActorOriginLogHandler: LogHandler {
103103
return formatter
104104
}
105105

106+
public static func _createSimpleFormatter() -> DateFormatter {
107+
let formatter = DateFormatter()
108+
formatter.dateFormat = "H:m:ss.SSSS"
109+
formatter.locale = Locale(identifier: "en_US")
110+
formatter.calendar = Calendar(identifier: .gregorian)
111+
return formatter
112+
}
113+
106114
private let context: LoggingContext
107115

108116
private var targetLogger: Logger

‎Sources/DistributedCluster/Cluster/Cluster+Event.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ extension Cluster.MembershipChange: CustomStringConvertible {
160160
if let replaced = self.replaced {
161161
base = "[replaced:\(reflecting: replaced)] by \(reflecting: self.node)"
162162
} else {
163-
base = "\(self.node)"
163+
base = "\(reflecting: self.node)"
164164
}
165165
return base +
166166
" :: " +

‎Sources/DistributedCluster/Cluster/Cluster+Membership.swift

+40-10
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,14 @@ extension Cluster {
6666
///
6767
/// This operation is guaranteed to return a member if it was added to the membership UNLESS the member has been `.removed`
6868
/// and dropped which happens only after an extended period of time. // FIXME: That period of time is not implemented
69-
public func uniqueMember(_ node: Cluster.Node) -> Cluster.Member? {
69+
public func member(_ node: Cluster.Node) -> Cluster.Member? {
7070
self._members[node]
7171
}
7272

7373
/// Picks "first", in terms of least progressed among its lifecycle member in presence of potentially multiple members
7474
/// for a non-unique `Node`. In practice, this happens when an existing node is superseded by a "replacement", and the
7575
/// previous node becomes immediately down.
76-
public func member(_ endpoint: Cluster.Endpoint) -> Cluster.Member? {
76+
public func anyMember(forEndpoint endpoint: Cluster.Endpoint) -> Cluster.Member? {
7777
self._members.values.sorted(by: Cluster.MemberStatus.lifecycleOrdering).first(where: { $0.node.endpoint == endpoint })
7878
}
7979

@@ -90,14 +90,22 @@ extension Cluster {
9090
self._members.count
9191
}
9292

93-
/// More efficient than using `members(atLeast:)` followed by a `.count`
93+
/// More efficient than using ``members(atLeast:reachability:)`` followed by a `.count`
9494
public func count(atLeast status: Cluster.MemberStatus) -> Int {
9595
self._members.values
9696
.lazy
9797
.filter { member in status <= member.status }
9898
.count
9999
}
100100

101+
/// More efficient than using ``members(atMost:reachability:)`` followed by a `.count`
102+
public func count(atMost status: Cluster.MemberStatus) -> Int {
103+
self._members.values
104+
.lazy
105+
.filter { member in member.status <= status }
106+
.count
107+
}
108+
101109
/// More efficient than using `members(withStatus:)` followed by a `.count`
102110
public func count(withStatus status: Cluster.MemberStatus) -> Int {
103111
self._members.values
@@ -153,6 +161,13 @@ extension Cluster {
153161
}
154162
}
155163

164+
/// Returns all members that are part of this membership, and have ``Cluster/MemberStatus`` that is *at most*
165+
/// the passed in `status` and `reachability`. See ``Cluster/MemberStatus`` to learn more about the meaning of "at most".
166+
///
167+
/// - Parameters:
168+
/// - status: "at most" status for which to check the members for
169+
/// - reachability: optional reachability that the members will be filtered by
170+
/// - Returns: array of members matching those checks. Can be empty.
156171
public func members(atMost status: Cluster.MemberStatus, reachability: Cluster.MemberReachability? = nil) -> [Cluster.Member] {
157172
if status == .removed, reachability == nil {
158173
return Array(self._members.values)
@@ -189,7 +204,7 @@ extension Cluster {
189204
/// Certain actions can only be performed by the "leader" of a group.
190205
public internal(set) var leader: Cluster.Member? {
191206
get {
192-
self._leaderNode.flatMap { self.uniqueMember($0) }
207+
self._leaderNode.flatMap { self.member($0) }
193208
}
194209
set {
195210
self._leaderNode = newValue?.node
@@ -223,6 +238,21 @@ extension Cluster {
223238
}
224239
}
225240

241+
extension Cluster.Membership: Sequence {
242+
public struct Iterator: IteratorProtocol {
243+
public typealias Element = Cluster.Member
244+
internal var it: Dictionary<Cluster.Node, Cluster.Member>.Values.Iterator
245+
246+
public mutating func next() -> Cluster.Member? {
247+
self.it.next()
248+
}
249+
}
250+
251+
public func makeIterator() -> Iterator {
252+
.init(it: self._members.values.makeIterator())
253+
}
254+
}
255+
226256
// Implementation notes: Membership/Member equality
227257
//
228258
// Membership equality is special, as it manually DOES take into account the Member's states (status, reachability),
@@ -300,7 +330,7 @@ extension Cluster.Membership {
300330
return self.removeCompletely(change.node)
301331
}
302332

303-
if let knownUnique = self.uniqueMember(change.node) {
333+
if let knownUnique = self.member(change.node) {
304334
// it is known uniquely, so we just update its status
305335
return self.mark(knownUnique.node, as: change.status)
306336
}
@@ -311,7 +341,7 @@ extension Cluster.Membership {
311341
return nil
312342
}
313343

314-
if let previousMember = self.member(change.node.endpoint) {
344+
if let previousMember = self.anyMember(forEndpoint: change.node.endpoint) {
315345
// we are joining "over" an existing incarnation of a node; causing the existing node to become .down immediately
316346
if previousMember.status < .down {
317347
_ = self.mark(previousMember.node, as: .down)
@@ -398,7 +428,7 @@ extension Cluster.Membership {
398428
///
399429
/// If the membership not aware of this address the update is treated as a no-op.
400430
public mutating func mark(_ node: Cluster.Node, as status: Cluster.MemberStatus) -> Cluster.MembershipChange? {
401-
if let existingExactMember = self.uniqueMember(node) {
431+
if let existingExactMember = self.member(node) {
402432
guard existingExactMember.status < status else {
403433
// this would be a "move backwards" which we do not do; membership only moves forward
404434
return nil
@@ -412,7 +442,7 @@ extension Cluster.Membership {
412442
self._members[existingExactMember.node] = updatedMember
413443

414444
return Cluster.MembershipChange(member: existingExactMember, toStatus: status)
415-
} else if let beingReplacedMember = self.member(node.endpoint) {
445+
} else if let beingReplacedMember = self.anyMember(forEndpoint: node.endpoint) {
416446
// We did not get a member by exact Cluster.Node match, but we got one by Node match...
417447
// this means this new node that we are trying to mark is a "replacement" and the `beingReplacedNode` must be .downed!
418448

@@ -619,14 +649,14 @@ extension Cluster.Membership {
619649
// TODO: diffing is not super well tested, may lose up numbers
620650
static func _diff(from: Cluster.Membership, to: Cluster.Membership) -> MembershipDiff {
621651
var entries: [Cluster.MembershipChange] = []
622-
entries.reserveCapacity(max(from._members.count, to._members.count))
652+
entries.reserveCapacity(Swift.max(from._members.count, to._members.count))
623653

624654
// TODO: can likely be optimized more
625655
var to = to
626656

627657
// iterate over the original member set, and remove from the `to` set any seen members
628658
for member in from._members.values {
629-
if let toMember = to.uniqueMember(member.node) {
659+
if let toMember = to.member(member.node) {
630660
to._members.removeValue(forKey: member.node)
631661
if member.status != toMember.status {
632662
entries.append(.init(member: member, toStatus: toMember.status))

‎Sources/DistributedCluster/Cluster/ClusterControl.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ public struct ClusterControl {
245245
}
246246
}
247247

248-
guard let foundMember = membership.uniqueMember(node) else {
248+
guard let foundMember = membership.member(node) else {
249249
if status == .down || status == .removed {
250250
// so we're seeing an already removed member, this can indeed happen and is okey
251251
return Cluster.Member(node: node, status: .removed).asUnreachable
@@ -273,7 +273,7 @@ public struct ClusterControl {
273273
@discardableResult
274274
public func waitFor(_ endpoint: Cluster.Endpoint, _ status: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member? {
275275
try await self.waitForMembershipEventually(Cluster.Member?.self, within: within) { membership in
276-
guard let foundMember = membership.member(endpoint) else {
276+
guard let foundMember = membership.anyMember(forEndpoint: endpoint) else {
277277
if status == .down || status == .removed {
278278
return nil
279279
}
@@ -306,7 +306,7 @@ public struct ClusterControl {
306306
}
307307
}
308308

309-
guard let foundMember = membership.uniqueMember(node) else {
309+
guard let foundMember = membership.member(node) else {
310310
if atLeastStatus == .down || atLeastStatus == .removed {
311311
// so we're seeing an already removed member, this can indeed happen and is okey
312312
return Cluster.Member(node: node, status: .removed).asUnreachable

‎Sources/DistributedCluster/Cluster/ClusterShell.swift

+4-4
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ extension ClusterShell {
489489
return self.retryHandshake(context, state, initiated: initiated)
490490

491491
case .failureDetectorReachabilityChanged(let node, let reachability):
492-
guard let member = state.membership.uniqueMember(node) else {
492+
guard let member = state.membership.member(node) else {
493493
return .same // reachability change of unknown node
494494
}
495495
switch reachability {
@@ -505,8 +505,8 @@ extension ClusterShell {
505505
case .shutdown(let receptacle):
506506
return self.onShutdownCommand(context, state: state, signalOnceUnbound: receptacle)
507507

508-
case .downCommand(let node):
509-
if let member = state.membership.member(node) {
508+
case .downCommand(let endpoint):
509+
if let member = state.membership.anyMember(forEndpoint: endpoint) {
510510
return self.ready(state: self.onDownCommand(context, state: state, member: member))
511511
} else {
512512
return self.ready(state: state)
@@ -1173,7 +1173,7 @@ extension ClusterShell {
11731173
]
11741174
)
11751175

1176-
guard let myselfMember = state.membership.uniqueMember(myselfNode) else {
1176+
guard let myselfMember = state.membership.member(myselfNode) else {
11771177
state.log.error("Unable to find Cluster.Member for \(myselfNode) self node! This should not happen, please file an issue.")
11781178
return .same
11791179
}

‎Sources/DistributedCluster/Cluster/ClusterShellState.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ internal struct ClusterShellState: ReadOnlyClusterState {
4848

4949
let selfNode: Cluster.Node
5050
var selfMember: Cluster.Member {
51-
if let member = self.membership.uniqueMember(self.selfNode) {
51+
if let member = self.membership.member(self.selfNode) {
5252
return member
5353
} else {
5454
fatalError("""
@@ -391,7 +391,7 @@ extension ClusterShellState {
391391
}
392392

393393
let change: Cluster.MembershipChange?
394-
if let replacedMember = self.membership.member(handshake.remoteNode.endpoint) {
394+
if let replacedMember = self.membership.anyMember(forEndpoint: handshake.remoteNode.endpoint) {
395395
change = self.membership.applyMembershipChange(Cluster.MembershipChange(replaced: replacedMember, by: Cluster.Member(node: handshake.remoteNode, status: .joining)))
396396
} else {
397397
change = self.membership.applyMembershipChange(Cluster.MembershipChange(member: Cluster.Member(node: handshake.remoteNode, status: .joining)))

‎Sources/DistributedCluster/Cluster/Downing/DowningStrategy.swift

+2-1
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,9 @@ internal distributed actor DowningStrategyShell {
145145
func markAsDown(members: Set<Cluster.Member>) {
146146
for member in members {
147147
self.log.info(
148-
"Decision to [.down] member [\(member)]!", metadata: self.metadata([
148+
"Decide to [.down] member [\(member)]!", metadata: self.metadata([
149149
"downing/node": "\(reflecting: member.node)",
150+
"member/status/previous": "\(member.status)",
150151
])
151152
)
152153
self.actorSystem.cluster.down(member: member)

‎Sources/DistributedCluster/Cluster/MembershipGossip/Cluster+MembershipGossip.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ extension Cluster {
6565
let causalRelation: VersionVector.CausalRelation = self.version.compareTo(incoming.version)
6666

6767
// 1.1) Protect the node from any gossip from a .down node (!), it cannot and MUST NOT be trusted.
68-
let incomingGossipOwnerKnownLocally = self.membership.uniqueMember(incoming.owner)
69-
guard let incomingOwnerMember = incoming.membership.uniqueMember(incoming.owner) else {
68+
let incomingGossipOwnerKnownLocally = self.membership.member(incoming.owner)
69+
guard let incomingOwnerMember = incoming.membership.member(incoming.owner) else {
7070
return .init(causalRelation: causalRelation, effectiveChanges: [])
7171
}
7272
switch incomingGossipOwnerKnownLocally {
@@ -83,7 +83,7 @@ extension Cluster {
8383
// 1.2) Protect from zombies: Any nodes that we know are dead or down, we should not accept any information from
8484
let incomingConcurrentDownMembers = incoming.membership.members(atLeast: .down)
8585
for pruneFromIncomingBeforeMerge in incomingConcurrentDownMembers
86-
where self.membership.uniqueMember(pruneFromIncomingBeforeMerge.node) == nil
86+
where self.membership.member(pruneFromIncomingBeforeMerge.node) == nil
8787
{
8888
_ = incoming.pruneMember(pruneFromIncomingBeforeMerge)
8989
}

‎Sources/DistributedCluster/Cluster/NodeDeathWatcher.swift

+6-2
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,14 @@ enum NodeDeathWatcherShell {
203203
}
204204

205205
case .membershipChange(let change) where change.isAtLeast(.down):
206-
context.log.trace("Node down: \(change)!")
206+
context.log.trace("Node down: \(change)!", metadata: [
207+
"node": "\(reflecting: change.node)",
208+
])
207209
instance.handleAddressDown(change)
208210
case .membershipChange(let change):
209-
context.log.trace("Node change: \(change)!")
211+
context.log.trace("Node change: \(change)!", metadata: [
212+
"node": "\(reflecting: change.node)",
213+
])
210214
instance.onMembershipChanged(change)
211215

212216
default:

‎Sources/DistributedCluster/Cluster/SWIM/SWIMActor.swift

+7
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,13 @@ internal distributed actor SWIMActor: SWIMPeer, SWIMAddressablePeer, CustomStrin
355355
)
356356
}
357357

358+
/// If SWIM claims we are dead, ignore this; we should be informed about this in high-level gossip soon enough.
359+
if change.status == .dead,
360+
change.member.node.asClusterNode == self.id.node
361+
{
362+
return
363+
}
364+
358365
let reachability: Cluster.MemberReachability
359366
switch change.status {
360367
case .alive, .suspect:

‎Sources/DistributedCluster/ClusterSystem.swift

+1
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,7 @@ extension ClusterSystem {
10371037

10381038
log.trace("Actor ready, well-known as: \(wellKnownName)", metadata: [
10391039
"actor/id": "\(actor.id)",
1040+
"actor/type": "\(type(of: actor))",
10401041
])
10411042

10421043
self._managedWellKnownDistributedActors[wellKnownName] = actor

‎Sources/DistributedCluster/Docs.docc/Clustering.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ You can observe ``Cluster/Event``s emitted by `system.cluster.events` (``Cluster
8181

8282
There is also convenience APIs available on ``ClusterControl`` (`system.cluster`):
8383
- ``ClusterControl/joined(endpoint:within:)`` which allows you to suspend until a specific node becomes ``Cluster/MemberStatus/joining`` in the cluster membership, or
84-
- ``ClusterControl/waitFor(_:_:within:)-1xiqo`` which allows you to suspend until a node reaches a specific ``Cluster/MemberStatus``.
84+
- ``ClusterControl/waitFor(_:_:within:)-2aq7r`` which allows you to suspend until a node reaches a specific ``Cluster/MemberStatus``.
8585

8686
### Automatic Node Discovery
8787

‎Sources/MultiNodeTestKit/MultiNodeTestConductor.swift

-3
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,8 @@ extension MultiNodeTestConductor {
8686
checkPoint: MultiNode.Checkpoint,
8787
waitTime: Duration) async throws
8888
{
89-
self.actorSystem.log.warning("CHECKPOINT FROM [\(node)]")
9089
try await RemoteCall.with(timeout: waitTime) {
91-
self.actorSystem.log.warning("CHECKPOINT FROM [\(node)] INNER (\(__isRemoteActor(self)))")
9290
try await self._enterCheckPoint(node: node, checkPoint: checkPoint)
93-
self.actorSystem.log.warning("CHECKPOINT FROM [\(node)] DONE")
9491
}
9592
}
9693

0 commit comments

Comments
 (0)
Please sign in to comment.