dz1-spatial-query/tools/stac_tiles_mosaic.py
2025-07-05 19:44:40 +08:00

130 lines
3.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, Query, Body, HTTPException
from typing import List
from pydantic import BaseModel
from shapely.geometry import Polygon, mapping, shape
from rio_tiler.io import Reader
from rio_tiler.mosaic import mosaic_reader
from rio_tiler.mosaic.methods.defaults import MeanMethod # or LowestMethod
import requests
from fastapi.responses import StreamingResponse
from io import BytesIO
import morecantile
from starlette.responses import StreamingResponse
from PIL import Image
import numpy as np
import os
STAC_API_URL = "http://localhost:8082"
COLLECTION_ID = "geosat1"
# ---------- 实用函数 ----------
def fetch_tiles_by_polygon(polygon) -> List[tuple]:
"""
根据多边形获取相交的瓦片列表
"""
zoom = 12
tms = morecantile.tms.get("WebMercatorQuad")
tiles = list(tms.tiles(*polygon.bounds, zoom))
def tile_polygon(x, y, z):
bounds = tms.bounds(x, y, z)
return Polygon([
(bounds.left, bounds.bottom),
(bounds.left, bounds.top),
(bounds.right, bounds.top),
(bounds.right, bounds.bottom),
(bounds.left, bounds.bottom)
])
intersecting_tiles = []
for x, y, z in tiles:
if polygon.intersects(tile_polygon(x, y, z)):
intersecting_tiles.append((x, y, z))
return intersecting_tiles
def fetch_items_by_tile(x, y, z) -> List[str]:
"""
根据瓦片坐标 (x, y, z) 查询与该瓦片 bbox 相交的影像
"""
tms = morecantile.tms.get("WebMercatorQuad")
bounds = tms.bounds(x, y, z)
bbox = [bounds.left, bounds.bottom, bounds.right, bounds.top]
url = f"{STAC_API_URL}/collections/{COLLECTION_ID}/items"
params = {"bbox": ",".join(map(str, bbox))}
r = requests.get(url, params=params)
r.raise_for_status()
data = r.json()
return [feat["assets"]["image"]["href"] for feat in data.get("features", [])]
def tile_mosaic_buf(i, x, y, z):
image_paths = fetch_items_by_tile(x, y, z)
if not image_paths:
# 返回一张全透明的空tilePNG格式
empty_img = Image.new("RGBA", (256, 256), (0, 0, 0, 0))
buf = BytesIO()
empty_img.save(buf, format="PNG")
return buf.getvalue()
def tile_reader(src_path, x, y, z):
with Reader(src_path) as cog:
img = cog.tile(x, y, z, tilesize=256)
img.array.mask = (img.data == 0).all(axis=0)
return img
img, _ = mosaic_reader(image_paths, tile_reader, x, y, z, pixel_selection=MeanMethod())
# with open(f"imgs/output{i}.tif", "wb") as f:
# f.write(img.render(img_format="GTiff"))
return img.render(img_format="GTiff")
def main():
polygon_coords = [
[111.76, 29.25],
[111.89, 29.25],
[111.89, 29.62],
[111.76, 29.62],
[111.76, 29.25]
]
polygon = Polygon(polygon_coords)
intersecting_tiles = fetch_tiles_by_polygon(polygon)
tile_size = 256 # 每个瓦片的像素尺寸
tiles = []
xs, ys = [], []
# 先收集所有瓦片及其坐标
for i, (x, y, z) in enumerate(intersecting_tiles):
tile_bytes = tile_mosaic_buf(i, x, y, z)
img = Image.open(BytesIO(tile_bytes))
tiles.append((x, y, img))
xs.append(x)
ys.append(y)
# 计算拼接大图的尺寸
min_x, max_x = min(xs), max(xs)
min_y, max_y = min(ys), max(ys)
width = (max_x - min_x + 1) * tile_size
height = (max_y - min_y + 1) * tile_size
# 创建空白大图
mosaic_img = Image.new("RGBA", (width, height))
# 按瓦片坐标拼接
for x, y, img in tiles:
px = (x - min_x) * tile_size
py = (y - min_y) * tile_size # 不需要翻转,直接递增
mosaic_img.paste(img, (px, py))
# 保存或展示大图
output_path = "mosaic_output.png"
mosaic_img.save(output_path)
print(f"拼接完成,已保存为 {output_path}")
if __name__ == "__main__":
main()