import os import shutil from dataclasses import dataclass from typing import Dict, Tuple import psutil import pandas as pd from filter.cluster_filter import GPSCluster from utils.directory_manager import DirectoryManager from utils.odm_monitor import ODMProcessMonitor from utils.gps_extractor import GPSExtractor from utils.grid_divider import GridDivider from utils.logger import setup_logger from utils.visualizer import FilterVisualizer from post_pro.merge_tif import MergeTif from post_pro.conv_obj import ConvertOBJ @dataclass class ProcessConfig: """预处理配置类""" image_dir: str output_dir: str # 聚类过滤参数 cluster_eps: float = 0.01 cluster_min_samples: int = 5 # 网格划分参数 grid_overlap: float = 0.05 grid_size: float = 500 mode: str = "三维模式" class ODM_Plugin: def __init__(self, config): self.config = config # 初始化目录管理器 self.dir_manager = DirectoryManager(config) # 清理并重建输出目录 self.dir_manager.clean_output_dir() self.dir_manager.setup_output_dirs() # 检查磁盘空间 self.dir_manager.check_disk_space() # 初始化其他组件 self.logger = setup_logger(config.output_dir) self.gps_points = pd.DataFrame(columns=["file", "lat", "lon"]) self.odm_monitor = ODMProcessMonitor( config.output_dir, mode=config.mode) self.visualizer = FilterVisualizer(config.output_dir) def extract_gps(self) -> pd.DataFrame: """提取GPS数据""" self.logger.info("开始提取GPS数据") extractor = GPSExtractor(self.config.image_dir) self.gps_points = extractor.extract_all_gps() self.logger.info(f"成功提取 {len(self.gps_points)} 个GPS点") def cluster(self): """使用DBSCAN对GPS点进行聚类,只保留最大的类""" previous_points = self.gps_points.copy() clusterer = GPSCluster( self.gps_points, eps=self.config.cluster_eps, min_samples=self.config.cluster_min_samples ) self.clustered_points = clusterer.fit() self.gps_points = clusterer.get_cluster_stats(self.clustered_points) self.visualizer.visualize_filter_step( self.gps_points, previous_points, "1-Clustering") def divide_grids(self) -> Dict[tuple, pd.DataFrame]: """划分网格 Returns: - grid_points: 网格点数据字典 - translations: 网格平移量字典 """ grid_divider = GridDivider( overlap=self.config.grid_overlap, grid_size=self.config.grid_size, output_dir=self.config.output_dir ) grids, grid_points = grid_divider.adjust_grid_size_and_overlap( self.gps_points ) grid_divider.visualize_grids(self.gps_points, grids) if len(grids) >= 20: self.logger.warning("网格数量已超过20, 需要人工调整分区") return grid_points def copy_images(self, grid_points: Dict[tuple, pd.DataFrame]): """复制图像到目标文件夹""" self.logger.info("开始复制图像文件") for grid_id, points in grid_points.items(): output_dir = os.path.join( self.config.output_dir, f"grid_{grid_id[0]}_{grid_id[1]}", "project", "images" ) os.makedirs(output_dir, exist_ok=True) for point in points: src = os.path.join(self.config.image_dir, point["file"]) dst = os.path.join(output_dir, point["file"]) shutil.copy(src, dst) self.logger.info( f"网格 ({grid_id[0]},{grid_id[1]}) 包含 {len(points)} 张图像") def merge_tif(self, grid_lt): """合并所有网格的影像产品""" self.logger.info("开始合并所有影像产品") merger = MergeTif(self.config.output_dir) merger.merge_orthophoto(grid_lt) def convert_obj(self, grid_lt): """转换OBJ模型""" self.logger.info("开始转换OBJ模型") converter = ConvertOBJ(self.config.output_dir) converter.convert_grid_obj(grid_lt) def post_process(self, successful_grid_lt: list, grid_points: Dict[tuple, pd.DataFrame]): """后处理:合并或复制处理结果""" if len(successful_grid_lt) < len(grid_points): self.logger.warning( f"有 {len(grid_points) - len(successful_grid_lt)} 个网格处理失败," f"将只合并成功处理的 {len(successful_grid_lt)} 个网格" ) self.merge_tif(successful_grid_lt) if self.config.mode == "三维模式": self.convert_obj(successful_grid_lt) else: pass def process(self): """执行完整的预处理流程""" try: self.extract_gps() self.cluster() grid_points = self.divide_grids() self.copy_images(grid_points) self.logger.info("预处理任务完成") successful_grid_lt = self.odm_monitor.process_all_grids( grid_points) self.post_process(successful_grid_lt, grid_points) self.logger.info("重建任务完成") except Exception as e: self.logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True) raise