Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

📝 Add docstrings to develop #430

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion src/praisonai-agents/mcp-basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@
# Function to get stock price using MCP
async def get_stock_price(symbol):
# Start server and connect client
"""
Retrieves the stock price asynchronously.

This function connects to a server via an asynchronous client session, retrieves the
list of available tools, and searches for a tool capable of fetching stock prices. If a
suitable tool is found, it invokes the tool with the provided stock symbol and returns
its result; otherwise, it returns a message indicating that no appropriate tool was found.

Args:
symbol: The stock ticker symbol to query.
"""
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the connection
Expand Down Expand Up @@ -45,7 +56,19 @@ async def get_stock_price(symbol):

# Create a custom tool for the agent
def stock_price_tool(symbol: str) -> str:
"""Get the current stock price for a given symbol"""
"""
Retrieve and format the current stock price for a specified symbol.

This synchronous wrapper runs the asynchronous `get_stock_price` function using
`asyncio.run` to obtain the latest stock price, and returns a formatted string
displaying the price for the given stock symbol.

Args:
symbol: The stock ticker symbol for which the price is retrieved.

Returns:
A string representing the stock price in a human-readable format.
"""
# Run the async function to get the stock price
result = asyncio.run(get_stock_price(symbol))
return f"Stock price for {symbol}: {result}"
Expand Down
15 changes: 13 additions & 2 deletions src/praisonai-agents/mcp-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

