import os import logging import pandas as pd from typing import Dict, List, Tuple import numpy as np import shutil import time import cv2 import subprocess from pyproj import Transformer class MergeObj: def __init__(self, output_dir: str): self.output_dir = output_dir self.logger = logging.getLogger('UAV_Preprocess.MergeObj') # 用于存储所有grid的UTM范围 self.min_east = float('inf') self.min_north = float('inf') self.max_east = float('-inf') self.max_north = float('-inf') # 初始化UTM到WGS84的转换器 self.transformer = Transformer.from_crs( "EPSG:32649", "EPSG:4326", always_xy=True) def read_obj(self, file_path): """读取.obj文件,返回顶点、纹理坐标、法线、面的列表和MTL文件名""" vertices = [] # v tex_coords = [] # vt normals = [] # vn faces = [] # f face_materials = [] # 每个面对应的材质名称 mtl_file = None # mtl文件名 current_material = None # 当前使用的材质 with open(file_path, 'r') as file: for line in file: if line.startswith('#') or not line.strip(): continue parts = line.strip().split() if not parts: continue if parts[0] == 'mtllib': # MTL文件引用 mtl_file = parts[1] elif parts[0] == 'usemtl': # 材质使用 current_material = parts[1] elif parts[0] == 'v': # 顶点 vertices.append( [float(parts[1]), float(parts[2]), float(parts[3])]) elif parts[0] == 'vt': # 纹理坐标 tex_coords.append([float(parts[1]), float(parts[2])]) elif parts[0] == 'vn': # 法线 normals.append( [float(parts[1]), float(parts[2]), float(parts[3])]) elif parts[0] == 'f': # 面 # 处理面的索引 (v/vt/vn) face_v = [] face_vt = [] face_vn = [] for p in parts[1:]: indices = p.split('/') face_v.append(int(indices[0])) if len(indices) > 1 and indices[1]: face_vt.append(int(indices[1])) if len(indices) > 2: face_vn.append(int(indices[2])) faces.append((face_v, face_vt, face_vn)) face_materials.append(current_material) # 记录这个面使用的材质 return vertices, tex_coords, normals, faces, face_materials, mtl_file def write_obj(self, file_path, vertices, tex_coords, normals, faces, face_materials, mtl_file=None): """将顶点、纹理坐标、法线和面写入到.obj文件""" with open(file_path, 'w') as file: # 写入MTL文件引用 if mtl_file: file.write(f"mtllib {mtl_file}\n") # 写入顶点 for v in vertices: file.write(f"v {v[0]} {v[1]} {v[2]}\n") # 写入纹理坐标 for vt in tex_coords: file.write(f"vt {vt[0]} {vt[1]}\n") # 写入法线 for vn in normals: file.write(f"vn {vn[0]} {vn[1]} {vn[2]}\n") # 写入面(按材质分组) current_material = None for face, material in zip(faces, face_materials): # 如果材质发生变化,写入新的usemtl if material != current_material: file.write(f"usemtl {material}\n") current_material = material face_str = "f" for i in range(len(face[0])): face_str += " " face_str += str(face[0][i]) if face[1]: face_str += f"/{face[1][i]}" else: face_str += "/" if face[2]: face_str += f"/{face[2][i]}" else: face_str += "/" file.write(face_str + "\n") def merge_grid_obj(self, grid_points: Dict[tuple, pd.DataFrame]) -> Tuple[float, float]: """合并所有网格的OBJ模型 Args: grid_points: 网格点数据字典 Returns: Tuple[float, float]: (longitude, latitude)中心点经纬度坐标 """ try: # 创建输出目录 output_model_dir = os.path.join(self.output_dir, "texturing") os.makedirs(output_model_dir, exist_ok=True) # 初始化全局边界框坐标 global_min_lon = float('inf') global_min_lat = float('inf') global_max_lon = float('-inf') global_max_lat = float('-inf') # 第一次遍历:获取所有grid的UTM范围 for grid_id, points in grid_points.items(): base_dir = os.path.join( self.output_dir, f"grid_{grid_id[0]}_{grid_id[1]}", "project" ) log_file = os.path.join( base_dir, "odm_orthophoto", "odm_orthophoto_log.txt") east_offset, north_offset = self.read_utm_offset(log_file) # 更新UTM范围 self.min_east = min(self.min_east, east_offset) self.min_north = min(self.min_north, north_offset) self.max_east = max(self.max_east, east_offset) self.max_north = max(self.max_north, north_offset) # 收集所有grid的数据 all_vertices = [] # 所有顶点 all_tex_coords = [] # 所有纹理坐标 all_normals = [] # 所有法线 all_faces = [] # 所有面 all_face_materials = [] # 所有面的材质 all_materials = {} # 所有材质信息 grid_centers = [] # 所有grid的中心点 # 处理每个grid for grid_id, points in grid_points.items(): base_dir = os.path.join( self.output_dir, f"grid_{grid_id[0]}_{grid_id[1]}", "project", "odm_texturing" ) obj_path = os.path.join(base_dir, "odm_textured_model_geo.obj") mtl_path = os.path.join(base_dir, "odm_textured_model_geo.mtl") if not os.path.exists(obj_path) or not os.path.exists(mtl_path): self.logger.warning( f"网格 ({grid_id[0]},{grid_id[1]}) 的文件不存在") continue # 读取UTM偏移量并修改obj文件的顶点坐标 log_file = os.path.join( base_dir, "..", "odm_orthophoto", "odm_orthophoto_log.txt") utm_offset = self.read_utm_offset(log_file) modified_obj = self.modify_obj_coordinates( obj_path, utm_offset) # 读取obj文件内容 vertices, tex_coords, normals, faces, face_materials, _ = self.read_obj( modified_obj) # 计算当前grid的中心点 grid_center_lon, grid_center_lat, grid_bounding_box = self.get_center_coordinates( vertices) grid_centers.append((grid_center_lon, grid_center_lat)) self.logger.info( f"网格 ({grid_id[0]},{grid_id[1]}) 中心点经纬度: ({grid_center_lon}, {grid_center_lat})") # 更新全局边界框坐标 global_min_lon = min( global_min_lon, grid_bounding_box['LB_lon']) global_min_lat = min( global_min_lat, grid_bounding_box['LB_lat']) global_max_lon = max( global_max_lon, grid_bounding_box['RU_lon']) global_max_lat = max( global_max_lat, grid_bounding_box['RU_lat']) # 复制并重命名纹理文件 texture_map = self.copy_and_rename_texture( base_dir, output_model_dir, grid_id ) # 读取并更新材质内容 materials = self.read_mtl(mtl_path) updated_materials = self.update_mtl_content( materials, texture_map, grid_id ) all_materials.update(updated_materials) # 计算顶点偏移量 v_offset = len(all_vertices) vt_offset = len(all_tex_coords) vn_offset = len(all_normals) # 添加顶点、纹理坐标和法线 all_vertices.extend(vertices) all_tex_coords.extend(tex_coords) all_normals.extend(normals) # 添加面和材质 for face, material in zip(faces, face_materials): # 调整面的索引 new_face_v = [f + v_offset for f in face[0]] new_face_vt = [ f + vt_offset for f in face[1]] if face[1] else [] new_face_vn = [ f + vn_offset for f in face[2]] if face[2] else [] all_faces.append((new_face_v, new_face_vt, new_face_vn)) # 添加材质前缀 if material: all_face_materials.append( f"material_{grid_id[0]}_{grid_id[1]}_{material}") else: all_face_materials.append(material) if not all_vertices: self.logger.error("没有找到有效的文件") return # 写入合并后的MTL文件 final_mtl = os.path.join(output_model_dir, "textured_model.mtl") with open(final_mtl, 'w') as f: for mat_name, content in all_materials.items(): f.write(f"newmtl {mat_name}\n") for line in content: f.write(f"{line}\n") f.write("\n") # 写入合并后的OBJ文件 final_obj = os.path.join(output_model_dir, "textured_model.obj") self.write_obj(final_obj, all_vertices, all_tex_coords, all_normals, all_faces, all_face_materials, "textured_model.mtl") # 计算整体中心点 center_lon = sum(center[0] for center in grid_centers) / len(grid_centers) center_lat = sum(center[1] for center in grid_centers) / len(grid_centers) self.logger.info(f"模型整体中心点经纬度: ({center_lon}, {center_lat})") # 计算整个区域的边界框 bounding_box = [global_min_lon, global_min_lat, global_max_lon, global_max_lat] self.logger.info( f"模型整体边界框: ({bounding_box[0]}, {bounding_box[1]}) - ({bounding_box[2]}, {bounding_box[3]})") return center_lon, center_lat, bounding_box except Exception as e: self.logger.error(f"合并过程中发生错误: {str(e)}", exc_info=True) raise def get_center_coordinates(self, vertices: List[List[float]]) -> Tuple[float, float, Dict[str, float]]: """计算顶点的中心点UTM坐标,并转换为WGS84经纬度。 注意:顶点坐标是相对于整体最小UTM坐标的偏移值,需要加回最小UTM坐标。 Args: vertices: 顶点列表,每个顶点是[x, y, z]格式,x和y是相对于最小UTM坐标的偏移 Returns: Tuple[float, float, Dict[str, float]]: (longitude, latitude, bounding_box) """ # 计算相对坐标的边界框 x_coords = [v[0] for v in vertices] y_coords = [v[1] for v in vertices] # 计算中心点相对坐标 center_x_relative = (min(x_coords) + max(x_coords)) / 2 center_y_relative = (min(y_coords) + max(y_coords)) / 2 # 加回最小UTM坐标得到实际的UTM坐标 center_x_utm = center_x_relative + self.min_east center_y_utm = center_y_relative + self.min_north # 转换为WGS84经纬度 lon, lat = self.transformer.transform(center_x_utm, center_y_utm) # 计算边界框并转换为经纬度 bounding_box = { 'LB_lon': self.transformer.transform(min(x_coords) + self.min_east, min(y_coords) + self.min_north)[0], 'LB_lat': self.transformer.transform(min(x_coords) + self.min_east, min(y_coords) + self.min_north)[1], 'RU_lon': self.transformer.transform(max(x_coords) + self.min_east, max(y_coords) + self.min_north)[0], 'RU_lat': self.transformer.transform(max(x_coords) + self.min_east, max(y_coords) + self.min_north)[1] } self.logger.info(f"模型UTM中心点: ({center_x_utm}, {center_y_utm})") return lon, lat, bounding_box def read_mtl(self, mtl_path: str) -> dict: """读取MTL文件内容 Returns: dict: 材质名称到材质信息的映射 """ materials = {} current_material = None with open(mtl_path, 'r') as f: content = f.read() for line in content.strip().split('\n'): if not line: continue parts = line.split() if not parts: continue if parts[0] == 'newmtl': current_material = parts[1] materials[current_material] = [] elif current_material: materials[current_material].append(line) return materials def copy_and_rename_texture(self, src_dir: str, dst_dir: str, grid_id: tuple) -> dict: """复制并重命名纹理文件,对大于100MB的文件进行下采样 Args: src_dir: 源纹理目录 dst_dir: 目标纹理目录 grid_id: 网格ID Returns: dict: 原始文件名到新文件名的映射 """ texture_map = {} os.makedirs(dst_dir, exist_ok=True) for file in os.listdir(src_dir): if file.lower().endswith(('.png', '.jpg', '.jpeg')): # 生成新的文件名 new_name = f"grid_{grid_id[0]}_{grid_id[1]}_{file}" src_path = os.path.join(src_dir, file) dst_path = os.path.join(dst_dir, new_name) # 检查文件大小(以字节为单位) file_size = os.path.getsize(src_path) if file_size > 100 * 1024 * 1024: # 大于100MB self.logger.info(f"纹理文件 {file} 大于100MB,进行4倍下采样") # 读取图像 img = cv2.imread(src_path, cv2.IMREAD_UNCHANGED) if img is not None: # 计算新的尺寸(长宽各变为1/4) new_size = (img.shape[1] // 4, img.shape[0] // 4) # 使用双三次插值进行下采样 resized_img = cv2.resize( img, new_size, interpolation=cv2.INTER_CUBIC) # 保存压缩后的图像 if file.lower().endswith('.png'): cv2.imwrite(dst_path, resized_img, [ cv2.IMWRITE_PNG_COMPRESSION, 9]) else: cv2.imwrite(dst_path, resized_img, [ cv2.IMWRITE_JPEG_QUALITY, 95]) else: self.logger.warning(f"无法读取图像文件: {src_path}") shutil.copy2(src_path, dst_path) else: # 文件大小未超过100MB,直接复制 shutil.copy2(src_path, dst_path) texture_map[file] = new_name self.logger.debug(f"处理纹理文件: {file} -> {new_name}") return texture_map def update_mtl_content(self, materials: dict, texture_map: dict, grid_id: tuple) -> dict: """更新材质内容,修改材质名称和纹理路径 Args: materials: 原始材质信息 texture_map: 纹理文件映射 grid_id: 网格ID Returns: dict: 更新后的材质信息 """ updated_materials = {} for mat_name, content in materials.items(): # 为材质名称添加网格ID前缀,与OBJ文件中的usemtl保持一致 new_mat_name = f"material_{grid_id[0]}_{grid_id[1]}_{mat_name}" updated_content = [] for line in content: if line.startswith('map_'): # 更新纹理文件路径 parts = line.split() old_texture = parts[-1] if old_texture in texture_map: parts[-1] = texture_map[old_texture] line = ' '.join(parts) updated_content.append(line) updated_materials[new_mat_name] = updated_content return updated_materials def read_utm_offset(self, log_file: str) -> tuple: """读取UTM偏移量""" try: east_offset = None north_offset = None with open(log_file, 'r') as f: lines = f.readlines() for i, line in enumerate(lines): if 'utm_north_offset' in line and i + 1 < len(lines): north_offset = float(lines[i + 1].strip()) elif 'utm_east_offset' in line and i + 1 < len(lines): east_offset = float(lines[i + 1].strip()) if east_offset is None or north_offset is None: raise ValueError("未找到UTM偏移量") return east_offset, north_offset except Exception as e: self.logger.error(f"读取UTM偏移量时发生错误: {str(e)}") raise def modify_obj_coordinates(self, obj_file: str, utm_offset: tuple) -> str: """修改obj文件中的顶点坐标,使用相对坐标系""" east_offset, north_offset = utm_offset output_obj = obj_file.replace('.obj', '_utm.obj') try: with open(obj_file, 'r') as f_in, open(output_obj, 'w') as f_out: for line in f_in: if line.startswith('v '): # 处理顶点坐标行 parts = line.strip().split() # 使用相对于整体最小UTM坐标的偏移 x = float(parts[1]) + (east_offset - self.min_east) y = float(parts[2]) + (north_offset - self.min_north) z = float(parts[3]) f_out.write(f'v {x:.6f} {y:.6f} {z:.6f}\n') else: # 其他行直接写入 f_out.write(line) return output_obj except Exception as e: self.logger.error(f"修改obj坐标时发生错误: {str(e)}") raise