Files
mars-nanobot/MCP/map-mcp/server.py
2026-03-27 17:28:48 +08:00

234 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Map-MCP: 地图操作 MCP 服务器
通过 stdio 与 nanobot 通信 (MCP 协议)
同时运行 WebSocket 服务器与浏览器中的 Cesium 前端通信。
架构:
nanobot ←stdio→ Map-MCP ←WebSocket→ 浏览器(Cesium)
"""
import asyncio
import json
import logging
import os
import uuid
from contextlib import asynccontextmanager
import websockets
from mcp.server.fastmcp import FastMCP
# ─── 配置 ─────────────────────────────────────────────────────────────────────
WS_HOST = os.getenv("MAP_MCP_WS_HOST", "0.0.0.0")
WS_PORT = int(os.getenv("MAP_MCP_WS_PORT", "8005"))
logger = logging.getLogger("map-mcp")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
# ─── WebSocket 状态 ───────────────────────────────────────────────────────────
_clients: set[websockets.ServerConnection] = set()
_pending: dict[str, asyncio.Future] = {}
async def _ws_handler(websocket):
"""处理浏览器 WebSocket 连接"""
_clients.add(websocket)
logger.info("浏览器已连接 (共 %d 个客户端)", len(_clients))
try:
async for raw in websocket:
try:
data = json.loads(raw)
except json.JSONDecodeError:
logger.warning("收到无效 JSON: %s", raw[:200])
continue
req_id = data.get("id")
if req_id and req_id in _pending:
_pending[req_id].set_result(data)
except websockets.ConnectionClosed:
pass
finally:
_clients.discard(websocket)
logger.info("浏览器断开 (共 %d 个客户端)", len(_clients))
async def _send_cmd(cmd: str, params: dict | None = None, timeout: float = 15.0) -> dict:
"""向浏览器发送命令并等待响应"""
if not _clients:
return {"ok": False, "error": "没有浏览器连接,请先打开地图页面"}
req_id = str(uuid.uuid4())
msg = {"id": req_id, "cmd": cmd}
if params:
msg.update(params)
loop = asyncio.get_running_loop()
future = loop.create_future()
_pending[req_id] = future
# 发送给第一个连接的客户端
client = next(iter(_clients))
await client.send(json.dumps(msg, ensure_ascii=False))
try:
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
return {"ok": False, "error": f"浏览器响应超时 ({timeout}s)"}
finally:
_pending.pop(req_id, None)
# ─── FastMCP 生命周期:启动 WebSocket 服务器 ──────────────────────────────────
@asynccontextmanager
async def lifespan(server):
ws_server = await websockets.serve(_ws_handler, WS_HOST, WS_PORT)
logger.info("WebSocket 服务器已启动: ws://%s:%d", WS_HOST, WS_PORT)
try:
yield
finally:
ws_server.close()
await ws_server.wait_closed()
logger.info("WebSocket 服务器已关闭")
mcp = FastMCP("map-mcp", lifespan=lifespan)
# ─── MCP 工具定义 ─────────────────────────────────────────────────────────────
@mcp.tool()
async def map_zoom_to(lon: float, lat: float, altitude: float = 20000_000) -> str:
"""飞行到指定火星坐标位置。
Args:
lon: 经度 (火星坐标, -180 ~ 180)
lat: 纬度 (火星坐标, -90 ~ 90)
altitude: 相机高度,单位米 (默认 20000km适合查看区域全貌。近距离查看可用 10000km)
"""
result = await _send_cmd("zoom_to", {"lon": lon, "lat": lat, "altitude": altitude})
if result.get("ok"):
return f"已飞行到 经度{lon}°, 纬度{lat}°, 相机高度{altitude/1000:.0f}km"
return f"操作失败: {result.get('error', '未知错误')}"
@mcp.tool()
async def map_get_viewport() -> str:
"""获取当前地图视野信息,包括相机位置(经纬度、高度)和可视范围边界框。
返回 JSON:
{
"center": {"lon": ..., "lat": ..., "altitude": ...},
"bbox": {"west": ..., "south": ..., "east": ..., "north": ...}
}
"""
result = await _send_cmd("get_viewport")
if result.get("ok"):
return json.dumps(result.get("data", {}), ensure_ascii=False)
return f"获取失败: {result.get('error', '未知错误')}"
@mcp.tool()
async def map_draw_bbox(
west: float, south: float, east: float, north: float,
color: str = "#ff6600",
) -> str:
"""在地图上绘制矩形边界框并飞行到该区域。
Args:
west: 西边界经度
south: 南边界纬度
east: 东边界经度
north: 北边界纬度
color: 边框颜色 (CSS 颜色值, 默认橙色)
"""
result = await _send_cmd("draw_bbox", {
"bbox": {"west": west, "south": south, "east": east, "north": north},
"color": color,
})
if result.get("ok"):
return f"已绘制矩形: ({west},{south}) - ({east},{north})"
return f"绘制失败: {result.get('error', '未知错误')}"
@mcp.tool()
async def map_spatial_query(product: str = "tianwen_moric") -> str:
"""在当前绘制的矩形区域内查询火星遥感影像数据,结果会显示在前端界面。
必须先调用 map_draw_bbox 绘制查询区域,然后再调用此工具执行查询。
Args:
product: 数据产品类型,可选值:
tianwen_moric - 天问中分相机
tianwen_hiric - 天问高分相机
mars_ctx - Mars CTX
mars_hirise - Mars HiRISE
"""
result = await _send_cmd("spatial_query", {"product": product}, timeout=30.0)
if result.get("ok"):
count = result.get("count", 0)
items = result.get("items", [])
if count == 0:
return "查询完成,未找到匹配的影像数据。可尝试扩大查询范围或更换产品类型。"
summary = f"查询完成,找到 {count} 条影像数据,已在前端结果面板中显示。\n\n"
for i, item in enumerate(items[:5], 1):
name = item.get("name", "N/A")
summary += f"{i}. {name} ({item.get('product', '')})\n"
if count > 5:
summary += f"\n... 还有 {count - 5} 条结果"
return summary
return f"查询失败: {result.get('error', '未知错误')}"
@mcp.tool()
async def map_render_footprints(geojson: str) -> str:
"""将 GeoJSON FeatureCollection 渲染到地图上,用于显示影像覆盖范围。
每个 Feature 的 geometry 应为 Polygon 类型properties 中可包含 id、filename 等信息。
Args:
geojson: GeoJSON 字符串 (FeatureCollection 格式)
"""
try:
data = json.loads(geojson) if isinstance(geojson, str) else geojson
except json.JSONDecodeError:
return "GeoJSON 解析失败,请检查格式"
result = await _send_cmd("render_footprints", {"geojson": data})
if result.get("ok"):
count = result.get("count", len(data.get("features", [])))
return f"已渲染 {count} 个覆盖范围到地图"
return f"渲染失败: {result.get('error', '未知错误')}"
@mcp.tool()
async def map_clear() -> str:
"""清除地图上所有由 AI 绘制的内容(边界框、覆盖范围等),不影响用户手绘内容。"""
result = await _send_cmd("clear")
if result.get("ok"):
return "已清除地图上的 AI 绘制内容"
return f"清除失败: {result.get('error', '未知错误')}"
@mcp.tool()
async def map_get_drawn_geometry() -> str:
"""获取用户在地图上手绘的几何图形,可用于确定查询范围。
返回 JSON:
{
"bbox": {"minLon": ..., "maxLon": ..., "minLat": ..., "maxLat": ...},
"hasGeometry": true/false
}
"""
result = await _send_cmd("get_drawn_geometry")
if result.get("ok"):
return json.dumps(result.get("data", {}), ensure_ascii=False)
return f"获取失败: {result.get('error', '未知错误')}"
if __name__ == "__main__":
mcp.run()