import os import logging import numpy as np from typing import Dict, Tuple import pandas as pd import subprocess import shutil class MergePly: def __init__(self, output_dir: str): self.output_dir = output_dir self.logger = logging.getLogger('UAV_Preprocess.MergePly') def merge_grid_laz(self, grid_points: Dict[tuple, pd.DataFrame]): """合并所有网格的点云数据""" try: # 获取所有点云文件路径 laz_files = [] for grid_id, points in grid_points.items(): laz_path = os.path.join( self.output_dir, f"grid_{grid_id[0]}_{grid_id[1]}", "project", "odm_georeferencing", "odm_georeferenced_model.laz" ) if os.path.exists(laz_path): laz_files.append(laz_path) else: self.logger.warning( f"网格 ({grid_id[0]},{grid_id[1]}) 的点云文件不存在") kwargs = { 'all_inputs': " ".join(laz_files), 'output': os.path.join(self.output_dir, 'pointcloud.laz') } subprocess.run( 'D:\\software\\LAStools\\bin\\lasmerge64.exe -i {all_inputs} -o "{output}"'.format(**kwargs)) except Exception as e: self.logger.error(f"PLY点云合并过程中发生错误: {str(e)}", exc_info=True) raise if __name__ == "__main__": from utils.logger import setup_logger # 设置输出目录和日志 output_dir = r"G:\ODM_output\1009" setup_logger(output_dir) # 构造测试用的grid_points字典 grid_points = { (0, 0): [], # 不再需要GPS点信息 (0, 1): [] } # 创建MergePly实例并执行合并 merge_ply = MergePly(output_dir) merge_ply.merge_grid_laz(grid_points)