Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial prometheus metrics #543

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions livekit-agents/livekit/agents/ipc/proc_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import copy
import logging
import multiprocessing as mp
import time
from dataclasses import dataclass

from livekit import rtc

from .. import utils
from .. import telemetry, utils
from ..job import JobContext, JobProcess
from ..log import logger
from . import channel, proto
Expand Down Expand Up @@ -197,9 +198,16 @@ def main(args: proto.ProcStartArgs) -> None:
), "first message must be InitializeRequest"

job_proc = JobProcess(start_arguments=args.user_arguments)

logger.debug("initializing process", extra={"pid": job_proc.pid})
start_time = time.time()
args.initialize_process_fnc(job_proc)
logger.debug("process initialized", extra={"pid": job_proc.pid})
elapsed = time.time() - start_time
telemetry.ipc.proc_initialized(elapsed)
logger.debug(
"process initialized",
extra={"pid": job_proc.pid, "elapsed": elapsed},
)

# signal to the ProcPool that is worker is now ready to receive jobs
loop.run_until_complete(cch.asend(proto.InitializeResponse()))
Expand Down
4 changes: 2 additions & 2 deletions livekit-agents/livekit/agents/ipc/proc_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ def __init__(
loop: asyncio.AbstractEventLoop,
) -> None:
super().__init__()
self._mp_ctx = mp_ctx
self._mp_ctx, self._loop = mp_ctx, loop
self._initialize_process_fnc = initialize_process_fnc
self._job_entrypoint_fnc = job_entrypoint_fnc
self._close_timeout = close_timeout
self._initialize_timeout = initialize_timeout
self._loop = loop

self._proc_needed_sem = asyncio.Semaphore(num_idle_processes)
self._warmed_proc_queue = asyncio.Queue[SupervisedProc]()
Expand Down Expand Up @@ -79,6 +78,7 @@ async def _proc_watch_task(self) -> None:
self.emit("process_started", proc)
try:
await proc.initialize()

# process where initialization times out will never fire "process_ready"
# neither be used to launch jobs
self.emit("process_ready", proc)
Expand Down
97 changes: 55 additions & 42 deletions livekit-agents/livekit/agents/ipc/supervised_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,53 +9,12 @@
from multiprocessing.context import BaseContext
from typing import Any, Callable, Coroutine

from .. import utils
from .. import telemetry, utils
from ..job import JobContext, JobProcess, RunningJobInfo
from ..log import logger
from . import channel, proc_main, proto


class LogQueueListener:
_sentinel = None

def __init__(
self, queue: mp.Queue, prepare_fnc: Callable[[logging.LogRecord], None]
):
self._thread: threading.Thread | None = None
self._q = queue
self._prepare_fnc = prepare_fnc

def start(self) -> None:
self._thread = t = threading.Thread(
target=self._monitor, daemon=True, name="log_listener"
)
t.start()

def stop(self) -> None:
if self._thread is None:
return
self._q.put_nowait(self._sentinel)
self._thread.join()
self._thread = None

def handle(self, record: logging.LogRecord) -> None:
self._prepare_fnc(record)

lger = logging.getLogger(record.name)
if not lger.isEnabledFor(record.levelno):
return

lger.callHandlers(record)

def _monitor(self):
while True:
record = self._q.get()
if record is self._sentinel:
break

self.handle(record)


class SupervisedProc:
def __init__(
self,
Expand Down Expand Up @@ -224,10 +183,20 @@ async def kill(self) -> None:

async def launch_job(self, info: RunningJobInfo) -> None:
"""start/assign a job to the process"""
if not self.started:
raise RuntimeError("process not started")

if self._running_job is not None:
raise RuntimeError("process already has a running job")

if self._closing:
raise RuntimeError("process is closing")

if not self._initialize_fut.done():
raise RuntimeError("process isn't initialized")

self._running_job = info
telemetry.ipc.job_started()
start_req = proto.StartJobRequest()
start_req.running_job = info
await self._pch.asend(start_req)
Expand Down Expand Up @@ -262,6 +231,7 @@ async def _main_task(self) -> None:
monitor_task = asyncio.create_task(self._monitor_task(pong_timeout))

await self._join_fut
# proc ended
self._exitcode = self._proc.exitcode
self._proc.close()
await utils.aio.gracefully_cancel(ping_task, monitor_task)
Expand All @@ -274,6 +244,9 @@ async def _main_task(self) -> None:
extra=self.logging_extra(),
)

if self._running_job is not None:
telemetry.ipc.job_ended()

@utils.log_exceptions(logger=logger)
async def _monitor_task(self, pong_timeout: utils.aio.Sleep) -> None:
while True:
Expand Down Expand Up @@ -322,3 +295,43 @@ def logging_extra(self) -> dict:
extra["job_id"] = self._running_job.job.id

return extra


class LogQueueListener:
_sentinel = None

def __init__(
self, queue: mp.Queue, prepare_fnc: Callable[[logging.LogRecord], None]
):
self._thread: threading.Thread | None = None
self._q, self._prepare_fnc = queue, prepare_fnc

