UAV/utils/odm_monitor.py
2024-12-23 11:31:20 +08:00

140 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import psutil
import logging
import subprocess
from typing import Optional, Tuple
class ODMProcessMonitor:
"""ODM进程监控器"""
def __init__(self, max_retries: int = 3, check_interval: int = 10, mode: str = "快拼模式"):
"""
初始化监控器
Args:
max_retries: 最大重试次数
check_interval: 检查间隔(秒)
mode: 模式
"""
self.max_retries = max_retries
self.check_interval = check_interval
self.logger = logging.getLogger('UAV_Preprocess.ODMMonitor')
self.mode = mode
def _check_docker_container(self, process_name: str = "opendronemap/odm") -> bool:
"""检查是否有指定的Docker容器在运行"""
try:
result = subprocess.run(
["docker", "ps", "--filter",
f"ancestor={process_name}", "--format", "{{.ID}}"],
capture_output=True,
text=True
)
return bool(result.stdout.strip())
except Exception as e:
self.logger.error(f"检查Docker容器状态时发生错误: {str(e)}")
return False
def _check_success(self, grid_dir: str) -> bool:
"""检查ODM是否执行成功"""
if self.mode == "快拼模式":
success_markers = ['odm_orthophoto', 'odm_georeferencing']
else:
success_markers = ['odm_orthophoto',
'odm_georeferencing', 'odm_texturing']
return all(os.path.exists(os.path.join(grid_dir, 'project', marker)) for marker in success_markers)
def run_odm_with_monitor(self, command: str, grid_dir: str, grid_idx: int) -> Tuple[bool, str]:
"""
运行ODM命令并监控进程
"""
attempt = 0
while attempt < self.max_retries:
try:
self.logger.info(f"网格 {grid_idx + 1}{attempt + 1} 次尝试执行ODM")
# 创建日志文件
log_file = os.path.join(grid_dir, f'odm_attempt_{attempt + 1}.log')
with open(log_file, 'w', encoding='utf-8') as f:
f.write(f"=== ODM处理日志 ===\n开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
# 启动ODM进程实时获取输出
process = subprocess.Popen(
command,
shell=True,
cwd=grid_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # 行缓冲
universal_newlines=True
)
self.logger.info("ODM进程已启动开始监控Docker容器")
# 等待进程启动
time.sleep(10)
# 实时读取输出并写入日志
def log_output(pipe, log_file, prefix=""):
with open(log_file, 'a', encoding='utf-8') as f:
for line in pipe:
f.write(f"{prefix}{line}")
f.flush() # 确保立即写入
# 创建线程读取输出
from threading import Thread
stdout_thread = Thread(target=log_output,
args=(process.stdout, log_file))
stderr_thread = Thread(target=log_output,
args=(process.stderr, log_file, "ERROR: "))
stdout_thread.daemon = True
stderr_thread.daemon = True
stdout_thread.start()
stderr_thread.start()
# 监控Docker容器
while True:
if not self._check_docker_container():
# Docker容器已结束
process.wait() # 等待进程完全结束
# 等待输出线程结束
stdout_thread.join(timeout=5)
stderr_thread.join(timeout=5)
# 记录结束时间
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"\n=== 处理结束 ===\n结束时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
# 检查是否成功完成
if self._check_success(grid_dir):
self.logger.info(f"网格 {grid_idx + 1} ODM处理成功")
return True, ""
else:
self.logger.warning(
f"网格 {grid_idx + 1}{attempt + 1} 次尝试失败")
break
time.sleep(self.check_interval)
# 如果不是最后一次尝试,等待后重试
if attempt < self.max_retries - 1:
wait_time = (attempt + 1) * 30
self.logger.info(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
attempt += 1
except Exception as e:
error_msg = f"监控进程发生异常: {str(e)}"
self.logger.error(error_msg)
return False, error_msg
error_msg = f"网格 {grid_idx + 1}{self.max_retries} 次尝试后仍然失败,需要人工查看"
self.logger.error(error_msg)
return False, error_msg