UAV/utils/command_runner.py
2024-12-25 14:28:01 +08:00

47 lines
1.6 KiB
Python

import os
import logging
from typing import Dict
import pandas as pd
from utils.odm_monitor import ODMProcessMonitor
class CommandRunner:
"""执行网格处理命令的类"""
def __init__(self, output_dir: str, max_retries: int = 3, mode: str = "快拼模式"):
self.output_dir = output_dir
self.max_retries = max_retries
self.logger = logging.getLogger('UAV_Preprocess.CommandRunner')
self.monitor = ODMProcessMonitor(max_retries=max_retries, mode=mode)
self.mode = mode
def _run_command(self, grid_idx: int):
"""执行单个网格的处理"""
try:
grid_dir = os.path.join(self.output_dir, f'grid_{grid_idx + 1}')
grid_dir = os.path.abspath(grid_dir) # 确保使用绝对路径
success, error_msg = self.monitor.run_odm_with_monitor(
grid_dir=grid_dir,
grid_idx=grid_idx,
fast_mode=(self.mode == "快拼模式")
)
if not success:
raise Exception(error_msg)
except Exception as e:
self.logger.error(f"网格 {grid_idx + 1} 处理失败: {str(e)}")
raise
def run_grid_commands(self, grid_points: Dict[int, pd.DataFrame]):
"""处理所有网格"""
self.logger.info("开始执行网格处理")
for grid_idx in grid_points.keys():
try:
self._run_command(grid_idx)
except Exception as e:
self.logger.error(f"网格 {grid_idx + 1} 处理失败,停止后续执行: {str(e)}")
raise