from fastapi import FastAPI, Query, Body, HTTPException from typing import List from pydantic import BaseModel from shapely.geometry import Polygon, mapping, shape from rio_tiler.io import Reader from rio_tiler.mosaic import mosaic_reader import requests from fastapi.responses import StreamingResponse from io import BytesIO import morecantile from starlette.responses import StreamingResponse # FastAPI 实例 app = FastAPI(title="STAC Mosaic API") # STAC 配置 STAC_API_URL = "http://localhost:8082" COLLECTION_ID = "geosat1" # ---------- 数据模型 ---------- class BBoxQuery(BaseModel): bbox: List[float] # [minx, miny, maxx, maxy] class GeoJSONPolygon(BaseModel): type: str coordinates: List # ---------- 实用函数 ---------- def fetch_tiles_by_polygon(polygon: GeoJSONPolygon) -> List[tuple]: """ 根据多边形获取相交的瓦片列表 """ zoom = 11 tms = morecantile.tms.get("WebMercatorQuad") # 将 GeoJSONPolygon 转为 shapely Polygon shapely_poly = shape(polygon.model_dump()) tiles = list(tms.tiles(*shapely_poly.bounds, zoom)) def tile_polygon(x, y, z): bounds = tms.bounds(x, y, z) return Polygon([ (bounds.left, bounds.bottom), (bounds.left, bounds.top), (bounds.right, bounds.top), (bounds.right, bounds.bottom), (bounds.left, bounds.bottom) ]) intersecting_tiles = [] for x, y, z in tiles: if shapely_poly.intersects(tile_polygon(x, y, z)): intersecting_tiles.append((x, y, z)) return intersecting_tiles def fetch_items_by_tile(x, y, z) -> List[str]: """ 根据瓦片坐标 (x, y, z) 查询与该瓦片 bbox 相交的影像 """ tms = morecantile.tms.get("WebMercatorQuad") bounds = tms.bounds(x, y, z) bbox = [bounds.left, bounds.bottom, bounds.right, bounds.top] url = f"{STAC_API_URL}/collections/{COLLECTION_ID}/items" params = {"bbox": ",".join(map(str, bbox))} r = requests.get(url, params=params) r.raise_for_status() data = r.json() return [feat["assets"]["image"]["href"] for feat in data.get("features", [])] def tile_mosaic_buf(x, y, z): image_paths = fetch_items_by_tile(x, y, z) if not image_paths: raise HTTPException(status_code=404, detail="未查询到图像") def tile_reader(src_path, x, y, z): with Reader(src_path) as cog: return cog.tile(x, y, z, tilesize=256) img, _ = mosaic_reader(image_paths, tile_reader, x, y, z) buf = BytesIO(img.render(img_format="PNG")) buf.seek(0) return buf # ---------- API 路由 ---------- @app.post("/polygon_tiles_mosaic_stream", summary="逐个返回瓦片镶嵌结果") def polygon_tiles_mosaic_stream(polygon: GeoJSONPolygon = Body(...)): def tile_generator(): intersecting_tiles = fetch_tiles_by_polygon(polygon) for x, y, z in intersecting_tiles: buf = tile_mosaic_buf(x, y, z) if buf: # 每个 tile 之间用自定义分隔符分割,前端需配合解析 yield b"--tile-boundary--\n" yield buf.read() # TODO 下面的代码是可选的,如果需要返回瓦片的元数据,可以取消注释 # if buf: # img_bytes = buf.getvalue() # meta = { # "x": x, # "y": y, # "z": z, # "content_type": "image/png", # "length": len(img_bytes) # } # yield b"--tile-boundary--\n" # yield (json.dumps(meta) + "\n").encode("utf-8") # yield img_bytes return StreamingResponse(tile_generator(), media_type="application/octet-stream")