Python使用fastmcp快速实现MCP Client和MCP Server

0

只有最简单的功能,没有RAG实现。

MCP Server

from fastmcp import FastMCP

mcp = FastMCP("Python MCP Server", port = 19091)

@mcp.tool(description = "加法计算")
def add(a: int, b: int) -> int:
    return a + b

@mcp.tool(description = "根据城市名称获取天气")
def getWeather(city: str) -> str:
    mock = {
        "北京": "晴天",
        "广州": "阴天"
    }
    return mock[city] if city in mock else "未知天气"
    
@mcp.prompt
def welcome(text: str) -> str:
    return f"你好呀!{text}"
    
@mcp.resource("config://version")
def get_version(): 
    return "1.0.0"
    
if __name__ == "__main__":
    mcp.run(transport = "sse")

MCP Client

"""
https://api-docs.deepseek.com/zh-cn/api/create-chat-completion
https://api-docs.deepseek.com/zh-cn/guides/function_calling
"""

import json
import asyncio

from mcp import ClientSession
from openai import AsyncOpenAI
from uvicorn import Server, Config
from fastapi import FastAPI, Request
from mcp.client.sse import sse_client
from sse_starlette.sse import EventSourceResponse
from fastapi.middleware.cors import CORSMiddleware

openai = AsyncOpenAI(api_key = "", base_url = "https://api.deepseek.com")

class MCPServer:
    async def init_mcp_server(self, url):
        print(f"加载MCP Server:{self.url}")
        self.url     = url
        self.client  = sse_client(url)
        self.streams = await self.client.__aenter__()
        self.session = ClientSession(*self.streams)
        self.context = await self.session.__aenter__()
        await self.context.initialize()
        tools = await self.context.list_tools()
        print(f"tool: {[tool.name for tool in tools.tools]}")
        prompts = await self.context.list_prompts()
        print(f"prompt: {[prompt.name for prompt in prompts.prompts]}")
        resources = await self.context.list_resources()
        print(f"resource: {[resource.name for resource in resources.resources]}")

    async def close_mcp_server(self):
        print(f"关闭MCP Server:{self.url}")
        if self.session:
            await self.session.__aexit__(None, None, None)
        if self.client:
            await self.client.__aexit__(None, None, None)

class MCPClient:
    async def init_mcp_server(self, urls = []):
        self.tools   = []
        self.servers = []
        self.mapping = {}
        for url in urls:
            mcpServer = MCPServer()
            await mcpServer.init_mcp_server(url)
            for tool in (await mcpServer.context.list_tools()).tools:
                self.tools.append({
                    "type"    : "function",
                    "function": {
                        "name"       : tool.name,
                        "parameters" : tool.inputSchema,
                        "description": tool.description,
                    }})
                self.mapping[tool.name] = mcpServer
            self.servers.append(mcpServer)
        print(f"加载MCP Server完成:{len(self.servers)}")

    async def close_mcp_server(self):
        for server in self.servers:
            try:
                await server.close_mcp_server()
            except Exception as e:
                print(f"关闭MCP Server异常", e)

    async def call_tool(self, tools_params, messages):
        for _, tool in tools_params.items():
            if not tool.function:
                print(f"调用工具参数错误:{tool}")
                continue
            if not tool.function.name:
                print(f"调用工具参数错误:{tool}")
                continue
            if tool.function.name not in self.mapping:
                print(f"调用工具没有映射:{tool}")
                continue
            try:
                params   = json.loads(tool.function.arguments) if tool.function.arguments else "{}"
                response = await self.mapping[tool.function.name].context.call_tool(tool.function.name, params)
                content  = "".join(list(map(lambda v: v.text, response.content)))
                print(f"调用工具:{tool} - {content}")
                messages.extend([
                    {
                        "role"      : "assistant",
                        "content"   : None,
                        "tool_calls": [tool]
                    },
                    {
                        "role"        : "tool",
                        "content"     : content,
                        "tool_call_id": tool.id,
                    }
                ])
            except Exception as e:
                print(f"调用工具异常:{tool} - {e}")
        return messages

    async def chat(self, messages):
        # TODO: 资料查询
        # TODO: 历史查询
        params = {
            "model"   : "deepseek-chat",
            "tools"   : self.tools,
            "stream"  : True,
            "messages": messages,
        }
        print(f"会话请求:{params}")
        text         = ""
        response     = None
        tools_params = {}
        try:
            response = await openai.chat.completions.create(**params)
            async for chunk in response:
                if chunk and chunk.choices:
                    print(f"会话响应:{chunk}")
                    for choice in chunk.choices:
                        delta  = choice.delta
                        reason = choice.finish_reason
                        if reason:
                            print(f"会话完成:{reason} = {text}")
                            if reason == "stop":
                                messages.extend([
                                    {
                                        "role"   : "assistant",
                                        "content": text,
                                    },
                                ])
                                # TODO: 记录历史
                            elif reason == "tool_calls":
                                messages = await self.call_tool(tools_params, messages)
                                async for recursion_chunk in self.chat(messages):
                                    yield recursion_chunk
                                break
                            else:
                                print(f"未知结束原因:{reason}")
                        # 文本返回
                        if delta.content:
                            text += delta.content
                            yield delta.content
                        # 工具调用
                        if delta.tool_calls:
                            for tool in delta.tool_calls:
                                if tool.index not in tools_params:
                                    tools_params[tool.index] = tool
                                if tool.id:
                                    tools_params[tool.index].id = tool.id
                                if tool.function:
                                    if tool.function.name:
                                        tools_params[tool.index].function.name = tool.function.name
                                    if tool.function.arguments:
                                        tools_params[tool.index].function.arguments += tool.function.arguments
        except Exception as e:
            print(f"会话异常{e}")
        finally:
            if response:
                await response.close()

mcpClient = MCPClient()

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins = ["*"],
    allow_methods = ["*"],
    allow_headers = ["*"],
    allow_credentials = True,
)

@app.get("/chat")
async def chat(message: str, request: Request):
    messages = [
        {
            "role"   : "system",
            "content": "你是一个中文AI助手!"
        },
        {
            "role"   : "user",
            "content": message
        }
    ]
    return EventSourceResponse(mcpClient.chat(messages))

async def main():
    await mcpClient.init_mcp_server(["http://localhost:19091/sse"])
    config = Config(app, host = "0.0.0.0", port = 18081)
    server = Server(config)
    await server.serve()
    await mcpClient.close_mcp_server()

if __name__ == "__main__":
    asyncio.run(main())