import os import time import logging from typing import Dict, Tuple import pandas as pd import subprocess class ODMProcessMonitor: """ODM处理监控器""" def __init__(self, output_dir: str, mode: str = "三维模式", config=None): self.output_dir = output_dir self.logger = logging.getLogger('UAV_Preprocess.ODMMonitor') self.mode = mode self.config = config def run_odm_with_monitor(self, grid_dir: str, grid_id: tuple) -> Tuple[bool, str]: """运行ODM命令""" self.logger.info(f"开始处理网格 ({grid_id[0]},{grid_id[1]})") success = False error_msg = "" max_retries = 3 current_try = 0 cpu_cores = os.cpu_count() while current_try < max_retries: current_try += 1 self.logger.info( f"第 {current_try} 次尝试处理网格 ({grid_id[0]},{grid_id[1]})") # 构建 Docker 容器运行参数 grid_dir_fixed = grid_dir[0].lower() + grid_dir[1:].replace('\\', '/') command = ( f"--max-concurrency {cpu_cores} " f"--force-gps " f"--use-exif " f"--use-hybrid-bundle-adjustment " f"--optimize-disk-space " f"--orthophoto-cutline " ) command += f"--feature-type {self.config.feature_type} " command += f"--orthophoto-resolution {self.config.orthophoto_resolution} " if self.mode == "快拼模式": command += ( f"--fast-orthophoto " f"--skip-3dmodel " ) else: # 三维模式参数 command += ( f"--dsm " f"--dtm " ) if current_try == 1: command += ( f"--feature-quality low " ) command += "--rerun-all" docker_cmd = f'python3 /code/run.py --project-path {grid_dir} project {command}' self.logger.info(f"Docker 命令: {docker_cmd}") try: # 执行命令 result = subprocess.run( docker_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8" ) if result.returncode != 0: self.logger.error( f"容器运行失败,退出状态码: {result.returncode}") error_lines = result.stderr.splitlines() self.logger.error("容器运行失败的最后 50 行错误日志:") for line in error_lines[-50:]: self.logger.error(line) error_msg = "\n".join(error_lines[-50:]) time.sleep(5) else: logs = result.stdout.splitlines() self.logger.info("容器运行完成,以下是最后 50 行日志:") for line in logs[-50:]: self.logger.info(line) success = True error_msg = "" break except Exception as e: error_msg = str(e) self.logger.error(f"执行docker命令时发生异常: {error_msg}") time.sleep(5) return success, error_msg def process_all_grids(self, grid_points: Dict[tuple, pd.DataFrame]) -> list: """处理所有网格 Returns: Dict[tuple, pd.DataFrame]: 成功处理的网格点数据字典 """ self.logger.info("开始执行网格处理") successful_grid_lt = [] failed_grids = [] for grid_id, points in grid_points.items(): grid_dir = os.path.join( self.output_dir, f'grid_{grid_id[0]}_{grid_id[1]}' ) try: success, error_msg = self.run_odm_with_monitor( grid_dir=grid_dir, grid_id=grid_id, ) if success: successful_grid_lt.append(grid_id) else: self.logger.error( f"网格 ({grid_id[0]},{grid_id[1]})") failed_grids.append(grid_id) except Exception as e: error_msg = str(e) self.logger.error( f"处理网格 ({grid_id[0]},{grid_id[1]}) 时发生异常: {error_msg}") failed_grids.append((grid_id, error_msg)) # 汇总处理结果 total_grids = len(grid_points) failed_count = len(failed_grids) success_count = len(successful_grid_lt) self.logger.info( f"网格处理完成。总计: {total_grids}, 成功: {success_count}, 失败: {failed_count}") if len(successful_grid_lt) == 0: raise Exception("所有网格处理都失败,无法继续处理") return successful_grid_lt