UAV2/main.py
2025-04-08 11:56:01 +08:00

150 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import shutil
from datetime import timedelta
from dataclasses import dataclass
from typing import Dict, Tuple
import psutil
import pandas as pd
from pathlib import Path
from utils.directory_manager import DirectoryManager
from utils.gps_extractor import GPSExtractor
from utils.grid_divider import GridDivider
from utils.logger import setup_logger
from utils.visualizer import FilterVisualizer
from utils.docker_runner import DockerRunner
from filter.cluster_filter import GPSCluster
from post_pro.conv_obj import ConvertOBJ
@dataclass
class ProcessConfig:
"""配置类"""
image_dir: str
output_dir: str
# 聚类过滤参数
cluster_eps: float = 0.01
cluster_min_samples: int = 5
# 时间组重叠过滤参数
time_group_overlap_threshold: float = 0.7
time_group_interval: timedelta = timedelta(minutes=5)
# 孤立点过滤参数
filter_distance_threshold: float = 0.001 # 经纬度距离
filter_min_neighbors: int = 6
# 密集点过滤参数
filter_grid_size: float = 0.001
filter_dense_distance_threshold: float = 10 # 普通距离,单位:米
filter_time_threshold: timedelta = timedelta(minutes=5)
# 网格划分参数
grid_overlap: float = 0.05
grid_size: float = 500
# 几个pipline过程是否开启
mode: str = "快拼模式"
accuracy: str = "medium"
produce_dem: bool = False
class ODM_Plugin:
def __init__(self, config: ProcessConfig):
self.config = config
# 初始化目录管理器
self.dir_manager = DirectoryManager(config)
# 清理并重建输出目录
self.dir_manager.clean_output_dir()
self.dir_manager.setup_output_dirs()
# 检查磁盘空间
self.dir_manager.check_disk_space()
# 修改输入目录符合ODM要求从这里开始image_dir就是project_path
self.project_path = self.dir_manager.rename_input_dir()
# 初始化其他组件
self.logger = setup_logger(config.output_dir)
self.gps_points = None
self.grid_points = None
self.visualizer = FilterVisualizer(config.output_dir)
def extract_gps(self) -> pd.DataFrame:
"""提取GPS数据"""
self.logger.info("开始提取GPS数据")
extractor = GPSExtractor(self.project_path)
self.gps_points = extractor.extract_all_gps()
self.logger.info(f"成功提取 {len(self.gps_points)} 个GPS点")
def cluster(self):
"""使用DBSCAN对GPS点进行聚类只保留最大的类"""
previous_points = self.gps_points.copy()
clusterer = GPSCluster(
self.gps_points,
eps=self.config.cluster_eps,
min_samples=self.config.cluster_min_samples
)
self.clustered_points = clusterer.fit()
self.gps_points = clusterer.get_cluster_stats(self.clustered_points)
self.visualizer.visualize_filter_step(
self.gps_points, previous_points, "1-Clustering")
def divide_grids(self):
"""划分网格
Returns:
tuple: (grid_points, translations)
- grid_points: 网格点数据字典
- translations: 网格平移量字典
"""
grid_divider = GridDivider(
overlap=self.config.grid_overlap,
grid_size=self.config.grid_size,
project_path=self.project_path,
output_dir=self.config.output_dir
)
grids, self.grid_points = grid_divider.adjust_grid_size_and_overlap(
self.gps_points
)
grid_divider.visualize_grids(self.gps_points, grids)
grid_divider.save_image_groups(self.grid_points)
if len(grids) >= 20:
self.logger.warning("网格数量已超过20, 需要人工调整分区")
def odm_docker_runner(self):
""""运行OMD docker容器"""
self.logger.info("开始运行Docker容器")
# TODO加一些容错处理
docker_runner = DockerRunner(self.project_path)
docker_runner.run_odm_container()
def convert_obj(self):
"""转换OBJ模型"""
self.logger.info("开始转换OBJ模型")
converter = ConvertOBJ(self.config.output_dir)
converter.convert_grid_obj(self.grid_points)
def post_process(self):
"""后处理:合并或复制处理结果"""
self.logger.info("开始后处理")
self.logger.info("拷贝正射影像至输出目录")
orthophoto_tif_path = os.path.join(
self.project_path, "project", "odm_orthophoto", "odm_orthophoto.tif")
shutil.copy(orthophoto_tif_path, self.config.output_dir)
# if self.config.mode == "三维模式":
# self.convert_obj()
# else:
# pass
def process(self):
"""执行完整的预处理流程"""
try:
self.extract_gps()
self.cluster()
self.divide_grids()
self.logger.info("==========预处理任务完成==========")
self.odm_docker_runner()
self.post_process()
except Exception as e:
self.logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
raise