234 lines
8.3 KiB
Python
234 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
||
"""Map-MCP: 地图操作 MCP 服务器
|
||
|
||
通过 stdio 与 nanobot 通信 (MCP 协议),
|
||
同时运行 WebSocket 服务器与浏览器中的 Cesium 前端通信。
|
||
|
||
架构:
|
||
nanobot ←stdio→ Map-MCP ←WebSocket→ 浏览器(Cesium)
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import uuid
|
||
from contextlib import asynccontextmanager
|
||
|
||
import websockets
|
||
|
||
from mcp.server.fastmcp import FastMCP
|
||
|
||
# ─── 配置 ─────────────────────────────────────────────────────────────────────
|
||
WS_HOST = os.getenv("MAP_MCP_WS_HOST", "0.0.0.0")
|
||
WS_PORT = int(os.getenv("MAP_MCP_WS_PORT", "8005"))
|
||
|
||
logger = logging.getLogger("map-mcp")
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
||
)
|
||
|
||
# ─── WebSocket 状态 ───────────────────────────────────────────────────────────
|
||
_clients: set[websockets.ServerConnection] = set()
|
||
_pending: dict[str, asyncio.Future] = {}
|
||
|
||
|
||
async def _ws_handler(websocket):
|
||
"""处理浏览器 WebSocket 连接"""
|
||
_clients.add(websocket)
|
||
logger.info("浏览器已连接 (共 %d 个客户端)", len(_clients))
|
||
try:
|
||
async for raw in websocket:
|
||
try:
|
||
data = json.loads(raw)
|
||
except json.JSONDecodeError:
|
||
logger.warning("收到无效 JSON: %s", raw[:200])
|
||
continue
|
||
req_id = data.get("id")
|
||
if req_id and req_id in _pending:
|
||
_pending[req_id].set_result(data)
|
||
except websockets.ConnectionClosed:
|
||
pass
|
||
finally:
|
||
_clients.discard(websocket)
|
||
logger.info("浏览器断开 (共 %d 个客户端)", len(_clients))
|
||
|
||
|
||
async def _send_cmd(cmd: str, params: dict | None = None, timeout: float = 15.0) -> dict:
|
||
"""向浏览器发送命令并等待响应"""
|
||
if not _clients:
|
||
return {"ok": False, "error": "没有浏览器连接,请先打开地图页面"}
|
||
|
||
req_id = str(uuid.uuid4())
|
||
msg = {"id": req_id, "cmd": cmd}
|
||
if params:
|
||
msg.update(params)
|
||
|
||
loop = asyncio.get_running_loop()
|
||
future = loop.create_future()
|
||
_pending[req_id] = future
|
||
|
||
# 发送给第一个连接的客户端
|
||
client = next(iter(_clients))
|
||
await client.send(json.dumps(msg, ensure_ascii=False))
|
||
|
||
try:
|
||
result = await asyncio.wait_for(future, timeout=timeout)
|
||
return result
|
||
except asyncio.TimeoutError:
|
||
return {"ok": False, "error": f"浏览器响应超时 ({timeout}s)"}
|
||
finally:
|
||
_pending.pop(req_id, None)
|
||
|
||
|
||
# ─── FastMCP 生命周期:启动 WebSocket 服务器 ──────────────────────────────────
|
||
@asynccontextmanager
|
||
async def lifespan(server):
|
||
ws_server = await websockets.serve(_ws_handler, WS_HOST, WS_PORT)
|
||
logger.info("WebSocket 服务器已启动: ws://%s:%d", WS_HOST, WS_PORT)
|
||
try:
|
||
yield
|
||
finally:
|
||
ws_server.close()
|
||
await ws_server.wait_closed()
|
||
logger.info("WebSocket 服务器已关闭")
|
||
|
||
|
||
mcp = FastMCP("map-mcp", lifespan=lifespan)
|
||
|
||
|
||
# ─── MCP 工具定义 ─────────────────────────────────────────────────────────────
|
||
|
||
@mcp.tool()
|
||
async def map_zoom_to(lon: float, lat: float, altitude: float = 20000_000) -> str:
|
||
"""飞行到指定火星坐标位置。
|
||
|
||
Args:
|
||
lon: 经度 (火星坐标, -180 ~ 180)
|
||
lat: 纬度 (火星坐标, -90 ~ 90)
|
||
altitude: 相机高度,单位米 (默认 20000km,适合查看区域全貌。近距离查看可用 10000km)
|
||
"""
|
||
result = await _send_cmd("zoom_to", {"lon": lon, "lat": lat, "altitude": altitude})
|
||
if result.get("ok"):
|
||
return f"已飞行到 经度{lon}°, 纬度{lat}°, 相机高度{altitude/1000:.0f}km"
|
||
return f"操作失败: {result.get('error', '未知错误')}"
|
||
|
||
|
||
@mcp.tool()
|
||
async def map_get_viewport() -> str:
|
||
"""获取当前地图视野信息,包括相机位置(经纬度、高度)和可视范围边界框。
|
||
|
||
返回 JSON:
|
||
{
|
||
"center": {"lon": ..., "lat": ..., "altitude": ...},
|
||
"bbox": {"west": ..., "south": ..., "east": ..., "north": ...}
|
||
}
|
||
"""
|
||
result = await _send_cmd("get_viewport")
|
||
if result.get("ok"):
|
||
return json.dumps(result.get("data", {}), ensure_ascii=False)
|
||
return f"获取失败: {result.get('error', '未知错误')}"
|
||
|
||
|
||
@mcp.tool()
|
||
async def map_draw_bbox(
|
||
west: float, south: float, east: float, north: float,
|
||
color: str = "#ff6600",
|
||
) -> str:
|
||
"""在地图上绘制矩形边界框并飞行到该区域。
|
||
|
||
Args:
|
||
west: 西边界经度
|
||
south: 南边界纬度
|
||
east: 东边界经度
|
||
north: 北边界纬度
|
||
color: 边框颜色 (CSS 颜色值, 默认橙色)
|
||
"""
|
||
result = await _send_cmd("draw_bbox", {
|
||
"bbox": {"west": west, "south": south, "east": east, "north": north},
|
||
"color": color,
|
||
})
|
||
if result.get("ok"):
|
||
return f"已绘制矩形: ({west},{south}) - ({east},{north})"
|
||
return f"绘制失败: {result.get('error', '未知错误')}"
|
||
|
||
|
||
@mcp.tool()
|
||
async def map_spatial_query(product: str = "tianwen_moric") -> str:
|
||
"""在当前绘制的矩形区域内查询火星遥感影像数据,结果会显示在前端界面。
|
||
|
||
必须先调用 map_draw_bbox 绘制查询区域,然后再调用此工具执行查询。
|
||
|
||
Args:
|
||
product: 数据产品类型,可选值:
|
||
tianwen_moric - 天问中分相机
|
||
tianwen_hiric - 天问高分相机
|
||
mars_ctx - Mars CTX
|
||
mars_hirise - Mars HiRISE
|
||
"""
|
||
result = await _send_cmd("spatial_query", {"product": product}, timeout=30.0)
|
||
if result.get("ok"):
|
||
count = result.get("count", 0)
|
||
items = result.get("items", [])
|
||
if count == 0:
|
||
return "查询完成,未找到匹配的影像数据。可尝试扩大查询范围或更换产品类型。"
|
||
summary = f"查询完成,找到 {count} 条影像数据,已在前端结果面板中显示。\n\n"
|
||
for i, item in enumerate(items[:5], 1):
|
||
name = item.get("name", "N/A")
|
||
summary += f"{i}. {name} ({item.get('product', '')})\n"
|
||
if count > 5:
|
||
summary += f"\n... 还有 {count - 5} 条结果"
|
||
return summary
|
||
return f"查询失败: {result.get('error', '未知错误')}"
|
||
|
||
|
||
@mcp.tool()
|
||
async def map_render_footprints(geojson: str) -> str:
|
||
"""将 GeoJSON FeatureCollection 渲染到地图上,用于显示影像覆盖范围。
|
||
|
||
每个 Feature 的 geometry 应为 Polygon 类型,properties 中可包含 id、filename 等信息。
|
||
|
||
Args:
|
||
geojson: GeoJSON 字符串 (FeatureCollection 格式)
|
||
"""
|
||
try:
|
||
data = json.loads(geojson) if isinstance(geojson, str) else geojson
|
||
except json.JSONDecodeError:
|
||
return "GeoJSON 解析失败,请检查格式"
|
||
|
||
result = await _send_cmd("render_footprints", {"geojson": data})
|
||
if result.get("ok"):
|
||
count = result.get("count", len(data.get("features", [])))
|
||
return f"已渲染 {count} 个覆盖范围到地图"
|
||
return f"渲染失败: {result.get('error', '未知错误')}"
|
||
|
||
|
||
@mcp.tool()
|
||
async def map_clear() -> str:
|
||
"""清除地图上所有由 AI 绘制的内容(边界框、覆盖范围等),不影响用户手绘内容。"""
|
||
result = await _send_cmd("clear")
|
||
if result.get("ok"):
|
||
return "已清除地图上的 AI 绘制内容"
|
||
return f"清除失败: {result.get('error', '未知错误')}"
|
||
|
||
|
||
@mcp.tool()
|
||
async def map_get_drawn_geometry() -> str:
|
||
"""获取用户在地图上手绘的几何图形,可用于确定查询范围。
|
||
|
||
返回 JSON:
|
||
{
|
||
"bbox": {"minLon": ..., "maxLon": ..., "minLat": ..., "maxLat": ...},
|
||
"hasGeometry": true/false
|
||
}
|
||
"""
|
||
result = await _send_cmd("get_drawn_geometry")
|
||
if result.get("ok"):
|
||
return json.dumps(result.get("data", {}), ensure_ascii=False)
|
||
return f"获取失败: {result.get('error', '未知错误')}"
|
||
|
||
|
||
if __name__ == "__main__":
|
||
mcp.run()
|