import os import time import logging import subprocess from typing import Dict, Tuple import pandas as pd import numpy as np from osgeo import gdal import threading from concurrent.futures import ThreadPoolExecutor, as_completed class NotOverlapError(Exception): """图像重叠度不足异常""" pass class ODMProcessMonitor: """ODM处理监控器""" def __init__(self, output_dir: str, mode: str = "快拼模式", max_parallel: int = 2): self.output_dir = output_dir self.logger = logging.getLogger('UAV_Preprocess.ODMMonitor') self.mode = mode self.max_parallel = max_parallel # 最大并行数 def _check_success(self, grid_dir: str) -> bool: """检查ODM是否执行成功 检查项目: 1. 必要的文件夹和文件是否存在 2. 产品文件是否有效 """ project_dir = os.path.join(grid_dir, 'project') # 根据不同模式检查不同的产品 if self.mode == "快拼模式": # 只检查正射影像 # if not self._check_orthophoto(project_dir): # return False pass elif self.mode == "三维模式": # 检查点云和实景三维 if not all([ os.path.exists(os.path.join(project_dir, 'odm_georeferencing', 'odm_georeferenced_model.laz')), os.path.exists(os.path.join(project_dir, 'odm_texturing', 'odm_textured_model_geo.obj')) ]): self.logger.error("点云或实景三维文件夹未生成") return False # TODO: 添加点云和实景三维的质量检查 elif self.mode == "重建模式": # 检查所有产品 if not all([ os.path.exists(os.path.join(project_dir, 'odm_georeferencing', 'odm_georeferenced_model.laz')), os.path.exists(os.path.join(project_dir, 'odm_texturing', 'odm_textured_model_geo.obj')) ]): self.logger.error("部分必要的文件夹未生成") return False # 检查正射影像 # if not self._check_orthophoto(project_dir): # return False # TODO: 添加点云和实景三维的质量检查 return True # TODO 正射影像怎么检查最好 def _check_orthophoto(self, project_dir: str) -> bool: """检查正射影像的质量""" ortho_path = os.path.join(project_dir, 'odm_orthophoto', 'odm_orthophoto.original.tif') if not os.path.exists(ortho_path): self.logger.error("正射影像文件未生成") return False # 检查文件大小 file_size_mb = os.path.getsize(ortho_path) / (1024 * 1024) # 转换为MB if file_size_mb < 1: self.logger.error(f"正射影像文件过小: {file_size_mb:.2f}MB") return False try: # 打开影像文件 ds = gdal.Open(ortho_path) if ds is None: self.logger.error("无法打开正射影像文件") return False # 读取第一个波段 band = ds.GetRasterBand(1) # 获取统计信息 stats = band.GetStatistics(False, True) if stats is None: self.logger.error("无法获取影像统计信息") return False min_val, max_val, mean, std = stats # 计算空值比例 no_data_value = band.GetNoDataValue() array = band.ReadAsArray() if no_data_value is not None: no_data_ratio = np.sum(array == no_data_value) / array.size else: no_data_ratio = 0 # 检查空值比例是否过高(超过50%) if no_data_ratio > 0.5: self.logger.error(f"正射影像空值比例过高: {no_data_ratio:.2%}") return False # 检查影像是否全黑或全白 if max_val - min_val < 1: self.logger.error("正射影像可能无效:像素值范围过小") return False ds = None # 关闭数据集 return True except Exception as e: self.logger.error(f"检查正射影像时发生错误: {str(e)}") return False def run_odm_with_monitor(self, grid_dir: str, grid_id: tuple, produce_dem: bool = False) -> Tuple[bool, str]: """运行ODM命令""" # if produce_dem and self.mode == "快拼模式": # self.logger.error("快拼模式下无法生成DEM,请调整生产参数") # return False, "快拼模式下无法生成DEM,请调整生产参数" self.logger.info(f"开始处理网格 ({grid_id[0]},{grid_id[1]})") success = False error_msg = "" max_retries = 3 current_try = 0 use_lowest_quality = True # 初始使用lowest quality while current_try < max_retries: current_try += 1 self.logger.info( f"第 {current_try} 次尝试处理网格 ({grid_id[0]},{grid_id[1]})") try: # 构建Docker命令 grid_dir = grid_dir[0].lower()+grid_dir[1:].replace('\\', '/') docker_command = ( f"docker run " # 资源限制 f"--gpus all " # 指定使用的GPU f"--memory=32g " # 限制内存使用 f"--memory-swap=36g " # 限制包含swap的总内存 f"--cpus=10.0 " # 限制CPU使用数量 # 其他参数 f"-ti --rm " f"--name odm_grid_{grid_id[0]}_{grid_id[1]} " f"-v {grid_dir}:/datasets " f"opendronemap/odm:gpu " f"--project-path /datasets project " f"--max-concurrency 10 " f"--force-gps " ) # 根据是否使用lowest quality添加参数 if use_lowest_quality: docker_command += f"--feature-quality lowest " # docker_command += f"--orthophoto-resolution 8 " if produce_dem: docker_command += ( f"--dsm " f"--dtm " ) if self.mode == "快拼模式": docker_command += ( #f"--fast-orthophoto " f"--skip-3dmodel " ) elif self.mode == "三维模式": docker_command += ( f"--skip-orthophoto " ) docker_command += "--rerun-all" self.logger.info(docker_command) result = subprocess.run( docker_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = result.stdout.decode( 'utf-8'), result.stderr.decode('utf-8') stdout_lines = stdout.strip().split('\n') last_lines = '\n'.join( stdout_lines[-50:] if len(stdout_lines) > 10 else stdout_lines) self.logger.info(f"==========stdout==========: {last_lines}") if stderr: self.logger.error(f"docker run指令执行失败") self.logger.error(f"==========stderr==========: {stderr}") if "error during connect" in stderr or "The system cannot find the file specified" in stderr: error_msg = "Docker没有启动,请启动Docker" elif "user declined directory sharing" in stderr: error_msg = "Docker无法访问目录,请检查目录权限和共享设置" else: error_msg = "Docker运行失败,需要人工排查错误" break else: self.logger.info("docker run指令执行成功") if "ODM app finished" in last_lines: self.logger.info("ODM处理完成") if self._check_success(grid_dir): self.logger.info( f"网格 ({grid_id[0]},{grid_id[1]}) 处理成功") success = True error_msg = "" break else: self.logger.error( f"虽然ODM处理完成,但是生产产品质量可能不合格,需要人工检查") raise NotOverlapError # TODO 先写成这样,后面这三种情况可能处理不一样 elif "enough overlap" in last_lines: raise NotOverlapError elif "out of memory" in last_lines: raise NotOverlapError elif "strange values" in last_lines: raise NotOverlapError else: raise NotOverlapError except NotOverlapError: if use_lowest_quality: self.logger.warning( "检测到not overlap错误,移除lowest quality参数后重试") use_lowest_quality = False time.sleep(10) continue else: self.logger.error( "即使移除lowest quality参数后仍然出现错误") error_msg = "图像重叠度不足,需要人工检查数据集的采样间隔情况" break return success, error_msg def process_all_grids(self, grid_points: Dict[tuple, pd.DataFrame], produce_dem: bool) -> Dict[tuple, pd.DataFrame]: """并行处理所有网格""" self.logger.info("开始执行网格处理") successful_grid_points = {} failed_grids = [] # 使用线程池进行并行处理 with ThreadPoolExecutor(max_workers=self.max_parallel) as executor: # 创建所有任务 future_to_grid = { executor.submit( self.run_odm_with_monitor, os.path.join(self.output_dir, f'grid_{grid_id[0]}_{grid_id[1]}'), grid_id, produce_dem ): (grid_id, points) for grid_id, points in grid_points.items() } # 等待所有任务完成并收集结果 for future in as_completed(future_to_grid): grid_id, points = future_to_grid[future] try: success, error_msg = future.result() if success: successful_grid_points[grid_id] = points else: self.logger.error( f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败: {error_msg}") failed_grids.append((grid_id, error_msg)) except Exception as e: error_msg = str(e) self.logger.error( f"处理网格 ({grid_id[0]},{grid_id[1]}) 时发生异常: {error_msg}") failed_grids.append((grid_id, error_msg)) # 汇总处理结果 total_grids = len(grid_points) failed_count = len(failed_grids) success_count = len(successful_grid_points) self.logger.info( f"网格处理完成。总计: {total_grids}, 成功: {success_count}, 失败: {failed_count}") if failed_grids: self.logger.error("失败的网格:") for grid_id, error_msg in failed_grids: self.logger.error( f"网格 ({grid_id[0]},{grid_id[1]}): {error_msg}") if len(successful_grid_points) == 0: raise Exception("所有网格处理都失败,无法继续处理") return successful_grid_points