Skip to content

Commit

Permalink
Merge pull request #3 from MihaelIsaev/race-fix
Browse files Browse the repository at this point in the history
🏎 Implement `knownEventLoop` to fix race condition
  • Loading branch information
MihaelIsaev authored Jun 28, 2021
2 parents 4910ff4 + 4781d86 commit 0438b55
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 33 deletions.
22 changes: 22 additions & 0 deletions Sources/WS/Models/Configurator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,26 @@ public struct Configurator {
application.storage[DefaultDecoderKey.self] = newValue
}
}

// MARK: - Default Decoder

struct KnownEventLoopKey: StorageKey {
typealias Value = EventLoop
}

/// Default encoder for all the observers, if `nil` then `JSONEncoder` is used.
public var knownEventLoop: EventLoop {
get {
if let eventLoop = application.storage[KnownEventLoopKey.self] {
return eventLoop
} else {
let eventLoop = application.eventLoopGroup.next()
application.storage[KnownEventLoopKey.self] = eventLoop
return eventLoop
}
}
nonmutating set {
application.storage[KnownEventLoopKey.self] = newValue
}
}
}
2 changes: 1 addition & 1 deletion Sources/WS/Protocols/AnyClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal protocol _AnyClient: AnyClient, _Disconnectable, _Subscribable, _Sendab
}

extension AnyClient {
public var eventLoop: EventLoop { application.eventLoopGroup.next() }
public var eventLoop: EventLoop { application.ws.knownEventLoop }
public var logger: Logger { application.logger }

/// See `Broadcastable`
Expand Down
2 changes: 1 addition & 1 deletion Sources/WS/Protocols/AnyObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ internal protocol _AnyObserver: AnyObserver, _Disconnectable, _Sendable {
// MARK: - Default implementation

extension AnyObserver {
public var eventLoop: EventLoop { application.eventLoopGroup.next() }
public var eventLoop: EventLoop { application.ws.knownEventLoop }

var _encoder: Encoder {
if let encoder = self.encoder {
Expand Down
64 changes: 37 additions & 27 deletions Sources/WS/Protocols/Subscribable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ import NIOWebSocket

public protocol Subscribable {
@discardableResult
func subscribe(to channels: String...) -> EventLoopFuture<Void>
func subscribe(to channels: String..., on eventLoop: EventLoop) -> EventLoopFuture<Void>
@discardableResult
func subscribe(to channels: [String]) -> EventLoopFuture<Void>
func subscribe(to channels: [String], on eventLoop: EventLoop) -> EventLoopFuture<Void>
@discardableResult
func unsubscribe(from channels: String...) -> EventLoopFuture<Void>
func unsubscribe(from channels: String..., on eventLoop: EventLoop) -> EventLoopFuture<Void>
@discardableResult
func unsubscribe(from channels: [String]) -> EventLoopFuture<Void>
func unsubscribe(from channels: [String], on eventLoop: EventLoop) -> EventLoopFuture<Void>
}

extension Subscribable {
public func subscribe(to channels: String...) -> EventLoopFuture<Void> {
subscribe(to: channels)
public func subscribe(to channels: String..., on eventLoop: EventLoop) -> EventLoopFuture<Void> {
subscribe(to: channels, on: eventLoop)
}

public func unsubscribe(from channels: String...) -> EventLoopFuture<Void> {
unsubscribe(from: channels)
public func unsubscribe(from channels: String..., on eventLoop: EventLoop) -> EventLoopFuture<Void> {
unsubscribe(from: channels, on: eventLoop)
}
}

Expand All @@ -28,41 +28,51 @@ internal protocol _Subscribable: class, Subscribable {
}

extension _Subscribable {
public func subscribe(to channels: String...) -> EventLoopFuture<Void> {
subscribe(to: channels)
public func subscribe(to channels: String..., on eventLoop: EventLoop) -> EventLoopFuture<Void> {
subscribe(to: channels, on: eventLoop)
}

public func subscribe(to channels: [String]) -> EventLoopFuture<Void> {
channels.forEach { channel in
self.clients.forEach {
$0.channels.insert(channel)
public func subscribe(to channels: [String], on eventLoop: EventLoop) -> EventLoopFuture<Void> {
self.eventLoop.submit {
channels.forEach { channel in
self.clients.forEach {
$0.channels.insert(channel)
}
}
}
return eventLoop.future()
}.hop(to: eventLoop)
}

public func unsubscribe(from channels: String...) -> EventLoopFuture<Void> {
unsubscribe(from: channels)
public func unsubscribe(from channels: String..., on eventLoop: EventLoop) -> EventLoopFuture<Void> {
unsubscribe(from: channels, on: eventLoop)
}

public func unsubscribe(from channels: [String]) -> EventLoopFuture<Void> {
channels.forEach { channel in
self.clients.forEach {
$0.channels.remove(channel)
public func unsubscribe(from channels: [String], on eventLoop: EventLoop) -> EventLoopFuture<Void> {
self.eventLoop.submit {
channels.forEach { channel in
self.clients.forEach {
$0.channels.remove(channel)
}
}
}
return eventLoop.future()
}.hop(to: eventLoop)
}
}

// MARK: - EventLoopFuture

extension EventLoopFuture: Subscribable where Value: Subscribable {
extension EventLoopFuture where Value: Subscribable {
public func subscribe(to channels: String...) -> EventLoopFuture<Void> {
subscribe(to: channels)
}

public func subscribe(to channels: [String]) -> EventLoopFuture<Void> {
flatMap { $0.subscribe(to: channels) }
flatMap { $0.subscribe(to: channels, on: self.eventLoop) }
}

public func unsubscribe(from channels: String...) -> EventLoopFuture<Void> {
unsubscribe(from: channels)
}

public func unsubscribe(from channels: [String]) -> EventLoopFuture<Void> {
flatMap { $0.unsubscribe(from: channels) }
flatMap { $0.unsubscribe(from: channels, on: self.eventLoop) }
}
}
8 changes: 4 additions & 4 deletions Sources/WS/Utilities/Broadcaster.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,16 @@ public class Broadcaster: Disconnectable, _Disconnectable, Sendable, _Sendable,
/// See `Subscribable`

/// Subscribe filtered clients to channels
public func subscribe(to channels: [String]) -> EventLoopFuture<Void> {
public func subscribe(to channels: [String], on eventLoop: EventLoop) -> EventLoopFuture<Void> {
clients.map {
$0.subscribe(to: channels)
$0.subscribe(to: channels, on: eventLoop)
}.flatten(on: eventLoop)
}

/// Unsubscribe filtered clients from channels
public func unsubscribe(from channels: [String]) -> EventLoopFuture<Void> {
public func unsubscribe(from channels: [String], on eventLoop: EventLoop) -> EventLoopFuture<Void> {
clients.map {
$0.unsubscribe(from: channels)
$0.unsubscribe(from: channels, on: eventLoop)
}.flatten(on: eventLoop)
}

Expand Down

0 comments on commit 0438b55

Please sign in to comment.