diff --git a/app/src/pages/experiments/DownloadExperimentActionMenu.tsx b/app/src/pages/experiments/DownloadExperimentActionMenu.tsx index 646c0fe0b5..a8c22d4e79 100644 --- a/app/src/pages/experiments/DownloadExperimentActionMenu.tsx +++ b/app/src/pages/experiments/DownloadExperimentActionMenu.tsx @@ -19,6 +19,12 @@ export function DownloadExperimentActionMenu({ align="end" icon={} />} onAction={(action) => { + if (action === "csv") { + window.open( + prependBasename(`/v1/experiments/${experimentId}/csv`), + "_blank" + ); + } if (action === "json") { window.open( prependBasename(`/v1/experiments/${experimentId}/json`), @@ -27,6 +33,7 @@ export function DownloadExperimentActionMenu({ } }} > + Download CSV Download JSON diff --git a/js/packages/phoenix-client/src/__generated__/api/v1.ts b/js/packages/phoenix-client/src/__generated__/api/v1.ts index 70a7db3a2a..a3b4bb1efb 100644 --- a/js/packages/phoenix-client/src/__generated__/api/v1.ts +++ b/js/packages/phoenix-client/src/__generated__/api/v1.ts @@ -193,6 +193,23 @@ export interface paths { patch?: never; trace?: never; }; + "/v1/experiments/{experiment_id}/csv": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + /** Download experiment runs as a CSV file */ + get: operations["getExperimentCSV"]; + put?: never; + post?: never; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/v1/span_annotations": { parameters: { query?: never; @@ -1631,6 +1648,46 @@ export interface operations { }; }; }; + getExperimentCSV: { + parameters: { + query?: never; + header?: never; + path: { + experiment_id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Successful Response */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "text/csv": string; + }; + }; + /** @description Forbidden */ + 403: { + headers: { + [name: string]: unknown; + }; + content: { + "text/plain": string; + }; + }; + /** @description Unprocessable Entity */ + 422: { + headers: { + [name: string]: unknown; + }; + content: { + "text/plain": string; + }; + }; + }; + }; annotateSpans: { parameters: { query?: { diff --git a/schemas/openapi.json b/schemas/openapi.json index 56a65eb9b6..1cf6fababd 100644 --- a/schemas/openapi.json +++ b/schemas/openapi.json @@ -1017,6 +1017,62 @@ } } }, + "/v1/experiments/{experiment_id}/csv": { + "get": { + "tags": [ + "experiments" + ], + "summary": "Download experiment runs as a CSV file", + "operationId": "getExperimentCSV", + "parameters": [ + { + "name": "experiment_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "title": "Experiment ID" + } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": {} + }, + "text/csv": { + "schema": { + "type": "string", + "contentMediaType": "text/csv" + } + } + } + }, + "403": { + "content": { + "text/plain": { + "schema": { + "type": "string" + } + } + }, + "description": "Forbidden" + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, "/v1/span_annotations": { "post": { "tags": [ diff --git a/src/phoenix/server/api/routers/v1/experiments.py b/src/phoenix/server/api/routers/v1/experiments.py index 27d34c68a0..17a9b895d8 100644 --- a/src/phoenix/server/api/routers/v1/experiments.py +++ b/src/phoenix/server/api/routers/v1/experiments.py @@ -3,13 +3,15 @@ from random import getrandbits from typing import Any, Optional +import pandas as pd from fastapi import APIRouter, HTTPException, Path, Response from pydantic import Field from sqlalchemy import and_, func, select +from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload from starlette.requests import Request from starlette.responses import PlainTextResponse -from starlette.status import HTTP_404_NOT_FOUND +from starlette.status import HTTP_200_OK, HTTP_404_NOT_FOUND, HTTP_422_UNPROCESSABLE_ENTITY from strawberry.relay import GlobalID from phoenix.db import models @@ -19,7 +21,7 @@ from phoenix.server.dml_event import ExperimentInsertEvent from .models import V1RoutesBaseModel -from .utils import ResponseBody, add_errors_to_responses +from .utils import ResponseBody, add_errors_to_responses, add_text_csv_content_to_responses router = APIRouter(tags=["experiments"], include_in_schema=True) @@ -311,6 +313,66 @@ async def list_experiments( return ListExperimentsResponseBody(data=data) +async def _get_experiment_runs_and_revisions( + session: AsyncSession, experiment_rowid: int +) -> tuple[models.Experiment, tuple[models.ExperimentRun], tuple[models.DatasetExampleRevision]]: + experiment = await session.get(models.Experiment, experiment_rowid) + if not experiment: + raise HTTPException(detail="Experiment not found", status_code=HTTP_404_NOT_FOUND) + revision_ids = ( + select(func.max(models.DatasetExampleRevision.id)) + .join( + models.DatasetExample, + models.DatasetExample.id == models.DatasetExampleRevision.dataset_example_id, + ) + .where( + and_( + models.DatasetExampleRevision.dataset_version_id <= experiment.dataset_version_id, + models.DatasetExample.dataset_id == experiment.dataset_id, + ) + ) + .group_by(models.DatasetExampleRevision.dataset_example_id) + .scalar_subquery() + ) + runs_and_revisions = ( + ( + await session.execute( + select(models.ExperimentRun, models.DatasetExampleRevision) + .join( + models.DatasetExample, + models.DatasetExample.id == models.ExperimentRun.dataset_example_id, + ) + .join( + models.DatasetExampleRevision, + and_( + models.DatasetExample.id + == models.DatasetExampleRevision.dataset_example_id, + models.DatasetExampleRevision.id.in_(revision_ids), + models.DatasetExampleRevision.revision_kind != "DELETE", + ), + ) + .options( + joinedload(models.ExperimentRun.annotations), + ) + .where(models.ExperimentRun.experiment_id == experiment_rowid) + .order_by( + models.ExperimentRun.dataset_example_id, + models.ExperimentRun.repetition_number, + ) + ) + ) + .unique() + .all() + ) + if not runs_and_revisions: + raise HTTPException( + detail="Experiment has no runs", + status_code=HTTP_404_NOT_FOUND, + ) + runs, revisions = zip(*runs_and_revisions) + return experiment, runs, revisions + + @router.get( "/experiments/{experiment_id}/json", operation_id="getExperimentJSON", @@ -324,112 +386,133 @@ async def list_experiments( ) async def get_experiment_json( request: Request, - response: Response, experiment_id: str = Path(..., title="Experiment ID"), -) -> str: +) -> Response: experiment_globalid = GlobalID.from_id(experiment_id) try: experiment_rowid = from_global_id_with_expected_type(experiment_globalid, "Experiment") except ValueError: raise HTTPException( - detail=f"Experiment with ID {experiment_globalid} does not exist", - status_code=HTTP_404_NOT_FOUND, + detail=f"Invalid experiment ID: {experiment_globalid}", + status_code=HTTP_422_UNPROCESSABLE_ENTITY, ) async with request.app.state.db() as session: - experiment = await session.get(models.Experiment, experiment_rowid) - if not experiment: - raise HTTPException( - detail=f"Experiment with ID {experiment_globalid} does not exist", - status_code=HTTP_404_NOT_FOUND, - ) - revision_ids = ( - select(func.max(models.DatasetExampleRevision.id)) - .join( - models.DatasetExample, - models.DatasetExample.id == models.DatasetExampleRevision.dataset_example_id, - ) - .where( - and_( - models.DatasetExampleRevision.dataset_version_id - <= experiment.dataset_version_id, - models.DatasetExample.dataset_id == experiment.dataset_id, - ) - ) - .group_by(models.DatasetExampleRevision.dataset_example_id) - .scalar_subquery() + experiment, runs, revisions = await _get_experiment_runs_and_revisions( + session, experiment_rowid ) - runs_and_revisions = ( - ( - await session.execute( - select(models.ExperimentRun, models.DatasetExampleRevision) - .join( - models.DatasetExample, - models.DatasetExample.id == models.ExperimentRun.dataset_example_id, - ) - .join( - models.DatasetExampleRevision, - and_( - models.DatasetExample.id - == models.DatasetExampleRevision.dataset_example_id, - models.DatasetExampleRevision.id.in_(revision_ids), - models.DatasetExampleRevision.revision_kind != "DELETE", - ), - ) - .options( - joinedload(models.ExperimentRun.annotations), - ) - .where(models.ExperimentRun.experiment_id == experiment_rowid) - .order_by( - models.ExperimentRun.dataset_example_id, - models.ExperimentRun.repetition_number, - ) + records = [] + for run, revision in zip(runs, revisions): + annotations = [] + for annotation in run.annotations: + annotations.append( + { + "name": annotation.name, + "annotator_kind": annotation.annotator_kind, + "label": annotation.label, + "score": annotation.score, + "explanation": annotation.explanation, + "trace_id": annotation.trace_id, + "error": annotation.error, + "metadata": annotation.metadata_, + "start_time": annotation.start_time.isoformat(), + "end_time": annotation.end_time.isoformat(), + } ) - ) - .unique() - .all() + record = { + "example_id": str( + GlobalID(models.DatasetExample.__name__, str(run.dataset_example_id)) + ), + "repetition_number": run.repetition_number, + "input": revision.input, + "reference_output": revision.output, + "output": run.output["task_output"], + "error": run.error, + "latency_ms": run.latency_ms, + "start_time": run.start_time.isoformat(), + "end_time": run.end_time.isoformat(), + "trace_id": run.trace_id, + "prompt_token_count": run.prompt_token_count, + "completion_token_count": run.completion_token_count, + "annotations": annotations, + } + records.append(record) + + return Response( + content=json.dumps(records, ensure_ascii=False, indent=2), + headers={"content-disposition": f'attachment; filename="{experiment.name}.json"'}, + media_type="application/json", ) - if not runs_and_revisions: - raise HTTPException( - detail=f"Experiment with ID {experiment_globalid} has no runs", - status_code=HTTP_404_NOT_FOUND, - ) - records = [] - for run, revision in runs_and_revisions: - annotations = [] - for annotation in run.annotations: - annotations.append( - { - "name": annotation.name, - "annotator_kind": annotation.annotator_kind, - "label": annotation.label, - "score": annotation.score, - "explanation": annotation.explanation, - "trace_id": annotation.trace_id, - "error": annotation.error, - "metadata": annotation.metadata_, - "start_time": annotation.start_time.isoformat(), - "end_time": annotation.end_time.isoformat(), - } + + +@router.get( + "/experiments/{experiment_id}/csv", + operation_id="getExperimentCSV", + summary="Download experiment runs as a CSV file", + responses={**add_text_csv_content_to_responses(HTTP_200_OK)}, +) +async def get_experiment_csv( + request: Request, + experiment_id: str = Path(..., title="Experiment ID"), +) -> Response: + experiment_globalid = GlobalID.from_id(experiment_id) + try: + experiment_rowid = from_global_id_with_expected_type(experiment_globalid, "Experiment") + except ValueError: + raise HTTPException( + detail=f"Invalid experiment ID: {experiment_globalid}", + status_code=HTTP_422_UNPROCESSABLE_ENTITY, + ) + + async with request.app.state.db() as session: + experiment, runs, revisions = await _get_experiment_runs_and_revisions( + session, experiment_rowid + ) + records = [] + for run, revision in zip(runs, revisions): + serialized_run_output = ( + json.dumps(run.output["task_output"]) + if isinstance(run.output["task_output"], dict) + else run.output["task_output"] ) - record = { - "example_id": str( - GlobalID(models.DatasetExample.__name__, str(run.dataset_example_id)) - ), - "repetition_number": run.repetition_number, - "input": revision.input, - "reference_output": revision.output, - "output": run.output["task_output"], - "error": run.error, - "latency_ms": run.latency_ms, - "start_time": run.start_time.isoformat(), - "end_time": run.end_time.isoformat(), - "trace_id": run.trace_id, - "prompt_token_count": run.prompt_token_count, - "completion_token_count": run.completion_token_count, - "annotations": annotations, - } - records.append(record) - - response.headers["content-disposition"] = f'attachment; filename="{experiment.name}.json"' - return json.dumps(records, ensure_ascii=False, indent=2) + record = { + "example_id": str(GlobalID("DatasetExample", str(run.dataset_example_id))), + "repetition_number": run.repetition_number, + "input": json.dumps(revision.input), + "reference_output": json.dumps(revision.output), + "output": serialized_run_output, + "error": run.error, + "latency_ms": run.latency_ms, + "start_time": run.start_time.isoformat(), + "end_time": run.end_time.isoformat(), + "trace_id": run.trace_id, + "prompt_token_count": run.prompt_token_count, + "completion_token_count": run.completion_token_count, + } + for annotation in run.annotations: + prefix = f"annotation_{annotation.name}" + record.update( + { + f"{prefix}_label": annotation.label, + f"{prefix}_score": annotation.score, + f"{prefix}_explanation": annotation.explanation, + f"{prefix}_metadata": json.dumps(annotation.metadata_), + f"{prefix}_annotator_kind": annotation.annotator_kind, + f"{prefix}_trace_id": annotation.trace_id, + f"{prefix}_error": annotation.error, + f"{prefix}_start_time": annotation.start_time.isoformat(), + f"{prefix}_end_time": annotation.end_time.isoformat(), + } + ) + records.append(record) + + df = pd.DataFrame.from_records(records) + csv_content = df.to_csv(index=False).encode() + + return Response( + content=csv_content, + headers={ + "content-disposition": f'attachment; filename="{experiment.name}.csv"', + "content-type": "text/csv", + }, + ) diff --git a/tests/unit/server/api/routers/v1/test_experiments.py b/tests/unit/server/api/routers/v1/test_experiments.py index cb2180bc49..dcc27ae311 100644 --- a/tests/unit/server/api/routers/v1/test_experiments.py +++ b/tests/unit/server/api/routers/v1/test_experiments.py @@ -1,8 +1,10 @@ import datetime import json +from io import StringIO from typing import Any import httpx +import pandas as pd import pytest from httpx import HTTPStatusError from strawberry.relay import GlobalID @@ -85,6 +87,33 @@ async def test_experiments_api( assert run.pop("annotations") == [] assert not run + # get experiment CSV after runs but before evaluations + response = await httpx_client.get(f"/v1/experiments/{experiment_gid}/csv") + assert response.status_code == 200 + assert response.headers["content-type"] == "text/csv" + assert response.headers["content-disposition"].startswith('attachment; filename="') + + # Parse CSV content and verify the data + csv_content = response.text + df = pd.read_csv(StringIO(csv_content)) + assert len(df) == 1 + + # Convert first row to dictionary and verify all fields + row = df.iloc[0].to_dict() + assert isinstance(row.pop("example_id"), str) + assert row.pop("repetition_number") == 1 + assert json.loads(row.pop("input")) == {"in": "foo"} + assert json.loads(row.pop("reference_output")) == {"out": "bar"} + assert row.pop("output") == "some LLM application output" + assert row.pop("error") == "an error message, if applicable" + assert isinstance(row.pop("latency_ms"), float) + assert isinstance(row.pop("start_time"), str) + assert isinstance(row.pop("end_time"), str) + assert row.pop("trace_id") == "placeholder-id" + assert pd.isna(row.pop("prompt_token_count")) + assert pd.isna(row.pop("completion_token_count")) + assert not row + # experiment runs can be listed for evaluations experiment_runs = (await httpx_client.get(f"/v1/experiments/{experiment_gid}/runs")).json()[ "data" @@ -96,7 +125,7 @@ async def test_experiments_api( evaluation_payload = { "experiment_run_id": run_payload["id"], "trace_id": "placeholder-id", - "name": "some evaluation name", + "name": "some_evaluation_name", "annotator_kind": "LLM", "result": { "label": "some label", @@ -120,7 +149,7 @@ async def test_experiments_api( assert len(runs) == 1 assert len(runs[0]["annotations"]) == 1 annotation = runs[0]["annotations"][0] - assert annotation.pop("name") == "some evaluation name" + assert annotation.pop("name") == "some_evaluation_name" assert annotation.pop("label") == "some label" assert annotation.pop("score") == 0.5 assert annotation.pop("explanation") == "some explanation" @@ -132,6 +161,45 @@ async def test_experiments_api( assert isinstance(annotation.pop("end_time"), str) assert not annotation + # get experiment CSV after evaluations + response = await httpx_client.get(f"/v1/experiments/{experiment_gid}/csv") + assert response.status_code == 200 + assert response.headers["content-type"] == "text/csv" + assert response.headers["content-disposition"].startswith('attachment; filename="') + + # Parse CSV content and verify the data with annotations + csv_content = response.text + df = pd.read_csv(StringIO(csv_content)) + assert len(df) == 1 + + # Verify base fields + row = df.iloc[0].to_dict() + assert isinstance(row.pop("example_id"), str) + assert row.pop("repetition_number") == 1 + assert json.loads(row.pop("input")) == {"in": "foo"} + assert json.loads(row.pop("reference_output")) == {"out": "bar"} + assert row.pop("output") == "some LLM application output" + assert row.pop("error") == "an error message, if applicable" + assert isinstance(row.pop("latency_ms"), float) + assert isinstance(row.pop("start_time"), str) + assert isinstance(row.pop("end_time"), str) + assert row.pop("trace_id") == "placeholder-id" + assert pd.isna(row.pop("prompt_token_count")) + assert pd.isna(row.pop("completion_token_count")) + + # Verify annotation fields + annotation_prefix = "annotation_some_evaluation_name" + assert row.pop(f"{annotation_prefix}_label") == "some label" + assert row.pop(f"{annotation_prefix}_score") == 0.5 + assert row.pop(f"{annotation_prefix}_explanation") == "some explanation" + assert json.loads(row.pop(f"{annotation_prefix}_metadata")) == {} + assert row.pop(f"{annotation_prefix}_annotator_kind") == "LLM" + assert row.pop(f"{annotation_prefix}_trace_id") == "placeholder-id" + assert row.pop(f"{annotation_prefix}_error") == "an error message, if applicable" + assert isinstance(row.pop(f"{annotation_prefix}_start_time"), str) + assert isinstance(row.pop(f"{annotation_prefix}_end_time"), str) + assert not row + async def test_experiment_404s_with_missing_dataset( httpx_client: httpx.AsyncClient,