Skip to content

Commit

Permalink
using stream: schema to fetch in App
Browse files Browse the repository at this point in the history
  • Loading branch information
lloydzhou committed Sep 28, 2024
1 parent d84d51b commit 2d920f7
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 122 deletions.
1 change: 1 addition & 0 deletions app/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ declare module "*.svg";

declare interface Window {
__TAURI__?: {
convertFileSrc(url: string, protocol?: string): string;
writeText(text: string): Promise<void>;
invoke(command: string, payload?: Record<string, unknown>): Promise<any>;
dialog: {
Expand Down
41 changes: 1 addition & 40 deletions app/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { showToast } from "./components/ui-lib";
import Locale from "./locales";
import { RequestMessage } from "./client/api";
import { ServiceProvider } from "./constant";
import { fetch } from "./utils/stream";

export function trimTopic(topic: string) {
// Fix an issue where double quotes still show in the Indonesian language
Expand Down Expand Up @@ -286,46 +287,6 @@ export function showPlugins(provider: ServiceProvider, model: string) {
return false;
}

export function fetch(
url: string,
options?: Record<string, unknown>,
): Promise<any> {
if (window.__TAURI__) {
const tauriUri = window.__TAURI__.convertFileSrc(url, "sse");
return window.fetch(tauriUri, options).then((r) => {
// 1. create response,
// TODO using event to get status and statusText and headers
const { status, statusText } = r;
const { readable, writable } = new TransformStream();
const res = new Response(readable, { status, statusText });
// 2. call fetch_read_body multi times, and write to Response.body
const writer = writable.getWriter();
let unlisten;
window.__TAURI__.event
.listen("sse-response", (e) => {
const { id, payload } = e;
console.log("event", id, payload);
writer.ready.then(() => {
if (payload !== 0) {
writer.write(new Uint8Array(payload));
} else {
writer.releaseLock();
writable.close();
unlisten && unlisten();
}
});
})
.then((u) => (unlisten = u));
return res;
});
}
return window.fetch(url, options);
}

if (undefined !== window) {
window.tauriFetch = fetch;
}

export function adapter(config: Record<string, unknown>) {
const { baseURL, url, params, ...rest } = config;
const path = baseURL ? `${baseURL}${url}` : url;
Expand Down
100 changes: 100 additions & 0 deletions app/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// using tauri register_uri_scheme_protocol, register `stream:` protocol
// see src-tauri/src/stream.rs, and src-tauri/src/main.rs
// 1. window.fetch(`stream://localhost/${fetchUrl}`), get request_id
// 2. listen event: `stream-response` multi times to get response headers and body

type ResponseEvent = {
id: number;
payload: {
request_id: number;
status?: number;
error?: string;
name?: string;
value?: string;
chunk?: number[];
};
};

export function fetch(url: string, options?: RequestInit): Promise<any> {
if (window.__TAURI__) {
const tauriUri = window.__TAURI__.convertFileSrc(url, "stream");
const { signal, ...rest } = options || {};
return window
.fetch(tauriUri, rest)
.then((r) => r.text())
.then((rid) => parseInt(rid))
.then((request_id: number) => {
// 1. using event to get status and statusText and headers, and resolve it
let resolve: Function | undefined;
let reject: Function | undefined;
let status: number;
let writable: WritableStream | undefined;
let writer: WritableStreamDefaultWriter | undefined;
const headers = new Headers();
let unlisten: Function | undefined;

if (signal) {
signal.addEventListener("abort", () => {
// Reject the promise with the abort reason.
unlisten && unlisten();
reject && reject(signal.reason);
});
}
// @ts-ignore 2. listen response multi times, and write to Response.body
window.__TAURI__.event
.listen("stream-response", (e: ResponseEvent) => {
const { id, payload } = e;
const {
request_id: rid,
status: _status,
name,
value,
error,
chunk,
} = payload;
if (request_id != rid) {
return;
}
/**
* 1. get status code
* 2. get headers
* 3. start get body, then resolve response
* 4. get body chunk
*/
if (error) {
unlisten && unlisten();
return reject && reject(error);
} else if (_status) {
status = _status;
} else if (name && value) {
headers.append(name, value);
} else if (chunk) {
if (resolve) {
const ts = new TransformStream();
writable = ts.writable;
writer = writable.getWriter();
resolve(new Response(ts.readable, { status, headers }));
resolve = undefined;
}
writer &&
writer.ready.then(() => {
writer && writer.write(new Uint8Array(chunk));
});
} else if (_status === 0) {
// end of body
unlisten && unlisten();
writer &&
writer.ready.then(() => {
writer && writer.releaseLock();
writable && writable.close();
});
}
})
.then((u: Function) => (unlisten = u));
return new Promise(
(_resolve, _reject) => ([resolve, reject] = [_resolve, _reject]),
);
});
}
return window.fetch(url, options);
}
36 changes: 1 addition & 35 deletions src-tauri/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tauri-plugin-window-state = { git = "https://github.com/tauri-apps/plugins-works
percent-encoding = "2.3.1"
reqwest = "0.11.18"
futures-util = "0.3.30"
bytes = "1.7.2"

[features]
# this feature is used for production builds or when `devPath` points to the filesystem and the built-in dev server is disabled.
Expand Down
51 changes: 4 additions & 47 deletions src-tauri/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,14 @@
// Prevents additional console window on Windows in release, DO NOT REMOVE!!
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]

use futures_util::{StreamExt};
use reqwest::Client;
use tauri::{ Manager};
use tauri::http::{ResponseBuilder};
mod stream;

fn main() {
tauri::Builder::default()
.plugin(tauri_plugin_window_state::Builder::default().build())
.register_uri_scheme_protocol("sse", |app_handle, request| {
let path = request.uri().strip_prefix("sse://localhost/").unwrap();
let path = percent_encoding::percent_decode(path.as_bytes())
.decode_utf8_lossy()
.to_string();
// println!("path : {}", path);
let client = Client::new();
let window = app_handle.get_window("main").unwrap();
// send http request
let body = reqwest::Body::from(request.body().clone());
let response_future = client.request(request.method().clone(), path)
.headers(request.headers().clone())
.body(body).send();

// get response and emit to client
tauri::async_runtime::spawn(async move {
let res = response_future.await;

match res {
Ok(res) => {
let mut stream = res.bytes_stream();

while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
window.emit("sse-response", bytes).unwrap();
}
Err(err) => {
println!("Error: {:?}", err);
}
}
}
window.emit("sse-response", 0).unwrap();
}
Err(err) => {
println!("Error: {:?}", err);
}
}
});
ResponseBuilder::new()
.header("Access-Control-Allow-Origin", "*")
.status(200).body("OK".into())
})
.register_uri_scheme_protocol("stream", move |app_handle, request| {
stream::stream(app_handle, request)
})
.run(tauri::generate_context!())
.expect("error while running tauri application");
}
Loading

0 comments on commit 2d920f7

Please sign in to comment.