Files
mars-nanobot/MCP/mars-mcp/server.py

313 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
火星数据 MCP 服务器
为 nanobot 提供时空查询和时空-语义查询工具
"""
import json
import logging
import os
from pathlib import Path
import numpy as np
import requests
import chromadb
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
# 加载环境变量 - 优先本地 .env回退到 mars-data-serv-llm 项目的 .env
_here = Path(__file__).parent
for _env in [_here / ".env", Path("~/studio/mars-data-serv-llm/.env").expanduser()]:
if _env.exists():
load_dotenv(_env)
break
DATACUBE_API_URL = os.getenv(
"DATACUBE_API_URL",
"http://digitalmars.com.cn/datacube/datasets/search"
)
EMBEDDING_API_URL = os.getenv(
"EMBEDDING_API_URL",
"http://192.168.190.62:11434/api/embeddings"
)
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "qwen3-embedding:8b-fp16")
CHROMADB_PATH = os.getenv(
"CHROMADB_PATH",
str(Path("/opt/mcp/mars-mcp/chromadb").expanduser())
)
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
mcp = FastMCP("mars-datacube")
# ─── 内部工具函数 ─────────────────────────────────────────────────────────────
def _validate_bbox(lat_min, lat_max, lon_min, lon_max):
"""验证边界框参数,返回错误字符串或 None"""
if not (-90 <= lat_min <= 90) or not (-90 <= lat_max <= 90):
return "纬度必须在-90到90之间"
if not (-180 <= lon_min <= 180) or not (-180 <= lon_max <= 180):
return "经度必须在-180到180之间"
if lat_min >= lat_max or lon_min >= lon_max:
return "最小值必须小于最大值"
return None
def _parse_item(item: dict) -> dict:
"""从 DataCube API 响应条目中提取标准化字段"""
image_path = item.get("image_urls", {}).get("red", "")
filename = image_path.split("/")[-1] if image_path else ""
coords = item.get("coordinates", [[]])
if coords and len(coords[0]) > 0:
lons = [p[0] for p in coords[0]]
lats = [p[1] for p in coords[0]]
coord_range = {
"lon_min": min(lons), "lon_max": max(lons),
"lat_min": min(lats), "lat_max": max(lats),
}
else:
coord_range = None
return {
"id": item.get("id", ""),
"filename": filename,
"product": item.get("product", ""),
"image_path": image_path,
"coordinates": coord_range,
"time": str(item.get("time", "")) or "未知时间",
}
def _spatial_query(lat_min, lat_max, lon_min, lon_max, product) -> list:
"""调用 DataCube API 进行空间查询,返回原始条目列表"""
params = {
"min_lon": lon_min, "max_lon": lon_max,
"min_lat": lat_min, "max_lat": lat_max,
"product": product,
}
response = requests.get(DATACUBE_API_URL, params=params, timeout=30)
response.raise_for_status()
return response.json() or []
def _get_embedding(text: str):
"""调用 Embedding API返回向量列表失败返回 None"""
try:
resp = requests.post(
EMBEDDING_API_URL,
json={"model": EMBEDDING_MODEL, "prompt": text},
timeout=30,
)
resp.raise_for_status()
return resp.json()["embedding"]
except Exception as e:
logger.error(f"Embedding API 请求失败: {e}")
return None
def _cosine_similarity(vec1, vec2) -> float:
v1, v2 = np.array(vec1), np.array(vec2)
norm = np.linalg.norm(v1) * np.linalg.norm(v2)
return float(np.dot(v1, v2) / norm) if norm else 0.0
def _get_chroma_collection():
client = chromadb.PersistentClient(path=CHROMADB_PATH)
return client.get_or_create_collection("mars_images")
# ─── MCP 工具定义 ─────────────────────────────────────────────────────────────
# @mcp.tool()
# def spatiotemporal_search(
# lat_min: float,
# lat_max: float,
# lon_min: float,
# lon_max: float,
# product: str = "tianwen_moric",
# top_k: int = 10,
# ) -> str:
# """
# 按空间范围查询火星遥感影像。
# 返回指定经纬度矩形区域内的影像列表,包含影像 ID、文件路径、
# 坐标范围和拍摄时间。适用于"查找某区域有哪些影像"类问题。
# Args:
# lat_min: 最小纬度 (-90 到 90)
# lat_max: 最大纬度 (-90 到 90)
# lon_min: 最小经度 (-180 到 180)
# lon_max: 最大经度 (-180 到 180)
# product: 产品类型,默认 "tianwen_moric"天问一号MoRIC
# top_k: 返回数量上限,默认 10最大 100
# """
# err = _validate_bbox(lat_min, lat_max, lon_min, lon_max)
# if err:
# return json.dumps({"success": False, "error": err, "results": []}, ensure_ascii=False)
# top_k = min(max(1, top_k), 100)
# try:
# raw = _spatial_query(lat_min, lat_max, lon_min, lon_max, product)
# if not raw:
# return json.dumps({
# "success": True,
# "query": {"spatial": {"lat_min": lat_min, "lat_max": lat_max, "lon_min": lon_min, "lon_max": lon_max}, "product": product},
# "total_results": 0,
# "results": [],
# "message": f"在指定区域未找到 {product} 影像",
# }, ensure_ascii=False, indent=2)
# results = [_parse_item(item) for item in raw[:top_k]]
# return json.dumps({
# "success": True,
# "query": {"spatial": {"lat_min": lat_min, "lat_max": lat_max, "lon_min": lon_min, "lon_max": lon_max}, "product": product},
# "total_results": len(raw),
# "returned_results": len(results),
# "results": results,
# "message": f"找到 {len(raw)} 幅 {product} 影像,返回前 {len(results)} 幅",
# }, ensure_ascii=False, indent=2)
# except requests.exceptions.RequestException as e:
# return json.dumps({"success": False, "error": f"DataCube API 请求失败: {e}", "results": []}, ensure_ascii=False)
# except Exception as e:
# logger.exception("spatiotemporal_search 失败")
# return json.dumps({"success": False, "error": str(e), "results": []}, ensure_ascii=False)
@mcp.tool()
def spatiotemporal_semantic_search(
query: str,
lat_min: float,
lat_max: float,
lon_min: float,
lon_max: float,
product: str = "tianwen_moric",
top_k: int = 10,
) -> str:
"""
时空-语义搜索:先按空间范围筛选影像,再按语义相似度排序。
先从 DataCube API 获取区域内所有影像,再从 ChromaDB 取出这些影像的
视觉描述向量,与 query 计算余弦相似度后排序,返回最相关的影像。
适用于"在某区域找尘卷风/撞击坑/沙丘"等含语义内容的问题。
结果额外包含:
- description: 影像视觉描述文本
- similarity: 与 query 的余弦相似度 (0-1)
Args:
query: 语义查询文本,如 "尘卷风""撞击坑""沙丘"
lat_min: 最小纬度 (-90 到 90)
lat_max: 最大纬度 (-90 到 90)
lon_min: 最小经度 (-180 到 180)
lon_max: 最大经度 (-180 到 180)
product: 产品类型,默认 "tianwen_moric"
top_k: 返回数量上限,默认 10最大 100
"""
if not query or not query.strip():
return json.dumps({"success": False, "error": "语义查询不能为空", "results": []}, ensure_ascii=False)
err = _validate_bbox(lat_min, lat_max, lon_min, lon_max)
if err:
return json.dumps({"success": False, "error": err, "results": []}, ensure_ascii=False)
top_k = min(max(1, top_k), 100)
query_info = {
"spatial": {"lat_min": lat_min, "lat_max": lat_max, "lon_min": lon_min, "lon_max": lon_max},
"product": product,
}
try:
# 步骤 1空间筛选
raw = _spatial_query(lat_min, lat_max, lon_min, lon_max, product)
if not raw:
return json.dumps({
"success": True, "query": query_info,
"total_results": 0, "results": [],
"message": f"在指定区域未找到 {product} 影像",
}, ensure_ascii=False, indent=2)
# 步骤 2构建文件名索引
spatial_map = {}
for item in raw:
parsed = _parse_item(item)
if parsed["filename"]:
spatial_map[parsed["filename"]] = parsed
# 步骤 3从 ChromaDB 批量取向量
try:
collection = _get_chroma_collection()
chroma_data = collection.get(
ids=list(spatial_map.keys()),
include=["documents", "embeddings"],
)
except Exception as e:
logger.error(f"ChromaDB 查询失败: {e}")
results = list(spatial_map.values())[:top_k]
return json.dumps({
"success": True, "query": query_info,
"total_results": len(spatial_map),
"returned_results": len(results),
"results": results,
"message": f"ChromaDB 不可用,返回 {len(results)} 幅时空搜索结果",
"warning": str(e),
}, ensure_ascii=False, indent=2)
if not chroma_data or not chroma_data["ids"]:
results = list(spatial_map.values())[:top_k]
return json.dumps({
"success": True, "query": query_info,
"total_results": len(spatial_map),
"returned_results": len(results),
"results": results,
"message": f"区域内 {len(spatial_map)} 幅影像在 ChromaDB 中无语义数据",
"warning": "ChromaDB 中没有这些影像的语义向量",
}, ensure_ascii=False, indent=2)
# 步骤 4生成查询向量并排序
query_vec = _get_embedding(query)
scored = []
for i, fname in enumerate(chroma_data["ids"]):
if fname not in spatial_map:
continue
item = dict(spatial_map[fname])
item["description"] = chroma_data["documents"][i]
if query_vec is not None and chroma_data["embeddings"] is not None:
item["similarity"] = round(_cosine_similarity(query_vec, chroma_data["embeddings"][i]), 4)
else:
item["similarity"] = 0.0
scored.append(item)
scored.sort(key=lambda x: x["similarity"], reverse=True)
final = scored[:top_k]
return json.dumps({
"success": True, "query": query_info,
"total_results": len(final),
"returned_results": len(final),
"results": final,
"message": (
f"区域内 {len(spatial_map)} 幅影像,其中 {len(chroma_data['ids'])} 幅有语义数据,"
f"'{query}' 排序后返回前 {len(final)}"
),
}, ensure_ascii=False, indent=2)
except requests.exceptions.RequestException as e:
return json.dumps({"success": False, "error": f"DataCube API 请求失败: {e}", "results": []}, ensure_ascii=False)
except Exception as e:
logger.exception("spatiotemporal_semantic_search 失败")
return json.dumps({"success": False, "error": str(e), "results": []}, ensure_ascii=False)
if __name__ == "__main__":
mcp.run()