UAV/utils/odm_monitor.py
2025-01-02 20:11:47 +08:00

90 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import logging
import subprocess
from typing import Dict, Tuple
import pandas as pd
class ODMProcessMonitor:
"""ODM处理监控器"""
def __init__(self, output_dir: str, mode: str = "快拼模式"):
self.output_dir = output_dir
self.logger = logging.getLogger('UAV_Preprocess.ODMMonitor')
self.mode = mode
def _check_success(self, grid_dir: str) -> bool:
"""检查ODM是否执行成功"""
success_markers = ['odm_orthophoto', 'odm_georeferencing']
if self.mode != "快拼模式":
success_markers.append('odm_texturing')
return all(os.path.exists(os.path.join(grid_dir, 'project', marker)) for marker in success_markers)
def run_odm_with_monitor(self, grid_dir: str, grid_id: tuple, fast_mode: bool = True, produce_dem: bool = False) -> Tuple[bool, str]:
"""运行ODM命令"""
if produce_dem and fast_mode:
self.logger.error("快拼模式下无法生成DEM请调整生产参数")
return False, "快拼模式下无法生成DEM请调整生产参数"
self.logger.info(f"开始处理网格 ({grid_id[0]},{grid_id[1]})")
# 构建Docker命令
grid_dir = grid_dir[0].lower()+grid_dir[1:].replace('\\', '/')
docker_command = (
f"docker run --gpus all -ti --rm "
f"-v {grid_dir}:/datasets "
f"opendronemap/odm:gpu "
f"--project-path /datasets project "
f"--max-concurrency 15 "
f"--force-gps "
f"--feature-quality lowest "
f"--orthophoto-resolution 10 "
)
if produce_dem:
docker_command += (
f"--dsm "
f"--dtm "
)
if fast_mode:
docker_command += (
f"--fast-orthophoto "
f"--skip-3dmodel "
)
docker_command += "--rerun-all"
self.logger.info(docker_command)
result = subprocess.run(
docker_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = result.stdout.decode(
'utf-8'), result.stderr.decode('utf-8')
self.logger.info(f"==========stdout==========: {stdout}")
self.logger.error(f"==========stderr==========: {stderr}")
# 检查执行结果
if self._check_success(grid_dir):
self.logger.info(f"网格 ({grid_id[0]},{grid_id[1]}) 处理成功")
return True, ""
else:
self.logger.error(f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败")
return False, f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败"
def process_all_grids(self, grid_points: Dict[tuple, pd.DataFrame], produce_dem: bool):
"""处理所有网格"""
self.logger.info("开始执行网格处理")
for grid_id in grid_points.keys():
grid_dir = os.path.join(
self.output_dir, f'grid_{grid_id[0]}_{grid_id[1]}'
)
success, error_msg = self.run_odm_with_monitor(
grid_dir=grid_dir,
grid_id=grid_id,
fast_mode=(self.mode == "快拼模式"),
produce_dem=produce_dem
)
if not success:
raise Exception(f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败: {error_msg}")