diff --git a/utils/odm_monitor.py b/utils/odm_monitor.py index 5b98a12..b655883 100644 --- a/utils/odm_monitor.py +++ b/utils/odm_monitor.py @@ -6,6 +6,8 @@ from typing import Dict, Tuple import pandas as pd import numpy as np from osgeo import gdal +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed class NotOverlapError(Exception): @@ -16,10 +18,11 @@ class NotOverlapError(Exception): class ODMProcessMonitor: """ODM处理监控器""" - def __init__(self, output_dir: str, mode: str = "快拼模式"): + def __init__(self, output_dir: str, mode: str = "快拼模式", max_parallel: int = 2): self.output_dir = output_dir self.logger = logging.getLogger('UAV_Preprocess.ODMMonitor') self.mode = mode + self.max_parallel = max_parallel # 最大并行数 def _check_success(self, grid_dir: str) -> bool: """检查ODM是否执行成功 @@ -145,11 +148,19 @@ class ODMProcessMonitor: # 构建Docker命令 grid_dir = grid_dir[0].lower()+grid_dir[1:].replace('\\', '/') docker_command = ( - f"docker run --gpus all -ti --rm " + f"docker run " + # 资源限制 + f"--gpus all " # 指定使用的GPU + f"--memory=32g " # 限制内存使用 + f"--memory-swap=36g " # 限制包含swap的总内存 + f"--cpus=10.0 " # 限制CPU使用数量 + # 其他参数 + f"-ti --rm " + f"--name odm_grid_{grid_id[0]}_{grid_id[1]} " f"-v {grid_dir}:/datasets " f"opendronemap/odm:gpu " f"--project-path /datasets project " - f"--max-concurrency 15 " + f"--max-concurrency 10 " f"--force-gps " ) @@ -238,40 +249,40 @@ class ODMProcessMonitor: return success, error_msg def process_all_grids(self, grid_points: Dict[tuple, pd.DataFrame], produce_dem: bool) -> Dict[tuple, pd.DataFrame]: - """处理所有网格 - - Returns: - Dict[tuple, pd.DataFrame]: 成功处理的网格点数据字典 - """ + """并行处理所有网格""" self.logger.info("开始执行网格处理") successful_grid_points = {} failed_grids = [] - - for grid_id, points in grid_points.items(): - grid_dir = os.path.join( - self.output_dir, f'grid_{grid_id[0]}_{grid_id[1]}' - ) - - try: - success, error_msg = self.run_odm_with_monitor( - grid_dir=grid_dir, - grid_id=grid_id, - produce_dem=produce_dem - ) - - if success: - successful_grid_points[grid_id] = points - else: + + # 使用线程池进行并行处理 + with ThreadPoolExecutor(max_workers=self.max_parallel) as executor: + # 创建所有任务 + future_to_grid = { + executor.submit( + self.run_odm_with_monitor, + os.path.join(self.output_dir, f'grid_{grid_id[0]}_{grid_id[1]}'), + grid_id, + produce_dem + ): (grid_id, points) for grid_id, points in grid_points.items() + } + + # 等待所有任务完成并收集结果 + for future in as_completed(future_to_grid): + grid_id, points = future_to_grid[future] + try: + success, error_msg = future.result() + if success: + successful_grid_points[grid_id] = points + else: + self.logger.error( + f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败: {error_msg}") + failed_grids.append((grid_id, error_msg)) + except Exception as e: + error_msg = str(e) self.logger.error( - f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败: {error_msg}") + f"处理网格 ({grid_id[0]},{grid_id[1]}) 时发生异常: {error_msg}") failed_grids.append((grid_id, error_msg)) - except Exception as e: - error_msg = str(e) - self.logger.error( - f"处理网格 ({grid_id[0]},{grid_id[1]}) 时发生异常: {error_msg}") - failed_grids.append((grid_id, error_msg)) - # 汇总处理结果 total_grids = len(grid_points) failed_count = len(failed_grids)