Skip to content

Commit afcb175

Browse files
YingChen1996Ying Chen
and
Ying Chen
authored
Avoid pf and pfs version mismatch in Python scenario. (#2138)
# Description - Start different pfs service for each python envionrment - pfs service stop itself if no request comming in certain time: e.g. 1 hour. - add pfs restart logic if version mismatch ![image](https://github.com/microsoft/promptflow/assets/26239730/7840e52b-08f3-4c85-a123-530afb13eb52) ![image](https://github.com/microsoft/promptflow/assets/26239730/3d200b81-c19b-4648-8f6a-497ea5a5ff59) Goal restart pfs if version mismatch different port file for different python Non goal in this pr handle concurrent create in same env monitor stop by waitress pid Please add an informative description that covers that changes made by the pull request and link all relevant issues. # All Promptflow Contribution checklist: - [ ] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [ ] Title of the pull request is clear and informative. - [ ] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [ ] Pull request includes test coverage for the included changes. --------- Co-authored-by: Ying Chen <[email protected]>
1 parent 08d9e70 commit afcb175

File tree

5 files changed

+129
-28
lines changed

5 files changed

+129
-28
lines changed

src/promptflow/promptflow/_sdk/_constants.py

+3
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,11 @@ def _prepare_home_dir() -> Path:
7171
PROMPT_FLOW_RUNS_DIR_NAME = ".runs"
7272
PROMPT_FLOW_EXP_DIR_NAME = ".exps"
7373
SERVICE_CONFIG_FILE = "pf.yaml"
74+
PF_SERVICE_PORT_DIT_NAME = "pfs"
7475
PF_SERVICE_PORT_FILE = "pfs.port"
7576
PF_SERVICE_LOG_FILE = "pfs.log"
77+
PF_SERVICE_HOUR_TIMEOUT = 1
78+
PF_SERVICE_MONITOR_SECOND = 60
7679
PF_TRACE_CONTEXT = "PF_TRACE_CONTEXT"
7780
PF_SERVICE_DEBUG = "PF_SERVICE_DEBUG"
7881

src/promptflow/promptflow/_sdk/_service/app.py

+48-8
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# ---------------------------------------------------------
44
import logging
5+
import sys
56
import time
7+
from datetime import datetime, timedelta
68
from logging.handlers import RotatingFileHandler
79

810
from flask import Blueprint, Flask, g, jsonify, request
911
from flask_cors import CORS
1012
from werkzeug.exceptions import HTTPException
1113

12-
from promptflow._sdk._constants import HOME_PROMPT_FLOW_DIR, PF_SERVICE_LOG_FILE
14+
from promptflow._sdk._constants import PF_SERVICE_HOUR_TIMEOUT, PF_SERVICE_LOG_FILE, PF_SERVICE_MONITOR_SECOND
1315
from promptflow._sdk._service import Api
1416
from promptflow._sdk._service.apis.collector import trace_collector
1517
from promptflow._sdk._service.apis.connection import api as connection_api
@@ -18,8 +20,14 @@
1820
from promptflow._sdk._service.apis.span import api as span_api
1921
from promptflow._sdk._service.apis.telemetry import api as telemetry_api
2022
from promptflow._sdk._service.apis.ui import api as ui_api
21-
from promptflow._sdk._service.utils.utils import FormattedException
22-
from promptflow._sdk._utils import get_promptflow_sdk_version, overwrite_null_std_logger, read_write_by_user
23+
from promptflow._sdk._service.utils.utils import (
24+
FormattedException,
25+
get_current_env_pfs_file,
26+
get_port_from_config,
27+
kill_exist_service,
28+
)
29+
from promptflow._sdk._utils import get_promptflow_sdk_version, overwrite_null_std_logger
30+
from promptflow._utils.thread_utils import ThreadWithContextVars
2331

2432
overwrite_null_std_logger()
2533

@@ -57,13 +65,14 @@ def create_app():
5765

5866
# Enable log
5967
app.logger.setLevel(logging.INFO)
60-
log_file = HOME_PROMPT_FLOW_DIR / PF_SERVICE_LOG_FILE
61-
log_file.touch(mode=read_write_by_user(), exist_ok=True)
68+
# each env will have its own log file
69+
log_file = get_current_env_pfs_file(PF_SERVICE_LOG_FILE)
6270
# Create a rotating file handler with a max size of 1 MB and keeping up to 1 backup files
6371
handler = RotatingFileHandler(filename=log_file, maxBytes=1_000_000, backupCount=1)
6472
formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] - %(message)s")
6573
handler.setFormatter(formatter)
66-
app.logger.addHandler(handler)
74+
# Set app logger to the only one RotatingFileHandler to avoid duplicate logs
75+
app.logger.handlers = [handler]
6776

