diff --git a/src/promptflow/CHANGELOG.md b/src/promptflow/CHANGELOG.md index ebce9783d26..765eb500c52 100644 --- a/src/promptflow/CHANGELOG.md +++ b/src/promptflow/CHANGELOG.md @@ -1,5 +1,11 @@ # Release History +## 0.1.0b6 (Upcoming) + +### Features Added + +- Add token metrics in run properties + ## 0.1.0b5 (2023.09.08) ### Features Added diff --git a/src/promptflow/promptflow/_sdk/_constants.py b/src/promptflow/promptflow/_sdk/_constants.py index 1f080e2fcc8..161dacfcdad 100644 --- a/src/promptflow/promptflow/_sdk/_constants.py +++ b/src/promptflow/promptflow/_sdk/_constants.py @@ -140,6 +140,7 @@ class FlowRunProperties: OUTPUT_PATH = "output_path" NODE_VARIANT = "node_variant" RUN = "run" + SYSTEM_METRICS = "system_metrics" class CommonYamlFields: diff --git a/src/promptflow/promptflow/_sdk/_orm/run_info.py b/src/promptflow/promptflow/_sdk/_orm/run_info.py index 53efdfdddcb..67aef235776 100644 --- a/src/promptflow/promptflow/_sdk/_orm/run_info.py +++ b/src/promptflow/promptflow/_sdk/_orm/run_info.py @@ -11,7 +11,12 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import declarative_base -from promptflow._sdk._constants import RUN_INFO_CREATED_ON_INDEX_NAME, RUN_INFO_TABLENAME, ListViewType +from promptflow._sdk._constants import ( + RUN_INFO_CREATED_ON_INDEX_NAME, + RUN_INFO_TABLENAME, + FlowRunProperties, + ListViewType, +) from promptflow._sdk._errors import RunExistsError, RunNotFoundError from .retry import sqlite_retry @@ -89,6 +94,7 @@ def update( tags: Optional[Dict[str, str]] = None, start_time: Optional[Union[str, datetime.datetime]] = None, end_time: Optional[Union[str, datetime.datetime]] = None, + system_metrics: Optional[Dict[str, int]] = None, ) -> None: update_dict = {} if status is not None: @@ -110,7 +116,20 @@ def update( self.end_time = end_time if isinstance(end_time, str) else end_time.isoformat() update_dict["end_time"] = self.end_time with mgmt_db_session() as session: - session.query(RunInfo).filter(RunInfo.name == self.name).update(update_dict) + # if not update system metrics, we can directly update the row; + # otherwise, we need to get properties first, update the dict and finally update the row + if system_metrics is None: + session.query(RunInfo).filter(RunInfo.name == self.name).update(update_dict) + else: + # with high concurrency on same row, we may lose the earlier commit + # we regard it acceptable as it should be an edge case to update properties + # on same row with high concurrency; + # if it's a concern, we can move those properties to an extra column + run_info = session.query(RunInfo).filter(RunInfo.name == self.name).first() + props = json.loads(run_info.properties) + props[FlowRunProperties.SYSTEM_METRICS] = system_metrics.copy() + update_dict["properties"] = json.dumps(props) + session.query(RunInfo).filter(RunInfo.name == self.name).update(update_dict) session.commit() @staticmethod diff --git a/src/promptflow/promptflow/_sdk/entities/_run.py b/src/promptflow/promptflow/_sdk/entities/_run.py index 6094afd8945..602e1051cc0 100644 --- a/src/promptflow/promptflow/_sdk/entities/_run.py +++ b/src/promptflow/promptflow/_sdk/entities/_run.py @@ -194,6 +194,7 @@ def _from_orm_object(cls, obj: ORMRun) -> "Run": end_time=datetime.datetime.fromisoformat(str(obj.end_time)) if obj.end_time else None, status=str(obj.status), data=Path(obj.data).resolve().absolute().as_posix() if obj.data else None, + properties={FlowRunProperties.SYSTEM_METRICS: properties_json.get(FlowRunProperties.SYSTEM_METRICS, {})}, ) @classmethod diff --git a/src/promptflow/promptflow/_sdk/operations/_run_submitter.py b/src/promptflow/promptflow/_sdk/operations/_run_submitter.py index 35d9160277c..39eda5b976c 100644 --- a/src/promptflow/promptflow/_sdk/operations/_run_submitter.py +++ b/src/promptflow/promptflow/_sdk/operations/_run_submitter.py @@ -310,15 +310,17 @@ def _submit_bulk_run(self, flow: Flow, run: Run, local_storage: LocalStorageOper local_storage.dump_snapshot(flow) local_storage.dump_inputs(mapped_inputs) # result: outputs and metrics - # TODO: retrieve root run system metrics from executor return, we might store it in db local_storage.persist_result(bulk_result) - + # exceptions local_storage.dump_exception(exception=exception, bulk_results=bulk_result) + # system metrics: token related + system_metrics = bulk_result.get_openai_metrics() self.run_operations.update( name=run.name, status=status, end_time=datetime.datetime.now(), + system_metrics=system_metrics, ) def _resolve_data(self, run: Run): diff --git a/src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py b/src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py index 083cb1b1915..bbae2db33ae 100644 --- a/src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py +++ b/src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py @@ -7,7 +7,7 @@ from promptflow import PFClient from promptflow._constants import PROMPTFLOW_CONNECTIONS -from promptflow._sdk._constants import LocalStorageFilenames, RunStatus +from promptflow._sdk._constants import FlowRunProperties, LocalStorageFilenames, RunStatus from promptflow._sdk._errors import InvalidFlowError, RunExistsError, RunNotFoundError from promptflow._sdk._run_functions import create_yaml_run from promptflow._sdk._utils import _get_additional_includes @@ -680,3 +680,9 @@ def test_error_message_dump(self, pf): run_dict = run._to_dict() assert "error" in run_dict assert run_dict["error"] == exception + + def test_system_metrics_in_properties(self, pf) -> None: + run = create_run_against_multi_line_data(pf) + assert FlowRunProperties.SYSTEM_METRICS in run.properties + assert isinstance(run.properties[FlowRunProperties.SYSTEM_METRICS], dict) + assert "total_tokens" in run.properties[FlowRunProperties.SYSTEM_METRICS]