dz1-spatial-query/stac_mosaic_api.py
2025-07-05 19:44:40 +08:00

104 lines
3.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, Query, Body, HTTPException
from typing import List
from pydantic import BaseModel
from shapely.geometry import Polygon, mapping
from rio_tiler.io import Reader
from rio_tiler.mosaic import mosaic_reader
from rio_tiler.mosaic.methods.defaults import MeanMethod # or LowestMethod
import requests
from fastapi.responses import StreamingResponse
from io import BytesIO
# FastAPI 实例
app = FastAPI(title="STAC Mosaic API")
# STAC 配置
STAC_API_URL = "http://localhost:8082"
COLLECTION_ID = "geosat1"
# ---------- 数据模型 ----------
class BBoxQuery(BaseModel):
bbox: List[float] # [minx, miny, maxx, maxy]
class GeoJSONPolygon(BaseModel):
type: str
coordinates: List
# ---------- 实用函数 ----------
def fetch_items_by_bbox(bbox: List[float]) -> List[str]:
url = f"{STAC_API_URL}/collections/{COLLECTION_ID}/items"
params = {"bbox": ",".join(map(str, bbox)),
# "limit": 10
}
r = requests.get(url, params=params)
r.raise_for_status()
data = r.json()
return [feat["assets"]["image"]["href"] for feat in data.get("features", [])]
def fetch_items_by_polygon(geojson: dict) -> List[str]:
url = f"{STAC_API_URL}/search"
headers = {"Content-Type": "application/json"}
body = {
"collections": [COLLECTION_ID],
"intersects": geojson,
# "limit": 10
}
r = requests.post(url, headers=headers, json=body)
r.raise_for_status()
data = r.json()
return [feat["assets"]["image"]["href"] for feat in data.get("features", [])]
def render_mosaic(image_paths: List[str], reader_fn, geo: object) -> BytesIO:
img, _ = mosaic_reader(image_paths, reader_fn, geo,
pixel_selection=MeanMethod())
buf = BytesIO(img.render(img_format="PNG"))
buf.seek(0)
return buf
# ---------- API 路由 ----------
@app.post("/bbox_mosaic", summary="基于 bbox 的图像镶嵌")
def bbox_mosaic(query: BBoxQuery):
try:
image_paths = fetch_items_by_bbox(query.bbox)
if not image_paths:
raise HTTPException(status_code=404, detail="未查询到图像")
# TODO 这个函数的输入可以用*args, **kwargs指定了max_size后面记得改
def part_reader(src_path, part):
with Reader(src_path) as cog:
img = cog.part(part, max_size=1024)
img.array.mask = (img.data == 0).all(axis=0)
return img
# TODO 目前部分读取和镶嵌一起做做的,以后做实验需要解耦
img_buf = render_mosaic(image_paths, part_reader, query.bbox)
return StreamingResponse(img_buf, media_type="image/png")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/polygon_mosaic", summary="基于多边形的图像镶嵌")
def polygon_mosaic(polygon: GeoJSONPolygon = Body(...)):
try:
image_paths = fetch_items_by_polygon(polygon.model_dump())
if not image_paths:
raise HTTPException(status_code=404, detail="未查询到图像")
def feature_reader(src_path, feat):
with Reader(src_path) as cog:
img = cog.feature(feat, max_size=1024)
img.array.mask = (img.data == 0).all(axis=0)
return img
img_buf = render_mosaic(
image_paths, feature_reader, polygon.model_dump())
return StreamingResponse(img_buf, media_type="image/png")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))