UAV/utils/odm_monitor.py
2025-05-04 17:26:21 +08:00

155 lines
5.2 KiB
Python

import os
import time
import logging
from typing import Dict, Tuple
import pandas as pd
import docker
class ODMProcessMonitor:
"""ODM处理监控器"""
def __init__(self, output_dir: str, mode: str = "三维模式"):
self.output_dir = output_dir
self.logger = logging.getLogger('UAV_Preprocess.ODMMonitor')
self.mode = mode
def run_odm_with_monitor(self, grid_dir: str, grid_id: tuple) -> Tuple[bool, str]:
"""运行ODM命令"""
self.logger.info(f"开始处理网格 ({grid_id[0]},{grid_id[1]})")
success = False
error_msg = ""
max_retries = 3
current_try = 0
cpu_cores = os.cpu_count()
# 初始化 Docker 客户端
client = docker.from_env()
while current_try < max_retries:
current_try += 1
self.logger.info(
f"{current_try} 次尝试处理网格 ({grid_id[0]},{grid_id[1]})")
# 构建 Docker 容器运行参数
grid_dir = grid_dir[0].lower(
) + grid_dir[1:].replace('\\', '/')
volumes = {
grid_dir: {'bind': '/datasets', 'mode': 'rw'}
}
command = (
f"--project-path /datasets project "
f"--max-concurrency {cpu_cores} "
f"--force-gps "
f"--use-exif "
f"--use-hybrid-bundle-adjustment "
f"--optimize-disk-space "
f"--orthophoto-cutline "
f"--feature-type sift "
f"--orthophoto-resolution 8 "
# f"--mesh-size 5000000 "
# f"--mesh-octree-depth 13 "
)
if self.mode == "快拼模式":
command += (
f"--fast-orthophoto "
f"--skip-3dmodel "
)
else: # 三维模式参数
command += (
f"--dsm "
f"--dtm "
)
if current_try == 1:
command += (
f"--feature-quality low "
)
command += "--rerun-all"
self.logger.info(f"Docker 命令: {command}")
# 运行 Docker 容器
container = client.containers.run(
image="opendronemap/odm:gpu",
command=command,
volumes=volumes,
detach=True,
remove=False,
runtime="nvidia", # 使用 GPU
)
# 等待容器运行完成
exit_status = container.wait()
if exit_status["StatusCode"] != 0:
self.logger.error(
f"容器运行失败,退出状态码: {exit_status['StatusCode']}")
# 获取容器的错误日志
error_msg = container.logs(
stderr=True).decode("utf-8").splitlines()
self.logger.error("容器运行失败的最后 50 行错误日志:")
for line in error_msg[-50:]:
self.logger.error(line)
container.remove()
time.sleep(5)
else:
# 获取所有日志
logs = container.logs().decode("utf-8").splitlines()
# 输出最后 50 行日志
self.logger.info("容器运行完成,以下是最后 50 行日志:")
for line in logs[-50:]:
self.logger.info(line)
success = True
error_msg = ""
container.remove()
break
return success, error_msg
def process_all_grids(self, grid_points: Dict[tuple, pd.DataFrame]) -> list:
"""处理所有网格
Returns:
Dict[tuple, pd.DataFrame]: 成功处理的网格点数据字典
"""
self.logger.info("开始执行网格处理")
successful_grid_lt = []
failed_grids = []
for grid_id, points in grid_points.items():
grid_dir = os.path.join(
self.output_dir, f'grid_{grid_id[0]}_{grid_id[1]}'
)
try:
success, error_msg = self.run_odm_with_monitor(
grid_dir=grid_dir,
grid_id=grid_id,
)
if success:
successful_grid_lt.append(grid_id)
else:
self.logger.error(
f"网格 ({grid_id[0]},{grid_id[1]})")
failed_grids.append(grid_id)
except Exception as e:
error_msg = str(e)
self.logger.error(
f"处理网格 ({grid_id[0]},{grid_id[1]}) 时发生异常: {error_msg}")
failed_grids.append((grid_id, error_msg))
# 汇总处理结果
total_grids = len(grid_points)
failed_count = len(failed_grids)
success_count = len(successful_grid_lt)
self.logger.info(
f"网格处理完成。总计: {total_grids}, 成功: {success_count}, 失败: {failed_count}")
if len(successful_grid_lt) == 0:
raise Exception("所有网格处理都失败,无法继续处理")
return successful_grid_lt