添加docker run过程的容错

This commit is contained in:
Your Name 2025-01-04 17:54:03 +08:00
parent 7f64316056
commit 0c44cde378
2 changed files with 234 additions and 56 deletions

View File

@ -249,6 +249,17 @@ class ImagePreprocessor:
merger = MergeObj(self.config.output_dir) merger = MergeObj(self.config.output_dir)
merger.merge_grid_obj(grid_points, translations) merger.merge_grid_obj(grid_points, translations)
def post_process(self, successful_grid_points: Dict[tuple, pd.DataFrame], grid_points: Dict[tuple, pd.DataFrame], translations: Dict[tuple, tuple]):
if len(successful_grid_points) < len(grid_points):
self.logger.warning(
f"{len(grid_points) - len(successful_grid_points)} 个网格处理失败,"
f"将只合并成功处理的 {len(successful_grid_points)} 个网格"
)
self.merge_tif(successful_grid_points, self.config.produce_dem)
if self.config.mode != "快拼模式":
self.merge_ply(successful_grid_points)
self.merge_obj(successful_grid_points, translations)
def process(self): def process(self):
"""执行完整的预处理流程""" """执行完整的预处理流程"""
try: try:
@ -260,10 +271,10 @@ class ImagePreprocessor:
self.copy_images(grid_points) self.copy_images(grid_points)
self.logger.info("预处理任务完成") self.logger.info("预处理任务完成")
self.odm_monitor.process_all_grids(grid_points, self.config.produce_dem) successful_grid_points = self.odm_monitor.process_all_grids(grid_points, self.config.produce_dem)
self.merge_tif(grid_points, self.config.produce_dem)
self.merge_ply(grid_points) self.post_process(successful_grid_points, grid_points, translations)
self.merge_obj(grid_points, translations)
except Exception as e: except Exception as e:
self.logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True) self.logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
raise raise

View File

