Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor agent_video pipelines heygen/tavus #217

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion agent_stream/schedule_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time

from restack_ai import Restack
from restack_ai.event import AgentEvent
from src.agents.agent import AgentStream


Expand All @@ -11,7 +12,12 @@ async def main() -> None:

agent_id = f"{int(time.time() * 1000)}-{AgentStream.__name__}"
run_id = await client.schedule_agent(
agent_name=AgentStream.__name__, agent_id=agent_id
agent_name=AgentStream.__name__,
agent_id=agent_id,
event=AgentEvent(
name="messages",
input={"messages": [{"role": "user", "content": "Tell me a joke"}]},
),
)

await client.get_agent_result(agent_id=agent_id, run_id=run_id)
Expand Down
48 changes: 40 additions & 8 deletions agent_video/README.md → agent_video/pipecat/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ For a complete documentation on how the agent works and how to setup the service
- Python 3.10 or higher
- Deepgram account (For speech-to-text transcription)
- Cartesia account (for text-to-speech and voice cloning)
- Tavus account (for video replica)
- Tavus or Heygen account (for video replica)

## Start Restack

Expand All @@ -21,9 +21,17 @@ To start the Restack, use the following Docker command:
docker run -d --pull always --name restack -p 5233:5233 -p 6233:6233 -p 7233:7233 -p 9233:9233 ghcr.io/restackio/restack:main
```

## Start python shell
## Configure environment variables

If using uv:
In all subfolders, duplicate the `env.example` file and rename it to `.env`.

Obtain a Restack API Key to interact with the 'gpt-4o-mini' model at no cost from [Restack Cloud](https://console.restack.io/starter)

## Start Restack Agent

in /agent

### Start python shell

```bash
uv venv && source .venv/bin/activate
Expand All @@ -35,7 +43,7 @@ If using pip:
python -m venv .venv && source .venv/bin/activate
```

## Install dependencies
### Install dependencies

If using uv:

Expand All @@ -51,13 +59,37 @@ pip install -e .
python -c "from src.services import watch_services; watch_services()"
```

## Configure Your Environment Variables
## Start Pipecat pipeline

Duplicate the `env.example` file and rename it to `.env`.
in /pipeline

Obtain a Restack API Key to interact with the 'gpt-4o-mini' model at no cost from [Restack Cloud](https://console.restack.io/starter)
### Start python shell

```bash
uv venv && source .venv/bin/activate
```

If using pip:

## Create Room and run Agent in parallel
```bash
python -m venv .venv && source .venv/bin/activate
```

### Install dependencies

If using uv:

```bash
uv sync
uv run dev
```

If using pip:

```bash
pip install -e .
python -c "from src.services import watch_services; watch_services()"
```

### from UI

Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 2 additions & 0 deletions agent_video/pipecat/agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

See parent README.md at /agent_video/pipecat/README.md for instructions on how to run the agent.
27 changes: 27 additions & 0 deletions agent_video/pipecat/agent/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[project]
name = "agent_video_pipecat_agent"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"openai>=1.59.9",
"pipecat-ai[daily]>=0.0.58",
"python-dotenv>=1.0.1",
"pydantic>=2.10.6",
"watchfiles>=1.0.4",
"restack-ai>=0.0.87",]

[project.scripts]
dev = "src.services:watch_services"
services = "src.services:run_services"

[tool.hatch.build.targets.sdist]
include = ["src"]

[tool.hatch.build.targets.wheel]
include = ["src"]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
File renamed without changes.
164 changes: 164 additions & 0 deletions agent_video/pipecat/agent/src/agents/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from datetime import timedelta
from typing import Literal

from pydantic import BaseModel
from restack_ai.agent import (
NonRetryableError,
RetryPolicy,
agent,
import_functions,
log,
uuid,
)

from src.workflows.logic import LogicWorkflow, LogicWorkflowInput

with import_functions():
from src.functions.context_docs import context_docs
from src.functions.daily_send_data import (
DailySendDataInput,
daily_send_data,
)
from src.functions.llm_talk import LlmTalkInput, llm_talk, Message, ModelType


class MessagesEvent(BaseModel):
messages: list[Message]


class EndEvent(BaseModel):
end: bool

