Skip to content

Commit fa1bd11

Browse files
committed
feat: add close, isclosed and read methods to Chan object
1 parent 35f05b5 commit fa1bd11

File tree

3 files changed

+111
-13
lines changed

3 files changed

+111
-13
lines changed

argon/vm/areval.cpp

+12-4
Original file line numberDiff line numberDiff line change
@@ -1402,8 +1402,12 @@ ArObject *argon::vm::Eval(Fiber *fiber) {
14021402
break;
14031403
}
14041404

1405-
if (!ChanRead((Chan *) ret, &ret))
1406-
return nullptr;
1405+
if (!ChanRead((Chan *) ret, &ret)) {
1406+
if (GetFiberStatus() != FiberStatus::RUNNING)
1407+
return nullptr;
1408+
1409+
break;
1410+
}
14071411

14081412
TOP_REPLACE(ret);
14091413
DISPATCH1();
@@ -1430,8 +1434,12 @@ ArObject *argon::vm::Eval(Fiber *fiber) {
14301434
break;
14311435
}
14321436

1433-
if (!ChanWrite((Chan *) ret, PEEK1()))
1434-
return nullptr;
1437+
if (!ChanWrite((Chan *) ret, PEEK1())) {
1438+
if (GetFiberStatus() != FiberStatus::RUNNING)
1439+
return nullptr;
1440+
1441+
break;
1442+
}
14351443

14361444
// POP only Chan, leave value on stack!
14371445
POP();

argon/vm/datatype/chan.cpp

+92-6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <argon/vm/datatype/error.h>
1111
#include <argon/vm/datatype/nil.h>
1212
#include <argon/vm/datatype/pcheck.h>
13+
#include <argon/vm/datatype/tuple.h>
1314

1415
#include <argon/vm/datatype/chan.h>
1516

