Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mcp/ai/cli] support different transport types #87

Merged
merged 9 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 168 additions & 51 deletions external/mcp/src/plugin.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,82 @@
import { ChatPrompt } from '@microsoft/spark.ai';
import { Readable, Writable } from 'stream';

import { IChatPrompt } from '@microsoft/spark.ai';
import { App, IActivityContext, IPlugin, IPluginEvents } from '@microsoft/spark.apps';
import { ConsoleLogger, EventEmitter, EventHandler, ILogger } from '@microsoft/spark.common';

import { ServerOptions } from '@modelcontextprotocol/sdk/server/index.js';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { Implementation, CallToolResult } from '@modelcontextprotocol/sdk/types.js';
import { CallToolResult } from '@modelcontextprotocol/sdk/types.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';

import { z } from 'zod';
import { jsonSchemaToZod } from 'json-schema-to-zod';

import { IConnection } from './connection';

export class MCPPlugin implements IPlugin {
/**
* MCP transport options for sse
*/
export type McpSSETransportOptions = {
/**
* the transport type
*/
readonly type: 'sse';

/**
* the url path
* @default /mcp
*/
readonly path?: string;
};

/**
* MCP transport options for stdio
*/
export type McpStdioTransportOptions = {
/**
* the transport type
*/
readonly type: 'stdio';

/**
* stdin to use
*/
readonly stdin?: Readable;

/**
* stdout to use
*/
readonly stdout?: Writable;
};

export type McpPluginOptions = ServerOptions & {
/**
* the MCP server name
* @default mcp
*/
readonly name?: string;

/**
* the MCP server version
* @default 0.0.0
*/
readonly version?: string;

/**
* the transport or transport options
* @default sse
*/
readonly transport?: McpSSETransportOptions | McpStdioTransportOptions;
};

/**
* High-level MCP server that provides a simpler API for working with resources, tools, and prompts.
* For advanced usage (like sending notifications or setting custom request handlers),
* use the underlying Server instance available via the server property.
*/
export class McpPlugin implements IPlugin {
readonly name: string;
readonly version: string;

Expand All @@ -24,64 +88,81 @@ export class MCPPlugin implements IPlugin {
protected log: ILogger;
protected id: number = -1;
protected connections: Record<number, IConnection> = {};
protected readonly events = new EventEmitter<IPluginEvents>();
protected events = new EventEmitter<IPluginEvents>();
protected transport: McpSSETransportOptions | McpStdioTransportOptions = { type: 'sse' };

constructor(serverInfo: Implementation | McpServer, options: ServerOptions = {}) {
constructor(options: McpServer | McpPluginOptions = {}) {
this.log = new ConsoleLogger('@spark/mcp');
this.name = serverInfo instanceof McpServer ? 'mcp' : `mcp.${serverInfo.name}`;
this.version = serverInfo instanceof McpServer ? '0.0.0' : serverInfo.version;
this.server = serverInfo instanceof McpServer ? serverInfo : new McpServer(serverInfo, options);
this.name =
options instanceof McpServer ? 'mcp' : `mcp${options.name ? `.${options.name}` : ''}`;
this.version = options instanceof McpServer ? '0.0.0' : options.version || '0.0.0';
this.server =
options instanceof McpServer
? options
: new McpServer(
{
name: this.name,
version: this.version,
},
options
);

if (!(options instanceof McpServer) && options.transport) {
this.transport = options.transport;
}

this.prompt = this.server.prompt.bind(this.server);
this.tool = this.server.tool.bind(this.server);
this.resource = this.server.resource.bind(this.server);
}

use(prompt: ChatPrompt) {
use(prompt: IChatPrompt) {
for (const fn of prompt.functions) {
const schema: z.AnyZodObject = eval(jsonSchemaToZod(fn.parameters, { module: 'cjs' }));
this.server.tool(
fn.name,
fn.description,
schema.shape,
async (args: any): Promise<CallToolResult> => {
try {
const res = await prompt.call(fn.name, args);

return {
content: [
{
type: 'text',
text: typeof res === 'string' ? res : JSON.stringify(res),
},
],
};
} catch (err: any) {
this.log.error(err.toString());

return {
isError: true,
content: [
{
type: 'text',
text: err.toString(),
},
],
};
}
}
);
this.server.tool(fn.name, fn.description, schema.shape, this.onToolCall(fn.name, prompt));
}

return this;
}

onInit(app: App) {
async onInit(app: App) {
this.log = app.log.child(this.name);

app.http.get('/mcp', (_, res) => {
if (this.transport.type === 'sse') {
return this.onInitSSE(app, this.transport);
}

await this.onInitStdio(app, this.transport);
}

async onStart(port: number = 3000) {
this.events.emit('start', this.log);

if (this.transport.type === 'sse') {
this.log.info(`listening at http://localhost:${port}${this.transport.path || '/mcp'}`);
} else {
this.log.info('listening on stdin');
}
}

on<Name extends keyof IPluginEvents>(name: Name, callback: EventHandler<IPluginEvents[Name]>) {
this.events.on(name, callback);
}

onActivity(_: IActivityContext) {}

protected onInitStdio(_: App, options: McpStdioTransportOptions) {
const transport = new StdioServerTransport(options.stdin, options.stdout);
return this.server.connect(transport);
}

protected onInitSSE(app: App, options: McpSSETransportOptions) {
const path = options.path || '/mcp';

app.http.get(path, (_, res) => {
this.id++;
this.log.debug('connecting...');
const transport = new SSEServerTransport(`/mcp/${this.id}/messages`, res);
const transport = new SSEServerTransport(`${path}/${this.id}/messages`, res);
this.connections[this.id] = {
id: this.id,
transport,
Expand All @@ -91,7 +172,7 @@ export class MCPPlugin implements IPlugin {
this.server.connect(transport);
});

app.http.post('/mcp/:id/messages', (req, res) => {
app.http.post(`${path}/:id/messages`, (req, res) => {
const id = +req.params.id;
const { transport } = this.connections[id];

Expand All @@ -104,14 +185,50 @@ export class MCPPlugin implements IPlugin {
});
}

async onStart(port: number = 3000) {
this.events.emit('start', this.log);
this.log.info(`listening at http://localhost:${port}/mcp 🚀`);
}
protected onToolCall(name: string, prompt: IChatPrompt) {
return async (args: any): Promise<CallToolResult> => {
try {
const res = await prompt.call(name, args);

on<Name extends keyof IPluginEvents>(name: Name, callback: EventHandler<IPluginEvents[Name]>) {
this.events.on(name, callback);
if (this.isCallToolResult(res)) {
return res;
}

return {
content: [
{
type: 'text',
text: typeof res === 'string' ? res : JSON.stringify(res),
},
],
};
} catch (err: any) {
this.log.error(err.toString());

return {
isError: true,
content: [
{
type: 'text',
text: err.toString(),
},
],
};
}
};
}

onActivity(_: IActivityContext) {}
protected isCallToolResult(value: any): value is CallToolResult {
if (!!value || !('content' in value)) return false;
const { content } = value;

return (
Array.isArray(content) &&
content.every(
(item) =>
'type' in item &&
(item.type === 'text' || item.type === 'image' || item.type === 'resource')
)
);
}
}
32 changes: 31 additions & 1 deletion packages/ai/src/prompts/audio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,37 @@ export type AudioPromptOptions = {
readonly model: IAudioModel;
};

export class AudioPrompt {
/**
* a prompt that can interface with
* an audio model
*/
export interface IAudioPrompt {
/**
* the prompt name
*/
readonly name: string;

/**
* the prompt description
*/
readonly description: string;

/**
* convert text to audio
*/
textToAudio?(params: TextToAudioParams): Promise<Buffer>;

/**
* transcribe audio to text
*/
audioToText?(params: AudioToTextParams): Promise<string>;
}

/**
* a prompt that can interface with
* an audio model
*/
export class AudioPrompt implements IAudioPrompt {
get name() {
return this._name;
}
Expand Down
64 changes: 62 additions & 2 deletions packages/ai/src/prompts/chat.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Function, FunctionHandler } from '../function';
import { LocalMemory } from '../local-memory';
import { IMemory } from '../memory';
import { ContentPart, Message, SystemMessage, UserMessage } from '../message';
import { ContentPart, Message, ModelMessage, SystemMessage, UserMessage } from '../message';
import { IChatModel, TextChunkHandler } from '../models';
import { Schema } from '../schema';
import { ITemplate } from '../template';
Expand Down Expand Up @@ -58,7 +58,67 @@ export type ChatPromptSendOptions<TOptions extends Record<string, any> = Record<
readonly onChunk?: TextChunkHandler;
};

export class ChatPrompt<TOptions extends Record<string, any> = Record<string, any>> {
/**
* a prompt that can interface with a
* chat model that provides utility like
* streaming and function calling
*/
export interface IChatPrompt<TOptions extends Record<string, any> = Record<string, any>> {
/**
* the prompt name
*/
readonly name: string;

/**
* the prompt description
*/
readonly description: string;

/**
* the chat history
*/
readonly messages: IMemory;

/**
* the registered functions
*/
readonly functions: Array<Function>;

/**
* add another chat prompt as a
*/
use(prompt: IChatPrompt): this;
use(name: string, prompt: IChatPrompt): this;

/**
* add a function that can be called
* by the model
*/
function(name: string, description: string, handler: FunctionHandler): this;
function(name: string, description: string, parameters: Schema, handler: FunctionHandler): this;

/**
* call a function
*/
call<A extends Record<string, any>, R = any>(name: string, args?: A): Promise<R>;

/**
* send a message to the model and get a response
*/
send(
input: string | ContentPart[],
options?: ChatPromptSendOptions<TOptions>
): Promise<Pick<ModelMessage, 'content'> & Omit<ModelMessage, 'content'>>;
}

/**
* a prompt that can interface with a
* chat model that provides utility like
* streaming and function calling
*/
export class ChatPrompt<TOptions extends Record<string, any> = Record<string, any>>
implements IChatPrompt<TOptions>
{
get name() {
return this._name;
}
Expand Down
5 changes: 5 additions & 0 deletions packages/ai/src/prompts/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
import { IAudioPrompt } from './audio';
import { IChatPrompt } from './chat';

export type Prompt = IChatPrompt | IAudioPrompt;

export * from './chat';
export * from './audio';
Loading