class AgentInput(BaseModel):
room_url: str
model: ModelType
interactive_prompt: str | None = None
reasoning_prompt: str | None = None


class ContextEvent(BaseModel):
context: str


class DailyMessageEvent(BaseModel):
message: str
recipient: str | None = None


@agent.defn()
class AgentVideo:
def __init__(self) -> None:
self.end = False
self.messages: list[Message] = []
self.room_url = ""
self.model: Literal[
"restack", "gpt-4o-mini", "gpt-4o", "openpipe:twenty-lions-fall", "ft:gpt-4o-mini-2024-07-18:restack::BJymdMm8"
] = "restack"
self.interactive_prompt = ""
self.reasoning_prompt = ""
self.context = ""

@agent.event
async def messages(
self,
messages_event: MessagesEvent,
) -> list[Message]:
log.info(f"Received message: {messages_event.messages}")
self.messages.extend(messages_event.messages)
try:
await agent.child_start(
workflow=LogicWorkflow,
workflow_id=f"{uuid()}-logic",
workflow_input=LogicWorkflowInput(
messages=self.messages,
room_url=self.room_url,
context=str(self.context),
interactive_prompt=self.interactive_prompt,
reasoning_prompt=self.reasoning_prompt,
model=self.model,
),
)

assistant_message = await agent.step(
function=llm_talk,
function_input=LlmTalkInput(
messages=self.messages[-3:],
context=str(self.context),
mode="default",
model=self.model,
interactive_prompt=self.interactive_prompt,
),
start_to_close_timeout=timedelta(seconds=3),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_attempts=1,
maximum_interval=timedelta(seconds=5),
),
)

except Exception as e:
error_message = f"llm_chat function failed: {e}"
raise NonRetryableError(error_message) from e
else:
self.messages.append(
Message(
role="assistant",
content=str(assistant_message),
),
)
return self.messages

@agent.event
async def end(self, end: EndEvent) -> EndEvent:
log.info("Received end")
self.end = True
return end

@agent.event
async def context(self, context: ContextEvent) -> str:
log.info("Received context")
self.context = context.context
return self.context

@agent.event
async def daily_message(
self, daily_message: DailyMessageEvent
) -> bool:
log.info("Received message", daily_message=daily_message)
await agent.step(
function=daily_send_data,
function_input=DailySendDataInput(
room_url=self.room_url,
data={
"text": daily_message.message,
"author": "agent",
},
recipient=daily_message.recipient,
),
)
return True

@agent.run
async def run(self, agent_input: AgentInput) -> None:
try:
self.room_url = agent_input.room_url
self.model = agent_input.model
self.interactive_prompt = (
agent_input.interactive_prompt
)
self.reasoning_prompt = agent_input.reasoning_prompt
docs = await agent.step(function=context_docs)
except Exception as e:
error_message = f"context_docs function failed: {e}"
raise NonRetryableError(error_message) from e
else:
system_prompt = f"""
You are an AI assistant for Restack. You can answer questions about the following documentation:
{docs}
{self.interactive_prompt}
"""
self.messages.append(
Message(role="system", content=system_prompt),
)

await agent.condition(lambda: self.end)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
api_address = os.getenv("RESTACK_ENGINE_API_ADDRESS")

connection_options = CloudConnectionOptions(
engine_id=engine_id, address=address, api_key=api_key, api_address=api_address
engine_id=engine_id,
address=address,
api_key=api_key,
api_address=api_address,
)
client = Restack(connection_options)
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@ async def fetch_content_from_url(url: str) -> str:
async with session.get(url) as response:
if response.status == 200:
return await response.text()
error_message = f"Failed to fetch content: {response.status}"
error_message = (
f"Failed to fetch content: {response.status}"
)
raise NonRetryableError(error_message)


@function.defn()
async def context_docs() -> str:
try:
docs_content = await fetch_content_from_url("https://docs.restack.io/llms-full.txt")
log.info("Fetched content from URL", content=len(docs_content))
docs_content = await fetch_content_from_url(
"https://docs.restack.io/llms-full.txt",
)
log.info(
"Fetched content from URL",
content=len(docs_content),
)

return docs_content

Expand Down
Loading