先不做docker并行处理

This commit is contained in:
龙澳 2025-01-18 10:45:56 +08:00
parent b6c7c4d2c1
commit 99a89cd09a

View File

@ -6,8 +6,6 @@ from typing import Dict, Tuple
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from osgeo import gdal from osgeo import gdal
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
class NotOverlapError(Exception): class NotOverlapError(Exception):
@ -18,11 +16,10 @@ class NotOverlapError(Exception):
class ODMProcessMonitor: class ODMProcessMonitor:
"""ODM处理监控器""" """ODM处理监控器"""
def __init__(self, output_dir: str, mode: str = "快拼模式", max_parallel: int = 2): def __init__(self, output_dir: str, mode: str = "快拼模式"):
self.output_dir = output_dir self.output_dir = output_dir
self.logger = logging.getLogger('UAV_Preprocess.ODMMonitor') self.logger = logging.getLogger('UAV_Preprocess.ODMMonitor')
self.mode = mode self.mode = mode
self.max_parallel = max_parallel # 最大并行数
def _check_success(self, grid_dir: str) -> bool: def _check_success(self, grid_dir: str) -> bool:
"""检查ODM是否执行成功 """检查ODM是否执行成功
@ -148,19 +145,11 @@ class ODMProcessMonitor:
# 构建Docker命令 # 构建Docker命令
grid_dir = grid_dir[0].lower()+grid_dir[1:].replace('\\', '/') grid_dir = grid_dir[0].lower()+grid_dir[1:].replace('\\', '/')
docker_command = ( docker_command = (
f"docker run " f"docker run --gpus all -ti --rm "
# 资源限制
f"--gpus all " # 指定使用的GPU
f"--memory=32g " # 限制内存使用
f"--memory-swap=36g " # 限制包含swap的总内存
f"--cpus=10.0 " # 限制CPU使用数量
# 其他参数
f"-ti --rm "
f"--name odm_grid_{grid_id[0]}_{grid_id[1]} "
f"-v {grid_dir}:/datasets " f"-v {grid_dir}:/datasets "
f"opendronemap/odm:gpu " f"opendronemap/odm:gpu "
f"--project-path /datasets project " f"--project-path /datasets project "
f"--max-concurrency 10 " f"--max-concurrency 15 "
f"--force-gps " f"--force-gps "
) )
@ -249,34 +238,34 @@ class ODMProcessMonitor:
return success, error_msg return success, error_msg
def process_all_grids(self, grid_points: Dict[tuple, pd.DataFrame], produce_dem: bool) -> Dict[tuple, pd.DataFrame]: def process_all_grids(self, grid_points: Dict[tuple, pd.DataFrame], produce_dem: bool) -> Dict[tuple, pd.DataFrame]:
"""并行处理所有网格""" """处理所有网格
Returns:
Dict[tuple, pd.DataFrame]: 成功处理的网格点数据字典
"""
self.logger.info("开始执行网格处理") self.logger.info("开始执行网格处理")
successful_grid_points = {} successful_grid_points = {}
failed_grids = [] failed_grids = []
# 使用线程池进行并行处理 for grid_id, points in grid_points.items():
with ThreadPoolExecutor(max_workers=self.max_parallel) as executor: grid_dir = os.path.join(
# 创建所有任务 self.output_dir, f'grid_{grid_id[0]}_{grid_id[1]}'
future_to_grid = { )
executor.submit(
self.run_odm_with_monitor,
os.path.join(self.output_dir, f'grid_{grid_id[0]}_{grid_id[1]}'),
grid_id,
produce_dem
): (grid_id, points) for grid_id, points in grid_points.items()
}
# 等待所有任务完成并收集结果
for future in as_completed(future_to_grid):
grid_id, points = future_to_grid[future]
try: try:
success, error_msg = future.result() success, error_msg = self.run_odm_with_monitor(
grid_dir=grid_dir,
grid_id=grid_id,
produce_dem=produce_dem
)
if success: if success:
successful_grid_points[grid_id] = points successful_grid_points[grid_id] = points
else: else:
self.logger.error( self.logger.error(
f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败: {error_msg}") f"网格 ({grid_id[0]},{grid_id[1]}) 处理失败: {error_msg}")
failed_grids.append((grid_id, error_msg)) failed_grids.append((grid_id, error_msg))
except Exception as e: except Exception as e:
error_msg = str(e) error_msg = str(e)
self.logger.error( self.logger.error(