From 99a89cd09a08fb3842bd015ae8fd0b8fb7401a64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BE=99=E6=BE=B3?= Date: Sat, 18 Jan 2025 10:45:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=88=E4=B8=8D=E5=81=9Adocker=E5=B9=B6?= =?UTF-8?q?=E8=A1=8C=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/odm_monitor.py | 73 +++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 42 deletions(-) diff --git a/utils/odm_monitor.py b/utils/odm_monitor.py index b655883..5b98a12 100644 --- a/utils/odm_monitor.py +++ b/utils/odm_monitor.py @@ -6,8 +6,6 @@ from typing import Dict, Tuple import pandas as pd import numpy as np from osgeo import gdal -import threading -from concurrent.futures import ThreadPoolExecutor, as_completed class NotOverlapError(Exception): @@ -18,11 +16,10 @@ class NotOverlapError(Exception): class ODMProcessMonitor: """ODM处理监控器""" - def __init__(self, output_dir: str, mode: str = "快拼模式", max_parallel: int = 2): + def __init__(self, output_dir: str, mode: str = "快拼模式"): self.output_dir = output_dir self.logger = logging.getLogger('UAV_Preprocess.ODMMonitor') self.mode = mode - self.max_parallel = max_parallel # 最大并行数 def _check_success(self, grid_dir: str) -> bool: """检查ODM是否执行成功 @@ -148,19 +145,11 @@ class ODMProcessMonitor: # 构建Docker命令 grid_dir = grid_dir[0].lower()+grid_dir[1:].replace('\\', '/') docker_command = ( - f"docker run " - # 资源限制 - f"--gpus all " # 指定使用的GPU - f"--memory=32g " # 限制内存使用 - f"--memory-swap=36g " # 限制包含swap的总内存 - f"--cpus=10.0 " # 限制CPU使用数量 - # 其他参数 - f"-ti --rm " - f"--name odm_grid_{grid_id[0]}_{grid_id[1]} " + f"docker run --gpus all -ti --rm " f"-v {grid_dir}:/datasets " f"opendronemap/odm:gpu " f"--project-path /datasets project " - f"--max-concurrency 10 " + f"--max-concurrency 15 " f"--force-gps " ) @@ -249,40 +238,40 @@ class ODMProcessMonitor: return success, error_msg def process_all_grids(self, grid_points: Dict[tuple, pd.DataFrame], produce_dem: bool) -> Dict[tuple, pd.DataFrame]: - """并行处理所有网格""" + """处理所有网格 + + Returns: + Dict[tuple, pd.DataFrame]: 成功处理的网格点数据字典 + """ self.logger.info("开始执行网格处理") successful_grid_points = {} failed_grids = [] - - # 使用线程池进行并行处理 - with ThreadPoolExecutor(max_workers=self.max_parallel) as executor: - # 创建所有任务 - future_to_grid = { - executor.submit( - self.run_odm_with_monitor, - os.path.join(self.output_dir, f'grid_{grid_id[0]}_{grid_id[1]}'), - grid_id, - produce_dem - ): (grid_id, points) for grid_id, points in grid_points.items() - } - - # 等待所有任务完成并收集结果 - for future in as_completed(future_to_grid): - grid_id, points = future_to_grid[future] - try: - success, error_msg = future.result() - if success: - successful_grid_points[grid_id] = points - else: - self.logger.error( - f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败: {error_msg}") - failed_grids.append((grid_id, error_msg)) - except Exception as e: - error_msg = str(e) + + for grid_id, points in grid_points.items(): + grid_dir = os.path.join( + self.output_dir, f'grid_{grid_id[0]}_{grid_id[1]}' + ) + + try: + success, error_msg = self.run_odm_with_monitor( + grid_dir=grid_dir, + grid_id=grid_id, + produce_dem=produce_dem + ) + + if success: + successful_grid_points[grid_id] = points + else: self.logger.error( - f"处理网格 ({grid_id[0]},{grid_id[1]}) 时发生异常: {error_msg}") + f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败: {error_msg}") failed_grids.append((grid_id, error_msg)) + except Exception as e: + error_msg = str(e) + self.logger.error( + f"处理网格 ({grid_id[0]},{grid_id[1]}) 时发生异常: {error_msg}") + failed_grids.append((grid_id, error_msg)) + # 汇总处理结果 total_grids = len(grid_points) failed_count = len(failed_grids)