-
Notifications
You must be signed in to change notification settings - Fork 192
/
Copy pathbase_server.rb
199 lines (173 loc) · 7.15 KB
/
base_server.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# typed: strict
# frozen_string_literal: true
module RubyLsp
class BaseServer
extend T::Sig
extend T::Helpers
abstract!
sig { params(options: T.untyped).void }
def initialize(**options)
@test_mode = T.let(options[:test_mode], T.nilable(T::Boolean))
@setup_error = T.let(options[:setup_error], T.nilable(StandardError))
@install_error = T.let(options[:install_error], T.nilable(StandardError))
@writer = T.let(Transport::Stdio::Writer.new, Transport::Stdio::Writer)
@reader = T.let(Transport::Stdio::Reader.new, Transport::Stdio::Reader)
@incoming_queue = T.let(Thread::Queue.new, Thread::Queue)
@outgoing_queue = T.let(Thread::Queue.new, Thread::Queue)
@cancelled_requests = T.let([], T::Array[Integer])
@mutex = T.let(Mutex.new, Mutex)
@worker = T.let(new_worker, Thread)
@current_request_id = T.let(1, Integer)
@store = T.let(Store.new, Store)
@outgoing_dispatcher = T.let(
Thread.new do
unless @test_mode
while (message = @outgoing_queue.pop)
@mutex.synchronize { @writer.write(message.to_hash) }
end
end
end,
Thread,
)
@global_state = T.let(GlobalState.new, GlobalState)
Thread.main.priority = 1
# We read the initialize request in `exe/ruby-lsp` to be able to determine the workspace URI where Bundler should
# be set up
initialize_request = options[:initialize_request]
process_message(initialize_request) if initialize_request
end
sig { void }
def start
@reader.read do |message|
method = message[:method]
# We must parse the document under a mutex lock or else we might switch threads and accept text edits in the
# source. Altering the source reference during parsing will put the parser in an invalid internal state, since
# it started parsing with one source but then it changed in the middle. We don't want to do this for text
# synchronization notifications
@mutex.synchronize do
uri = message.dig(:params, :textDocument, :uri)
if uri
begin
parsed_uri = URI(uri)
parsed_uri = @global_state.to_internal_uri(parsed_uri)
message[:params][:textDocument][:uri] = parsed_uri
# We don't want to try to parse documents on text synchronization notifications
unless method.start_with?("textDocument/did")
document = @store.get(parsed_uri)
# If the client supports request delegation and we're working with an ERB document and there was
# something to parse, then we have to maintain the client updated about the virtual state of the host
# language source
if document.parse! && @global_state.client_capabilities.supports_request_delegation &&
document.is_a?(ERBDocument)
send_message(
Notification.new(
method: "delegate/textDocument/virtualState",
params: {
textDocument: {
uri: uri,
text: document.host_language_source,
},
},
),
)
end
end
rescue Store::NonExistingDocumentError
# If we receive a request for a file that no longer exists, we don't want to fail
end
end
end
# The following requests need to be executed in the main thread directly to avoid concurrency issues. Everything
# else is pushed into the incoming queue
case method
when "initialize", "initialized", "textDocument/didOpen", "textDocument/didClose", "textDocument/didChange",
"$/cancelRequest"
process_message(message)
when "shutdown"
@mutex.synchronize do
send_log_message("Shutting down Ruby LSP...")
shutdown
run_shutdown
@writer.write(Result.new(id: message[:id], response: nil).to_hash)
end
when "exit"
@mutex.synchronize do
status = @incoming_queue.closed? ? 0 : 1
send_log_message("Shutdown complete with status #{status}")
exit(status)
end
else
@incoming_queue << message
end
end
end
sig { void }
def run_shutdown
@incoming_queue.clear
@outgoing_queue.clear
@incoming_queue.close
@outgoing_queue.close
@cancelled_requests.clear
@worker.join
@outgoing_dispatcher.join
@store.clear
end
# This method is only intended to be used in tests! Pops the latest response that would be sent to the client
sig { returns(T.untyped) }
def pop_response
@outgoing_queue.pop
end
# This method is only intended to be used in tests! Pushes a message to the incoming queue directly
sig { params(message: T::Hash[Symbol, T.untyped]).void }
def push_message(message)
@incoming_queue << message
end
sig { abstract.params(message: T::Hash[Symbol, T.untyped]).void }
def process_message(message); end
sig { abstract.void }
def shutdown; end
sig { params(id: Integer, message: String, type: Integer).void }
def fail_request_and_notify(id, message, type: Constant::MessageType::INFO)
send_message(Error.new(id: id, code: Constant::ErrorCodes::REQUEST_FAILED, message: message))
send_message(Notification.window_show_message(message, type: type))
end
sig { returns(Thread) }
def new_worker
Thread.new do
while (message = T.let(@incoming_queue.pop, T.nilable(T::Hash[Symbol, T.untyped])))
id = message[:id]
# Check if the request was cancelled before trying to process it
@mutex.synchronize do
if id && @cancelled_requests.include?(id)
send_message(Error.new(
id: id,
code: Constant::ErrorCodes::REQUEST_CANCELLED,
message: "Request #{id} was cancelled",
))
@cancelled_requests.delete(id)
next
end
end
process_message(message)
end
end
end
sig { params(message: T.any(Result, Error, Notification, Request)).void }
def send_message(message)
# When we're shutting down the server, there's a small race condition between closing the thread queues and
# finishing remaining requests. We may close the queue in the middle of processing a request, which will then fail
# when trying to send a response back
return if @outgoing_queue.closed?
@outgoing_queue << message
@current_request_id += 1 if message.is_a?(Request)
end
sig { params(id: Integer).void }
def send_empty_response(id)
send_message(Result.new(id: id, response: nil))
end
sig { params(message: String, type: Integer).void }
def send_log_message(message, type: Constant::MessageType::LOG)
send_message(Notification.window_log_message(message, type: Constant::MessageType::LOG))
end
end
end