Skip to content

Commit 899d588

Browse files
authoredAug 10, 2024
Merge pull request #43 from ferdinand-beyer/master
Complete pending server-initiated request promises on default executor
2 parents 3e108c7 + cb24771 commit 899d588

File tree

6 files changed

+95
-9
lines changed

6 files changed

+95
-9
lines changed
 

‎.cljfmt.edn

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
:remove-consecutive-blank-lines? true
44
:insert-missing-whitespace? true
55
:align-associative? false
6-
:indents {#re "^(?!catch-kondo-errors).*" [[:block 0]]
7-
catch-kondo-errors [[:inner 0]]}
6+
:extra-indents {#re "^(?!catch-kondo-errors).*" [[:block 0]]
7+
catch-kondo-errors [[:inner 0]]}
88
:test-code
99
(comment
1010
(:require

‎CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## Unreleased
44

5+
- Add a `:response-executor` option to control on which thread responses to
6+
server-initiated requests are run, defaulting to Promesa's `:default`
7+
executor, i.e. `ForkJoinPool/commonPool`.
8+
59
## v1.10.0
610

711
- Add `textDocument/foldingRange` schemas.

‎README.md

+4
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ Alternatively, you can convert the request to a promesa promise, and handle it u
120120

121121
In this case `(promesa/cancel! request)` will send `$/cancelRequest`.
122122

123+
Response promises are completed on Promesa's `:default` executor. You
124+
can specify your own executor by passing the `:response-executor` option
125+
when creating your server instance.
126+
123127
### Start and stop a server
124128

125129
The last step is to start the server you created earlier. Use `lsp4clj.server/start`. This method accepts two arguments, the server and a "context".

‎deps.edn

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
cheshire/cheshire {:mvn/version "5.11.0"}
55
funcool/promesa {:mvn/version "10.0.594"}}
66
:paths ["src" "resources"]
7-
:aliases {:test {:extra-deps {lambdaisland/kaocha {:mvn/version "1.64.1010"}}
7+
:aliases {:dev {:extra-paths ["test"]}
8+
:test {:extra-deps {lambdaisland/kaocha {:mvn/version "1.64.1010"}}
89
:extra-paths ["test"]
910
:main-opts ["-m" "kaocha.runner"]}
1011
:build {:extra-paths ["resources"]

‎src/lsp4clj/server.clj

+20-6
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
[lsp4clj.protocols.endpoint :as protocols.endpoint]
1010
[lsp4clj.trace :as trace]
1111
[promesa.core :as p]
12+
[promesa.exec :as p.exec]
1213
[promesa.protocols :as p.protocols])
1314
(:import
1415
(java.util.concurrent CancellationException)))
@@ -95,7 +96,7 @@
9596
;; client. This cannot be `(-> (p/deferred) (p/catch))` because that returns
9697
;; a promise which, when cancelled, does nothing because there's no
9798
;; exception handler chained onto it. Instead, we must cancel the
98-
;; `(p/deffered)` promise itself.
99+
;; `(p/deferred)` promise itself.
99100
(p/catch p CancellationException
100101
(fn [_]
101102
(protocols.endpoint/send-notification server "$/cancelRequest" {:id id})))
@@ -196,6 +197,7 @@
196197
trace-ch
197198
tracer*
198199
^java.time.Clock clock
200+
response-executor
199201
on-close
200202
request-id*
201203
pending-sent-requests*
@@ -351,9 +353,19 @@
351353
(if-let [{:keys [p started] :as req} (get pending-requests id)]
352354
(do
353355
(trace this trace/received-response req resp started now)
354-
(if error
355-
(p/reject! p (ex-info "Received error response" resp))
356-
(p/resolve! p result)))
356+
;; Note that we are called from the server's pipeline, a core.async
357+
;; go-loop, and therefore must not block. Callbacks of the pending
358+
;; request's promise (`p`) will be executed in the completing
359+
;; thread, whatever that thread is. Since the callbacks are not
360+
;; under our control, they are under our users' control, they could
361+
;; block. Therefore, we do not want the completing thread to be our
362+
;; thread. This is very easy for users to miss, therefore we
363+
;; complete the promise using an explicit executor.
364+
(p.exec/submit! response-executor
365+
(fn []
366+
(if error
367+
(p/reject! p (ex-info "Received error response" resp))
368+
(p/resolve! p result)))))
357369
(trace this trace/received-unmatched-response resp now)))
358370
(catch Throwable e
359371
(log-error-receiving this e resp))))
@@ -410,9 +422,10 @@
410422
(update server :tracer* reset! (trace/tracer-for-level trace-level)))
411423

