import os import logging from typing import Dict import pandas as pd from utils.odm_monitor import ODMProcessMonitor class CommandRunner: """执行网格处理命令的类""" def __init__(self, output_dir: str, mode: str = "快拼模式"): self.output_dir = output_dir self.logger = logging.getLogger('UAV_Preprocess.CommandRunner') self.monitor = ODMProcessMonitor(mode=mode) self.mode = mode def run_grid_commands(self, grid_points: Dict[int, pd.DataFrame]): """处理所有网格""" self.logger.info("开始执行网格处理") for grid_idx in grid_points.keys(): grid_dir = os.path.abspath(os.path.join( self.output_dir, f'grid_{grid_idx + 1}' )) success, error_msg = self.monitor.run_odm_with_monitor( grid_dir=grid_dir, grid_idx=grid_idx, fast_mode=(self.mode == "快拼模式") ) if not success: raise Exception(f"网格 {grid_idx + 1} 处理失败: {error_msg}")