UAV/utils/odm_monitor.py

113 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import psutil
import logging
import subprocess
from typing import Optional, Tuple
class ODMProcessMonitor:
"""ODM进程监控器"""
def __init__(self, max_retries: int = 3, check_interval: int = 10, mode: str = "快拼模式"):
"""
初始化监控器
Args:
max_retries: 最大重试次数
check_interval: 检查间隔(秒)
mode: 模式
"""
self.max_retries = max_retries
self.check_interval = check_interval
self.logger = logging.getLogger('UAV_Preprocess.ODMMonitor')
self.mode = mode
def _check_docker_container(self, process_name: str = "opendronemap/odm") -> bool:
"""检查是否有指定的Docker容器在运行"""
try:
result = subprocess.run(
["docker", "ps", "--filter",
f"ancestor={process_name}", "--format", "{{.ID}}"],
capture_output=True,
text=True
)
return bool(result.stdout.strip())
except Exception as e:
self.logger.error(f"检查Docker容器状态时发生错误: {str(e)}")
return False
def _check_success(self, grid_dir: str) -> bool:
"""检查ODM是否执行成功"""
if self.mode == "快拼模式":
success_markers = ['odm_orthophoto', 'odm_georeferencing']
else:
success_markers = ['odm_orthophoto',
'odm_georeferencing', 'odm_texturing']
return all(os.path.exists(os.path.join(grid_dir, 'project', marker)) for marker in success_markers)
def run_odm_with_monitor(self, command: str, grid_dir: str, grid_idx: int) -> Tuple[bool, str]:
"""运行ODM命令并监控进程"""
attempt = 0
while attempt < self.max_retries:
try:
self.logger.info(f"网格 {grid_idx + 1}{attempt + 1} 次尝试执行ODM")
# 创建日志文件
log_file = os.path.join(grid_dir, f'odm_attempt_{attempt + 1}.log')
# 使用 subprocess.Popen 启动进程,并设置适当的参数
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1, # 使用行缓冲
text=True, # 使用文本模式
universal_newlines=True # 统一换行符处理
)
# 使用非阻塞方式读取输出
while True:
# 读取输出
output = process.stdout.readline()
if output:
with open(log_file, 'a', encoding='utf-8') as f:
f.write(output)
self.logger.info(f"网格 {grid_idx + 1}: {output.strip()}")
# 检查进程是否结束
if process.poll() is not None:
break
# 检查Docker容器状态
if not self._check_docker_container():
break
time.sleep(0.1) # 短暂休眠避免CPU过度使用
# 获取最终返回码
return_code = process.wait()
# 检查是否成功完成
if return_code == 0 and self._check_success(grid_dir):
self.logger.info(f"网格 {grid_idx + 1} ODM处理成功")
return True, ""
self.logger.warning(f"网格 {grid_idx + 1}{attempt + 1} 次尝试失败")
except Exception as e:
error_msg = f"监控进程发生异常: {str(e)}"
self.logger.error(error_msg)
return False, error_msg
attempt += 1
if attempt < self.max_retries:
wait_time = (attempt + 1) * 30
self.logger.info(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
error_msg = f"网格 {grid_idx + 1}{self.max_retries} 次尝试后仍然失败"
self.logger.error(error_msg)
return False, error_msg