UAV/post_pro/merge_ply.py
2024-12-29 12:03:53 +08:00

108 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import logging
import numpy as np
from typing import Dict
import pandas as pd
import open3d as o3d
class MergePly:
def __init__(self, output_dir: str):
self.output_dir = output_dir
self.logger = logging.getLogger('UAV_Preprocess.MergePly')
def merge_two_plys(self, ply1_path: str, ply2_path: str, output_path: str):
"""合并两个PLY文件"""
try:
self.logger.info("开始合并PLY点云")
self.logger.info(f"输入点云1: {ply1_path}")
self.logger.info(f"输入点云2: {ply2_path}")
self.logger.info(f"输出点云: {output_path}")
# 检查输入文件是否存在
if not os.path.exists(ply1_path) or not os.path.exists(ply2_path):
raise FileNotFoundError("输入点云文件不存在")
# 读取点云
pcd1 = o3d.io.read_point_cloud(ply1_path)
pcd2 = o3d.io.read_point_cloud(ply2_path)
if pcd1 is None or pcd2 is None:
raise ValueError("无法读取点云文件")
# 获取点云中心
center1 = pcd1.get_center()
center2 = pcd2.get_center()
# 计算平移向量
translation_vector = center2 - center1
# 对齐点云
pcd2.translate(translation_vector)
# 合并点云
combined_pcd = pcd1 + pcd2
# 保存合并后的点云
if not o3d.io.write_point_cloud(output_path, combined_pcd):
raise RuntimeError("保存合并后的点云失败")
self.logger.info(f"点云合并成功,已保存至: {output_path}")
except Exception as e:
self.logger.error(f"合并PLY点云时发生错误: {str(e)}", exc_info=True)
raise
def merge_grid_ply(self, grid_points: Dict[int, pd.DataFrame]):
"""合并所有网格的PLY点云"""
self.logger.info("开始合并所有网格的PLY点云")
if len(grid_points) < 2:
self.logger.info("只有一个网格,无需合并")
return
input_ply1, input_ply2 = None, None
merge_count = 0
try:
for grid_idx, points in grid_points.items():
grid_ply = os.path.join(
self.output_dir,
f"grid_{grid_idx + 1}",
"project",
"odm_georeferencing",
"odm_georeferenced_model.ply"
)
if not os.path.exists(grid_ply):
self.logger.warning(f"网格 {grid_idx + 1} 的PLY文件不存在: {grid_ply}")
continue
if input_ply1 is None:
input_ply1 = grid_ply
self.logger.info(f"设置第一个输入PLY: {input_ply1}")
else:
input_ply2 = grid_ply
output_ply = os.path.join(self.output_dir, "merged_pointcloud.ply")
self.logger.info(
f"开始合并第 {merge_count + 1} 次:\n"
f"输入1: {input_ply1}\n"
f"输入2: {input_ply2}\n"
f"输出: {output_ply}"
)
self.merge_two_plys(input_ply1, input_ply2, output_ply)
merge_count += 1
input_ply1 = output_ply
input_ply2 = None
self.logger.info(
f"PLY点云合并完成共执行 {merge_count} 次合并,"
f"最终输出文件: {input_ply1}"
)
except Exception as e:
self.logger.error(f"PLY点云合并过程中发生错误: {str(e)}", exc_info=True)
raise