Skip to content

Commit

Permalink
Implement new SSE Events API
Browse files Browse the repository at this point in the history
  • Loading branch information
valeriansaliou committed May 17, 2024
1 parent a0e1d60 commit ab5ddf2
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 25 deletions.
30 changes: 29 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,17 @@ client.Task.TranscribeSpeech(
* **Response (stream):**

```
{"start": 5.0, "end": 9.0, "text": " I'm just speaking some seconds to see if the translation is correct"}
event: system
data: [START]
event: locale
data: "en"
event: part
data: {"start": 5.0, "end": 9.0, "text": " I'm just speaking some seconds to see if the translation is correct"}
event: system
data: [DONE]
```

#### ➡️ Answer Prompt
Expand Down Expand Up @@ -206,11 +216,29 @@ client.Task.AnswerQuestion(
* **Response (stream):**

```
event: system
data: [START]
event: model
data: "medium"
event: answer
{"index": 0, "chunk": "You can add the Crisp chatbox to"}
event: answer
{"index": 1, "chunk": " your website by following this guide:"}
event: answer
{"index": 2, "chunk": " https://help.crisp.chat/en/article/how-to-add-crisp-chatbox-to-your-website-dkrg1d/"}
event: answer
{"index": 3, "chunk": " :)"}
event: answer
{"index": 4, "chunk": ""}
event: system
data: [DONE]
```

#### ➡️ Summarize Paragraphs
Expand Down
16 changes: 14 additions & 2 deletions examples/task_answer_question.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,26 @@ client.Task.AnswerQuestion(
)
.then(function(stream) {
// Bind all event listeners on created stream
stream.on("data", function(data) {
console.log("Got partial data:", data);
stream.on("model", function(data) {
console.log("Got partial data (model):", data);
});

stream.on("source", function(data) {
console.log("Got partial data (source):", data);
});

stream.on("answer", function(data) {
console.log("Got partial data (answer):", data);
});

stream.on("error", function(error) {
console.error("Answering aborted:", error);
});

stream.on("start", function() {
console.info("Start receiving answer...");
});

stream.on("done", function() {
console.info("Done receiving answer!");
});
Expand Down
97 changes: 75 additions & 22 deletions lib/mirage.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@ var DEFAULT_REST_HOST = "https://api.mirage-ai.com";
var DEFAULT_REST_BASE = "/v1";
var DEFAULT_TIMEOUT = 40000;

var STREAM_EVENT_PREFIX = "event:";
var STREAM_DATA_PREFIX = "data:";
var STREAM_START_TAG = "[START]";
var STREAM_DONE_TAG = "[DONE]";
var STREAM_CHUNK_STALL_TIMEOUT = 10000;

var STREAM_RESERVED_EVENTS = [
"start",
"done",
"error",
"end"
];

var RESOURCES = [
"Task",
"Data"
Expand Down Expand Up @@ -178,7 +187,8 @@ Mirage.prototype.__doPostStream = function(
request
.on("response", function(response) {
var emitter = new events.EventEmitter(),
drainBuffer = "";
drainBuffer = "",
eventBlock = "";

// Response is not successful?
if (response.statusCode >= 400) {
Expand Down Expand Up @@ -238,43 +248,86 @@ Mirage.prototype.__doPostStream = function(
drainBuffer += chunk.toString();

// Emit fully-assembled data from chunk
var dataParts = drainBuffer.split("\n");
var lineParts = drainBuffer.split("\n");

// Terminated by new line? Process buffer now
if ((dataParts.length > 0) &&
(dataParts[dataParts.length - 1] === "")) {
if ((lineParts.length > 0) &&
(lineParts[lineParts.length - 1] === "")) {
// Clear drain buffer immediately
drainBuffer = "";

// Process each data part
for (var i = 0; i < dataParts.length; i++) {
var dataPart = dataParts[i];

if (dataPart.length > 0 &&
dataPart.startsWith(STREAM_DATA_PREFIX) === true) {
// Clear out tag from data part
dataPart = dataPart.substring(STREAM_DATA_PREFIX.length).trim();
for (var i = 0; i < lineParts.length; i++) {
var linePart = lineParts[i];

if (dataPart === STREAM_DONE_TAG) {
// Clear previous stall timeout (as needed)
fnCancelNextChunkStall();
// Line is empty? Skip it.
if (linePart.length === 0) {
// Abort parsing of line there.
continue;
}

// Process at next event loop tick, as the 'done' event might \
// come out-of-order.
setImmediate(function() {
// Raise 'done' event
emitter.emit("done");
});
// Acquire line type ('event:' or 'data:')
if (linePart.startsWith(STREAM_DATA_PREFIX) === true) {
// Clear out tag from text part
var dataPart = (
linePart.substring(STREAM_DATA_PREFIX.length).trim()
);

// System events are not broadcasted to the user, as those are \
// special non-user level events.
if (eventBlock === "system") {
if (dataPart === STREAM_START_TAG) {
// Process at next event loop tick, as the 'start' event \
// might come out-of-order.
setImmediate(function() {
// Raise 'start' event
emitter.emit("start");
});
} else if (dataPart === STREAM_DONE_TAG) {
// Clear previous stall timeout (as needed)
fnCancelNextChunkStall();

// Process at next event loop tick, as the 'done' event \
// might come out-of-order.
setImmediate(function() {
// Raise 'done' event
emitter.emit("done");
});
}
} else {
var dataPartObject = JSON.parse(dataPart);

// Process at next event loop tick, as the 'data' event might \
// come out-of-order.
setImmediate(function() {
// Raise 'data' event
emitter.emit("data", dataPartObject);
// Raise event (fallback to 'data' if no event block)
// Important: add prefix if event streamed from Mirage is \
// a reserved event (eg. 'error' becomes ':error').
var eventName = (eventBlock || "data");

if (STREAM_RESERVED_EVENTS.includes(eventName) === true) {
eventName = (":" + eventName);
}

emitter.emit(eventName, dataPartObject);
});
}

// Abort parsing of line there.
continue;
}

if (linePart.startsWith(STREAM_EVENT_PREFIX) === true) {
// Clear out tag from text part
var eventPart = (
linePart.substring(STREAM_EVENT_PREFIX.length).trim()
);

// Update current event block name (or use none)
eventBlock = (eventPart || "");

// Abort parsing of line there.
continue;
}
}
}
Expand Down

0 comments on commit ab5ddf2

Please sign in to comment.