6877
# Basic error handler
6978
@api.errorhandler(Exception)
@@ -82,9 +91,19 @@ def handle_exception(e):
8291

8392
@app.before_request
8493
def log_before_request_info():
94+
app.config["last_request_time"] = datetime.now()
8595
g.start = time.perf_counter()
86-
app.logger.debug("Headers: %s", request.headers)
87-
app.logger.debug("Body: %s", request.get_data())
96+
if "/v1.0/Connections" in request.url:
97+
request_body = "Request body not recorded for Connections API"
98+
else:
99+
request_body = request.get_data()
100+
101+
app.logger.debug(
102+
"Last request time: %s, Headers: %s, Body: %s",
103+
app.config["last_request_time"],
104+
request.headers,
105+
request_body,
106+
)
88107

89108
@app.after_request
90109
def log_after_request_info(response):
@@ -94,4 +113,25 @@ def log_after_request_info(response):
94113
)
95114
return response
96115

116+
# Start a monitor process using detach mode. It will stop pfs service if no request to pfs service in 1h in
117+
# python scenario. For C# scenario, pfs will live until the process is killed manually.
118+
def monitor_request():
119+
while True:
120+
time.sleep(PF_SERVICE_MONITOR_SECOND)
121+
# For python scenario, since we start waitress in cli, there will be two app. The one used to log in
122+
# the parent process will have no "last_request_time" in app.config since the app doesn't run.
123+
app.logger.info(f"Promptflow service is running, version: {get_promptflow_sdk_version()}")
124+
if "last_request_time" in app.config and datetime.now() - app.config["last_request_time"] > timedelta(
125+
hours=PF_SERVICE_HOUR_TIMEOUT
126+
):
127+
# Todo: check if we have any not complete work? like persist all traces.
128+
port = get_port_from_config()
129+
if port:
130+
app.logger.info(f"Try auto stop pfs service in port {port} since no request to app within 1h")
131+
kill_exist_service(port)
132+
break
133+
134+
if not sys.executable.endswith("pfcli.exe"):
135+
monitor_thread = ThreadWithContextVars(target=monitor_request, daemon=True)
136+
monitor_thread.start()
97137
return app, api

src/promptflow/promptflow/_sdk/_service/entry.py

+12-4
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@
2222
get_started_service_info,
2323
is_port_in_use,
2424
kill_exist_service,
25+
kill_service_get_from_original_port_file,
2526
)
2627
from promptflow._sdk._utils import get_promptflow_sdk_version, print_pf_version
2728
from promptflow.exceptions import UserErrorException
2829

30+
app, _ = create_app()
31+
2932

3033
def get_app(environ, start_response):
3134
app, _ = create_app()
@@ -80,13 +83,16 @@ def add_show_status_action(subparsers):
8083

8184
def start_service(args):
8285
# User Agent will be set based on header in request, so not set globally here.
86+
global app
8387
os.environ[PF_NO_INTERACTIVE_LOGIN] = "true"
8488
port = args.port
85-
app, _ = create_app()
8689
if args.debug:
8790
os.environ[PF_SERVICE_DEBUG] = "true"
8891
app.logger.setLevel(logging.DEBUG)
8992

