diff --git a/src/praisonai-agents/mcp-basic.py b/src/praisonai-agents/mcp-basic.py index f8b36514..a5120cce 100644 --- a/src/praisonai-agents/mcp-basic.py +++ b/src/praisonai-agents/mcp-basic.py @@ -14,6 +14,17 @@ # Function to get stock price using MCP async def get_stock_price(symbol): # Start server and connect client + """ + Retrieves the stock price asynchronously. + + This function connects to a server via an asynchronous client session, retrieves the + list of available tools, and searches for a tool capable of fetching stock prices. If a + suitable tool is found, it invokes the tool with the provided stock symbol and returns + its result; otherwise, it returns a message indicating that no appropriate tool was found. + + Args: + symbol: The stock ticker symbol to query. + """ async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # Initialize the connection @@ -45,7 +56,19 @@ async def get_stock_price(symbol): # Create a custom tool for the agent def stock_price_tool(symbol: str) -> str: - """Get the current stock price for a given symbol""" + """ + Retrieve and format the current stock price for a specified symbol. + + This synchronous wrapper runs the asynchronous `get_stock_price` function using + `asyncio.run` to obtain the latest stock price, and returns a formatted string + displaying the price for the given stock symbol. + + Args: + symbol: The stock ticker symbol for which the price is retrieved. + + Returns: + A string representing the stock price in a human-readable format. + """ # Run the async function to get the stock price result = asyncio.run(get_stock_price(symbol)) return f"Stock price for {symbol}: {result}" diff --git a/src/praisonai-agents/mcp-test.py b/src/praisonai-agents/mcp-test.py index fecc1f02..40cddae0 100644 --- a/src/praisonai-agents/mcp-test.py +++ b/src/praisonai-agents/mcp-test.py @@ -14,9 +14,12 @@ async def execute_tool(session: ClientSession, tool_name: str, params: Dict[str, Any]) -> Any: """ - Execute a tool with proper error handling and return the result. + Execute a tool asynchronously and return its result. - This follows the pattern shown in the article for reliable tool execution. + This function invokes the tool identified by the given name using the provided session + and parameters. If the tool execution completes successfully, its output is returned. + If an error occurs, the function prints an error message and returns a dictionary + containing the error detail. """ try: result = await session.call_tool(tool_name, arguments=params) @@ -27,6 +30,14 @@ async def execute_tool(session: ClientSession, tool_name: str, params: Dict[str, async def main(): # Start server and connect client + """ + Orchestrates an asynchronous interaction with the tool server. + + Establishes a connection using a standard I/O client and creates a client session, + initializes the session, and retrieves a list of available tools. For debugging, it + prints the available tools and, if present, selects the first tool to demonstrate an + example invocation by constructing parameters from its input schema and printing the response. + """ async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # Initialize the connection diff --git a/src/praisonai-agents/praisonaiagents/mcp/mcp.py b/src/praisonai-agents/praisonaiagents/mcp/mcp.py index ce11d7dd..222332e5 100644 --- a/src/praisonai-agents/praisonaiagents/mcp/mcp.py +++ b/src/praisonai-agents/praisonaiagents/mcp/mcp.py @@ -14,6 +14,17 @@ class MCPToolRunner(threading.Thread): """A dedicated thread for running MCP operations.""" def __init__(self, server_params): + """ + Initializes the MCPToolRunner and starts its asynchronous processing thread. + + Sets up queues for handling tool requests and results, configures an event to signal + when initialization is complete, and stores the server parameters for establishing + an MCP connection. The daemon thread is started immediately to manage asynchronous + tool operations. + + Args: + server_params: Configuration parameters for connecting to the MCP server. + """ super().__init__(daemon=True) self.server_params = server_params self.queue = queue.Queue() @@ -23,11 +34,25 @@ def __init__(self, server_params): self.start() def run(self): - """Main thread function that processes MCP requests.""" + """ + Starts the asynchronous loop to process MCP requests. + + This method serves as the entry point for the dedicated thread, + initializing an asyncio event loop to execute the internal + asynchronous request processing routine. + """ asyncio.run(self._run_async()) async def _run_async(self): - """Async entry point for MCP operations.""" + """ + Initializes the MCP session and processes tool requests asynchronously. + + Establishes a connection with the MCP server using the provided parameters, initializes + the session, and retrieves available tools before signaling readiness. Then, it enters + a loop to poll an internal request queue for tool invocation requests, calling the + appropriate tool for each request and enqueuing the result or error message. The loop + exits gracefully when a shutdown signal or cancellation is encountered. + """ try: # Set up MCP session async with stdio_client(self.server_params) as (read, write): @@ -69,7 +94,22 @@ async def _run_async(self): self.result_queue.put((False, f"MCP initialization error: {str(e)}")) def call_tool(self, tool_name, arguments): - """Call an MCP tool and wait for the result.""" + """ + Calls an MCP tool synchronously and returns its result. + + This method waits for MCP initialization (up to 30 seconds) before proceeding. If initialization + times out, it returns an error message. Otherwise, it enqueues the tool request with the specified + arguments and waits for the result. If the tool execution fails, an error message is returned; if + successful, the method extracts and returns the text content of the result when available, or a string + representation of the result. + + Parameters: + tool_name: The name of the MCP tool to execute. + arguments: The arguments to pass to the MCP tool. + + Returns: + A string containing the tool's output or an error message. + """ if not self.initialized.is_set(): self.initialized.wait(timeout=30) if not self.initialized.is_set(): @@ -91,7 +131,12 @@ def call_tool(self, tool_name, arguments): return str(result) def shutdown(self): - """Signal the thread to shut down.""" + """ + Signals the worker thread to terminate. + + Inserts a sentinel value (None) into the request queue, indicating that the thread should + cease processing further requests. + """ self.queue.put(None) @@ -130,15 +175,25 @@ class MCP: def __init__(self, command_or_string=None, args=None, *, command=None, **kwargs): """ - Initialize the MCP connection and get tools. + Initialize the MCP instance with command parsing and dynamic tool generation. + + This method configures the MCP connection by interpreting command inputs. It accepts + either a complete command string—which is split into a command and its arguments—or + separate command and argument values. An alternative parameter name 'command' is also + supported for backward compatibility. The method sets up the server parameters, starts + the MCP tool runner, and waits up to 30 seconds for initialization, printing a warning + if the process times out. Args: - command_or_string: Either: - - The command to run the MCP server (e.g., Python path) - - A complete command string (e.g., "/path/to/python /path/to/app.py") - args: Arguments to pass to the command (when command_or_string is the command) - command: Alternative parameter name for backward compatibility - **kwargs: Additional parameters for StdioServerParameters + command_or_string: A command executable or a complete command line to launch the + MCP server. When provided as a string without separate arguments, it is split + into the executable and its arguments. + args: A list of arguments to pass to the command when it is provided separately. + command: An alternative name for 'command_or_string' for backward compatibility. + **kwargs: Additional keyword arguments passed to StdioServerParameters. + + Raises: + ValueError: If the provided command string is empty after parsing. """ # Handle backward compatibility with named parameter 'command' if command_or_string is None and command is not None: @@ -174,10 +229,14 @@ def __init__(self, command_or_string=None, args=None, *, command=None, **kwargs) def _generate_tool_functions(self) -> List[Callable]: """ - Generate functions for each MCP tool. + Generates callable wrappers for each available MCP tool. + + This method iterates over the tools provided by the MCP runner and creates a wrapper + function for each using the _create_tool_wrapper method. The returned functions can be + invoked directly to execute the corresponding MCP tool with the appropriate input schema. Returns: - List[Callable]: Functions that can be used as tools + List[Callable]: A list of callable wrappers for MCP tools. """ tool_functions = [] @@ -188,7 +247,25 @@ def _generate_tool_functions(self) -> List[Callable]: return tool_functions def _create_tool_wrapper(self, tool): - """Create a wrapper function for an MCP tool.""" + """ + Creates a dynamic wrapper for an MCP tool. + + This function builds a callable that conforms to the tool's interface as defined + by its input schema. It extracts parameter names, types, and required status from + the tool's schema and constructs a wrapper with a matching signature and + documentation. When invoked, the wrapper maps positional and keyword arguments + to the expected parameters and calls the tool via the runner. + + Parameters: + tool: An MCP tool object with attributes 'name', 'description', and + 'inputSchema'. The inputSchema should include a "properties" dictionary + that defines parameter types and an optional "required" list for mandatory + parameters. + + Returns: + A callable that wraps the tool, allowing it to be invoked with a signature + that reflects its defined input parameters. + """ # Determine parameter names from the schema param_names = [] param_annotations = {} @@ -235,6 +312,11 @@ def _create_tool_wrapper(self, tool): # Create function template to be properly decorated def template_function(*args, **kwargs): + """ + Template function that accepts arbitrary arguments. + + This placeholder function is intended for future extension and currently does not perform any operation; it always returns None. + """ return None # Create a proper function with the correct signature @@ -248,6 +330,20 @@ def template_function(*args, **kwargs): @wraps(template_function) def wrapper(*args, **kwargs): # Map positional args to parameter names + """ + Invokes a tool using combined positional and keyword arguments. + + This wrapper maps positional arguments to their corresponding expected + parameter names, merges them with any keyword arguments, and delegates + the call to the tool via the runner using the tool's name. + + Args: + *args: Positional values for the tool's parameters. + **kwargs: Keyword arguments for the tool. + + Returns: + The result of executing the tool. + """ all_args = {} for i, arg in enumerate(args): if i < len(param_names): @@ -273,6 +369,11 @@ def __iter__(self) -> Iterable[Callable]: return iter(self._tools) def __del__(self): - """Clean up resources when the object is garbage collected.""" + """ + Clean up resources when the MCP instance is garbage collected. + + If the instance has an associated runner, its shutdown method is called to halt + ongoing operations and release allocated resources. + """ if hasattr(self, 'runner'): self.runner.shutdown() \ No newline at end of file