Skip to content

Commit 3f92c50

Browse files
ykeremywintonzheng
andauthored
Screen streaming under docker environment (Skyvern-AI#674)
Co-authored-by: Shuchang Zheng <[email protected]>
1 parent 9342dfb commit 3f92c50

12 files changed

+222
-10
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,12 @@ traces/
165165
*.pkl
166166
har/
167167
postgres-data
168+
files/
168169

169170
# Streamlit ignores
170171
**/secrets*.toml
171172

172173
## Frontend
173174
node_modules
174175
.env.backup
175-
.env.old
176+
.env.old

Dockerfile

+1-4
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ RUN pip install --no-cache-dir --upgrade -r requirements.txt
1313
RUN pip install --no-cache-dir streamlit
1414
RUN playwright install-deps
1515
RUN playwright install
16-
RUN apt-get install -y xauth && apt-get clean
16+
RUN apt-get install -y xauth x11-apps netpbm && apt-get clean
1717

1818
COPY . /app
1919

@@ -29,6 +29,3 @@ COPY ./entrypoint-streamlit.sh /app/entrypoint-streamlit.sh
2929
RUN chmod +x /app/entrypoint-streamlit.sh
3030

3131
CMD [ "/bin/bash", "/app/entrypoint-skyvern.sh" ]
32-
33-
34-

Dockerfile.ui

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ COPY ./entrypoint-skyvernui.sh /app/entrypoint-skyvernui.sh
66
RUN npm install
77

88
ENV VITE_API_BASE_URL=http://localhost:8000/api/v1
9+
ENV VITE_WSS_BASE_URL=ws://localhost:8000/api/v1
910
ENV VITE_ARTIFACT_API_BASE_URL=http://localhost:9090
1011

1112
CMD [ "/bin/bash", "/app/entrypoint-skyvernui.sh" ]

docker-compose.yml

+3-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ services:
3131
- ./videos:/data/videos
3232
- ./har:/data/har
3333
- ./.streamlit:/app/.streamlit
34+
- ./files:/tmp
3435
environment:
3536
- DATABASE_STRING=postgresql+psycopg://skyvern:skyvern@postgres:5432/skyvern
3637
- BROWSER_TYPE=chromium-headful
@@ -66,7 +67,8 @@ services:
6667
- ./videos:/data/videos
6768
- ./har:/data/har
6869
- ./.streamlit:/app/.streamlit
69-
# environment:
70+
environment:
71+
- VITE_WSS_BASE_URL=ws://localhost:8000/api/v1
7072
# - VITE_API_BASE_URL=
7173
# - VITE_SKYVERN_API_KEY=
7274
depends_on:

entrypoint-skyvern.sh

+20-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,24 @@ if [ ! -f ".streamlit/secrets.toml" ]; then
1616
echo ".streamlit/secrets.toml file updated with organization details."
1717
fi
1818

19+
_kill_xvfb_on_term() {
20+
kill -TERM $xvfb
21+
}
22+
23+
# Setup a trap to catch SIGTERM and relay it to child processes
24+
trap _kill_xvfb_on_term TERM
25+
26+
echo "Starting Xvfb..."
27+
# delete the lock file if any
28+
rm -f /tmp/.X99-lock
29+
# Set display environment variable
30+
export DISPLAY=:99
31+
# Start Xvfb
32+
Xvfb :99 -screen 0 1920x1080x16 &
33+
xvfb=$!
34+
35+
DISPLAY=:99 xterm 2>/dev/null &
36+
python run_streaming.py > /dev/null &
37+
1938
# Run the command and pass in all three arguments
20-
xvfb-run python -m skyvern.forge
39+
python -m skyvern.forge

run_streaming.py

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import asyncio
2+
import subprocess
3+
4+
import structlog
5+
import typer
6+
7+
from skyvern.forge import app
8+
from skyvern.forge.sdk.settings_manager import SettingsManager
9+
10+
INTERVAL = 1
11+
LOG = structlog.get_logger()
12+
13+
14+
async def run() -> None:
15+
file_name = "skyvern_screenshot.png"
16+
png_file_path = f"{SettingsManager.get_settings().STREAMING_FILE_BASE_PATH}/{file_name}"
17+
18+
while True:
19+
# run subprocess to take screenshot
20+
subprocess.run(
21+
f"xwd -root | xwdtopnm 2>/dev/null | pnmtopng > {png_file_path}", shell=True, env={"DISPLAY": ":99"}
22+
)
23+
24+
# upload screenshot to S3
25+
try:
26+
await app.STORAGE.save_streaming_file("placeholder_org", file_name)
27+
except Exception:
28+
LOG.info("Failed to save screenshot")
29+
30+
await asyncio.sleep(INTERVAL)
31+
32+
33+
def main() -> None:
34+
asyncio.run(run())
35+
36+
37+
if __name__ == "__main__":
38+
typer.run(main)

run_streaming.sh

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#!/bin/bash
2+
3+
echo "Starting streaming..."
4+
5+
while true; do
6+
xwd -root | xwdtopnm | pnmtopng > /tmp/skyvern_screenshot.png
7+
sleep 1
8+
done

skyvern/config.py

+3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ class Settings(BaseSettings):
6363
# Workflow constant parameters
6464
WORKFLOW_DOWNLOAD_DIRECTORY_PARAMETER_KEY: str = "SKYVERN_DOWNLOAD_DIRECTORY"
6565

66+
# streaming settings
67+
STREAMING_FILE_BASE_PATH: str = "/tmp"
68+
6669
#####################
6770
# Bitwarden Configs #
6871
#####################

skyvern/forge/api_app.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import Awaitable, Callable
44

55
import structlog
6-
from fastapi import APIRouter, FastAPI, Response, status
6+
from fastapi import FastAPI, Response, status
77
from fastapi.middleware.cors import CORSMiddleware
88
from fastapi.responses import JSONResponse
99
from pydantic import ValidationError
@@ -17,6 +17,7 @@
1717
from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
1818
from skyvern.forge.sdk.db.exceptions import NotFoundError
1919
from skyvern.forge.sdk.routes.agent_protocol import base_router
20+
from skyvern.forge.sdk.routes.streaming import websocket_router
2021
from skyvern.forge.sdk.settings_manager import SettingsManager
2122
from skyvern.scheduler import SCHEDULER
2223

@@ -30,7 +31,7 @@ async def process_request(self, request: Request | HTTPConnection) -> datetime:
3031
return datetime.now()
3132

3233

33-
def get_agent_app(router: APIRouter = base_router) -> FastAPI:
34+
def get_agent_app() -> FastAPI:
3435
"""
3536
Start the agent server.
3637
"""
@@ -46,7 +47,8 @@ def get_agent_app(router: APIRouter = base_router) -> FastAPI:
4647
allow_headers=["*"],
4748
)
4849

