import os import logging import numpy as np from typing import Dict import pandas as pd import open3d as o3d class MergePly: def __init__(self, output_dir: str): self.output_dir = output_dir self.logger = logging.getLogger('UAV_Preprocess.MergePly') def merge_two_plys(self, ply1_path: str, ply2_path: str, output_path: str): """合并两个PLY文件""" try: self.logger.info("开始合并PLY点云") self.logger.info(f"输入点云1: {ply1_path}") self.logger.info(f"输入点云2: {ply2_path}") self.logger.info(f"输出点云: {output_path}") # 检查输入文件是否存在 if not os.path.exists(ply1_path) or not os.path.exists(ply2_path): raise FileNotFoundError("输入点云文件不存在") # 读取点云 pcd1 = o3d.io.read_point_cloud(ply1_path) pcd2 = o3d.io.read_point_cloud(ply2_path) if pcd1 is None or pcd2 is None: raise ValueError("无法读取点云文件") # 获取点云中心 center1 = pcd1.get_center() center2 = pcd2.get_center() # 计算平移向量 translation_vector = center2 - center1 # 对齐点云 pcd2.translate(translation_vector) # 合并点云 combined_pcd = pcd1 + pcd2 # 保存合并后的点云 if not o3d.io.write_point_cloud(output_path, combined_pcd): raise RuntimeError("保存合并后的点云失败") self.logger.info(f"点云合并成功,已保存至: {output_path}") except Exception as e: self.logger.error(f"合并PLY点云时发生错误: {str(e)}", exc_info=True) raise def merge_grid_ply(self, grid_points: Dict[int, pd.DataFrame]): """合并所有网格的PLY点云""" self.logger.info("开始合并所有网格的PLY点云") if len(grid_points) < 2: self.logger.info("只有一个网格,无需合并") return input_ply1, input_ply2 = None, None merge_count = 0 try: for grid_idx, points in grid_points.items(): grid_ply = os.path.join( self.output_dir, f"grid_{grid_idx + 1}", "project", "odm_georeferencing", "odm_georeferenced_model.ply" ) if not os.path.exists(grid_ply): self.logger.warning(f"网格 {grid_idx + 1} 的PLY文件不存在: {grid_ply}") continue if input_ply1 is None: input_ply1 = grid_ply self.logger.info(f"设置第一个输入PLY: {input_ply1}") else: input_ply2 = grid_ply output_ply = os.path.join(self.output_dir, "merged_pointcloud.ply") self.logger.info( f"开始合并第 {merge_count + 1} 次:\n" f"输入1: {input_ply1}\n" f"输入2: {input_ply2}\n" f"输出: {output_ply}" ) self.merge_two_plys(input_ply1, input_ply2, output_ply) merge_count += 1 input_ply1 = output_ply input_ply2 = None self.logger.info( f"PLY点云合并完成,共执行 {merge_count} 次合并," f"最终输出文件: {input_ply1}" ) except Exception as e: self.logger.error(f"PLY点云合并过程中发生错误: {str(e)}", exc_info=True) raise