Files
mars-nanobot/MCP/mars-mcp/server.py

313 lines
11 KiB
Python
Raw Normal View History

2026-03-27 17:17:29 +08:00
#!/usr/bin/env python3
"""
火星数据 MCP 服务器
nanobot 提供时空查询和时空-语义查询工具
"""
import json
import logging
import os
from pathlib import Path
import numpy as np
import requests
import chromadb
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
# 加载环境变量 - 优先本地 .env回退到 mars-data-serv-llm 项目的 .env
_here = Path(__file__).parent
for _env in [_here / ".env", Path("~/studio/mars-data-serv-llm/.env").expanduser()]:
if _env.exists():
load_dotenv(_env)
break
DATACUBE_API_URL = os.getenv(
"DATACUBE_API_URL",
"http://digitalmars.com.cn/datacube/datasets/search"
)
EMBEDDING_API_URL = os.getenv(
"EMBEDDING_API_URL",
"http://192.168.190.42:11434/api/embeddings"
)
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "qwen3-embedding:8b-fp16")
CHROMADB_PATH = os.getenv(
"CHROMADB_PATH",
str(Path("~/studio/mars-data-serv-llm/data/chromadb").expanduser())
)
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
mcp = FastMCP("mars-datacube")
# ─── 内部工具函数 ─────────────────────────────────────────────────────────────
def _validate_bbox(lat_min, lat_max, lon_min, lon_max):
"""验证边界框参数,返回错误字符串或 None"""
if not (-90 <= lat_min <= 90) or not (-90 <= lat_max <= 90):
return "纬度必须在-90到90之间"
if not (-180 <= lon_min <= 180) or not (-180 <= lon_max <= 180):
return "经度必须在-180到180之间"
if lat_min >= lat_max or lon_min >= lon_max:
return "最小值必须小于最大值"
return None
def _parse_item(item: dict) -> dict:
"""从 DataCube API 响应条目中提取标准化字段"""
image_path = item.get("image_urls", {}).get("red", "")
filename = image_path.split("/")[-1] if image_path else ""
coords = item.get("coordinates", [[]])
if coords and len(coords[0]) > 0:
lons = [p[0] for p in coords[0]]
lats = [p[1] for p in coords[0]]
coord_range = {
"lon_min": min(lons), "lon_max": max(lons),
"lat_min": min(lats), "lat_max": max(lats),
}
else:
coord_range = None
return {
"id": item.get("id", ""),
"filename": filename,
"product": item.get("product", ""),
"image_path": image_path,
"coordinates": coord_range,
"time": str(item.get("time", "")) or "未知时间",
}
def _spatial_query(lat_min, lat_max, lon_min, lon_max, product) -> list:
"""调用 DataCube API 进行空间查询,返回原始条目列表"""
params = {
"min_lon": lon_min, "max_lon": lon_max,
"min_lat": lat_min, "max_lat": lat_max,
"product": product,
}
response = requests.get(DATACUBE_API_URL, params=params, timeout=30)
response.raise_for_status()
return response.json() or []
def _get_embedding(text: str):
"""调用 Embedding API返回向量列表失败返回 None"""
try:
resp = requests.post(
EMBEDDING_API_URL,
json={"model": EMBEDDING_MODEL, "prompt": text},
timeout=30,
)
resp.raise_for_status()
return resp.json()["embedding"]
except Exception as e:
logger.error(f"Embedding API 请求失败: {e}")
return None
def _cosine_similarity(vec1, vec2) -> float:
v1, v2 = np.array(vec1), np.array(vec2)
norm = np.linalg.norm(v1) * np.linalg.norm(v2)
return float(np.dot(v1, v2) / norm) if norm else 0.0
def _get_chroma_collection():
client = chromadb.PersistentClient(path=CHROMADB_PATH)
return client.get_or_create_collection("mars_images")
# ─── MCP 工具定义 ─────────────────────────────────────────────────────────────
# @mcp.tool()
# def spatiotemporal_search(
# lat_min: float,
# lat_max: float,
# lon_min: float,
# lon_max: float,
# product: str = "tianwen_moric",
# top_k: int = 10,
# ) -> str:
# """
# 按空间范围查询火星遥感影像。
# 返回指定经纬度矩形区域内的影像列表,包含影像 ID、文件路径、
# 坐标范围和拍摄时间。适用于"查找某区域有哪些影像"类问题。
# Args:
# lat_min: 最小纬度 (-90 到 90)
# lat_max: 最大纬度 (-90 到 90)
# lon_min: 最小经度 (-180 到 180)
# lon_max: 最大经度 (-180 到 180)
# product: 产品类型,默认 "tianwen_moric"天问一号MoRIC
# top_k: 返回数量上限,默认 10最大 100
# """
# err = _validate_bbox(lat_min, lat_max, lon_min, lon_max)
# if err:
# return json.dumps({"success": False, "error": err, "results": []}, ensure_ascii=False)
# top_k = min(max(1, top_k), 100)
# try:
# raw = _spatial_query(lat_min, lat_max, lon_min, lon_max, product)
# if not raw:
# return json.dumps({
# "success": True,
# "query": {"spatial": {"lat_min": lat_min, "lat_max": lat_max, "lon_min": lon_min, "lon_max": lon_max}, "product": product},
# "total_results": 0,
# "results": [],
# "message": f"在指定区域未找到 {product} 影像",
# }, ensure_ascii=False, indent=2)
# results = [_parse_item(item) for item in raw[:top_k]]
# return json.dumps({
# "success": True,
# "query": {"spatial": {"lat_min": lat_min, "lat_max": lat_max, "lon_min": lon_min, "lon_max": lon_max}, "product": product},
# "total_results": len(raw),
# "returned_results": len(results),
# "results": results,
# "message": f"找到 {len(raw)} 幅 {product} 影像,返回前 {len(results)} 幅",
# }, ensure_ascii=False, indent=2)
# except requests.exceptions.RequestException as e:
# return json.dumps({"success": False, "error": f"DataCube API 请求失败: {e}", "results": []}, ensure_ascii=False)
# except Exception as e:
# logger.exception("spatiotemporal_search 失败")
# return json.dumps({"success": False, "error": str(e), "results": []}, ensure_ascii=False)
@mcp.tool()
def spatiotemporal_semantic_search(
query: str,
lat_min: float,
lat_max: float,
lon_min: float,
lon_max: float,
product: str = "tianwen_moric",
top_k: int = 10,
) -> str:
"""
时空-语义搜索先按空间范围筛选影像再按语义相似度排序
先从 DataCube API 获取区域内所有影像再从 ChromaDB 取出这些影像的
视觉描述向量 query 计算余弦相似度后排序返回最相关的影像
适用于"在某区域找尘卷风/撞击坑/沙丘"等含语义内容的问题
结果额外包含
- description: 影像视觉描述文本
- similarity: query 的余弦相似度 (0-1)
Args:
query: 语义查询文本 "尘卷风""撞击坑""沙丘"
lat_min: 最小纬度 (-90 90)
lat_max: 最大纬度 (-90 90)
lon_min: 最小经度 (-180 180)
lon_max: 最大经度 (-180 180)
product: 产品类型默认 "tianwen_moric"
top_k: 返回数量上限默认 10最大 100
"""
if not query or not query.strip():
return json.dumps({"success": False, "error": "语义查询不能为空", "results": []}, ensure_ascii=False)
err = _validate_bbox(lat_min, lat_max, lon_min, lon_max)
if err:
return json.dumps({"success": False, "error": err, "results": []}, ensure_ascii=False)
top_k = min(max(1, top_k), 100)
query_info = {
"spatial": {"lat_min": lat_min, "lat_max": lat_max, "lon_min": lon_min, "lon_max": lon_max},
"product": product,
}
try:
# 步骤 1空间筛选
raw = _spatial_query(lat_min, lat_max, lon_min, lon_max, product)
if not raw:
return json.dumps({
"success": True, "query": query_info,
"total_results": 0, "results": [],
"message": f"在指定区域未找到 {product} 影像",
}, ensure_ascii=False, indent=2)
# 步骤 2构建文件名索引
spatial_map = {}
for item in raw:
parsed = _parse_item(item)
if parsed["filename"]:
spatial_map[parsed["filename"]] = parsed
# 步骤 3从 ChromaDB 批量取向量
try:
collection = _get_chroma_collection()
chroma_data = collection.get(
ids=list(spatial_map.keys()),
include=["documents", "embeddings"],
)
except Exception as e:
logger.error(f"ChromaDB 查询失败: {e}")
results = list(spatial_map.values())[:top_k]
return json.dumps({
"success": True, "query": query_info,
"total_results": len(spatial_map),
"returned_results": len(results),
"results": results,
"message": f"ChromaDB 不可用,返回 {len(results)} 幅时空搜索结果",
"warning": str(e),
}, ensure_ascii=False, indent=2)
if not chroma_data or not chroma_data["ids"]:
results = list(spatial_map.values())[:top_k]
return json.dumps({
"success": True, "query": query_info,
"total_results": len(spatial_map),
"returned_results": len(results),
"results": results,
"message": f"区域内 {len(spatial_map)} 幅影像在 ChromaDB 中无语义数据",
"warning": "ChromaDB 中没有这些影像的语义向量",
}, ensure_ascii=False, indent=2)
# 步骤 4生成查询向量并排序
query_vec = _get_embedding(query)
scored = []
for i, fname in enumerate(chroma_data["ids"]):
if fname not in spatial_map:
continue
item = dict(spatial_map[fname])
item["description"] = chroma_data["documents"][i]
if query_vec is not None and chroma_data["embeddings"] is not None:
item["similarity"] = round(_cosine_similarity(query_vec, chroma_data["embeddings"][i]), 4)
else:
item["similarity"] = 0.0
scored.append(item)
scored.sort(key=lambda x: x["similarity"], reverse=True)
final = scored[:top_k]
return json.dumps({
"success": True, "query": query_info,
"total_results": len(final),
"returned_results": len(final),
"results": final,
"message": (
f"区域内 {len(spatial_map)} 幅影像,其中 {len(chroma_data['ids'])} 幅有语义数据,"
f"'{query}' 排序后返回前 {len(final)}"
),
}, ensure_ascii=False, indent=2)
except requests.exceptions.RequestException as e:
return json.dumps({"success": False, "error": f"DataCube API 请求失败: {e}", "results": []}, ensure_ascii=False)
except Exception as e:
logger.exception("spatiotemporal_semantic_search 失败")
return json.dumps({"success": False, "error": str(e), "results": []}, ensure_ascii=False)
if __name__ == "__main__":
mcp.run()