49-
app.include_router(router, prefix="/api/v1")
50+
app.include_router(base_router, prefix="/api/v1")
51+
app.include_router(websocket_router, prefix="/api/v1/stream")
5052

5153
app.add_middleware(
5254
RawContextMiddleware,

skyvern/forge/sdk/artifact/storage/base.py

+8
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,11 @@ async def get_share_links(self, artifacts: list[Artifact]) -> list[str] | None:
5151
@abstractmethod
5252
async def store_artifact_from_path(self, artifact: Artifact, path: str) -> None:
5353
pass
54+
55+
@abstractmethod
56+
async def save_streaming_file(self, organization_id: str, file_name: str) -> None:
57+
pass
58+
59+
@abstractmethod
60+
async def get_streaming_file(self, organization_id: str, file_name: str, use_default: bool = True) -> bytes | None:
61+
pass

skyvern/forge/sdk/artifact/storage/local.py

+18
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,24 @@ async def get_share_link(self, artifact: Artifact) -> str:
6767
async def get_share_links(self, artifacts: list[Artifact]) -> list[str]:
6868
return [artifact.uri for artifact in artifacts]
6969

70+
async def save_streaming_file(self, organization_id: str, file_name: str) -> None:
71+
return
72+
73+
async def get_streaming_file(self, organization_id: str, file_name: str, use_default: bool = True) -> bytes | None:
74+
file_path = Path(f"{SettingsManager.get_settings().STREAMING_FILE_BASE_PATH}/skyvern_screenshot.png")
75+
if not use_default:
76+
file_path = Path(f"{SettingsManager.get_settings().STREAMING_FILE_BASE_PATH}/{organization_id}/{file_name}")
77+
try:
78+
with open(file_path, "rb") as f:
79+
return f.read()
80+
except Exception:
81+
LOG.exception(
82+
"Failed to retrieve streaming file.",
83+
organization_id=organization_id,
84+
file_name=file_name,
85+
)
86+
return None
87+
7088
@staticmethod
7189
def _parse_uri_to_path(uri: str) -> str:
7290
parsed_uri = urlparse(uri)

skyvern/forge/sdk/routes/streaming.py

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import asyncio
2+
import base64
3+
from datetime import datetime
4+
5+
import structlog
6+
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
7+
from pydantic import ValidationError
8+
from websockets.exceptions import ConnectionClosedOK
9+
10+
from skyvern.forge import app
11+
from skyvern.forge.sdk.schemas.tasks import TaskStatus
12+
from skyvern.forge.sdk.services.org_auth_service import get_current_org
13+
14+
LOG = structlog.get_logger()
15+
websocket_router = APIRouter()
16+
STREAMING_TIMEOUT = 300
17+
18+
19+
@websocket_router.websocket("/tasks/{task_id}")
20+
async def task_stream(
21+
websocket: WebSocket,
22+
task_id: str,
23+
apikey: str | None = None,
24+
token: str | None = None,
25+
) -> None:
26+
try:
27+
await websocket.accept()
28+
if not token and not apikey:
29+
await websocket.send_text("No valid credential provided")
30+
return
31+
except ConnectionClosedOK:
32+
LOG.info("ConnectionClosedOK error. Streaming won't start")
33+
return
34+
35+
try:
36+
organization = await get_current_org(x_api_key=apikey, authorization=token)
37+
organization_id = organization.organization_id
38+
except Exception:
39+
try:
40+
await websocket.send_text("Invalid credential provided")
41+
except ConnectionClosedOK:
42+
LOG.info("ConnectionClosedOK error while sending invalid credential message")
43+
return
44+
45+
LOG.info("Started task streaming", task_id=task_id, organization_id=organization_id)
46+
# timestamp last time when streaming activity happens
47+
last_activity_timestamp = datetime.utcnow()
48+
49+
try:
50+
while True:
51+
# if no activity for 5 minutes, close the connection
52+
if (datetime.utcnow() - last_activity_timestamp).total_seconds() > STREAMING_TIMEOUT:
53+
LOG.info(
54+
"No activity for 5 minutes. Closing connection", task_id=task_id, organization_id=organization_id
55+
)
56+
await websocket.send_json(
57+
{
58+
"task_id": task_id,
59+
"status": "timeout",
60+
}
61+
)
62+
return
63+
64+
task = await app.DATABASE.get_task(task_id=task_id, organization_id=organization_id)
65+
if not task:
66+
LOG.info("Task not found. Closing connection", task_id=task_id, organization_id=organization_id)
67+
await websocket.send_json(
68+
{
69+
"task_id": task_id,
70+
"status": "not_found",
71+
}
72+
)
73+
return
74+
if task.status.is_final():
75+
LOG.info(
76+
"Task is in a final state. Closing connection",
77+
task_status=task.status,
78+
task_id=task_id,
79+
organization_id=organization_id,
80+
)
81+
await websocket.send_json(
82+
{
83+
"task_id": task_id,
84+
"status": task.status,
85+
}
86+
)
87+
return
88+
89+
if task.status == TaskStatus.running:
90+
file_name = f"{task_id}.png"
91+
screenshot = await app.STORAGE.get_streaming_file(organization_id, file_name)
92+
if screenshot:
93+
encoded_screenshot = base64.b64encode(screenshot).decode("utf-8")
94+
await websocket.send_json(
95+
{
96+
"task_id": task_id,
97+
"status": task.status,
98+
"screenshot": encoded_screenshot,
99+
}
100+
)
101+
last_activity_timestamp = datetime.utcnow()
102+
await asyncio.sleep(2)
103+
104+
except ValidationError as e:
105+
await websocket.send_text(f"Invalid data: {e}")
106+
except WebSocketDisconnect:
107+
LOG.info("WebSocket connection closed")
108+
except ConnectionClosedOK:
109+
LOG.info("ConnectionClosedOK error while streaming", exc_info=True)
110+
return
111+
except Exception:
112+
LOG.warning("Error while streaming", exc_info=True)
113+
return
114+
LOG.info("WebSocket connection closed successfully")
115+
return

0 commit comments

Comments
 (0)