def start(self) -> None:
self._thread = t = threading.Thread(
target=self._monitor, daemon=True, name="log_listener"
)
t.start()

def stop(self) -> None:
if self._thread is None:
return
self._q.put_nowait(self._sentinel)
self._thread.join()
self._thread = None

def handle(self, record: logging.LogRecord) -> None:
self._prepare_fnc(record)

lger = logging.getLogger(record.name)
if not lger.isEnabledFor(record.levelno):
return

lger.callHandlers(record)

def _monitor(self):
while True:
record = self._q.get()
if record is self._sentinel:
break

self.handle(record)
3 changes: 3 additions & 0 deletions livekit-agents/livekit/agents/telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from . import http_server, ipc

__all__ = ["http_server", "ipc"]
34 changes: 34 additions & 0 deletions livekit-agents/livekit/agents/telemetry/http_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

import asyncio

import aiohttp.web_request
from aiohttp import web
from prometheus_client import (
CONTENT_TYPE_LATEST,
CollectorRegistry,
generate_latest,
multiprocess,
)

from .. import utils


async def metrics(_request: aiohttp.web_request.Request):
def _get_metrics():
registry = CollectorRegistry(auto_describe=True)
multiprocess.MultiProcessCollector(registry)
return generate_latest(registry)

loop = asyncio.get_running_loop()
data = await loop.run_in_executor(None, _get_metrics)
return web.Response(
body=data,
headers={"Content-Type": CONTENT_TYPE_LATEST, "Content-Length": str(len(data))},
)


class HttpServer(utils.http_server.HttpServer):
def __init__(self, host: str, port: int, loop: asyncio.AbstractEventLoop) -> None:
super().__init__(host, port, loop)
self._app.add_routes([web.get("/", metrics)])
28 changes: 28 additions & 0 deletions livekit-agents/livekit/agents/telemetry/ipc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations

import prometheus_client

from .. import utils

PROC_PREWARM_TIME = prometheus_client.Histogram(
"lk_agents_proc_warm_time_seconds",
"Time taken to warm a process",
["nodename"],
buckets=[0.1, 0.5, 1, 2, 5, 10],
)

PROC_STATUS_GAUGE = prometheus_client.Gauge(
"lk_agents_running_job", "Running jobs", ["nodename"]
)


def job_started():
PROC_STATUS_GAUGE.labels(nodename=utils.nodename()).inc()


def job_ended():
PROC_STATUS_GAUGE.labels(nodename=utils.nodename()).dec()


def proc_initialized(time_elapsed: float):
PROC_PREWARM_TIME.labels(nodename=utils.nodename()).observe(time_elapsed)
6 changes: 4 additions & 2 deletions livekit-agents/livekit/agents/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from . import aio, audio, codecs, http_context, images
from . import aio, audio, codecs, http_context, http_server, images
from .event_emitter import EventEmitter
from .exp_filter import ExpFilter
from .log import log_exceptions
from .misc import AudioBuffer, merge_frames, shortuuid, time_ms
from .misc import AudioBuffer, merge_frames, nodename, shortuuid, time_ms
from .moving_average import MovingAverage

__all__ = [
Expand All @@ -19,4 +19,6 @@
"images",
"audio",
"aio",
"http_server",
"nodename",
]
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import contextlib
from typing import Any

from aiohttp import web
Expand All @@ -11,27 +12,26 @@ async def health_check(_: Any):


class HttpServer:
def __init__(
self, host: str, port: int, loop: asyncio.AbstractEventLoop | None = None
) -> None:
self._loop = loop or asyncio.get_event_loop()
self._host = host
self._port = port
def __init__(self, host: str, port: int, loop: asyncio.AbstractEventLoop) -> None:
self._host, self._port, self._loop = host, port, loop
self._app = web.Application(loop=self._loop)
self._app.add_routes([web.get("/", health_check)])
self._close_future = asyncio.Future[None](loop=self._loop)

@property
def app(self) -> web.Application:
return self._app

async def run(self) -> None:
self._runner = web.AppRunner(self._app)
await self._runner.setup()
site = web.TCPSite(self._runner, self._host, self._port)
runner = web.AppRunner(self._app)
await runner.setup()
site = web.TCPSite(runner, self._host, self._port)
await site.start()

try:
await self._close_future
finally:
await self._runner.cleanup()
await runner.cleanup()

async def aclose(self) -> None:
if not self._close_future.done():
with contextlib.suppress(asyncio.InvalidStateError):
self._close_future.set_result(None)
9 changes: 7 additions & 2 deletions livekit-agents/livekit/agents/utils/misc.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import platform
import time
import uuid
from typing import List, Union
Expand Down Expand Up @@ -46,5 +47,9 @@ def time_ms() -> int:
return int(time.time() * 1000)


def shortuuid() -> str:
return str(uuid.uuid4().hex)[:12]
def shortuuid(prefix: str = "") -> str:
return f"{prefix}{str(uuid.uuid4().hex)[:12]}"


def nodename():
return platform.node()
Loading
Loading