UAV/post_pro/merge_tif.py

199 lines
7.0 KiB
Python
Raw Normal View History

2024-12-23 11:31:20 +08:00
from osgeo import gdal
import logging
import os
2024-12-29 12:03:53 +08:00
from typing import Dict
import pandas as pd
2024-12-23 11:31:20 +08:00
class MergeTif:
2024-12-29 12:03:53 +08:00
def __init__(self, output_dir: str):
self.output_dir = output_dir
2024-12-23 11:31:20 +08:00
self.logger = logging.getLogger('UAV_Preprocess.MergeTif')
2024-12-29 12:03:53 +08:00
def merge_two_tifs(self, input_tif1: str, input_tif2: str, output_tif: str):
2024-12-23 11:31:20 +08:00
"""合并两张TIF影像"""
try:
self.logger.info("开始合并TIF影像")
2024-12-29 12:03:53 +08:00
self.logger.info(f"输入影像1: {input_tif1}")
self.logger.info(f"输入影像2: {input_tif2}")
self.logger.info(f"输出影像: {output_tif}")
2024-12-23 11:31:20 +08:00
# 检查输入文件是否存在
2024-12-29 12:03:53 +08:00
if not os.path.exists(input_tif1) or not os.path.exists(input_tif2):
2024-12-23 11:31:20 +08:00
error_msg = "输入影像文件不存在"
self.logger.error(error_msg)
raise FileNotFoundError(error_msg)
# 打开影像,检查投影是否一致
2024-12-29 12:03:53 +08:00
datasets = [gdal.Open(tif) for tif in [input_tif1, input_tif2]]
2024-12-23 11:31:20 +08:00
if None in datasets:
error_msg = "无法打开输入影像文件"
self.logger.error(error_msg)
raise ValueError(error_msg)
projections = [dataset.GetProjection() for dataset in datasets]
self.logger.debug(f"影像1投影: {projections[0]}")
self.logger.debug(f"影像2投影: {projections[1]}")
# 检查投影是否一致
if len(set(projections)) != 1:
error_msg = "影像的投影不一致,请先进行重投影!"
self.logger.error(error_msg)
raise ValueError(error_msg)
# 创建 GDAL Warp 选项
warp_options = gdal.WarpOptions(
format="GTiff",
2024-12-29 12:03:53 +08:00
resampleAlg="average",
srcNodata=0,
dstNodata=0,
multithread=True
2024-12-23 11:31:20 +08:00
)
self.logger.info("开始执行影像拼接...")
2024-12-31 14:23:45 +08:00
result = gdal.Warp(
output_tif, [input_tif1, input_tif2], options=warp_options)
2024-12-23 11:31:20 +08:00
if result is None:
error_msg = "影像拼接失败"
self.logger.error(error_msg)
raise RuntimeError(error_msg)
# 获取输出影像的基本信息
2024-12-29 12:03:53 +08:00
output_dataset = gdal.Open(output_tif)
2024-12-23 11:31:20 +08:00
if output_dataset:
width = output_dataset.RasterXSize
height = output_dataset.RasterYSize
bands = output_dataset.RasterCount
self.logger.info(f"拼接完成,输出影像大小: {width}x{height},波段数: {bands}")
2024-12-29 12:03:53 +08:00
self.logger.info(f"影像拼接成功,输出文件保存至: {output_tif}")
2024-12-23 11:31:20 +08:00
except Exception as e:
self.logger.error(f"影像拼接过程中发生错误: {str(e)}", exc_info=True)
raise
2024-12-29 12:03:53 +08:00
def merge_grid_tif(self, grid_points: Dict[int, pd.DataFrame], product_info: dict):
"""合并指定产品的所有网格"""
product_name = product_info['name']
product_path = product_info['path']
filename = product_info['filename']
2024-12-31 14:23:45 +08:00
2024-12-29 12:03:53 +08:00
self.logger.info(f"开始合并{product_name}")
2024-12-31 14:23:45 +08:00
2024-12-29 12:03:53 +08:00
if len(grid_points) < 2:
self.logger.info("只有一个网格,无需合并")
return
2024-12-31 14:23:45 +08:00
2024-12-29 12:03:53 +08:00
input_tif1, input_tif2 = None, None
merge_count = 0
2024-12-31 14:23:45 +08:00
2024-12-29 12:03:53 +08:00
try:
for grid_idx, points in grid_points.items():
grid_tif = os.path.join(
self.output_dir,
f"grid_{grid_idx + 1}",
"project",
product_path,
filename
)
2024-12-31 14:23:45 +08:00
2024-12-29 12:03:53 +08:00
if not os.path.exists(grid_tif):
2024-12-31 14:23:45 +08:00
self.logger.warning(
f"网格 {grid_idx + 1}{product_name}不存在: {grid_tif}")
2024-12-29 12:03:53 +08:00
continue
2024-12-31 14:23:45 +08:00
2024-12-29 12:03:53 +08:00
if input_tif1 is None:
input_tif1 = grid_tif
self.logger.info(f"设置第一个输入{product_name}: {input_tif1}")
else:
input_tif2 = grid_tif
2024-12-31 14:23:45 +08:00
output_tif = os.path.join(
self.output_dir, f"merged_{product_info['output']}")
2024-12-29 12:03:53 +08:00
self.logger.info(
f"开始合并{product_name}{merge_count + 1} 次:\n"
f"输入1: {input_tif1}\n"
f"输入2: {input_tif2}\n"
f"输出: {output_tif}"
)
2024-12-31 14:23:45 +08:00
2024-12-29 12:03:53 +08:00
self.merge_two_tifs(input_tif1, input_tif2, output_tif)
merge_count += 1
2024-12-31 14:23:45 +08:00
2024-12-29 12:03:53 +08:00
input_tif1 = output_tif
input_tif2 = None
2024-12-31 14:23:45 +08:00
2024-12-29 12:03:53 +08:00
self.logger.info(
f"{product_name}合并完成,共执行 {merge_count} 次合并,"
f"最终输出文件: {input_tif1}"
)
2024-12-31 14:23:45 +08:00
2024-12-29 12:03:53 +08:00
except Exception as e:
2024-12-31 14:23:45 +08:00
self.logger.error(
f"{product_name}合并过程中发生错误: {str(e)}", exc_info=True)
2024-12-29 12:03:53 +08:00
raise
def merge_all_tifs(self, grid_points: Dict[int, pd.DataFrame]):
"""合并所有产品正射影像、DSM和DTM"""
try:
products = [
{
'name': '正射影像',
'path': 'odm_orthophoto',
'filename': 'odm_orthophoto.original.tif',
'output': 'orthophoto.tif'
},
{
'name': 'DSM',
'path': 'odm_dem',
'filename': 'dsm.original.tif',
'output': 'dsm.tif'
},
{
'name': 'DTM',
'path': 'odm_dem',
'filename': 'dtm.original.tif',
'output': 'dtm.tif'
}
]
2024-12-31 14:23:45 +08:00
2024-12-29 12:03:53 +08:00
for product in products:
2024-12-31 14:23:45 +08:00
self.merge_grid_tif(grid_points, product)
2024-12-29 12:03:53 +08:00
self.logger.info("所有产品合并完成")
except Exception as e:
self.logger.error(f"产品合并过程中发生错误: {str(e)}", exc_info=True)
raise
2024-12-23 11:31:20 +08:00
if __name__ == "__main__":
2024-12-31 14:23:45 +08:00
import sys
sys.path.append(os.path.dirname(
os.path.dirname(os.path.abspath(__file__))))
2024-12-23 11:31:20 +08:00
from utils.logger import setup_logger
2024-12-31 14:23:45 +08:00
import pandas as pd
# 设置输出目录和日志
output_dir = r"G:\ODM_output\1009"
2024-12-23 11:31:20 +08:00
setup_logger(output_dir)
2024-12-31 14:23:45 +08:00
# 构造测试用的grid_points字典
# 假设我们有两个网格每个网格包含一些GPS点的DataFrame
grid_points = {
0: pd.DataFrame({
'latitude': [39.9, 39.91],
'longitude': [116.3, 116.31],
'altitude': [100, 101]
}),
1: pd.DataFrame({
'latitude': [39.92, 39.93],
'longitude': [116.32, 116.33],
'altitude': [102, 103]
})
}
# 创建MergeTif实例并执行合并
2024-12-29 12:03:53 +08:00
merge_tif = MergeTif(output_dir)
2024-12-31 14:23:45 +08:00
merge_tif.merge_all_tifs(grid_points)