Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed NIOAsyncChannel.executeThenClose to accept actor-isolated par… #3121

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 129 additions & 2 deletions Sources/NIOCore/AsyncChannel/AsyncChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
/// - Important: After this method returned the underlying ``Channel`` will be closed.
///
/// - Parameter body: A closure that gets scoped access to the inbound and outbound.
@_disfavoredOverload
public func executeThenClose<Result>(
_ body: (_ inbound: NIOAsyncChannelInboundStream<Inbound>, _ outbound: NIOAsyncChannelOutboundWriter<Outbound>)
async throws -> Result
Expand All @@ -301,6 +302,53 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
}
}

self._outbound.finish()
// We ignore errors from close, since all we care about is that the channel has been closed
// at this point.
self.channel.close(promise: nil)
// `closeFuture` should never be failed, so we could ignore the error. However, do an
// assertionFailure to guide bad Channel implementations that are incorrectly failing this
// future to stop failing it.
do {
try await self.channel.closeFuture.get()
} catch {
assertionFailure(
"""
The channel's closeFuture should never be failed, but it was failed with error: \(error).
This is an error in the channel's implementation.
Refer to `Channel/closeFuture`'s documentation for more information.
"""
)
}
return result
}

#if compiler(>=6.0)
/// Provides scoped access to the inbound and outbound side of the underlying ``Channel``.
///
/// - Important: After this method returned the underlying ``Channel`` will be closed.
///
/// - Parameters:
/// - actor: actor where this function should be isolated to
/// - body: A closure that gets scoped access to the inbound and outbound.
public func executeThenClose<Result>(
isolation actor: isolated (any Actor)? = #isolation,
_ body: (_ inbound: NIOAsyncChannelInboundStream<Inbound>, _ outbound: NIOAsyncChannelOutboundWriter<Outbound>)
async throws -> sending Result
) async throws -> sending Result {
let result: Result
do {
result = try await body(self._inbound, self._outbound)
} catch let bodyError {
do {
self._outbound.finish()
try await self.channel.close().get()
throw bodyError
} catch {
throw bodyError
}
}

self._outbound.finish()
// We ignore errors from close, since all we care about is that the channel has been closed
// at this point.
Expand All @@ -322,6 +370,7 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {

return result
}
#endif
}

