import os import logging import numpy as np from typing import Dict, Tuple import pandas as pd import open3d as o3d class MergePly: def __init__(self, output_dir: str): self.output_dir = output_dir self.logger = logging.getLogger('UAV_Preprocess.MergePly') def read_corners_file(self, grid_idx: int) -> Tuple[float, float]: """读取角点文件并计算中心点坐标 角点文件格式:xmin ymin xmax ymax """ corners_file = os.path.join( self.output_dir, f"grid_{grid_idx + 1}", "project", "odm_orthophoto", "odm_orthophoto_corners.txt" ) try: if not os.path.exists(corners_file): raise FileNotFoundError(f"角点文件不存在: {corners_file}") # 读取角点文件 with open(corners_file, 'r') as f: line = f.readline().strip() if not line: raise ValueError(f"角点文件为空: {corners_file}") # 解析四个角点值:xmin ymin xmax ymax xmin, ymin, xmax, ymax = map(float, line.split()) # 计算中心点坐标 center_x = (xmin + xmax) / 2 center_y = (ymin + ymax) / 2 self.logger.info( f"网格 {grid_idx + 1} 边界坐标: \n" f"xmin={xmin:.2f}, ymin={ymin:.2f}\n" f"xmax={xmax:.2f}, ymax={ymax:.2f}\n" f"中心点: x={center_x:.2f}, y={center_y:.2f}" ) return center_x, center_y except Exception as e: self.logger.error(f"读取角点文件时发生错误: {str(e)}", exc_info=True) raise def merge_two_plys(self, ply1_path: str, ply2_path: str, output_path: str, center1: Tuple[float, float], center2: Tuple[float, float]): """合并两个PLY文件,使用中心点坐标进行对齐""" try: self.logger.info("开始合并PLY点云") self.logger.info(f"输入点云1: {ply1_path}") self.logger.info(f"输入点云2: {ply2_path}") self.logger.info(f"输出点云: {output_path}") # 检查输入文件是否存在 if not os.path.exists(ply1_path) or not os.path.exists(ply2_path): raise FileNotFoundError("输入点云文件不存在") # 读取点云 pcd1 = o3d.io.read_point_cloud(ply1_path) pcd2 = o3d.io.read_point_cloud(ply2_path) if pcd1 is None or pcd2 is None: raise ValueError("无法读取点云文件") # 计算平移向量(直接使用中心点坐标差) translation = np.array([ center2[0] - center1[0], # x方向的平移 center2[1] - center1[1], # y方向的平移 0.0 # z方向不平移 ]) # 对第二个点云进行平移 pcd2.translate(translation*100) # 合并点云 combined_pcd = pcd1 + pcd2 # 保存合并后的点云 if not o3d.io.write_point_cloud(output_path, combined_pcd): raise RuntimeError("保存合并后的点云失败") self.logger.info(f"点云合并成功,已保存至: {output_path}") except Exception as e: self.logger.error(f"合并PLY点云时发生错误: {str(e)}", exc_info=True) raise def merge_grid_ply(self, grid_points: Dict[int, list]): """合并所有网格的PLY点云,以第一个网格为参考点""" self.logger.info("开始合并所有网格的PLY点云") if len(grid_points) < 2: self.logger.info("只有一个网格,无需合并") return try: # 获取网格索引列表并排序 grid_indices = sorted(grid_points.keys()) # 读取第一个网格作为参考网格 ref_idx = grid_indices[0] ref_ply = os.path.join( self.output_dir, f"grid_{ref_idx + 1}", "project", "odm_filterpoints", "point_cloud.ply" ) if not os.path.exists(ref_ply): raise FileNotFoundError(f"参考网格的PLY文件不存在: {ref_ply}") # 获取参考网格的中心点坐标 ref_center = self.read_corners_file(ref_idx) self.logger.info(f"参考网格(grid_{ref_idx + 1})中心点: x={ref_center[0]:.2f}, y={ref_center[1]:.2f}") # 将参考点云复制到输出位置作为初始合并结果 output_ply = os.path.join(self.output_dir, "merged_pointcloud.ply") import shutil shutil.copy2(ref_ply, output_ply) # 依次处理其他网格 for grid_idx in grid_indices[1:]: current_ply = os.path.join( self.output_dir, f"grid_{grid_idx + 1}", "project", "odm_filterpoints", "point_cloud.ply" ) if not os.path.exists(current_ply): self.logger.warning(f"网格 {grid_idx + 1} 的PLY文件不存在: {current_ply}") continue # 读取当前网格的中心点坐标 current_center = self.read_corners_file(grid_idx) self.logger.info( f"处理网格 {grid_idx + 1}:\n" f"合并点云: {current_ply}\n" f"当前网格中心点: x={current_center[0]:.2f}, y={current_center[1]:.2f}" ) # 合并点云,始终使用第一个网格的中心点作为参考点 self.merge_two_plys( output_ply, # 当前合并结果 current_ply, # 要合并的新点云 output_ply, # 覆盖原有的合并结果 ref_center, # 参考网格中心点(始终不变) current_center # 当前网格中心点 ) self.logger.info(f"PLY点云合并完成,最终输出文件: {output_ply}") except Exception as e: self.logger.error(f"PLY点云合并过程中发生错误: {str(e)}", exc_info=True) raise if __name__ == "__main__": from utils.logger import setup_logger import open3d as o3d # 设置输出目录和日志 output_dir = r"G:\ODM_output\1009" setup_logger(output_dir) # 构造测试用的grid_points字典 grid_points = { 0: [], # 不再需要GPS点信息 1: [] } # 创建MergePly实例并执行合并 merge_ply = MergePly(output_dir) merge_ply.merge_grid_ply(grid_points)