@@ -22,20 +23,44 @@ ARGON_FUNCTION(chan_chan, Chan,
2223
"\n"
2324
"- KWParameters:\n"
2425
" - backlog: Set the size of the backlog.\n"
26+
" - defval: Sets the value to be returned when a read operation is performed on a closed channel."
2527
"- returns: New Chan object.\n",
2628
nullptr, false, true) {
27-
IntegerUnderlying backlog;
29+
ArObject *defval;
30+
31+
IntegerUnderlying backlog;
2832

2933
if (!KParamLookupInt((Dict *) kwargs, "backlog", &backlog, 1))
3034
return nullptr;
3135

32-
if(backlog<0){
33-
ErrorFormat(kValueError[0],"backlog value cannot be negative");
36+
if (!KParamLookup((Dict *) kwargs, "defval", nullptr, &defval, nullptr, false))
37+
return nullptr;
38+
39+
if (backlog < 0) {
40+
ErrorFormat(kValueError[0], "backlog value cannot be negative");
3441

3542
return nullptr;
3643
}
3744

38-
return (ArObject *) ChanNew(backlog);
45+
return (ArObject *) ChanNew(defval, backlog);
46+
}
47+
48+
ARGON_METHOD(chan_close, close,
49+
"Close this channel.\n"
50+
"\n"
51+
"The sender should be the only one to close the channel; multiple closures of a channel are considered a non-op.\n"
52+
"\n",
53+
nullptr, false, false) {
54+
auto self = (Chan *) _self;
55+
56+
std::unique_lock _(self->lock);
57+
58+
self->close = true;
59+
60+
self->r_queue.NotifyAll();
61+
self->w_queue.NotifyAll();
62+
63+
return (ArObject *) IncRef(Nil);
3964
}
4065

4166
ARGON_METHOD(chan_flush, flush,
@@ -58,10 +83,48 @@ ARGON_METHOD(chan_flush, flush,
5883
return (ArObject *) IncRef(Nil);
5984
}
6085

86+
ARGON_METHOD(chan_isclosed, isclosed,
87+
"Test if this channel is closed.\n"
88+
"\n"
89+
"- Returns: True if channel is closed, false otherwise.\n",
90+
nullptr, false, false) {
91+
auto self = (Chan *) _self;
92+
return (ArObject *) BoolToArBool(self->close);
93+
}
94+
95+
ARGON_METHOD(chan_read, read,
96+
"Read data from channel.\n"
97+
"\n"
98+
"- Returns: Tuple containing the value and a state indicating whether the value is reliable or not. If the state is false, "
99+
"the channel is closed, and the read value is invalid.",
100+
nullptr, false, false) {
101+
auto self = (Chan *) _self;
102+
103+
ArObject *value;
104+
Tuple *ret;
105+
106+
std::unique_lock _(self->lock);
107+
108+
if (self->close && self->count == 0)
109+
return (ArObject *) TupleNew("ob", self->defval, false);
110+
111+
if (!ChanRead(self, &value))
112+
return nullptr;
113+
114+
ret = TupleNew("ob", value, true);
115+
116+
Release(value);
117+
118+
return (ArObject *) ret;
119+
}
120+
61121
const FunctionDef chan_methods[] = {
62122
chan_chan,
63123

124+
chan_close,
64125
chan_flush,
126+
chan_isclosed,
127+
chan_read,
65128
ARGON_METHOD_SENTINEL
66129
};
67130

@@ -82,7 +145,8 @@ ArObject *chan_compare(const ArObject *self, const ArObject *other, CompareMode
82145
}
83146

84147
ArObject *chan_repr(const Chan *self) {
85-
return (ArObject *) StringFormat("<%s -- backlog: %d, count: %d>", type_chan_->name, self->length, self->count);
148+
return (ArObject *) StringFormat("<%s -- backlog: %d, count: %d, closed: %s>", type_chan_->name, self->length,
149+
self->count, self->close ? "true" : "false");
86150
}
87151

88152
bool chan_dtor(Chan *self) {
@@ -149,6 +213,19 @@ bool argon::vm::datatype::ChanRead(Chan *chan, ArObject **out_value) {
149213
std::unique_lock _(chan->lock);
150214

151215
if (chan->count == 0) {
216+
if (chan->close) {
217+
218+
if (chan->defval == nullptr) {
219+
ErrorFormat(kRuntimeError[0], "read from closed channel");
220+
221+
return false;
222+
}
223+
224+
*out_value = IncRef(chan->defval);
225+
226+
return true;
227+
}
228+
152229
*out_value = nullptr;
153230

154231
chan->r_queue.Wait(FiberStatus::BLOCKED_SUSPENDED);
@@ -170,6 +247,11 @@ bool argon::vm::datatype::ChanRead(Chan *chan, ArObject **out_value) {
170247
bool argon::vm::datatype::ChanWrite(Chan *chan, ArObject *value) {
171248
std::unique_lock _(chan->lock);
172249

250+
if (chan->close) {
251+
ErrorFormat(kRuntimeError[0], "write on closed channel");
252+
return false;
253+
}
254+
173255
if (chan->write == chan->read && chan->count > 0) {
174256
chan->w_queue.Wait(FiberStatus::BLOCKED_SUSPENDED);
175257

@@ -187,7 +269,7 @@ bool argon::vm::datatype::ChanWrite(Chan *chan, ArObject *value) {
187269
return true;
188270
}
189271

190-
Chan *argon::vm::datatype::ChanNew(unsigned int backlog) {
272+
Chan *argon::vm::datatype::ChanNew(ArObject *defval, unsigned int backlog) {
191273
auto *chan = MakeGCObject<Chan>(type_chan_, false);
192274

193275
if (chan != nullptr) {
@@ -202,11 +284,15 @@ Chan *argon::vm::datatype::ChanNew(unsigned int backlog) {
202284
new(&chan->r_queue)sync::NotifyQueue();
203285
new(&chan->w_queue)sync::NotifyQueue();
204286

287+
chan->defval = IncRef(defval);
288+
205289
chan->read = 0;
206290
chan->write = 0;
207291

208292
chan->count = 0;
209293
chan->length = backlog;
294+
295+
chan->close = false;
210296
}
211297

212298
return chan;

argon/vm/datatype/chan.h

+7-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ namespace argon::vm::datatype {
1919
sync::NotifyQueue r_queue;
2020
sync::NotifyQueue w_queue;
2121

22+
ArObject *defval;
23+
2224
ArObject **queue;
2325

2426
unsigned int read;
@@ -28,14 +30,16 @@ namespace argon::vm::datatype {
2830
unsigned int count;
2931

3032
unsigned int length;
33+
34+
bool close;
3135
};
3236
_ARGONAPI extern const TypeInfo *type_chan_;
3337

34-
bool ChanRead(Chan *chan, ArObject **out_value);
38+
bool ChanRead(Chan * chan, ArObject * *out_value);
3539

36-
bool ChanWrite(Chan *chan, ArObject *value);
40+
bool ChanWrite(Chan * chan, ArObject * value);
3741

38-
Chan *ChanNew(unsigned int backlog);
42+
Chan *ChanNew(ArObject *defval, unsigned int backlog);
3943

4044
} // namespace argon::vm::datatype
4145

0 commit comments

Comments
 (0)