UAV_odm_merge/post_pro/merge_tif.py
2024-12-30 17:34:21 +08:00

199 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from osgeo import gdal
import logging
import os
from typing import Dict
import pandas as pd
class MergeTif:
def __init__(self, output_dir: str):
self.output_dir = output_dir
self.logger = logging.getLogger('UAV_Preprocess.MergeTif')
def merge_two_tifs(self, input_tif1: str, input_tif2: str, output_tif: str):
"""合并两张TIF影像"""
try:
self.logger.info("开始合并TIF影像")
self.logger.info(f"输入影像1: {input_tif1}")
self.logger.info(f"输入影像2: {input_tif2}")
self.logger.info(f"输出影像: {output_tif}")
# 检查输入文件是否存在
if not os.path.exists(input_tif1) or not os.path.exists(input_tif2):
error_msg = "输入影像文件不存在"
self.logger.error(error_msg)
raise FileNotFoundError(error_msg)
# 打开影像,检查投影是否一致
datasets = [gdal.Open(tif) for tif in [input_tif1, input_tif2]]
if None in datasets:
error_msg = "无法打开输入影像文件"
self.logger.error(error_msg)
raise ValueError(error_msg)
projections = [dataset.GetProjection() for dataset in datasets]
self.logger.debug(f"影像1投影: {projections[0]}")
self.logger.debug(f"影像2投影: {projections[1]}")
# 检查投影是否一致
if len(set(projections)) != 1:
error_msg = "影像的投影不一致,请先进行重投影!"
self.logger.error(error_msg)
raise ValueError(error_msg)
# 创建 GDAL Warp 选项
warp_options = gdal.WarpOptions(
format="GTiff",
resampleAlg="average",
srcNodata=0,
dstNodata=0,
multithread=True
)
self.logger.info("开始执行影像拼接...")
result = gdal.Warp(
output_tif, [input_tif1, input_tif2], options=warp_options)
if result is None:
error_msg = "影像拼接失败"
self.logger.error(error_msg)
raise RuntimeError(error_msg)
# 获取输出影像的基本信息
output_dataset = gdal.Open(output_tif)
if output_dataset:
width = output_dataset.RasterXSize
height = output_dataset.RasterYSize
bands = output_dataset.RasterCount
self.logger.info(f"拼接完成,输出影像大小: {width}x{height},波段数: {bands}")
self.logger.info(f"影像拼接成功,输出文件保存至: {output_tif}")
except Exception as e:
self.logger.error(f"影像拼接过程中发生错误: {str(e)}", exc_info=True)
raise
def merge_grid_tif(self, grid_points: Dict[int, pd.DataFrame], product_info: dict):
"""合并指定产品的所有网格"""
product_name = product_info['name']
product_path = product_info['path']
filename = product_info['filename']
self.logger.info(f"开始合并{product_name}")
if len(grid_points) < 2:
self.logger.info("只有一个网格,无需合并")
return
input_tif1, input_tif2 = None, None
merge_count = 0
try:
for grid_idx, points in grid_points.items():
grid_tif = os.path.join(
self.output_dir,
f"grid_{grid_idx + 1}",
"project",
product_path,
filename
)
if not os.path.exists(grid_tif):
self.logger.warning(
f"网格 {grid_idx + 1}{product_name}不存在: {grid_tif}")
continue
if input_tif1 is None:
input_tif1 = grid_tif
self.logger.info(f"设置第一个输入{product_name}: {input_tif1}")
else:
input_tif2 = grid_tif
output_tif = os.path.join(
self.output_dir, f"merged_{product_info['output']}")
self.logger.info(
f"开始合并{product_name}{merge_count + 1} 次:\n"
f"输入1: {input_tif1}\n"
f"输入2: {input_tif2}\n"
f"输出: {output_tif}"
)
self.merge_two_tifs(input_tif1, input_tif2, output_tif)
merge_count += 1
input_tif1 = output_tif
input_tif2 = None
self.logger.info(
f"{product_name}合并完成,共执行 {merge_count} 次合并,"
f"最终输出文件: {input_tif1}"
)
except Exception as e:
self.logger.error(
f"{product_name}合并过程中发生错误: {str(e)}", exc_info=True)
raise
def merge_all_tifs(self, grid_points: Dict[int, pd.DataFrame]):
"""合并所有产品正射影像、DSM和DTM"""
try:
products = [
{
'name': '正射影像',
'path': 'odm_orthophoto',
'filename': 'odm_orthophoto.original.tif',
'output': 'orthophoto.tif'
},
{
'name': 'DSM',
'path': 'odm_dem',
'filename': 'dsm.original.tif',
'output': 'dsm.tif'
},
{
'name': 'DTM',
'path': 'odm_dem',
'filename': 'dtm.original.tif',
'output': 'dtm.tif'
}
]
for product in products:
self.merge_grid_tif(grid_points, product)
self.logger.info("所有产品合并完成")
except Exception as e:
self.logger.error(f"产品合并过程中发生错误: {str(e)}", exc_info=True)
raise
if __name__ == "__main__":
import sys
sys.path.append(os.path.dirname(
os.path.dirname(os.path.abspath(__file__))))
from utils.logger import setup_logger
import pandas as pd
# 设置输出目录和日志
output_dir = r"G:\ODM_output\1009"
setup_logger(output_dir)
# 构造测试用的grid_points字典
# 假设我们有两个网格每个网格包含一些GPS点的DataFrame
grid_points = {
0: pd.DataFrame({
'latitude': [39.9, 39.91],
'longitude': [116.3, 116.31],
'altitude': [100, 101]
}),
1: pd.DataFrame({
'latitude': [39.92, 39.93],
'longitude': [116.32, 116.33],
'altitude': [102, 103]
})
}
# 创建MergeTif实例并执行合并
merge_tif = MergeTif(output_dir)
merge_tif.merge_all_tifs(grid_points)