UAV/post_pro/merge_obj.py
2024-12-31 20:37:35 +08:00

189 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import logging
import numpy as np
from typing import Dict
import pandas as pd
class MergeObj:
def __init__(self, output_dir: str):
self.output_dir = output_dir
self.logger = logging.getLogger('UAV_Preprocess.MergeObj')
def read_obj(self, file_path):
"""读取.obj文件返回顶点列表和面列表"""
vertices = []
faces = []
with open(file_path, 'r') as file:
for line in file:
parts = line.split()
if len(parts) == 0:
continue
if parts[0] == 'v': # 顶点
vertices.append([float(parts[1]), float(parts[2]), float(parts[3])])
elif parts[0] == 'f': # 面
faces.append([int(parts[1].split('/')[0]), int(parts[2].split('/')[0]), int(parts[3].split('/')[0])])
return vertices, faces
def write_obj(self, file_path, vertices, faces):
"""将修改后的顶点和面列表写入到.obj文件"""
with open(file_path, 'w') as file:
for vertex in vertices:
file.write(f"v {vertex[0]} {vertex[1]} {vertex[2]}\n")
for face in faces:
file.write(f"f {face[0]} {face[1]} {face[2]}\n")
def translate_vertices(self, vertices, translation):
"""平移顶点"""
return [[v[0] + translation[0], v[1] + translation[1], v[2] + translation[2]] for v in vertices]
def merge_two_objs(self, obj1_path: str, obj2_path: str, output_path: str, translation):
"""合并两个OBJ文件"""
try:
self.logger.info(f"开始合并OBJ模型:\n输入1: {obj1_path}\n输入2: {obj2_path}")
# 检查输入文件是否存在
if not os.path.exists(obj1_path) or not os.path.exists(obj2_path):
raise FileNotFoundError("输入模型文件不存在")
# 读取两个obj文件
vertices1, faces1 = self.read_obj(obj1_path)
vertices2, faces2 = self.read_obj(obj2_path)
# 平移第二个模型的顶点
vertices2_translated = self.translate_vertices(vertices2, translation)
# 合并顶点和面
all_vertices = vertices1 + vertices2_translated
all_faces = faces1 + [[f[0] + len(vertices1), f[1] + len(vertices1), f[2] + len(vertices1)] for f in faces2]
# 写入合并后的obj文件
self.write_obj(output_path, all_vertices, all_faces)
self.logger.info(f"模型合并成功,已保存至: {output_path}")
except Exception as e:
self.logger.error(f"合并OBJ模型时发生错误: {str(e)}", exc_info=True)
raise
def calculate_translation(self, grid_idx: int, grid_points: Dict[int, pd.DataFrame], grid_size: float) -> tuple:
"""根据网格索引和大小计算平移量"""
# 从grid_points中获取网格划分器
grid_divider = grid_points.get('grid_divider', None)
if grid_divider is None:
# 如果没有grid_divider使用默认的计算方式
row = grid_idx // 2
col = grid_idx % 2
else:
# 使用grid_divider获取正确的网格坐标
row, col = grid_divider.get_grid_coordinates(grid_idx)
# 计算平移量,考虑到重叠
overlap_factor = 0.9 # 重叠因子与grid_divider中的overlap对应
x_translation = col * grid_size * overlap_factor
y_translation = row * grid_size * overlap_factor
self.logger.info(
f"网格 {grid_idx} 的位置: 行={row}, 列={col}"
)
return (x_translation, y_translation, 0) # z轴不需要平移
def merge_grid_obj(self, grid_points: Dict[int, pd.DataFrame], grid_size: float = 500):
"""合并所有网格的OBJ模型"""
self.logger.info("开始合并所有网格的OBJ模型")
if len(grid_points) < 2:
self.logger.info("只有一个网格,无需合并")
return
input_obj1, input_obj2 = None, None
merge_count = 0
try:
for grid_idx, points in grid_points.items():
if grid_idx == 'grid_divider': # 跳过grid_divider对象
continue
grid_obj = os.path.join(
self.output_dir,
f"grid_{grid_idx + 1}",
"project",
"odm_texturing",
"odm_textured_model_geo.obj"
)
if not os.path.exists(grid_obj):
self.logger.warning(f"网格 {grid_idx + 1} 的OBJ文件不存在: {grid_obj}")
continue
if input_obj1 is None:
input_obj1 = grid_obj
self.logger.info(f"设置第一个输入OBJ: {input_obj1}")
else:
input_obj2 = grid_obj
output_obj = os.path.join(self.output_dir, f"merged_model_{merge_count}.obj")
# 计算当前网格的平移量
translation = self.calculate_translation(grid_idx, grid_points, grid_size)
self.logger.info(
f"开始合并第 {merge_count + 1} 次:\n"
f"平移量: {translation}\n"
f"输出: {output_obj}"
)
self.merge_two_objs(input_obj1, input_obj2, output_obj, translation)
merge_count += 1
input_obj1 = output_obj
input_obj2 = None
# 最后的结果重命名为merged_model.obj
final_output = os.path.join(self.output_dir, "merged_model.obj")
if os.path.exists(input_obj1) and input_obj1 != final_output:
os.rename(input_obj1, final_output)
self.logger.info(
f"OBJ模型合并完成共执行 {merge_count} 次合并,"
f"最终输出文件: {final_output}"
)
except Exception as e:
self.logger.error(f"OBJ模型合并过程中发生错误: {str(e)}", exc_info=True)
raise
if __name__ == "__main__":
import sys
sys.path.append(os.path.dirname(
os.path.dirname(os.path.abspath(__file__))))
from utils.logger import setup_logger
import pandas as pd
# 设置输出目录和日志
output_dir = r"G:\ODM_output\1009"
setup_logger(output_dir)
# 构造测试用的grid_points字典
# 假设我们有两个网格每个网格包含一些GPS点的DataFrame
grid_points = {
0: pd.DataFrame({
'latitude': [39.9, 39.91],
'longitude': [116.3, 116.31],
'altitude': [100, 101]
}),
1: pd.DataFrame({
'latitude': [39.92, 39.93],
'longitude': [116.32, 116.33],
'altitude': [102, 103]
})
}
# 创建MergeObj实例并执行合并
merge_obj = MergeObj(output_dir)
merge_obj.merge_grid_obj(grid_points)