import os import time import logging import subprocess from typing import Tuple class ODMProcessMonitor: """ODM进程监控器""" def __init__(self, max_retries: int = 3, mode: str = "快拼模式"): self.max_retries = max_retries self.logger = logging.getLogger('UAV_Preprocess.ODMMonitor') self.mode = mode def _check_success(self, grid_dir: str) -> bool: """检查ODM是否执行成功""" success_markers = ['odm_orthophoto', 'odm_georeferencing'] if self.mode != "快拼模式": success_markers.append('odm_texturing') return all(os.path.exists(os.path.join(grid_dir, 'project', marker)) for marker in success_markers) def run_odm_with_monitor(self, grid_dir: str, grid_idx: int, fast_mode: bool = True) -> Tuple[bool, str]: """运行ODM命令""" attempt = 0 while attempt < self.max_retries: try: self.logger.info(f"网格 {grid_idx + 1} 第 {attempt + 1} 次尝试") # 构建基础命令 cmd = [ "docker", "run", "--rm", "-v", f"{grid_dir}:/datasets", "opendronemap/odm", "--project-path", "/datasets", "project", "--max-concurrency", "10", "--force-gps", "--rerun-all" ] # 快速模式额外参数 if fast_mode: cmd.extend([ "--feature-quality", "lowest", "--orthophoto-resolution", "8", "--fast-orthophoto", "--skip-3dmodel" ]) # 执行命令,不捕获输出 result = subprocess.run( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True ) # 检查执行结果 if result.returncode == 0 and self._check_success(grid_dir): self.logger.info(f"网格 {grid_idx + 1} ODM处理成功") return True, "" # 只在失败时记录错误信息 if result.returncode != 0: self.logger.error(f"网格 {grid_idx + 1} 执行失败,错误信息:{result.stderr[-500:]}") except Exception as e: error_msg = f"执行异常: {str(e)}" self.logger.error(error_msg) return False, error_msg attempt += 1 if attempt < self.max_retries: time.sleep(30) error_msg = f"网格 {grid_idx + 1} 在 {self.max_retries} 次尝试后仍然失败" self.logger.error(error_msg) return False, error_msg