93+
# add this logic to stop pfs service which is start in the original port file.
94+
kill_service_get_from_original_port_file()
95+
9096
def validate_port(port, force_start):
9197
if is_port_in_use(port):
9298
if force_start:
@@ -111,14 +117,16 @@ def validate_port(port, force_start):
111117
)
112118
waitress.serve(app, host="127.0.0.1", port=port)
113119
else:
120+
# Note: in this scenario, we will have two app, one is the parent process created to log, one is created in
121+
# the detached child process.
114122
# Set host to localhost, only allow request from localhost.
115123
cmd = ["waitress-serve", f"--listen=127.0.0.1:{port}", "promptflow._sdk._service.entry:get_app"]
116124
# Start a pfs process using detach mode. It will start a new process and create a new app. So we use environment
117125
# variable to pass the debug mode, since it will inherit parent process environment variable.
118126
if platform.system() == "Windows":
119-
subprocess.Popen(cmd, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
127+
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
120128
else:
121-
subprocess.Popen(cmd, start_new_session=True)
129+
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, start_new_session=True)
122130
is_healthy = check_pfs_service_status(port)
123131
if is_healthy:
124132
app.logger.info(
@@ -129,7 +137,7 @@ def validate_port(port, force_start):
129137

130138

131139
def stop_service():
132-
app, _ = create_app()
140+
global app
133141
port = get_port_from_config()
134142
if port is not None:
135143
kill_exist_service(port)

src/promptflow/promptflow/_sdk/_service/utils/utils.py

+59-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# ---------------------------------------------------------
44
import getpass
5+
import hashlib
6+
import os
7+
import re
58
import socket
9+
import sys
610
import time
711
from dataclasses import InitVar, dataclass, field
812
from datetime import datetime
@@ -12,9 +16,14 @@
1216
import requests
1317
from flask import abort, make_response, request
1418

15-
from promptflow._sdk._constants import DEFAULT_ENCODING, HOME_PROMPT_FLOW_DIR, PF_SERVICE_PORT_FILE
19+
from promptflow._sdk._constants import (
20+
DEFAULT_ENCODING,
21+
HOME_PROMPT_FLOW_DIR,
22+
PF_SERVICE_PORT_DIT_NAME,
23+
PF_SERVICE_PORT_FILE,
24+
)
1625
from promptflow._sdk._errors import ConnectionNotFoundError, RunNotFoundError
17-
from promptflow._sdk._utils import read_write_by_user
26+
from promptflow._sdk._utils import get_promptflow_sdk_version, read_write_by_user
1827
from promptflow._utils.logger_utils import get_cli_sdk_logger
1928
from promptflow._utils.yaml_utils import dump_yaml, load_yaml
2029
from promptflow._version import VERSION
@@ -35,13 +44,27 @@ def wrapper(*args, **kwargs):
3544
return wrapper
3645

3746

47+
def get_current_env_pfs_file(file_name):
48+
executable_path = sys.executable
49+
dir_name = os.path.basename(os.path.dirname(executable_path))
50+
# Hash the executable path
51+
hash_object = hashlib.sha1(executable_path.encode())
52+
hex_dig = hash_object.hexdigest()
53+
port_file_name = f"{dir_name}_{hex_dig}_{file_name}"
54+
port_file_dir = HOME_PROMPT_FLOW_DIR / PF_SERVICE_PORT_DIT_NAME
55+
port_file_dir.mkdir(parents=True, exist_ok=True)
56+
port_file_path = port_file_dir / port_file_name
57+
port_file_path.touch(mode=read_write_by_user(), exist_ok=True)
58+
return port_file_path
59+
60+
3861
def get_port_from_config(create_if_not_exists=False):
39-
(HOME_PROMPT_FLOW_DIR / PF_SERVICE_PORT_FILE).touch(mode=read_write_by_user(), exist_ok=True)
40-
with open(HOME_PROMPT_FLOW_DIR / PF_SERVICE_PORT_FILE, "r", encoding=DEFAULT_ENCODING) as f:
62+
port_file_path = get_current_env_pfs_file(PF_SERVICE_PORT_FILE)
63+
with open(port_file_path, "r", encoding=DEFAULT_ENCODING) as f:
4164
service_config = load_yaml(f) or {}
4265
port = service_config.get("service", {}).get("port", None)
4366
if not port and create_if_not_exists:
44-
with open(HOME_PROMPT_FLOW_DIR / PF_SERVICE_PORT_FILE, "w", encoding=DEFAULT_ENCODING) as f:
67+
with open(port_file_path, "w", encoding=DEFAULT_ENCODING) as f:
4568
# Set random port to ~/.promptflow/pf.yaml
4669
port = get_random_port()
4770
service_config["service"] = service_config.get("service", {})
@@ -50,12 +73,25 @@ def get_port_from_config(create_if_not_exists=False):
5073
return port
5174

5275

76+
def kill_service_get_from_original_port_file():
77+
if (HOME_PROMPT_FLOW_DIR / PF_SERVICE_PORT_FILE).exists():
78+
with open(HOME_PROMPT_FLOW_DIR / PF_SERVICE_PORT_FILE, "r", encoding=DEFAULT_ENCODING) as f:
79+
service_config = load_yaml(f) or {}
80+
port = service_config.get("service", {}).get("port", None)
81+
if port:
82+
if is_port_in_use(port):
83+
logger.debug(f"Kill the deprecated port {port} got from service key in thr pfs.port file.")
84+
kill_exist_service(port)
85+
# delete original .promptflow/pfs.port
86+
(HOME_PROMPT_FLOW_DIR / PF_SERVICE_PORT_FILE).unlink()
87+
88+
5389
def dump_port_to_config(port):
54-
# Set port to ~/.promptflow/pf.port, if already have a port in file , will overwrite it.
55-
(HOME_PROMPT_FLOW_DIR / PF_SERVICE_PORT_FILE).touch(mode=read_write_by_user(), exist_ok=True)
56-
with open(HOME_PROMPT_FLOW_DIR / PF_SERVICE_PORT_FILE, "r", encoding=DEFAULT_ENCODING) as f:
90+
# Set port to ~/.promptflow/pfs/**_pf.port, if already have a port in file , will overwrite it.
91+
port_file_path = get_current_env_pfs_file(PF_SERVICE_PORT_FILE)
92+
with open(port_file_path, "r", encoding=DEFAULT_ENCODING) as f:
5793
service_config = load_yaml(f) or {}
58-
with open(HOME_PROMPT_FLOW_DIR / PF_SERVICE_PORT_FILE, "w", encoding=DEFAULT_ENCODING) as f:
94+
with open(port_file_path, "w", encoding=DEFAULT_ENCODING) as f:
5995
service_config["service"] = service_config.get("service", {})
6096
service_config["service"]["port"] = port
6197
dump_yaml(service_config, f)
@@ -106,12 +142,24 @@ def make_response_no_content():
106142

107143

108144
def is_pfs_service_healthy(pfs_port) -> bool:
109-
"""Check if pfs service is running."""
145+
"""Check if pfs service is running and pfs version matches pf version."""
110146
try:
111147
response = requests.get("http://localhost:{}/heartbeat".format(pfs_port))
112148
if response.status_code == 200:
113149
logger.debug(f"Promptflow service is already running on port {pfs_port}, {response.text}")
114-
return True
150+
match = re.search(r'"promptflow":"(.*?)"', response.text)
151+
if match:
152+
version = match.group(1)
153+
is_healthy = version == get_promptflow_sdk_version()
154+
if not is_healthy:
155+
logger.warning(
156+
f"Promptflow service is running on port {pfs_port}, but the version is not the same as "
157+
f"promptflow sdk version {get_promptflow_sdk_version()}. The service version is {version}."
158+
)
159+
else:
160+
is_healthy = False
161+
logger.warning("/heartbeat response doesn't contain current pfs version.")
162+
return is_healthy
115163
except Exception: # pylint: disable=broad-except
116164
pass
117165
logger.warning(

src/promptflow/tests/sdk_pfs_test/e2etests/test_cli.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ def _test_start_service(self, port=None, force=False):
3030
command = f"{command} --force"
3131
start_pfs = subprocess.Popen(command, shell=True)
3232
# Wait for service to be started
33-
sleep(5)
33+
start_pfs.wait()
3434
assert self._is_service_healthy()
35-
start_pfs.terminate()
36-
start_pfs.wait(10)
35+
stop_command = "pfs stop"
36+
stop_pfs = subprocess.Popen(stop_command, shell=True)
37+
stop_pfs.wait()
3738

3839
def _is_service_healthy(self, port=None):
3940
port = port or get_port_from_config()
@@ -48,10 +49,11 @@ def test_start_service(self):
4849
random_port = get_random_port()
4950
self._test_start_service(port=random_port, force=True)
5051

51-
# Force start pfs
52+
# start pfs
5253
start_pfs = subprocess.Popen("pfs start", shell=True)
5354
# Wait for service to be started
54-
sleep(5)
55+
start_pfs.wait()
56+
assert self._is_service_healthy()
5557
self._test_start_service(force=True)
5658
# previous pfs is killed
5759
assert start_pfs.poll() is not None

0 commit comments

Comments
 (0)