// swift-format-ignore: AmbiguousTrailingClosureOverload
Expand All @@ -332,13 +381,91 @@ extension NIOAsyncChannel {
/// - Important: After this method returned the underlying ``Channel`` will be closed.
///
/// - Parameter body: A closure that gets scoped access to the inbound.
@_disfavoredOverload
public func executeThenClose<Result>(
_ body: (_ inbound: NIOAsyncChannelInboundStream<Inbound>) async throws -> Result
) async throws -> Result where Outbound == Never {
try await self.executeThenClose { inbound, _ in
try await body(inbound)
let result: Result
do {
result = try await body(self._inbound)
} catch let bodyError {
do {
self._outbound.finish()
try await self.channel.close().get()
throw bodyError
} catch {
throw bodyError
}
}

self._outbound.finish()
// We ignore errors from close, since all we care about is that the channel has been closed
// at this point.
self.channel.close(promise: nil)
// `closeFuture` should never be failed, so we could ignore the error. However, do an
// assertionFailure to guide bad Channel implementations that are incorrectly failing this
// future to stop failing it.
do {
try await self.channel.closeFuture.get()
} catch {
assertionFailure(
"""
The channel's closeFuture should never be failed, but it was failed with error: \(error).
This is an error in the channel's implementation.
Refer to `Channel/closeFuture`'s documentation for more information.
"""
)
}
return result
}

#if compiler(>=6.0)
/// Provides scoped access to the inbound side of the underlying ``Channel``.
///
/// - Important: After this method returned the underlying ``Channel`` will be closed.
///
/// - Parameters:
/// - actor: actor where this function should be isolated to
/// - body: A closure that gets scoped access to the inbound.
public func executeThenClose<Result>(
isolation actor: isolated (any Actor)? = #isolation,
_ body: (_ inbound: NIOAsyncChannelInboundStream<Inbound>) async throws -> sending Result
) async throws -> sending Result where Outbound == Never {
let result: Result
do {
result = try await body(self._inbound)
} catch let bodyError {
do {
self._outbound.finish()
try await self.channel.close().get()
throw bodyError
} catch {
throw bodyError
}
}

self._outbound.finish()
// We ignore errors from close, since all we care about is that the channel has been closed
// at this point.
self.channel.close(promise: nil)
// `closeFuture` should never be failed, so we could ignore the error. However, do an
// assertionFailure to guide bad Channel implementations that are incorrectly failing this
// future to stop failing it.
do {
try await self.channel.closeFuture.get()
} catch {
assertionFailure(
"""
The channel's closeFuture should never be failed, but it was failed with error: \(error).
This is an error in the channel's implementation.
Refer to `Channel/closeFuture`'s documentation for more information.
"""
)
}

return result
}
#endif
}

extension Channel {
Expand Down
143 changes: 143 additions & 0 deletions Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,149 @@ final class AsyncChannelTests: XCTestCase {
XCTAssertNil(secondRead)
}
}

#if compiler(>=6.0)
func testExecuteThenCloseFromActor() async throws {
final actor TestActor {
func test() async throws {
let channel = NIOAsyncTestingChannel()
let wrapped = try await channel.testingEventLoop.executeInContext {
try NIOAsyncChannel<String, String>(wrappingChannelSynchronously: channel)
}

try await wrapped.executeThenClose { inbound, outbound in
var iterator = inbound.makeAsyncIterator()
try await channel.writeInbound("hello")
let firstRead = try await iterator.next()
XCTAssertEqual(firstRead, "hello")

try await outbound.write("world")
let write = try await channel.waitForOutboundWrite(as: String.self)
XCTAssertEqual(write, "world")

try await channel.testingEventLoop.executeInContext {
channel.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
}

let secondRead = try await iterator.next()
XCTAssertNil(secondRead)
}
}
}

let actor = TestActor()
try await actor.test()
}

func testExecuteThenCloseFromActorReferencingSelf() async throws {
final actor TestActor {
let hello = "hello"
let world = "world"

func test() async throws {
let channel = NIOAsyncTestingChannel()
let wrapped = try await channel.testingEventLoop.executeInContext {
try NIOAsyncChannel<String, String>(wrappingChannelSynchronously: channel)
}

try await wrapped.executeThenClose { inbound, outbound in
var iterator = inbound.makeAsyncIterator()
try await channel.writeInbound(self.hello)
let firstRead = try await iterator.next()
XCTAssertEqual(firstRead, "hello")

try await outbound.write(self.world)
let write = try await channel.waitForOutboundWrite(as: String.self)
XCTAssertEqual(write, "world")

try await channel.testingEventLoop.executeInContext {
channel.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
}

let secondRead = try await iterator.next()
XCTAssertNil(secondRead)
}
}
}

let actor = TestActor()
try await actor.test()
}

func testExecuteThenCloseInboundChannelFromActor() async throws {
final actor TestActor {
func test() async throws {
let channel = NIOAsyncTestingChannel()
let wrapped = try await channel.testingEventLoop.executeInContext {
try NIOAsyncChannel<String, Never>(wrappingChannelSynchronously: channel)
}

try await wrapped.executeThenClose { inbound in
var iterator = inbound.makeAsyncIterator()
try await channel.writeInbound("hello")
let firstRead = try await iterator.next()
XCTAssertEqual(firstRead, "hello")

try await channel.testingEventLoop.executeInContext {
channel.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
}

let secondRead = try await iterator.next()
XCTAssertNil(secondRead)
}
}
}

let actor = TestActor()
try await actor.test()
}

func testExecuteThenCloseNonSendableResultCross() async throws {
final class NonSendable {
var someMutableState: Int = 5
}

final actor TestActor {
func test() async throws -> sending NonSendable {
let channel = NIOAsyncTestingChannel()
let wrapped = try await channel.testingEventLoop.executeInContext {
try NIOAsyncChannel<String, String>(wrappingChannelSynchronously: channel)
}

return try await wrapped.executeThenClose { _, _ in
NonSendable()
}
}
}

let actor = TestActor()
let r = try await actor.test()
XCTAssertEqual(r.someMutableState, 5)
}

func testExecuteThenCloseInboundNonSendableResultCross() async throws {
final class NonSendable {
var someMutableState: Int = 5
}

final actor TestActor {
func test() async throws -> sending NonSendable {
let channel = NIOAsyncTestingChannel()
let wrapped = try await channel.testingEventLoop.executeInContext {
try NIOAsyncChannel<String, Never>(wrappingChannelSynchronously: channel)
}

return try await wrapped.executeThenClose { _ in
NonSendable()
}
}
}

let actor = TestActor()
let r = try await actor.test()
XCTAssertEqual(r.someMutableState, 5)
}
#endif
}

// This is unchecked Sendable since we only call this in the testing eventloop
Expand Down
Loading