import os import logging import pandas as pd from typing import Dict, List, Tuple import numpy as np import shutil import time import cv2 import subprocess from pyproj import Transformer class MergeObj: def __init__(self, output_dir: str): self.output_dir = output_dir self.logger = logging.getLogger('UAV_Preprocess.MergeObj') # 用于存储所有grid的UTM范围 self.min_east = float('inf') self.min_north = float('inf') self.max_east = float('-inf') self.max_north = float('-inf') # 初始化UTM到WGS84的转换器 self.transformer = Transformer.from_crs("EPSG:32649", "EPSG:4326", always_xy=True) def read_obj(self, file_path): """读取.obj文件,返回顶点、纹理坐标、法线、面的列表和MTL文件名""" vertices = [] # v tex_coords = [] # vt normals = [] # vn faces = [] # f face_materials = [] # 每个面对应的材质名称 mtl_file = None # mtl文件名 current_material = None # 当前使用的材质 with open(file_path, 'r') as file: for line in file: if line.startswith('#') or not line.strip(): continue parts = line.strip().split() if not parts: continue if parts[0] == 'mtllib': # MTL文件引用 mtl_file = parts[1] elif parts[0] == 'usemtl': # 材质使用 current_material = parts[1] elif parts[0] == 'v': # 顶点 vertices.append( [float(parts[1]), float(parts[2]), float(parts[3])]) elif parts[0] == 'vt': # 纹理坐标 tex_coords.append([float(parts[1]), float(parts[2])]) elif parts[0] == 'vn': # 法线 normals.append( [float(parts[1]), float(parts[2]), float(parts[3])]) elif parts[0] == 'f': # 面 # 处理面的索引 (v/vt/vn) face_v = [] face_vt = [] face_vn = [] for p in parts[1:]: indices = p.split('/') face_v.append(int(indices[0])) if len(indices) > 1 and indices[1]: face_vt.append(int(indices[1])) if len(indices) > 2: face_vn.append(int(indices[2])) faces.append((face_v, face_vt, face_vn)) face_materials.append(current_material) # 记录这个面使用的材质 return vertices, tex_coords, normals, faces, face_materials, mtl_file def write_obj(self, file_path, vertices, tex_coords, normals, faces, face_materials, mtl_file=None): """将顶点、纹理坐标、法线和面写入到.obj文件""" with open(file_path, 'w') as file: # 写入MTL文件引用 if mtl_file: file.write(f"mtllib {mtl_file}\n") # 写入顶点 for v in vertices: file.write(f"v {v[0]} {v[1]} {v[2]}\n") # 写入纹理坐标 for vt in tex_coords: file.write(f"vt {vt[0]} {vt[1]}\n") # 写入法线 for vn in normals: file.write(f"vn {vn[0]} {vn[1]} {vn[2]}\n") # 写入面(按材质分组) current_material = None for face, material in zip(faces, face_materials): # 如果材质发生变化,写入新的usemtl if material != current_material: file.write(f"usemtl {material}\n") current_material = material face_str = "f" for i in range(len(face[0])): face_str += " " face_str += str(face[0][i]) if face[1]: face_str += f"/{face[1][i]}" else: face_str += "/" if face[2]: face_str += f"/{face[2][i]}" else: face_str += "/" file.write(face_str + "\n") def merge_two_objs(self, obj1_path: str, obj2_path: str, output_path: str, grid_id1: tuple, grid_id2: tuple): """合并两个OBJ文件""" try: self.logger.info(f"开始合并OBJ模型:\n输入1: {obj1_path}\n输入2: {obj2_path}") # 读取两个obj文件 vertices1, tex_coords1, normals1, faces1, face_materials1, mtl1 = self.read_obj( obj1_path) vertices2, tex_coords2, normals2, faces2, face_materials2, mtl2 = self.read_obj( obj2_path) # 读取MTL文件内容以获取正确的材质名称 src_dir1 = os.path.dirname(obj1_path) src_dir2 = os.path.dirname(obj2_path) mtl1_path = os.path.join(src_dir1, mtl1) mtl2_path = os.path.join(src_dir2, mtl2) # 读取并更新材质内容 materials1 = self.read_mtl(mtl1_path) materials2 = self.read_mtl(mtl2_path) # 创建材质名称映射(使用与MTL文件相同的命名格式) material_map1 = {} material_map2 = {} # 处理第一个模型的材质映射 for old_name in materials1.keys(): if "grid_0_0" in obj1_path: material_map1[old_name] = f"material_{grid_id1[0]}_{grid_id1[1]}_{old_name}" else: # 更新完一次后,之后就不用再更新了 material_map1[old_name] = old_name # 处理第二个模型的材质映射 for old_name in materials2.keys(): material_map2[old_name] = f"material_{grid_id2[0]}_{grid_id2[1]}_{old_name}" # 计算偏移量 v_offset = len(vertices1) vt_offset = len(tex_coords1) vn_offset = len(normals1) # 合并顶点、纹理坐标和法线 all_vertices = vertices1 + vertices2 all_tex_coords = tex_coords1 + tex_coords2 all_normals = normals1 + normals2 # 调整第二个模型的面索引和材质名称 all_faces = faces1.copy() all_face_materials = [] # 更新第一个模型的材质名称 for material in face_materials1: all_face_materials.append(material_map1.get(material)) # 更新第二个模型的面索引和材质名称 for face, material in zip(faces2, face_materials2): new_face_v = [f + v_offset for f in face[0]] new_face_vt = [ f + vt_offset for f in face[1]] if face[1] else [] new_face_vn = [ f + vn_offset for f in face[2]] if face[2] else [] all_faces.append((new_face_v, new_face_vt, new_face_vn)) all_face_materials.append(material_map2.get(material)) # 写入合并后的obj文件,使用与MTL文件相同的名称 mtl_filename = "textured_model.mtl" # 使用固定的MTL文件名 self.write_obj(output_path, all_vertices, all_tex_coords, all_normals, all_faces, all_face_materials, mtl_filename) self.logger.info(f"模型合并成功,已保存至: {output_path}") except Exception as e: self.logger.error(f"合并OBJ模型时发生错误: {str(e)}", exc_info=True) raise def merge_grid_obj(self, grid_points: Dict[tuple, pd.DataFrame]) -> Tuple[float, float]: """合并所有网格的OBJ模型 Args: grid_points: 网格点数据字典 Returns: Tuple[float, float]: (longitude, latitude)中心点经纬度坐标 """ try: # 创建输出目录 output_model_dir = os.path.join(self.output_dir, "texturing") os.makedirs(output_model_dir, exist_ok=True) # 第一次遍历:获取所有grid的UTM范围 for grid_id, points in grid_points.items(): base_dir = os.path.join( self.output_dir, f"grid_{grid_id[0]}_{grid_id[1]}", "project" ) log_file = os.path.join(base_dir, "odm_orthophoto", "odm_orthophoto_log.txt") east_offset, north_offset = self.read_utm_offset(log_file) # 更新UTM范围 self.min_east = min(self.min_east, east_offset) self.min_north = min(self.min_north, north_offset) self.max_east = max(self.max_east, east_offset) self.max_north = max(self.max_north, north_offset) # 获取所有有效的网格文件 grid_files = {} grid_centers = [] # 用于存储每个grid的中心点坐标 for grid_id, points in grid_points.items(): base_dir = os.path.join( self.output_dir, f"grid_{grid_id[0]}_{grid_id[1]}", "project", "odm_texturing" ) obj_path = os.path.join(base_dir, "odm_textured_model_geo.obj") mtl_path = os.path.join(base_dir, "odm_textured_model_geo.mtl") if not os.path.exists(obj_path) or not os.path.exists(mtl_path): self.logger.warning( f"网格 ({grid_id[0]},{grid_id[1]}) 的文件不存在") continue # 读取UTM偏移量 log_file = os.path.join(base_dir, "..", "odm_orthophoto", "odm_orthophoto_log.txt") utm_offset = self.read_utm_offset(log_file) # 修改obj文件的顶点坐标 modified_obj = self.modify_obj_coordinates(obj_path, utm_offset) grid_files[grid_id] = { 'obj': modified_obj, 'mtl': mtl_path.replace('.mtl', '_utm.mtl'), 'dir': base_dir } # 读取obj文件的顶点坐标并计算当前grid的中心点 vertices, _, _, _, _, _ = self.read_obj(modified_obj) grid_center_lon, grid_center_lat = self.get_center_coordinates(vertices) grid_centers.append((grid_center_lon, grid_center_lat)) self.logger.info(f"网格 ({grid_id[0]},{grid_id[1]}) 中心点经纬度: ({grid_center_lon}, {grid_center_lat})") if not grid_files: self.logger.error("没有找到有效的文件") return # 收集所有材质和纹理信息 all_materials = {} for grid_id, files in grid_files.items(): # 复制并重命名纹理文件 texture_map = self.copy_and_rename_texture( files['dir'], output_model_dir, grid_id ) # 读取并更新MTL内容 materials = self.read_mtl(files['mtl']) if len(grid_files) == 1: # 对于单个grid,直接使用update_mtl_content的结果 updated_materials = self.update_mtl_content( materials, texture_map, grid_id ) all_materials.update(updated_materials) else: # 对于多个grid,使用原有的处理逻辑 updated_materials = self.update_mtl_content( materials, texture_map, grid_id ) all_materials.update(updated_materials) # 写入合并后的MTL文件 final_mtl = os.path.join(output_model_dir, "textured_model.mtl") with open(final_mtl, 'w') as f: for mat_name, content in all_materials.items(): f.write(f"newmtl {mat_name}\n") for line in content: f.write(f"{line}\n") f.write("\n") # 最终结果 final_obj = os.path.join(output_model_dir, "textured_model.obj") if len(grid_files) == 1: # 对于单个grid,需要修改obj文件中的材质引用 grid_id = list(grid_files.keys())[0] src_obj = grid_files[grid_id]['obj'] # 读取源obj文件内容 vertices, tex_coords, normals, faces, face_materials, _ = self.read_obj(src_obj) # 更新材质名称,添加网格前缀 updated_face_materials = [] for mat in face_materials: if mat: # 如果材质名不为空 updated_face_materials.append(f"material_{grid_id[0]}_{grid_id[1]}_{mat}") else: updated_face_materials.append(mat) # 写入新的obj文件 self.write_obj(final_obj, vertices, tex_coords, normals, faces, updated_face_materials, "textured_model.mtl") else: # 合并多个OBJ文件 reference_id = list(grid_files.keys())[0] merged_obj = grid_files[reference_id]['obj'] temp_files = [] # 记录所有中间文件 for grid_id, files in list(grid_files.items())[1:]: # 生成临时输出文件名 temp_output = os.path.join( output_model_dir, f"temp_merged_{int(time.time())}.obj" ) temp_files.append(temp_output) # 添加到临时文件列表 self.merge_two_objs( merged_obj, files['obj'], temp_output, reference_id, grid_id) merged_obj = temp_output # 重命名最终文件 try: if os.path.exists(final_obj): os.remove(final_obj) os.rename(merged_obj, final_obj) except Exception as e: self.logger.warning(f"重命名最终文件失败: {str(e)}") shutil.copy2(merged_obj, final_obj) try: os.remove(merged_obj) except: pass # 清理所有临时文件 for temp_file in temp_files: if os.path.exists(temp_file): try: os.remove(temp_file) except Exception as e: self.logger.warning( f"删除临时文件失败: {temp_file}, 错误: {str(e)}") # 计算所有grid中心点的平均值作为整体中心点 center_lon = sum(center[0] for center in grid_centers) / len(grid_centers) center_lat = sum(center[1] for center in grid_centers) / len(grid_centers) self.logger.info(f"模型整体中心点经纬度: ({center_lon}, {center_lat})") return center_lon, center_lat except Exception as e: self.logger.error(f"合并过程中发生错误: {str(e)}", exc_info=True) raise def get_center_coordinates(self, vertices: List[List[float]]) -> Tuple[float, float]: """计算顶点的中心点UTM坐标,并转换为WGS84经纬度。 注意:顶点坐标是相对于整体最小UTM坐标的偏移值,需要加回最小UTM坐标。 Args: vertices: 顶点列表,每个顶点是[x, y, z]格式,x和y是相对于最小UTM坐标的偏移 Returns: Tuple[float, float]: (longitude, latitude)经纬度坐标 """ # 计算相对坐标的边界框 x_coords = [v[0] for v in vertices] y_coords = [v[1] for v in vertices] # 计算中心点相对坐标 center_x_relative = (min(x_coords) + max(x_coords)) / 2 center_y_relative = (min(y_coords) + max(y_coords)) / 2 # 加回最小UTM坐标得到实际的UTM坐标 center_x_utm = center_x_relative + self.min_east center_y_utm = center_y_relative + self.min_north # 转换为WGS84经纬度 lon, lat = self.transformer.transform(center_x_utm, center_y_utm) self.logger.info(f"模型UTM中心点: ({center_x_utm}, {center_y_utm})") return lon, lat def read_mtl(self, mtl_path: str) -> dict: """读取MTL文件内容 Returns: dict: 材质名称到材质信息的映射 """ materials = {} current_material = None with open(mtl_path, 'r') as f: content = f.read() for line in content.strip().split('\n'): if not line: continue parts = line.split() if not parts: continue if parts[0] == 'newmtl': current_material = parts[1] materials[current_material] = [] elif current_material: materials[current_material].append(line) return materials def copy_and_rename_texture(self, src_dir: str, dst_dir: str, grid_id: tuple) -> dict: """复制并重命名纹理文件,对大于100MB的文件进行下采样 Args: src_dir: 源纹理目录 dst_dir: 目标纹理目录 grid_id: 网格ID Returns: dict: 原始文件名到新文件名的映射 """ texture_map = {} os.makedirs(dst_dir, exist_ok=True) for file in os.listdir(src_dir): if file.lower().endswith(('.png', '.jpg', '.jpeg')): # 生成新的文件名 new_name = f"grid_{grid_id[0]}_{grid_id[1]}_{file}" src_path = os.path.join(src_dir, file) dst_path = os.path.join(dst_dir, new_name) # 检查文件大小(以字节为单位) file_size = os.path.getsize(src_path) if file_size > 100 * 1024 * 1024: # 大于100MB self.logger.info(f"纹理文件 {file} 大于100MB,进行4倍下采样") # 读取图像 img = cv2.imread(src_path, cv2.IMREAD_UNCHANGED) if img is not None: # 计算新的尺寸(长宽各变为1/4) new_size = (img.shape[1] // 4, img.shape[0] // 4) # 使用双三次插值进行下采样 resized_img = cv2.resize( img, new_size, interpolation=cv2.INTER_CUBIC) # 保存压缩后的图像 if file.lower().endswith('.png'): cv2.imwrite(dst_path, resized_img, [ cv2.IMWRITE_PNG_COMPRESSION, 9]) else: cv2.imwrite(dst_path, resized_img, [ cv2.IMWRITE_JPEG_QUALITY, 95]) else: self.logger.warning(f"无法读取图像文件: {src_path}") shutil.copy2(src_path, dst_path) else: # 文件大小未超过100MB,直接复制 shutil.copy2(src_path, dst_path) texture_map[file] = new_name self.logger.debug(f"处理纹理文件: {file} -> {new_name}") return texture_map def update_mtl_content(self, materials: dict, texture_map: dict, grid_id: tuple) -> dict: """更新材质内容,修改材质名称和纹理路径 Args: materials: 原始材质信息 texture_map: 纹理文件映射 grid_id: 网格ID Returns: dict: 更新后的材质信息 """ updated_materials = {} for mat_name, content in materials.items(): # 为材质名称添加网格ID前缀,与OBJ文件中的usemtl保持一致 new_mat_name = f"material_{grid_id[0]}_{grid_id[1]}_{mat_name}" updated_content = [] for line in content: if line.startswith('map_'): # 更新纹理文件路径 parts = line.split() old_texture = parts[-1] if old_texture in texture_map: parts[-1] = texture_map[old_texture] line = ' '.join(parts) updated_content.append(line) updated_materials[new_mat_name] = updated_content return updated_materials def read_utm_offset(self, log_file: str) -> tuple: """读取UTM偏移量""" try: east_offset = None north_offset = None with open(log_file, 'r') as f: lines = f.readlines() for i, line in enumerate(lines): if 'utm_north_offset' in line and i + 1 < len(lines): north_offset = float(lines[i + 1].strip()) elif 'utm_east_offset' in line and i + 1 < len(lines): east_offset = float(lines[i + 1].strip()) if east_offset is None or north_offset is None: raise ValueError("未找到UTM偏移量") return east_offset, north_offset except Exception as e: self.logger.error(f"读取UTM偏移量时发生错误: {str(e)}") raise def modify_obj_coordinates(self, obj_file: str, utm_offset: tuple) -> str: """修改obj文件中的顶点坐标,使用相对坐标系""" east_offset, north_offset = utm_offset output_obj = obj_file.replace('.obj', '_utm.obj') try: with open(obj_file, 'r') as f_in, open(output_obj, 'w') as f_out: for line in f_in: if line.startswith('v '): # 处理顶点坐标行 parts = line.strip().split() # 使用相对于整体最小UTM坐标的偏移 x = float(parts[1]) + (east_offset - self.min_east) y = float(parts[2]) + (north_offset - self.min_north) z = float(parts[3]) f_out.write(f'v {x:.6f} {y:.6f} {z:.6f}\n') else: # 其他行直接写入 f_out.write(line) # 复制材质文件 mtl_file = obj_file.replace('.obj', '.mtl') if os.path.exists(mtl_file): shutil.copy2(mtl_file, mtl_file.replace('.mtl', '_utm.mtl')) return output_obj except Exception as e: self.logger.error(f"修改obj坐标时发生错误: {str(e)}") raise