Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add MCPToolkitManager to manage multiple MCPToolkits. #1817

Merged
merged 10 commits into from
Mar 12, 2025
3 changes: 2 additions & 1 deletion camel/toolkits/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
from .excel_toolkit import ExcelToolkit
from .video_analysis_toolkit import VideoAnalysisToolkit
from .image_analysis_toolkit import ImageAnalysisToolkit
from .mcp_toolkit import MCPToolkit
from .mcp_toolkit import MCPToolkit, MCPToolkitManager
from .web_toolkit import WebToolkit
from .file_write_toolkit import FileWriteToolkit
from .terminal_toolkit import TerminalToolkit
Expand Down Expand Up @@ -98,6 +98,7 @@
'SymPyToolkit',
'MinerUToolkit',
'MCPToolkit',
'MCPToolkitManager',
'AudioAnalysisToolkit',
'ExcelToolkit',
'VideoAnalysisToolkit',
Expand Down
193 changes: 192 additions & 1 deletion camel/toolkits/mcp_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import inspect
import json
import os
from contextlib import AsyncExitStack, asynccontextmanager
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Callable,
Dict,
List,
Expand Down Expand Up @@ -236,6 +239,27 @@ async def dynamic_function(**kwargs):

return dynamic_function

def _build_tool_schema(self, mcp_tool: "Tool") -> Dict[str, Any]:
input_schema = mcp_tool.inputSchema
properties = input_schema.get("properties", {})
required = input_schema.get("required", [])

parameters = {
"type": "object",
"properties": properties,
"required": required,
}

return {
"type": "function",
"function": {
"name": mcp_tool.name,
"description": mcp_tool.description
or "No description provided.",
"parameters": parameters,
},
}

def get_tools(self) -> List[FunctionTool]:
r"""Returns a list of FunctionTool objects representing the
functions in the toolkit. Each function is dynamically generated
Expand All @@ -246,6 +270,173 @@ def get_tools(self) -> List[FunctionTool]:
representing the functions in the toolkit.
"""
return [
FunctionTool(self.generate_function_from_mcp_tool(mcp_tool))
FunctionTool(
self.generate_function_from_mcp_tool(mcp_tool),
openai_tool_schema=self._build_tool_schema(mcp_tool),
)
for mcp_tool in self._mcp_tools
]


class MCPToolkitManager:
r"""MCPToolkitManager provides a unified interface for managing multiple
MCPToolkit instances and their connections.

This class handles the lifecycle of multiple MCP tool connections and
offers a centralized configuration mechanism for both local and remote
MCP services.

Attributes:
toolkits (List[MCPToolkit]): List of MCPToolkit instances to manage.
"""

def __init__(self, toolkits: List[MCPToolkit]):
self.toolkits = toolkits
self._exit_stack: Optional[AsyncExitStack] = None
self._connected = False

@staticmethod
def from_config(config_path: str) -> "MCPToolkitManager":
r"""Creates an MCPToolkitManager from a JSON config file.

The configuration file should define local MCP servers and/or remote
MCP web servers with their respective parameters.

Args:
config_path (str): Path to the JSON configuration file.

Returns:
MCPToolkitManager: An initialized toolkit manager with configured
MCPToolkit instances.

Raises:
FileNotFoundError: If the config file doesn't exist.
json.JSONDecodeError: If the config file contains invalid JSON.
ValueError: If the config is missing required fields or has invalid
structure.

Example:
Example JSON configuration format:

```json
{
"mcpServers": {
"filesystem": {
"command": "mcp-filesystem-server",
"args": ["/Users/user/Desktop", "/Users/user/Downloads"]
},
},
"mcpWebServers": {
"weather": {
"url": "https://example-api.ngrok-free.app/sse"
}
}
}
```

Each entry under "mcpServers" requires at least a "command" field.
Each entry under "mcpWebServers" requires a "url" field.
Optional fields include "args", "env", and "timeout".
"""
try:
with open(config_path, "r", encoding="utf-8") as f:
try:
data = json.load(f)
except json.JSONDecodeError as e:
raise json.JSONDecodeError(
f"Invalid JSON in config file '{config_path}': {e!s}",
e.doc,
e.pos,
) from e
except FileNotFoundError:
raise FileNotFoundError(f"Config file not found: '{config_path}'")

all_toolkits = []