412424
(defn chan-server
413-
[{:keys [output-ch input-ch log-ch trace? trace-level trace-ch clock on-close]
425+
[{:keys [output-ch input-ch log-ch trace? trace-level trace-ch clock on-close response-executor]
414426
:or {clock (java.time.Clock/systemDefaultZone)
415-
on-close (constantly nil)}}]
427+
on-close (constantly nil)
428+
response-executor :default}}]
416429
(let [;; before defaulting trace-ch, so that default is "off"
417430
tracer (trace/tracer-for-level (or trace-level
418431
(when (or trace? trace-ch) "verbose")
@@ -427,6 +440,7 @@
427440
:tracer* (atom tracer)
428441
:clock clock
429442
:on-close on-close
443+
:response-executor response-executor
430444
:request-id* (atom 0)
431445
:pending-sent-requests* (atom {})
432446
:pending-received-requests* (atom {})

‎test/lsp4clj/server_test.clj

+63
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,69 @@
481481
(h/assert-take output-ch)))
482482
(server/shutdown server))))
483483

484+
(defn- core-async-dispatch-thread? [^Thread thread]
485+
(re-matches #"async-dispatch-\d+" (.getName thread)))
486+
487+
(deftest can-determine-core-async-dispatch-thread
488+
(testing "current thread"
489+
(is (not (core-async-dispatch-thread? (Thread/currentThread)))))
490+
(testing "thread running go blocks"
491+
(let [thread (async/<!! (async/go (Thread/currentThread)))]
492+
(is (core-async-dispatch-thread? thread))))
493+
(testing "thread running core.async thread macro"
494+
(let [thread (async/<!! (async/thread (Thread/currentThread)))]
495+
(is (not (core-async-dispatch-thread? thread))))))
496+
497+
(deftest request-should-complete-on-a-suitable-executor
498+
(testing "successful completion"
499+
(let [input-ch (async/chan 3)
500+
output-ch (async/chan 3)
501+
server (server/chan-server {:output-ch output-ch
502+
:input-ch input-ch})
503+
_ (server/start server nil)
504+
thread-p (-> (server/send-request server "req" {:body "foo"})
505+
(p/then (fn [_] (Thread/currentThread))))
506+
client-rcvd-msg (h/assert-take output-ch)
507+
_ (async/put! input-ch (lsp.responses/response (:id client-rcvd-msg) {:result "good"}))
508+
thread (deref thread-p 100 nil)]
509+
(is (not (core-async-dispatch-thread? thread)))
510+
(is (instance? java.util.concurrent.ForkJoinWorkerThread thread)
511+
"completes on default ForkJoinPool executor")
512+
(server/shutdown server)))
513+
(testing "exceptional completion"
514+
(let [input-ch (async/chan 3)
515+
output-ch (async/chan 3)
516+
server (server/chan-server {:output-ch output-ch
517+
:input-ch input-ch})
518+
_ (server/start server nil)
519+
thread-p (-> (server/send-request server "req" {:body "foo"})
520+
(p/catch (fn [_] (Thread/currentThread))))
521+
client-rcvd-msg (h/assert-take output-ch)
522+
_ (async/put! input-ch
523+
(-> (lsp.responses/response (:id client-rcvd-msg))
524+
(lsp.responses/error {:code 1234
525+
:message "Something bad"
526+
:data {:body "foo"}})))
527+
thread (deref thread-p 100 nil)]
528+
(is (not (core-async-dispatch-thread? thread)))
529+
(is (instance? java.util.concurrent.ForkJoinWorkerThread thread)
530+
"completes on default ForkJoinPool executor")
531+
(server/shutdown server)))
532+
(testing "completion with :current-thread executor for legacy behavior"
533+
(let [input-ch (async/chan 3)
534+
output-ch (async/chan 3)
535+
server (server/chan-server {:output-ch output-ch
536+
:input-ch input-ch
537+
:response-executor :current-thread})
538+
_ (server/start server nil)
539+
thread-p (-> (server/send-request server "req" {:body "foo"})
540+
(p/then (fn [_] (Thread/currentThread))))
541+
client-rcvd-msg (h/assert-take output-ch)
542+
_ (async/put! input-ch (lsp.responses/response (:id client-rcvd-msg) {:result "good"}))
543+
thread (deref thread-p 100 nil)]
544+
(is (core-async-dispatch-thread? thread) "completes on core.async dispatch thread")
545+
(server/shutdown server))))
546+
484547
(def fixed-clock
485548
(-> (java.time.LocalDateTime/of 2022 03 05 13 35 23 0)
486549
(.toInstant java.time.ZoneOffset/UTC)

0 commit comments

Comments
 (0)