Skip to content

Commit 2669a1c

Browse files
chore: add mutagen prompting gRPC (#118)
Relates to #63. The daemon requires this prompting communication channel be open during all requests.
1 parent 185a894 commit 2669a1c

7 files changed

+883
-19
lines changed

Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift

+11-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public protocol FileSyncDaemon: ObservableObject {
1919

2020
@MainActor
2121
public class MutagenDaemon: FileSyncDaemon {
22-
private let logger = Logger(subsystem: Bundle.main.bundleIdentifier!, category: "mutagen")
22+
let logger = Logger(subsystem: Bundle.main.bundleIdentifier!, category: "mutagen")
2323

2424
@Published public var state: DaemonState = .stopped {
2525
didSet {
@@ -42,9 +42,9 @@ public class MutagenDaemon: FileSyncDaemon {
4242
private let mutagenDaemonSocket: URL
4343

4444
// Non-nil when the daemon is running
45+
var client: DaemonClient?
4546
private var group: MultiThreadedEventLoopGroup?
4647
private var channel: GRPCChannel?
47-
private var client: DaemonClient?
4848

4949
// Protect start & stop transitions against re-entrancy
5050
private let transition = AsyncSemaphore(value: 1)
@@ -171,7 +171,8 @@ public class MutagenDaemon: FileSyncDaemon {
171171
)
172172
client = DaemonClient(
173173
mgmt: Daemon_DaemonAsyncClient(channel: channel!),
174-
sync: Synchronization_SynchronizationAsyncClient(channel: channel!)
174+
sync: Synchronization_SynchronizationAsyncClient(channel: channel!),
175+
prompt: Prompting_PromptingAsyncClient(channel: channel!)
175176
)
176177
logger.info(
177178
"Successfully connected to mutagen daemon, socket: \(self.mutagenDaemonSocket.path, privacy: .public)"
@@ -301,6 +302,7 @@ public class MutagenDaemon: FileSyncDaemon {
301302
struct DaemonClient {
302303
let mgmt: Daemon_DaemonAsyncClient
303304
let sync: Synchronization_SynchronizationAsyncClient
305+
let prompt: Prompting_PromptingAsyncClient
304306
}
305307

306308
public enum DaemonState {
@@ -342,6 +344,8 @@ public enum DaemonError: Error {
342344
case connectionFailure(Error)
343345
case terminatedUnexpectedly
344346
case grpcFailure(Error)
347+
case invalidGrpcResponse(String)
348+
case unexpectedStreamClosure
345349

346350
public var description: String {
347351
switch self {
@@ -355,6 +359,10 @@ public enum DaemonError: Error {
355359
"The daemon must be started first"
356360
case let .grpcFailure(error):
357361
"Failed to communicate with daemon: \(error)"
362+
case let .invalidGrpcResponse(response):
363+
"Invalid gRPC response: \(response)"
364+
case .unexpectedStreamClosure:
365+
"Unexpected stream closure"
358366
}
359367
}
360368

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import GRPC
2+
3+
extension MutagenDaemon {
4+
typealias PromptStream = GRPCAsyncBidirectionalStreamingCall<Prompting_HostRequest, Prompting_HostResponse>
5+
6+
func host(allowPrompts: Bool = true) async throws(DaemonError) -> (PromptStream, identifier: String) {
7+
let stream = client!.prompt.makeHostCall()
8+
9+
do {
10+
try await stream.requestStream.send(.with { req in req.allowPrompts = allowPrompts })
11+
} catch {
12+
throw .grpcFailure(error)
13+
}
14+
15+
// We can't make call `makeAsyncIterator` more than once
16+
// (as a for-loop would do implicitly)
17+
var iter = stream.responseStream.makeAsyncIterator()
18+
19+
let initResp: Prompting_HostResponse?
20+
do {
21+
initResp = try await iter.next()
22+
} catch {
23+
throw .grpcFailure(error)
24+
}
25+
guard let initResp else {
26+
throw .unexpectedStreamClosure
27+
}
28+
try initResp.ensureValid(first: true, allowPrompts: allowPrompts)
29+
30+
Task.detached(priority: .background) {
31+
do {
32+
while let msg = try await iter.next() {
33+
try msg.ensureValid(first: false, allowPrompts: allowPrompts)
34+
var reply: Prompting_HostRequest = .init()
35+
if msg.isPrompt {
36+
// Handle SSH key prompts
37+
if msg.message.contains("yes/no/[fingerprint]") {
38+
reply.response = "yes"
39+
}
40+
// Any other messages that require a non-empty response will
41+
// cause the create op to fail, showing an error. This is ok for now.
42+
}
43+
try await stream.requestStream.send(reply)
44+
}
45+
} catch let error as GRPCStatus where error.code == .cancelled {
46+
return
47+
} catch {
48+
self.logger.critical("Prompt stream failed: \(error)")
49+
}
50+
}
51+
return (stream, identifier: initResp.identifier)
52+
}
53+
}

Coder-Desktop/VPNLib/FileSync/MutagenConvert.swift

+23
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,26 @@ func accumulateErrors(from state: Synchronization_State) -> [FileSyncError] {
5757
func humanReadableBytes(_ bytes: UInt64) -> String {
5858
ByteCountFormatter().string(fromByteCount: Int64(bytes))
5959
}
60+
61+
extension Prompting_HostResponse {
62+
func ensureValid(first: Bool, allowPrompts: Bool) throws(DaemonError) {
63+
if first {
64+
if identifier.isEmpty {
65+
throw .invalidGrpcResponse("empty prompter identifier")
66+
}
67+
if isPrompt {
68+
throw .invalidGrpcResponse("unexpected message type specification")
69+
}
70+
if !message.isEmpty {
71+
throw .invalidGrpcResponse("unexpected message")
72+
}
73+
} else {
74+
if !identifier.isEmpty {
75+
throw .invalidGrpcResponse("unexpected prompter identifier")
76+
}
77+
if isPrompt, !allowPrompts {
78+
throw .invalidGrpcResponse("disallowed prompt message type")
79+
}
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)