import os import shutil from datetime import timedelta from dataclasses import dataclass from typing import Dict import matplotlib.pyplot as plt import pandas as pd from tqdm import tqdm from filter.cluster_filter import GPSCluster from filter.time_group_overlap_filter import TimeGroupOverlapFilter from filter.gps_filter import GPSFilter from utils.command_runner import CommandRunner from utils.gps_extractor import GPSExtractor from utils.grid_divider import GridDivider from utils.logger import setup_logger from utils.visualizer import FilterVisualizer from post_pro.merge_tif import MergeTif @dataclass class PreprocessConfig: """预处理配置类""" image_dir: str output_dir: str # 聚类过滤参数 cluster_eps: float = 0.01 cluster_min_samples: int = 5 # 时间组重叠过滤参数 time_group_overlap_threshold: float = 0.7 time_group_interval: timedelta = timedelta(minutes=5) # 孤立点过滤参数 filter_distance_threshold: float = 0.001 # 经纬度距离 filter_min_neighbors: int = 6 # 密集点过滤参数 filter_grid_size: float = 0.001 filter_dense_distance_threshold: float = 10 # 普通距离,单位:米 filter_time_threshold: timedelta = timedelta(minutes=5) # 网格划分参数 grid_overlap: float = 0.05 grid_size: float = 500 # 几个pipline过程是否开启 mode: str = "快拼模式" class ImagePreprocessor: def __init__(self, config: PreprocessConfig): self.config = config # 清理并重建输出目录 if os.path.exists(config.output_dir): self._clean_output_dir() self._setup_output_dirs() # 初始化其他组件 self.logger = setup_logger(config.output_dir) self.gps_points = None self.command_runner = CommandRunner( config.output_dir, mode=config.mode) self.visualizer = FilterVisualizer(config.output_dir) def _clean_output_dir(self): """清理输出目录""" try: shutil.rmtree(self.config.output_dir) print(f"已清理输出目录: {self.config.output_dir}") except Exception as e: print(f"清理输出目录时发生错误: {str(e)}") raise def _setup_output_dirs(self): """创建必要的输出目录结构""" try: # 创建主输出目录 os.makedirs(self.config.output_dir) # 创建过滤图像保存目录 os.makedirs(os.path.join(self.config.output_dir, 'filter_imgs')) # 创建日志目录 os.makedirs(os.path.join(self.config.output_dir, 'logs')) print(f"已创建输出目录结构: {self.config.output_dir}") except Exception as e: print(f"创建输出目录时发生错误: {str(e)}") raise def extract_gps(self) -> pd.DataFrame: """提取GPS数据""" self.logger.info("开始提取GPS数据") extractor = GPSExtractor(self.config.image_dir) self.gps_points = extractor.extract_all_gps() self.logger.info(f"成功提取 {len(self.gps_points)} 个GPS点") return self.gps_points def cluster(self) -> pd.DataFrame: """使用DBSCAN对GPS点进行聚类,只保留最大的类""" self.logger.info("开始聚类") previous_points = self.gps_points.copy() # 创建聚类器并执行聚类 clusterer = GPSCluster( self.gps_points, output_dir=self.config.output_dir, eps=self.config.cluster_eps, min_samples=self.config.cluster_min_samples) # 获取主要类别的点 self.clustered_points = clusterer.fit() self.gps_points = clusterer.get_main_cluster(self.clustered_points) # 获取统计信息并记录 stats = clusterer.get_cluster_stats(self.clustered_points) self.logger.info( f"聚类完成:主要类别包含 {stats['main_cluster_points']} 个点," f"噪声点 {stats['noise_points']} 个" ) # 可视化聚类结果 self.visualizer.visualize_filter_step( self.gps_points, previous_points, "1-Clustering") return self.gps_points def filter_time_group_overlap(self) -> pd.DataFrame: """过滤重叠的时间组""" self.logger.info("开始过滤重叠时间组") self.logger.info("开始过滤重叠时间组") previous_points = self.gps_points.copy() filter = TimeGroupOverlapFilter( self.config.image_dir, self.config.output_dir, overlap_threshold=self.config.time_group_overlap_threshold ) deleted_files = filter.filter_overlapping_groups( time_threshold=self.config.time_group_interval ) # 更新GPS点数据,移除被删除的图像 self.gps_points = self.gps_points[~self.gps_points['file'].isin( deleted_files)] self.logger.info(f"重叠时间组过滤后剩余 {len(self.gps_points)} 个GPS点") # 可视化过滤结果 self.visualizer.visualize_filter_step( self.gps_points, previous_points, "2-Time Group Overlap") return self.gps_points # TODO 过滤算法还需要更新 def filter_points(self) -> pd.DataFrame: """过滤GPS点""" self.logger.info("开始过滤GPS点") filter = GPSFilter(self.config.output_dir) # 过滤孤立点 previous_points = self.gps_points.copy() self.logger.info( f"开始过滤孤立点(距离阈值: {self.config.filter_distance_threshold}, " f"最小邻居数: {self.config.filter_min_neighbors})" ) self.gps_points = filter.filter_isolated_points( self.gps_points, self.config.filter_distance_threshold, self.config.filter_min_neighbors, ) self.logger.info(f"孤立点过滤后剩余 {len(self.gps_points)} 个GPS点") # 可视化孤立点过滤结果 self.visualizer.visualize_filter_step( self.gps_points, previous_points, "3-Isolated Points") # 过滤密集点 previous_points = self.gps_points.copy() self.logger.info( f"开始过滤密集点(网格大小: {self.config.filter_grid_size}, " f"距离阈值: {self.config.filter_dense_distance_threshold})" ) self.gps_points = filter.filter_dense_points( self.gps_points, grid_size=self.config.filter_grid_size, distance_threshold=self.config.filter_dense_distance_threshold, time_threshold=self.config.filter_time_threshold, ) self.logger.info(f"密集点过滤后剩余 {len(self.gps_points)} 个GPS点") # 可视化密集点过滤结果 self.visualizer.visualize_filter_step( self.gps_points, previous_points, "4-Dense Points") return self.gps_points def divide_grids(self) -> Dict[int, pd.DataFrame]: """划分网格""" self.logger.info(f"开始划分网格 (重叠率: {self.config.grid_overlap})") grid_divider = GridDivider( overlap=self.config.grid_overlap, output_dir=self.config.output_dir ) grids = grid_divider.divide_grids( self.gps_points, grid_size=self.config.grid_size ) grid_points = grid_divider.assign_to_grids(self.gps_points, grids) self.logger.info(f"成功划分为 {len(grid_points)} 个网格") return grid_points def copy_images(self, grid_points: Dict[int, pd.DataFrame]): """复制图像到目标文件夹""" self.logger.info("开始复制图像文件") self.logger.info("开始复制图像文件") for grid_idx, points in grid_points.items(): output_dir = os.path.join( self.config.output_dir, f"grid_{grid_idx + 1}", "project", "images" ) os.makedirs(output_dir, exist_ok=True) for point in tqdm(points, desc=f"复制网格 {grid_idx + 1} 的图像"): src = os.path.join(self.config.image_dir, point["file"]) dst = os.path.join(output_dir, point["file"]) shutil.copy(src, dst) self.logger.info(f"网格 {grid_idx + 1} 包含 {len(points)} 张图像") def merge_tif(self, grid_points: Dict[int, pd.DataFrame]): """合并所有网格的TIF影像""" self.logger.info("开始合并TIF影像") # 检查是否有多个网格需要合并 if len(grid_points) < 2: self.logger.info("只有一个网格,无需合并TIF影像") return input_tif1, input_tif2 = None, None merge_count = 0 try: for grid_idx, points in grid_points.items(): grid_tif = os.path.join( self.config.output_dir, f"grid_{grid_idx + 1}", "project", "odm_orthophoto", "odm_orthophoto.original.tif" ) # 检查TIF文件是否存在 if not os.path.exists(grid_tif): self.logger.error( f"网格 {grid_idx + 1} 的TIF文件不存在: {grid_tif}") continue if input_tif1 is None: input_tif1 = grid_tif self.logger.info(f"设置第一个输入TIF: {input_tif1}") else: input_tif2 = grid_tif output_tif = os.path.join( self.config.output_dir, "merged_orthophoto.tif") self.logger.info( f"开始合并第 {merge_count + 1} 次:\n" f"输入1: {input_tif1}\n" f"输入2: {input_tif2}\n" f"输出: {output_tif}" ) merge_tif = MergeTif(input_tif1, input_tif2, output_tif) merge_tif.merge() merge_count += 1 input_tif1 = output_tif input_tif2 = None self.logger.info( f"TIF影像合并完成,共执行 {merge_count} 次合并," f"最终输出文件: {input_tif1}" ) except Exception as e: self.logger.error(f"TIF影像合并过程中发生错误: {str(e)}", exc_info=True) raise def process(self): """执行完整的预处理流程""" try: self.extract_gps() self.cluster() # self.filter_time_group_overlap() # self.filter_points() grid_points = self.divide_grids() self.copy_images(grid_points) self.logger.info("预处理任务完成") self.command_runner.run_grid_commands( grid_points, ) # 添加TIF合并步骤 self.merge_tif(grid_points) except Exception as e: self.logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True) raise if __name__ == "__main__": # 创建配置 config = PreprocessConfig( image_dir=r"F:\error_data\20241024100834\code\images", output_dir=r"G:\ODM_output\20241024100834\output", cluster_eps=0.01, cluster_min_samples=5, # 添加时间组重叠过滤参数 time_group_overlap_threshold=0.7, time_group_interval=timedelta(minutes=5), filter_distance_threshold=0.001, filter_min_neighbors=6, filter_grid_size=0.001, filter_dense_distance_threshold=10, filter_time_threshold=timedelta(minutes=5), grid_size=1000, grid_overlap=0.03, mode="快拼模式", ) # 创建处理器并执行 processor = ImagePreprocessor(config) processor.process()