14
14
15
15
#include " ray/object_manager/object_buffer_pool.h"
16
16
17
+ #include " absl/time/time.h"
17
18
#include " ray/common/status.h"
18
19
#include " ray/util/logging.h"
19
20
20
21
namespace ray {
21
22
22
23
ObjectBufferPool::ObjectBufferPool (const std::string &store_socket_name,
23
24
uint64_t chunk_size)
24
- : default_chunk_size_(chunk_size) {
25
- store_socket_name_ = store_socket_name;
25
+ : store_socket_name_(store_socket_name), default_chunk_size_(chunk_size) {
26
26
RAY_CHECK_OK (store_client_.Connect (store_socket_name_.c_str (), " " , 0 , 300 ));
27
27
}
28
28
29
29
ObjectBufferPool::~ObjectBufferPool () {
30
- // Abort everything in progress.
31
- auto create_buf_state_copy = create_buffer_state_;
32
- for (const auto &pair : create_buf_state_copy) {
33
- AbortCreate (pair.first );
30
+ absl::MutexLock lock (&pool_mutex_);
31
+ auto inflight_ops = create_buffer_ops_;
32
+ pool_mutex_.Unlock ();
33
+
34
+ for (const auto &[id, cond_var] : inflight_ops) {
35
+ cond_var->SignalAll ();
36
+ }
37
+ auto no_inflight = [this ]() {
38
+ pool_mutex_.AssertReaderHeld ();
39
+ return create_buffer_ops_.empty ();
40
+ };
41
+ // Assume no request would arrive, acquire pool_mutex_ when there is no inflight
42
+ // operation. Otherwise print an error.
43
+ if (!pool_mutex_.LockWhenWithTimeout (absl::Condition (&no_inflight), absl::Seconds (5 ))) {
44
+ RAY_LOG (ERROR)
45
+ << create_buffer_ops_.size () << " remaining inflight create buffer operations "
46
+ << " during ObjectBufferPool destruction. Either abort these operations before "
47
+ << " destroying ObjectBufferPool, or refactor ObjectBufferPool to make it "
48
+ " unnecessary to wait for the operations' completion." ;
34
49
}
50
+
51
+ // Abort unfinished buffers in progress.
52
+ for (auto it = create_buffer_state_.begin (); it != create_buffer_state_.end (); it++) {
53
+ RAY_CHECK_OK (store_client_.Release (it->first ));
54
+ RAY_CHECK_OK (store_client_.Abort (it->first ));
55
+ create_buffer_state_.erase (it);
56
+ }
57
+
35
58
RAY_CHECK (create_buffer_state_.empty ());
36
59
RAY_CHECK_OK (store_client_.Disconnect ());
37
60
}
38
61
39
- uint64_t ObjectBufferPool::GetNumChunks (uint64_t data_size) {
62
+ uint64_t ObjectBufferPool::GetNumChunks (uint64_t data_size) const {
40
63
return (data_size + default_chunk_size_ - 1 ) / default_chunk_size_;
41
64
}
42
65
43
- uint64_t ObjectBufferPool::GetBufferLength (uint64_t chunk_index, uint64_t data_size) {
66
+ uint64_t ObjectBufferPool::GetBufferLength (uint64_t chunk_index,
67
+ uint64_t data_size) const {
44
68
return (chunk_index + 1 ) * default_chunk_size_ > data_size
45
69
? data_size % default_chunk_size_
46
70
: default_chunk_size_;
@@ -49,7 +73,7 @@ uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_s
49
73
std::pair<std::shared_ptr<MemoryObjectReader>, ray::Status>
50
74
ObjectBufferPool::CreateObjectReader (const ObjectID &object_id,
51
75
rpc::Address owner_address) {
52
- std::lock_guard<std::mutex> lock (pool_mutex_);
76
+ absl::MutexLock lock (& pool_mutex_);
53
77
54
78
std::vector<ObjectID> object_ids{object_id};
55
79
std::vector<plasma::ObjectBuffer> object_buffers (1 );
@@ -76,53 +100,21 @@ ray::Status ObjectBufferPool::CreateChunk(const ObjectID &object_id,
76
100
const rpc::Address &owner_address,
77
101
uint64_t data_size, uint64_t metadata_size,
78
102
uint64_t chunk_index) {
79
- std::unique_lock<std::mutex> lock (pool_mutex_);
80
- if (create_buffer_state_.count (object_id) == 0 ) {
81
- int64_t object_size = data_size - metadata_size;
82
- // Try to create shared buffer.
83
- std::shared_ptr<Buffer> data;
84
-
85
- // Release the buffer pool lock during the blocking create call.
86
- lock.unlock ();
87
- Status s = store_client_.CreateAndSpillIfNeeded (
88
- object_id, owner_address, object_size, nullptr , metadata_size, &data,
89
- plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet);
90
- lock.lock ();
91
-
92
- // Another thread may have succeeded in creating the chunk while the lock
93
- // was released. In that case skip the remainder of the creation block.
94
- if (create_buffer_state_.count (object_id) == 0 ) {
95
- std::vector<boost::asio::mutable_buffer> buffer;
96
- if (!s.ok ()) {
97
- // Create failed. The object may already exist locally. If something else went
98
- // wrong, another chunk will succeed in creating the buffer, and this
99
- // chunk will eventually make it here via pull requests.
100
- return ray::Status::IOError (s.message ());
101
- }
102
- // Read object into store.
103
- uint8_t *mutable_data = data->Data ();
104
- uint64_t num_chunks = GetNumChunks (data_size);
105
- create_buffer_state_.emplace (
106
- std::piecewise_construct, std::forward_as_tuple (object_id),
107
- std::forward_as_tuple (BuildChunks (object_id, mutable_data, data_size, data)));
108
- RAY_LOG (DEBUG) << " Created object " << object_id
109
- << " in plasma store, number of chunks: " << num_chunks
110
- << " , chunk index: " << chunk_index;
111
- RAY_CHECK (create_buffer_state_[object_id].chunk_info .size () == num_chunks);
112
- }
113
- }
114
- if (create_buffer_state_[object_id].chunk_state [chunk_index] !=
115
- CreateChunkState::AVAILABLE) {
103
+ absl::MutexLock lock (&pool_mutex_);
104
+ RAY_RETURN_NOT_OK (EnsureBufferExists (object_id, owner_address, data_size, metadata_size,
105
+ chunk_index));
106
+ auto &state = create_buffer_state_.at (object_id);
107
+ if (state.chunk_state [chunk_index] != CreateChunkState::AVAILABLE) {
116
108
// There can be only one reference to this chunk at any given time.
117
109
return ray::Status::IOError (" Chunk already received by a different thread." );
118
110
}
119
- create_buffer_state_[object_id] .chunk_state [chunk_index] = CreateChunkState::REFERENCED;
111
+ state .chunk_state [chunk_index] = CreateChunkState::REFERENCED;
120
112
return ray::Status::OK ();
121
113
}
122
114
123
115
void ObjectBufferPool::WriteChunk (const ObjectID &object_id, const uint64_t chunk_index,
124
116
const std::string &data) {
125
- std::lock_guard<std::mutex> lock (pool_mutex_);
117
+ absl::MutexLock lock (& pool_mutex_);
126
118
auto it = create_buffer_state_.find (object_id);
127
119
if (it == create_buffer_state_.end () ||
128
120
it->second .chunk_state .at (chunk_index) != CreateChunkState::REFERENCED) {
@@ -148,7 +140,7 @@ void ObjectBufferPool::WriteChunk(const ObjectID &object_id, const uint64_t chun
148
140
}
149
141
150
142
void ObjectBufferPool::AbortCreate (const ObjectID &object_id) {
151
- std::lock_guard<std::mutex> lock (pool_mutex_);
143
+ absl::MutexLock lock (& pool_mutex_);
152
144
auto it = create_buffer_state_.find (object_id);
153
145
if (it != create_buffer_state_.end ()) {
154
146
RAY_LOG (INFO) << " Not enough memory to create requested object " << object_id
@@ -179,13 +171,84 @@ std::vector<ObjectBufferPool::ChunkInfo> ObjectBufferPool::BuildChunks(
179
171
return chunks;
180
172
}
181
173
174
+ ray::Status ObjectBufferPool::EnsureBufferExists (const ObjectID &object_id,
175
+ const rpc::Address &owner_address,
176
+ uint64_t data_size,
177
+ uint64_t metadata_size,
178
+ uint64_t chunk_index) {
179
+ while (true ) {
180
+ // Buffer for object_id already exists.
181
+ if (create_buffer_state_.contains (object_id)) {
182
+ return ray::Status::OK ();
183
+ }
184
+
185
+ auto it = create_buffer_ops_.find (object_id);
186
+ if (it == create_buffer_ops_.end ()) {
187
+ // No inflight create buffer operation, proceed to start one.
188
+ break ;
189
+ }
190
+
191
+ auto cond_var = it->second ;
192
+ // Release pool_mutex_ while waiting, until the current inflight create buffer
193
+ // operation finishes.
194
+ cond_var->Wait (&pool_mutex_);
195
+ }
196
+
197
+ // Indicate that there is an inflight create buffer operation, by inserting into
198
+ // create_buffer_ops_.
199
+ RAY_CHECK (
200
+ create_buffer_ops_.insert ({object_id, std::make_shared<absl::CondVar>()}).second );
201
+ const int64_t object_size =
202
+ static_cast <int64_t >(data_size) - static_cast <int64_t >(metadata_size);
203
+ std::shared_ptr<Buffer> data;
204
+
205
+ // Release pool_mutex_ during the blocking create call.
206
+ pool_mutex_.Unlock ();
207
+ Status s = store_client_.CreateAndSpillIfNeeded (
208
+ object_id, owner_address, static_cast <int64_t >(object_size), nullptr ,
209
+ static_cast <int64_t >(metadata_size), &data,
210
+ plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet);
211
+ pool_mutex_.Lock ();
212
+
213
+ // No other thread could have created the buffer.
214
+ RAY_CHECK (!create_buffer_state_.contains (object_id));
215
+
216
+ // Remove object_id from create_buffer_ops_ to indicate to the waiting ops that the
217
+ // inflight operation has finished. Wake up waiters so they can either start another
218
+ // create buffer op, or proceed after the buffer has been created.
219
+ {
220
+ auto it = create_buffer_ops_.find (object_id);
221
+ it->second ->SignalAll ();
222
+ create_buffer_ops_.erase (it);
223
+ }
224
+
225
+ if (!s.ok ()) {
226
+ // Create failed. Buffer creation will be tried by another chunk.
227
+ // And this chunk will eventually make it here via retried pull requests.
228
+ return ray::Status::IOError (s.message ());
229
+ }
230
+
231
+ // Read object into store.
232
+ uint8_t *mutable_data = data->Data ();
233
+ uint64_t num_chunks = GetNumChunks (data_size);
234
+ create_buffer_state_.emplace (
235
+ std::piecewise_construct, std::forward_as_tuple (object_id),
236
+ std::forward_as_tuple (BuildChunks (object_id, mutable_data, data_size, data)));
237
+ RAY_CHECK (create_buffer_state_[object_id].chunk_info .size () == num_chunks);
238
+ RAY_LOG (DEBUG) << " Created object " << object_id
239
+ << " in plasma store, number of chunks: " << num_chunks
240
+ << " , chunk index: " << chunk_index;
241
+
242
+ return ray::Status::OK ();
243
+ }
244
+
182
245
void ObjectBufferPool::FreeObjects (const std::vector<ObjectID> &object_ids) {
183
- std::lock_guard<std::mutex> lock (pool_mutex_);
246
+ absl::MutexLock lock (& pool_mutex_);
184
247
RAY_CHECK_OK (store_client_.Delete (object_ids));
185
248
}
186
249
187
250
std::string ObjectBufferPool::DebugString () const {
188
- std::lock_guard<std::mutex> lock (pool_mutex_);
251
+ absl::MutexLock lock (& pool_mutex_);
189
252
std::stringstream result;
190
253
result << " BufferPool:" ;
191
254
result << " \n - create buffer state map size: " << create_buffer_state_.size ();
0 commit comments