@ -3,8 +3,22 @@ import logging
import subprocess import subprocess
from typing import Dict, Tuple from typing import Dict, Tuple
import pandas as pd import pandas as pd
import numpy as np
from osgeo import gdal
class NotOverlapError(Exception):
"""图像重叠度不足异常"""
pass
class DockerNotRunError(Exception):
"""Docker未启动异常"""
pass
class DockerShareError(Exception):
"""Docker目录共享异常"""
pass
class ODMProcessMonitor: class ODMProcessMonitor:
"""ODM处理监控器""" """ODM处理监控器"""
@ -14,11 +28,76 @@ class ODMProcessMonitor:
self.mode = mode self.mode = mode
def _check_success(self, grid_dir: str) -> bool: def _check_success(self, grid_dir: str) -> bool:
"""检查ODM是否执行成功""" """检查ODM是否执行成功
success_markers = ['odm_orthophoto', 'odm_georeferencing']
检查项目:
1. 必要的文件夹是否存在
2. 正射影像是否生成且有效
3. 正射影像文件大小是否正常
"""
# 检查必要文件夹
success_markers = ['odm_orthophoto']
if self.mode != "快拼模式": if self.mode != "快拼模式":
success_markers.append('odm_texturing') success_markers.extend(['odm_texturing', 'odm_georeferencing'])
return all(os.path.exists(os.path.join(grid_dir, 'project', marker)) for marker in success_markers)
if not all(os.path.exists(os.path.join(grid_dir, 'project', marker)) for marker in success_markers):
self.logger.error("必要的文件夹未生成")
return False
# 检查正射影像文件
ortho_path = os.path.join(grid_dir, 'project', 'odm_orthophoto', 'odm_orthophoto.original.tif')
if not os.path.exists(ortho_path):
self.logger.error("正射影像文件未生成")
return False
# 检查文件大小
file_size_mb = os.path.getsize(ortho_path) / (1024 * 1024) # 转换为MB
if file_size_mb < 1:
self.logger.error(f"正射影像文件过小: {file_size_mb:.2f}MB")
return False
try:
# 打开影像文件
ds = gdal.Open(ortho_path)
if ds is None:
self.logger.error("无法打开正射影像文件")
return False
# 读取第一个波段
band = ds.GetRasterBand(1)
# 获取统计信息
stats = band.GetStatistics(False, True)
if stats is None:
self.logger.error("无法获取影像统计信息")
return False
min_val, max_val, mean, std = stats
# 计算空值比例
no_data_value = band.GetNoDataValue()
array = band.ReadAsArray()
if no_data_value is not None:
no_data_ratio = np.sum(array == no_data_value) / array.size
else:
no_data_ratio = 0
# 检查空值比例是否过高超过50%
if no_data_ratio > 0.5:
self.logger.error(f"正射影像空值比例过高: {no_data_ratio:.2%}")
return False
# 检查影像是否全黑或全白
if max_val - min_val < 1:
self.logger.error("正射影像可能无效:像素值范围过小")
return False
ds = None # 关闭数据集
return True
except Exception as e:
self.logger.error(f"检查正射影像时发生错误: {str(e)}")
return False
def run_odm_with_monitor(self, grid_dir: str, grid_id: tuple, fast_mode: bool = True, produce_dem: bool = False) -> Tuple[bool, str]: def run_odm_with_monitor(self, grid_dir: str, grid_id: tuple, fast_mode: bool = True, produce_dem: bool = False) -> Tuple[bool, str]:
"""运行ODM命令""" """运行ODM命令"""
@ -27,7 +106,15 @@ class ODMProcessMonitor:
return False, "快拼模式下无法生成DEM请调整生产参数" return False, "快拼模式下无法生成DEM请调整生产参数"
self.logger.info(f"开始处理网格 ({grid_id[0]},{grid_id[1]})") self.logger.info(f"开始处理网格 ({grid_id[0]},{grid_id[1]})")
max_retries = 3
current_try = 0
use_lowest_quality = True # 初始使用lowest quality
while current_try < max_retries:
current_try += 1
self.logger.info(f"{current_try} 次尝试处理网格 ({grid_id[0]},{grid_id[1]})")
try:
# 构建Docker命令 # 构建Docker命令
grid_dir = grid_dir[0].lower()+grid_dir[1:].replace('\\', '/') grid_dir = grid_dir[0].lower()+grid_dir[1:].replace('\\', '/')
docker_command = ( docker_command = (
@ -37,10 +124,14 @@ class ODMProcessMonitor:
f"--project-path /datasets project " f"--project-path /datasets project "
f"--max-concurrency 15 " f"--max-concurrency 15 "
f"--force-gps " f"--force-gps "
f"--feature-quality lowest "
f"--orthophoto-resolution 10 "
) )
# 根据是否使用lowest quality添加参数
if use_lowest_quality:
docker_command += f"--feature-quality lowest "
docker_command += f"--orthophoto-resolution 10 "
if produce_dem: if produce_dem:
docker_command += ( docker_command += (
f"--dsm " f"--dsm "
@ -55,29 +146,80 @@ class ODMProcessMonitor:
docker_command += "--rerun-all" docker_command += "--rerun-all"
self.logger.info(docker_command) self.logger.info(docker_command)
result = subprocess.run( result = subprocess.run(
docker_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) docker_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = result.stdout.decode( stdout, stderr = result.stdout.decode(
'utf-8'), result.stderr.decode('utf-8') 'utf-8'), result.stderr.decode('utf-8')
self.logger.info(f"==========stdout==========: {stdout}")
self.logger.error(f"==========stderr==========: {stderr}") self.logger.error(f"==========stderr==========: {stderr}")
# 检查是否有错误
stdout_lines = stdout.strip().split('\n')
last_lines = stdout_lines[-10:] if len(stdout_lines) > 10 else stdout_lines
# 检查Docker是否未运行
if any("docker not run" in line.lower() for line in last_lines) or \
any("docker daemon" in line.lower() for line in last_lines) or \
any("cannot connect to the docker daemon" in line.lower() for line in last_lines):
raise DockerNotRunError("Docker服务未启动")
# 检查目录共享问题
if any("not share" in line.lower() for line in last_lines) or \
any("permission denied" in line.lower() for line in last_lines) or \
any("access is denied" in line.lower() for line in last_lines):
raise DockerShareError("Docker无法访问目录")
# 检查是否有重叠度不足错误
if any("not overlap" in line.lower() for line in last_lines):
raise NotOverlapError("检测到图像重叠度不足错误")
# 检查执行结果 # 检查执行结果
if self._check_success(grid_dir): if self._check_success(grid_dir):
self.logger.info(f"网格 ({grid_id[0]},{grid_id[1]}) 处理成功") self.logger.info(f"网格 ({grid_id[0]},{grid_id[1]}) 处理成功")
return True, "" return True, ""
if current_try < max_retries:
self.logger.warning(f"网格处理失败,准备第 {current_try + 1} 次重试")
else: else:
self.logger.error(f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败") self.logger.error(f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败,已达到最大重试次数")
return False, f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败,已重试{max_retries}"
except NotOverlapError:
if use_lowest_quality:
self.logger.warning("检测到'not overlap'错误移除lowest quality参数后重试")
use_lowest_quality = False
continue
else:
self.logger.error("即使移除lowest quality参数后仍然出现'not overlap'错误")
return False, "图像重叠度不足"
except DockerNotRunError:
self.logger.error("Docker服务未启动")
return False, "Docker没有启动请启动Docker"
except DockerShareError:
self.logger.error("Docker无法访问目录")
return False, "Docker无法访问数据目录或输出目录请检查目录权限和共享设置"
return False, f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败" return False, f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败"
def process_all_grids(self, grid_points: Dict[tuple, pd.DataFrame], produce_dem: bool): def process_all_grids(self, grid_points: Dict[tuple, pd.DataFrame], produce_dem: bool) -> Dict[tuple, pd.DataFrame]:
"""处理所有网格""" """处理所有网格
Returns:
Dict[tuple, pd.DataFrame]: 成功处理的网格点数据字典
"""
self.logger.info("开始执行网格处理") self.logger.info("开始执行网格处理")
for grid_id in grid_points.keys(): successful_grid_points = {}
failed_grids = []
for grid_id, points in grid_points.items():
grid_dir = os.path.join( grid_dir = os.path.join(
self.output_dir, f'grid_{grid_id[0]}_{grid_id[1]}' self.output_dir, f'grid_{grid_id[0]}_{grid_id[1]}'
) )
try:
success, error_msg = self.run_odm_with_monitor( success, error_msg = self.run_odm_with_monitor(
grid_dir=grid_dir, grid_dir=grid_dir,
grid_id=grid_id, grid_id=grid_id,
@ -85,5 +227,30 @@ class ODMProcessMonitor:
produce_dem=produce_dem produce_dem=produce_dem
) )
if not success: if success:
raise Exception(f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败: {error_msg}") successful_grid_points[grid_id] = points
else:
self.logger.error(f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败: {error_msg}")
failed_grids.append((grid_id, error_msg))
except Exception as e:
error_msg = str(e)
self.logger.error(f"处理网格 ({grid_id[0]},{grid_id[1]}) 时发生异常: {error_msg}")
failed_grids.append((grid_id, error_msg))
# 汇总处理结果
total_grids = len(grid_points)
failed_count = len(failed_grids)
success_count = len(successful_grid_points)
self.logger.info(f"网格处理完成。总计: {total_grids}, 成功: {success_count}, 失败: {failed_count}")
if failed_grids:
self.logger.error("失败的网格:")
for grid_id, error_msg in failed_grids:
self.logger.error(f"网格 ({grid_id[0]},{grid_id[1]}): {error_msg}")
if len(successful_grid_points) == 0:
raise Exception("所有网格处理都失败,无法继续处理")
return successful_grid_points