async def execute_tool(session: ClientSession, tool_name: str, params: Dict[str, Any]) -> Any:
"""
Execute a tool with proper error handling and return the result.
Execute a tool asynchronously and return its result.

This follows the pattern shown in the article for reliable tool execution.
This function invokes the tool identified by the given name using the provided session
and parameters. If the tool execution completes successfully, its output is returned.
If an error occurs, the function prints an error message and returns a dictionary
containing the error detail.
"""
try:
result = await session.call_tool(tool_name, arguments=params)
Expand All @@ -27,6 +30,14 @@ async def execute_tool(session: ClientSession, tool_name: str, params: Dict[str,

async def main():
# Start server and connect client
"""
Orchestrates an asynchronous interaction with the tool server.

Establishes a connection using a standard I/O client and creates a client session,
initializes the session, and retrieves a list of available tools. For debugging, it
prints the available tools and, if present, selects the first tool to demonstrate an
example invocation by constructing parameters from its input schema and printing the response.
"""
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the connection
Expand Down
131 changes: 116 additions & 15 deletions src/praisonai-agents/praisonaiagents/mcp/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ class MCPToolRunner(threading.Thread):
"""A dedicated thread for running MCP operations."""

def __init__(self, server_params):
"""
Initializes the MCPToolRunner and starts its asynchronous processing thread.

Sets up queues for handling tool requests and results, configures an event to signal
when initialization is complete, and stores the server parameters for establishing
an MCP connection. The daemon thread is started immediately to manage asynchronous
tool operations.

Args:
server_params: Configuration parameters for connecting to the MCP server.
"""
super().__init__(daemon=True)
self.server_params = server_params
self.queue = queue.Queue()
Expand All @@ -23,11 +34,25 @@ def __init__(self, server_params):
self.start()

def run(self):
"""Main thread function that processes MCP requests."""
"""
Starts the asynchronous loop to process MCP requests.

This method serves as the entry point for the dedicated thread,
initializing an asyncio event loop to execute the internal
asynchronous request processing routine.
"""
asyncio.run(self._run_async())

async def _run_async(self):
"""Async entry point for MCP operations."""
"""
Initializes the MCP session and processes tool requests asynchronously.

Establishes a connection with the MCP server using the provided parameters, initializes
the session, and retrieves available tools before signaling readiness. Then, it enters
a loop to poll an internal request queue for tool invocation requests, calling the
appropriate tool for each request and enqueuing the result or error message. The loop
exits gracefully when a shutdown signal or cancellation is encountered.
"""
try:
# Set up MCP session
async with stdio_client(self.server_params) as (read, write):
Expand Down Expand Up @@ -69,7 +94,22 @@ async def _run_async(self):
self.result_queue.put((False, f"MCP initialization error: {str(e)}"))

def call_tool(self, tool_name, arguments):
"""Call an MCP tool and wait for the result."""
"""
Calls an MCP tool synchronously and returns its result.

This method waits for MCP initialization (up to 30 seconds) before proceeding. If initialization
times out, it returns an error message. Otherwise, it enqueues the tool request with the specified
arguments and waits for the result. If the tool execution fails, an error message is returned; if
successful, the method extracts and returns the text content of the result when available, or a string
representation of the result.

Parameters:
tool_name: The name of the MCP tool to execute.
arguments: The arguments to pass to the MCP tool.

Returns:
A string containing the tool's output or an error message.
"""
if not self.initialized.is_set():
self.initialized.wait(timeout=30)
if not self.initialized.is_set():
Expand All @@ -91,7 +131,12 @@ def call_tool(self, tool_name, arguments):
return str(result)

def shutdown(self):
"""Signal the thread to shut down."""
"""
Signals the worker thread to terminate.

Inserts a sentinel value (None) into the request queue, indicating that the thread should
cease processing further requests.
"""
self.queue.put(None)


Expand Down Expand Up @@ -130,15 +175,25 @@ class MCP:

def __init__(self, command_or_string=None, args=None, *, command=None, **kwargs):
"""
Initialize the MCP connection and get tools.
Initialize the MCP instance with command parsing and dynamic tool generation.

This method configures the MCP connection by interpreting command inputs. It accepts
either a complete command string—which is split into a command and its arguments—or
separate command and argument values. An alternative parameter name 'command' is also
supported for backward compatibility. The method sets up the server parameters, starts
the MCP tool runner, and waits up to 30 seconds for initialization, printing a warning
if the process times out.

Args:
command_or_string: Either:
- The command to run the MCP server (e.g., Python path)
- A complete command string (e.g., "/path/to/python /path/to/app.py")
args: Arguments to pass to the command (when command_or_string is the command)
command: Alternative parameter name for backward compatibility
**kwargs: Additional parameters for StdioServerParameters
command_or_string: A command executable or a complete command line to launch the
MCP server. When provided as a string without separate arguments, it is split
into the executable and its arguments.
args: A list of arguments to pass to the command when it is provided separately.
command: An alternative name for 'command_or_string' for backward compatibility.
**kwargs: Additional keyword arguments passed to StdioServerParameters.

Raises:
ValueError: If the provided command string is empty after parsing.
"""
# Handle backward compatibility with named parameter 'command'
if command_or_string is None and command is not None:
Expand Down Expand Up @@ -174,10 +229,14 @@ def __init__(self, command_or_string=None, args=None, *, command=None, **kwargs)

def _generate_tool_functions(self) -> List[Callable]:
"""
Generate functions for each MCP tool.
Generates callable wrappers for each available MCP tool.

This method iterates over the tools provided by the MCP runner and creates a wrapper
function for each using the _create_tool_wrapper method. The returned functions can be
invoked directly to execute the corresponding MCP tool with the appropriate input schema.

Returns:
List[Callable]: Functions that can be used as tools
List[Callable]: A list of callable wrappers for MCP tools.
"""
tool_functions = []

Expand All @@ -188,7 +247,25 @@ def _generate_tool_functions(self) -> List[Callable]:
return tool_functions

def _create_tool_wrapper(self, tool):
"""Create a wrapper function for an MCP tool."""
"""
Creates a dynamic wrapper for an MCP tool.

This function builds a callable that conforms to the tool's interface as defined
by its input schema. It extracts parameter names, types, and required status from
the tool's schema and constructs a wrapper with a matching signature and
documentation. When invoked, the wrapper maps positional and keyword arguments
to the expected parameters and calls the tool via the runner.

Parameters:
tool: An MCP tool object with attributes 'name', 'description', and
'inputSchema'. The inputSchema should include a "properties" dictionary
that defines parameter types and an optional "required" list for mandatory
parameters.

Returns:
A callable that wraps the tool, allowing it to be invoked with a signature
that reflects its defined input parameters.
"""
# Determine parameter names from the schema
param_names = []
param_annotations = {}
Expand Down Expand Up @@ -235,6 +312,11 @@ def _create_tool_wrapper(self, tool):

# Create function template to be properly decorated
def template_function(*args, **kwargs):
"""
Template function that accepts arbitrary arguments.

This placeholder function is intended for future extension and currently does not perform any operation; it always returns None.
"""
return None

# Create a proper function with the correct signature
Expand All @@ -248,6 +330,20 @@ def template_function(*args, **kwargs):
@wraps(template_function)
def wrapper(*args, **kwargs):
# Map positional args to parameter names
"""
Invokes a tool using combined positional and keyword arguments.

This wrapper maps positional arguments to their corresponding expected
parameter names, merges them with any keyword arguments, and delegates
the call to the tool via the runner using the tool's name.

Args:
*args: Positional values for the tool's parameters.
**kwargs: Keyword arguments for the tool.

Returns:
The result of executing the tool.
"""
all_args = {}
for i, arg in enumerate(args):
if i < len(param_names):
Expand All @@ -273,6 +369,11 @@ def __iter__(self) -> Iterable[Callable]:
return iter(self._tools)

def __del__(self):
"""Clean up resources when the object is garbage collected."""
"""
Clean up resources when the MCP instance is garbage collected.

If the instance has an associated runner, its shutdown method is called to halt
ongoing operations and release allocated resources.
"""
if hasattr(self, 'runner'):
self.runner.shutdown()
Loading