# Process local MCP servers
mcp_servers = data.get("mcpServers", {})
if not isinstance(mcp_servers, dict):
raise ValueError("'mcpServers' must be a dictionary")

for name, cfg in mcp_servers.items():
if not isinstance(cfg, dict):
raise ValueError(
f"Configuration for server '{name}' must be a dictionary"
)

if "command" not in cfg:
raise ValueError(
f"Missing required 'command' field for server '{name}'"
)

toolkit = MCPToolkit(
command_or_url=cfg["command"],
args=cfg.get("args", []),
env={**os.environ, **cfg.get("env", {})},
timeout=cfg.get("timeout", None),
)
all_toolkits.append(toolkit)

# Process remote MCP web servers
mcp_web_servers = data.get("mcpWebServers", {})
if not isinstance(mcp_web_servers, dict):
raise ValueError("'mcpWebServers' must be a dictionary")

for name, cfg in mcp_web_servers.items():
if not isinstance(cfg, dict):
raise ValueError(
f"Configuration for web server '{name}' must"
"be a dictionary"
)

if "url" not in cfg:
raise ValueError(
f"Missing required 'url' field for web server '{name}'"
)

toolkit = MCPToolkit(
command_or_url=cfg["url"],
timeout=cfg.get("timeout", None),
)
all_toolkits.append(toolkit)

return MCPToolkitManager(all_toolkits)

@asynccontextmanager
async def connection(self) -> AsyncGenerator["MCPToolkitManager", None]:
r"""Async context manager that simultaneously establishes connections
to all managed MCPToolkit instances.

Yields:
MCPToolkitManager: Self with all toolkits connected.
"""
self._exit_stack = AsyncExitStack()
try:
# Sequentially connect to each toolkit
for tk in self.toolkits:
await self._exit_stack.enter_async_context(tk.connection())
self._connected = True
yield self
finally:
self._connected = False
await self._exit_stack.aclose()
self._exit_stack = None

def is_connected(self) -> bool:
r"""Checks if all the managed toolkits are connected.

Returns:
bool: True if connected, False otherwise.
"""
return self._connected

def get_all_tools(self) -> List[FunctionTool]:
r"""Aggregates all tools from the managed MCPToolkit instances.

Returns:
List[FunctionTool]: Combined list of all available function tools.
"""
all_tools = []
for tk in self.toolkits:
all_tools.extend(tk.get_tools())
return all_tools
77 changes: 77 additions & 0 deletions examples/toolkits/mcp/mcp_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
"""MCP Server Example

This example demonstrates how to use the MCP (Managed Code Processing) server
with CAMEL agents for file operations.

Setup:
1. Install Node.js and npm

2. Install MCP filesystem server globally:
```bash
npm install -g @modelcontextprotocol/server-filesystem
```

Usage:
1. Run this script to start an MCP filesystem server
2. The server will only operate within the specified directory
3. All paths in responses will be relative to maintain privacy
"""

import asyncio
from pathlib import Path

from camel.agents import ChatAgent
from camel.models import ModelFactory
from camel.toolkits import MCPToolkitManager
from camel.types import ModelPlatformType, ModelType


async def main():
config_path = Path(__file__).parent / "mcp_servers_config.json"
manager = MCPToolkitManager.from_config(str(config_path))

async with manager.connection():
tools = manager.get_all_tools()
sys_msg = "You are a helpful assistant"

model = ModelFactory.create(
model_platform=ModelPlatformType.DEFAULT,
model_type=ModelType.DEFAULT,
)
camel_agent = ChatAgent(
system_message=sys_msg,
model=model,
tools=tools,
)
user_msg = "List all README files in the project, using relative paths"
response = await camel_agent.astep(user_msg)
print(response.msgs[0].content)
print(response.info['tool_calls'])


if __name__ == "__main__":
asyncio.run(main())
'''
===============================================================================
Here are all the README files in the project, listed with their relative paths:

- `./README.md`
- `./apps/agents/README.md`
- `./apps/data_explorer/README.md`
- `./docs/README.md`
- `./.pytest_cache/README.md`
===============================================================================
'''
13 changes: 13 additions & 0 deletions examples/toolkits/mcp/mcp_servers_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"mcpServers": {
"filesystem": {
"command": "npx",
"args": [
"-y",
"@modelcontextprotocol/[email protected]",
"."
]
}
},
"mcpWebServers": {}
}
Loading
Loading