删除不必要的代码

This commit is contained in:
weixin_46229132 2025-04-11 15:38:15 +08:00
parent e57c3b6ec9
commit e86fe196f8
18 changed files with 39 additions and 2540 deletions

View File

@ -7,21 +7,17 @@ import psutil
import pandas as pd
from filter.cluster_filter import GPSCluster
from filter.time_group_overlap_filter import TimeGroupOverlapFilter
from filter.gps_filter import GPSFilter
from utils.odm_monitor import ODMProcessMonitor
from utils.gps_extractor import GPSExtractor
from utils.grid_divider import GridDivider
from utils.logger import setup_logger
from utils.visualizer import FilterVisualizer
from post_pro.merge_tif import MergeTif
from post_pro.merge_obj import MergeObj
from post_pro.merge_laz import MergePly
from post_pro.conv_obj2 import ConvertOBJ
from post_pro.conv_obj import ConvertOBJ
@dataclass
class PreprocessConfig:
class ProcessConfig:
"""预处理配置类"""
image_dir: str
@ -48,8 +44,8 @@ class PreprocessConfig:
produce_dem: bool = False
class ImagePreprocessor:
def __init__(self, config: PreprocessConfig):
class ODM_Plugin:
def __init__(self, config):
self.config = config
# 检查磁盘空间
@ -154,60 +150,6 @@ class ImagePreprocessor:
self.visualizer.visualize_filter_step(
self.gps_points, previous_points, "1-Clustering")
def filter_isolated_points(self):
"""过滤孤立点"""
filter = GPSFilter(self.config.output_dir)
previous_points = self.gps_points.copy()
self.gps_points = filter.filter_isolated_points(
self.gps_points,
self.config.filter_distance_threshold,
self.config.filter_min_neighbors,
)
self.visualizer.visualize_filter_step(
self.gps_points, previous_points, "2-Isolated Points")
def filter_time_group_overlap(self):
"""过滤重叠的时间组"""
previous_points = self.gps_points.copy()
filter = TimeGroupOverlapFilter(
self.config.image_dir,
self.config.output_dir,
overlap_threshold=self.config.time_group_overlap_threshold
)
self.gps_points = filter.filter_overlapping_groups(
self.gps_points,
time_threshold=self.config.time_group_interval
)
self.visualizer.visualize_filter_step(
self.gps_points, previous_points, "3-Time Group Overlap")
def calculate_center_coordinates(self):
"""计算剩余点的中心经纬度坐标"""
mean_lat = self.gps_points['lat'].mean()
mean_lon = self.gps_points['lon'].mean()
self.logger.info(f"区域中心坐标:纬度 {mean_lat:.6f}, 经度 {mean_lon:.6f}")
return mean_lat, mean_lon
def filter_alternate_images(self):
"""按时间顺序隔一个删一个图像来降低密度"""
previous_points = self.gps_points.copy()
# 按时间戳排序
self.gps_points = self.gps_points.sort_values('date')
# 保留索引为偶数的行(即隔一个保留一个)
self.gps_points = self.gps_points.iloc[::2].reset_index(drop=True)
self.visualizer.visualize_filter_step(
self.gps_points, previous_points, "4-Alternate Images")
self.logger.info(f"交替过滤后剩余 {len(self.gps_points)} 个点")
def divide_grids(self) -> Tuple[Dict[tuple, pd.DataFrame], Dict[tuple, tuple]]:
"""划分网格
Returns:
@ -256,18 +198,6 @@ class ImagePreprocessor:
merger = MergeTif(self.config.output_dir)
merger.merge_orthophoto(grid_points)
def merge_ply(self, grid_points: Dict[tuple, pd.DataFrame]):
"""合并所有网格的PLY点云"""
self.logger.info("开始合并PLY点云")
merger = MergePly(self.config.output_dir)
merger.merge_grid_laz(grid_points)
def merge_obj(self, grid_points: Dict[tuple, pd.DataFrame], translations: Dict[tuple, tuple]):
"""合并所有网格的OBJ模型"""
self.logger.info("开始合并OBJ模型")
merger = MergeObj(self.config.output_dir)
merger.merge_grid_obj(grid_points, translations)
def convert_obj(self, grid_points: Dict[tuple, pd.DataFrame]):
"""转换OBJ模型"""
self.logger.info("开始转换OBJ模型")
@ -282,26 +212,17 @@ class ImagePreprocessor:
f"将只合并成功处理的 {len(successful_grid_points)} 个网格"
)
# if self.config.mode == "快拼模式":
self.merge_tif(successful_grid_points, self.config.mode,
self.config.produce_dem)
if self.config.mode == "三维模式":
# self.merge_tif(successful_grid_points, self.config.produce_dem)
# self.merge_ply(successful_grid_points)
# self.merge_obj(successful_grid_points, translations)
self.convert_obj(successful_grid_points)
# else:
# self.merge_tif(successful_grid_points, self.config.produce_dem)
# # self.merge_ply(successful_grid_points)
# # self.merge_obj(successful_grid_points, translations)
# self.convert_obj(successful_grid_points)
def process(self):
"""执行完整的预处理流程"""
try:
self.extract_gps()
self.cluster()
# self.filter_isolated_points()
grid_points, translations = self.divide_grids()
self.copy_images(grid_points)
self.logger.info("预处理任务完成")
@ -311,40 +232,8 @@ class ImagePreprocessor:
self.post_process(successful_grid_points,
grid_points, translations)
self.logger.info("重建任务完成")
except Exception as e:
self.logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
raise
if __name__ == "__main__":
# 创建配置
config = PreprocessConfig(
image_dir=r"E:\datasets\UAV\134\project\images",
output_dir=r"G:\ODM_output\134",
cluster_eps=0.01,
cluster_min_samples=5,
# 添加时间组重叠过滤参数
time_group_overlap_threshold=0.7,
time_group_interval=timedelta(minutes=5),
filter_distance_threshold=0.001,
filter_min_neighbors=6,
filter_grid_size=0.001,
filter_dense_distance_threshold=10,
filter_time_threshold=timedelta(minutes=5),
grid_size=800,
grid_overlap=0.05,
mode="重建模式",
produce_dem=False,
)
# 创建处理器并执行
processor = ImagePreprocessor(config)
processor.process()

View File

@ -1,248 +0,0 @@
import os
import math
from itertools import combinations
import numpy as np
from scipy.spatial import KDTree
import logging
import pandas as pd
from datetime import datetime, timedelta
class GPSFilter:
"""过滤密集点及孤立点"""
def __init__(self, output_dir):
self.logger = logging.getLogger('UAV_Preprocess.GPSFilter')
@staticmethod
def _haversine(lat1, lon1, lat2, lon2):
"""计算两点之间的地理距离(单位:米)"""
R = 6371000 # 地球平均半径,单位:米
phi1, phi2 = math.radians(lat1), math.radians(lat2)
delta_phi = math.radians(lat2 - lat1)
delta_lambda = math.radians(lon2 - lon1)
a = math.sin(delta_phi / 2) ** 2 + math.cos(phi1) * \
math.cos(phi2) * math.sin(delta_lambda / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
@staticmethod
def _assign_to_grid(lat, lon, grid_size, min_lat, min_lon):
"""根据经纬度和网格大小,将点分配到网格"""
grid_x = int((lat - min_lat) // grid_size)
grid_y = int((lon - min_lon) // grid_size)
return grid_x, grid_y
def _get_distances(self, points_df, grid_size):
"""读取图片 GPS 坐标,计算点对之间的距离并排序"""
# 确定经纬度范围
min_lat, max_lat = points_df['lat'].min(), points_df['lat'].max()
min_lon, max_lon = points_df['lon'].min(), points_df['lon'].max()
self.logger.info(
f"经纬度范围:纬度[{min_lat:.6f}, {max_lat:.6f}],纬度范围[{max_lat-min_lat:.6f}]"
f"经度[{min_lon:.6f}, {max_lon:.6f}],经度范围[{max_lon-min_lon:.6f}]")
# 分配到网格
grid_map = {}
for _, row in points_df.iterrows():
grid = self._assign_to_grid(
row['lat'], row['lon'], grid_size, min_lat, min_lon)
if grid not in grid_map:
grid_map[grid] = []
grid_map[grid].append((row['file'], row['lat'], row['lon']))
self.logger.info(f"图像点已分配到 {len(grid_map)} 个网格中")
# 在每个网格中计算两两距离并排序
sorted_distances = {}
for grid, images in grid_map.items():
distances = []
for (img1, lat1, lon1), (img2, lat2, lon2) in combinations(images, 2):
dist = self._haversine(lat1, lon1, lat2, lon2)
distances.append((img1, img2, dist))
distances.sort(key=lambda x: x[2]) # 按距离升序排序
sorted_distances[grid] = distances
self.logger.debug(f"网格 {grid} 中计算了 {len(distances)} 个距离对")
return sorted_distances
def _group_by_time(self, points_df: pd.DataFrame, time_threshold: timedelta) -> list:
"""根据拍摄时间分组图片
如果相邻两张图片的拍摄时间差超过5分钟则进行切分
Args:
points_df: 包含图片信息的DataFrame必须包含'file''date'
time_threshold: 时间间隔阈值默认5分钟
Returns:
list: 每个元素为时间组内的点数据
"""
if 'date' not in points_df.columns:
self.logger.error("数据中缺少date列")
return [points_df]
# 将date为空的行单独作为一组
null_date_group = points_df[points_df['date'].isna()]
valid_date_points = points_df[points_df['date'].notna()]
if not null_date_group.empty:
self.logger.info(f"发现 {len(null_date_group)} 个无时间戳的点,将作为单独分组")
if valid_date_points.empty:
self.logger.warning("没有有效的时间戳数据")
return [null_date_group] if not null_date_group.empty else []
# 按时间排序
valid_date_points = valid_date_points.sort_values('date')
self.logger.info(
f"有效时间范围: {valid_date_points['date'].min()}{valid_date_points['date'].max()}")
# 计算时间差
time_diffs = valid_date_points['date'].diff()
# 找到时间差超过阈值的位置
time_groups = []
current_group_start = 0
for idx, time_diff in enumerate(time_diffs):
if time_diff and time_diff > time_threshold:
# 添加当前组
current_group = valid_date_points.iloc[current_group_start:idx]
time_groups.append(current_group)
# 记录断点信息
break_time = valid_date_points.iloc[idx]['date']
group_start_time = current_group.iloc[0]['date']
group_end_time = current_group.iloc[-1]['date']
self.logger.info(
f"时间组 {len(time_groups)}: {len(current_group)} 个点, "
f"时间范围 [{group_start_time} - {group_end_time}]"
)
self.logger.info(
f"在时间 {break_time} 处发现断点,时间差为 {time_diff}")
current_group_start = idx
# 添加最后一组
last_group = valid_date_points.iloc[current_group_start:]
if not last_group.empty:
time_groups.append(last_group)
self.logger.info(
f"时间组 {len(time_groups)}: {len(last_group)} 个点, "
f"时间范围 [{last_group.iloc[0]['date']} - {last_group.iloc[-1]['date']}]"
)
# 如果有空时间戳的点,将其作为最后一组
if not null_date_group.empty:
time_groups.append(null_date_group)
self.logger.info(f"添加无时间戳组: {len(null_date_group)} 个点")
self.logger.info(f"共分为 {len(time_groups)} 个时间组")
return time_groups
def filter_dense_points(self, points_df, grid_size=0.001, distance_threshold=13, time_threshold=timedelta(minutes=5)):
"""
过滤密集点先按时间分组再在每个时间组内过滤
空时间戳的点不进行过滤
Args:
points_df: 点数据
grid_size: 网格大小
distance_threshold: 距离阈值
time_interval: 时间间隔
"""
self.logger.info(f"开始按时间分组过滤密集点 (网格大小: {grid_size}, "
f"距离阈值: {distance_threshold}米, 分组时间间隔: {time_threshold}秒)")
# 按时间分组
time_groups = self._group_by_time(points_df, time_threshold)
# 存储所有要删除的图片
all_to_del_imgs = []
# 对每个时间组进行密集点过滤
for group_idx, group_points in enumerate(time_groups):
# 检查是否为空时间戳组(最后一组)
if group_idx == len(time_groups) - 1 and group_points['date'].isna().any():
self.logger.info(f"跳过无时间戳组 (包含 {len(group_points)} 个点)")
continue
self.logger.info(
f"处理时间组 {group_idx + 1} (包含 {len(group_points)} 个点)")
# 计算该组内的点间距离
sorted_distances = self._get_distances(group_points, grid_size)
group_to_del_imgs = []
# 在每个网格中过滤密集点
for grid, distances in sorted_distances.items():
grid_del_count = 0
while distances:
candidate_img1, candidate_img2, dist = distances[0]
if dist < distance_threshold:
distances.pop(0)
# 获取候选图片的其他最短距离
candidate_img1_dist = None
candidate_img2_dist = None
for distance in distances:
if candidate_img1 in distance:
candidate_img1_dist = distance[2]
break
for distance in distances:
if candidate_img2 in distance:
candidate_img2_dist = distance[2]
break
# 选择要删除的点
if candidate_img1_dist and candidate_img2_dist:
to_del_img = candidate_img1 if candidate_img1_dist < candidate_img2_dist else candidate_img2
group_to_del_imgs.append(to_del_img)
grid_del_count += 1
self.logger.debug(
f"时间组 {group_idx + 1} 网格 {grid} 删除密集点: {to_del_img} (距离: {dist:.2f}米)")
distances = [
d for d in distances if to_del_img not in d]
else:
break
if grid_del_count > 0:
self.logger.info(
f"时间组 {group_idx + 1} 网格 {grid} 删除了 {grid_del_count} 个密集点")
all_to_del_imgs.extend(group_to_del_imgs)
self.logger.info(
f"时间组 {group_idx + 1} 共删除 {len(group_to_del_imgs)} 个密集点")
# 过滤数据
filtered_df = points_df[~points_df['file'].isin(all_to_del_imgs)]
self.logger.info(
f"密集点过滤完成,共删除 {len(all_to_del_imgs)} 个点,剩余 {len(filtered_df)} 个点")
return filtered_df
def filter_isolated_points(self, points_df, threshold_distance=0.001, min_neighbors=6):
"""过滤孤立点"""
self.logger.info(
f"开始过滤孤立点 (距离阈值: {threshold_distance}, 最小邻居数: {min_neighbors})")
coords = points_df[['lat', 'lon']].values
kdtree = KDTree(coords)
neighbors_count = [len(kdtree.query_ball_point(
coord, threshold_distance)) for coord in coords]
isolated_points = []
for i, (_, row) in enumerate(points_df.iterrows()):
if neighbors_count[i] < min_neighbors:
isolated_points.append(row['file'])
self.logger.debug(
f"删除孤立点: {row['file']} (邻居数: {neighbors_count[i]})")
filtered_df = points_df[~points_df['file'].isin(isolated_points)]
self.logger.info(
f"孤立点过滤完成,共删除 {len(isolated_points)} 个点,剩余 {len(filtered_df)} 个点")
return filtered_df

View File

@ -1,201 +0,0 @@
import shutil
import pandas as pd
from shapely.geometry import box
from utils.logger import setup_logger
from utils.gps_extractor import GPSExtractor
import numpy as np
import logging
from datetime import timedelta
import matplotlib.pyplot as plt
import os
import sys
class TimeGroupOverlapFilter:
"""基于时间组重叠度的图像过滤器"""
def __init__(self, image_dir: str, output_dir: str, overlap_threshold: float = 0.7):
"""
初始化过滤器
Args:
image_dir: 图像目录
output_dir: 输出目录
overlap_threshold: 重叠阈值默认0.7
"""
self.image_dir = image_dir
self.output_dir = output_dir
self.overlap_threshold = overlap_threshold
self.logger = logging.getLogger('UAV_Preprocess.TimeGroupFilter')
def _group_by_time(self, points_df, time_threshold=timedelta(minutes=5)):
"""按时间间隔对点进行分组"""
if 'date' not in points_df.columns:
self.logger.error("数据中缺少date列")
return []
# 将date为空的行单独作为一组
null_date_group = points_df[points_df['date'].isna()]
valid_date_points = points_df[points_df['date'].notna()]
if not null_date_group.empty:
self.logger.info(f"发现 {len(null_date_group)} 个无时间戳的点,将作为单独分组")
if valid_date_points.empty:
self.logger.warning("没有有效的时间戳数据")
return [null_date_group] if not null_date_group.empty else []
# 按时间排序
valid_date_points = valid_date_points.sort_values('date')
# 计算时间差
time_diffs = valid_date_points['date'].diff()
# 找到时间差超过阈值的位置
time_groups = []
current_group_start = 0
for idx, time_diff in enumerate(time_diffs):
if time_diff and time_diff > time_threshold:
# 添加当前组
current_group = valid_date_points.iloc[current_group_start:idx]
time_groups.append(current_group)
current_group_start = idx
# 添加最后一组
last_group = valid_date_points.iloc[current_group_start:]
if not last_group.empty:
time_groups.append(last_group)
# 如果有空时间戳的点,将其作为最后一组
if not null_date_group.empty:
time_groups.append(null_date_group)
return time_groups
def _get_group_bbox(self, group_df):
"""获取组内点的边界框"""
min_lon = group_df['lon'].min()
max_lon = group_df['lon'].max()
min_lat = group_df['lat'].min()
max_lat = group_df['lat'].max()
return box(min_lon, min_lat, max_lon, max_lat)
def _calculate_overlap(self, box1, box2):
"""计算两个边界框的重叠率"""
if box1.intersects(box2):
intersection_area = box1.intersection(box2).area
smaller_area = min(box1.area, box2.area)
if smaller_area == 0:
overlap_ratio = 1
else:
overlap_ratio = intersection_area / smaller_area
else:
overlap_ratio = 0
return overlap_ratio
def filter_overlapping_groups(self, gps_points, time_threshold=timedelta(minutes=5)):
"""过滤重叠的时间组"""
# 按时间分组
self.logger.info("开始过滤重叠时间组")
time_groups = self._group_by_time(gps_points, time_threshold)
# 计算每个组的边界框
group_boxes = []
for idx, group in enumerate(time_groups):
if not group['date'].isna().any(): # 只处理有时间戳的组
bbox = self._get_group_bbox(group)
group_boxes.append((idx, group, bbox))
# 找出需要删除的组
groups_to_delete = set()
for i in range(len(group_boxes)):
if i in groups_to_delete:
continue
idx1, group1, box1 = group_boxes[i]
area1 = box1.area
for j in range(i + 1, len(group_boxes)):
if j in groups_to_delete:
continue
idx2, group2, box2 = group_boxes[j]
area2 = box2.area
overlap_ratio = self._calculate_overlap(box1, box2)
if overlap_ratio > self.overlap_threshold:
# 删除面积较小的组
if area1 < area2:
group_to_delete = idx1
smaller_area = area1
larger_area = area2
else:
group_to_delete = idx2
smaller_area = area2
larger_area = area1
groups_to_delete.add(group_to_delete)
self.logger.info(
f"时间组 {group_to_delete + 1} 与时间组 "
f"{idx2 + 1 if group_to_delete == idx1 else idx1 + 1} "
f"重叠率为 {overlap_ratio:.2f}"
f"面积比为 {smaller_area/larger_area:.2f}"
f"将删除较小面积的组 {group_to_delete + 1}"
)
# 删除重复组的图像
deleted_files = []
for group_idx in groups_to_delete:
group_files = time_groups[group_idx]['file'].tolist()
deleted_files.extend(group_files)
self.logger.info(f"共删除 {len(groups_to_delete)} 个重复时间组,"
f"{len(deleted_files)} 张图像")
# 可视化结果
self._visualize_results(time_groups, groups_to_delete)
retained_points = gps_points[~gps_points['file'].isin(
deleted_files)]
self.logger.info(f"重叠时间组过滤后剩余 {len(retained_points)} 个GPS点")
return retained_points
def _visualize_results(self, time_groups, groups_to_delete):
"""可视化过滤结果"""
plt.figure(figsize=(15, 10))
# 生成不同的颜色
colors = plt.cm.rainbow(np.linspace(0, 1, len(time_groups)))
# 绘制所有组的边界框
for idx, (group, color) in enumerate(zip(time_groups, colors)):
if not group['date'].isna().any(): # 只处理有时间戳的组
bbox = self._get_group_bbox(group)
x, y = bbox.exterior.xy
if idx in groups_to_delete:
# 被删除的组用虚线表示
plt.plot(x, y, '--', color=color, alpha=0.6,
label=f'Deleted Group {idx + 1}')
else:
# 保留的组用实线表示
plt.plot(x, y, '-', color=color, alpha=0.6,
label=f'Group {idx + 1}')
# 绘制该组的GPS点
plt.scatter(group['lon'], group['lat'], color=color,
s=30, alpha=0.6)
plt.title("Time Groups and Their Bounding Boxes", fontsize=14)
plt.xlabel("Longitude", fontsize=12)
plt.ylabel("Latitude", fontsize=12)
plt.grid(True)
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', fontsize=10)
plt.tight_layout()
# 保存图片
plt.savefig(os.path.join(self.output_dir, 'filter_imgs', 'time_groups_overlap_bbox.png'),
dpi=300, bbox_inches='tight')
plt.close()

View File

@ -1,236 +0,0 @@
from utils.gps_extractor import GPSExtractor
import os
import sys
import shutil
from pathlib import Path
import matplotlib.pyplot as plt
from matplotlib.widgets import RectangleSelector
import pandas as pd
from matplotlib.font_manager import FontProperties
class GPSSelector:
def __init__(self, image_dir: str, output_dir: str = None):
# 移除中文字体设置
self.image_dir = image_dir
self.output_dir = output_dir
self.gps_points = None
self.selected_points = []
self.fig, self.ax = plt.subplots(figsize=(12, 8))
self.scatter = None
self.rs = None
self.setup_plot()
def extract_gps(self):
"""提取GPS数据"""
extractor = GPSExtractor(self.image_dir)
self.gps_points = extractor.extract_all_gps()
print(f"成功提取 {len(self.gps_points)} 个GPS点")
def setup_plot(self):
"""设置绘图"""
self.ax.set_title('GPS Points - Use mouse to drag and select points to delete')
self.ax.set_xlabel('Longitude')
self.ax.set_ylabel('Latitude')
self.ax.grid(True)
# 设置坐标轴使用相同的比例
self.ax.set_aspect('equal')
# 设置矩形选择器
self.rs = RectangleSelector(
self.ax, self.on_select,
interactive=True,
useblit=True,
button=[1], # 只响应左键
props=dict(facecolor='red', alpha=0.3)
)
# 添加按钮回调
self.fig.canvas.mpl_connect('key_press_event', self.on_key_press)
# 添加缩放和平移功能
self.fig.canvas.mpl_connect('scroll_event', self.on_scroll)
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
self.fig.canvas.mpl_connect('button_release_event', self.on_release)
self.fig.canvas.mpl_connect('motion_notify_event', self.on_motion)
# 用于平移功能的变量
self._pan_start = None
def plot_gps_points(self):
"""绘制GPS点"""
if self.scatter is not None:
self.scatter.remove()
# 计算经纬度的范围
lon_range = self.gps_points['lon'].max() - self.gps_points['lon'].min()
lat_range = self.gps_points['lat'].max() - self.gps_points['lat'].min()
# 设置合适的图形大小,保持经纬度的真实比例
aspect_ratio = lon_range / lat_range
fig_width = 12
fig_height = fig_width / aspect_ratio
self.fig.set_size_inches(fig_width, fig_height)
self.scatter = self.ax.scatter(
self.gps_points['lon'],
self.gps_points['lat'],
c='blue',
s=20,
alpha=0.6
)
# 设置适当的显示范围,添加一些边距
margin = 0.1
x_margin = lon_range * margin
y_margin = lat_range * margin
self.ax.set_xlim([
self.gps_points['lon'].min() - x_margin,
self.gps_points['lon'].max() + x_margin
])
self.ax.set_ylim([
self.gps_points['lat'].min() - y_margin,
self.gps_points['lat'].max() + y_margin
])
# 关闭自动缩放
self.ax.autoscale(False)
# 使用更精确的刻度
self.ax.ticklabel_format(useOffset=False, style='plain')
self.fig.canvas.draw_idle()
def on_select(self, eclick, erelease):
"""矩形选择回调"""
x1, y1 = eclick.xdata, eclick.ydata
x2, y2 = erelease.xdata, erelease.ydata
# 获取选中区域内的点
mask = (
(self.gps_points['lon'] >= min(x1, x2)) &
(self.gps_points['lon'] <= max(x1, x2)) &
(self.gps_points['lat'] >= min(y1, y2)) &
(self.gps_points['lat'] <= max(y1, y2))
)
selected = self.gps_points[mask]
self.selected_points.extend(selected['file'].tolist())
# 从数据中移除选中的点
self.gps_points = self.gps_points[~mask]
# 更新绘图
self.plot_gps_points()
print(f"选中 {len(selected)} 个点,剩余 {len(self.gps_points)} 个点")
def on_key_press(self, event):
"""键盘事件回调"""
if event.key == 'enter':
self.save_results()
plt.close()
elif event.key == 'escape':
plt.close()
def save_results(self):
"""保存结果"""
if not self.output_dir:
return
# 创建输出目录
os.makedirs(self.output_dir, exist_ok=True)
# 获取所有保留的图片文件名
remaining_files = self.gps_points['file'].tolist()
# 移动保留的图片到输出目录
for img_name in remaining_files:
src = os.path.join(self.image_dir, img_name)
dst = os.path.join(self.output_dir, img_name)
shutil.copy2(src, dst) # 使用copy2保留文件的元数据
# 保存剩余点的信息
self.gps_points.to_csv(
os.path.join(self.output_dir, "remaining_points.csv"),
index=False
)
print(f"已选择删除 {len(self.selected_points)} 张图片")
print(f"已复制 {len(remaining_files)} 张保留的图片到 {self.output_dir}")
def run(self):
"""运行选择器"""
self.extract_gps()
self.plot_gps_points()
plt.show()
def on_scroll(self, event):
"""鼠标滚轮缩放"""
if event.inaxes != self.ax:
return
# 获取当前视图范围
cur_xlim = self.ax.get_xlim()
cur_ylim = self.ax.get_ylim()
# 缩放因子
base_scale = 1.1
xdata = event.xdata
ydata = event.ydata
if event.button == 'up':
# 放大
scale_factor = 1/base_scale
else:
# 缩小
scale_factor = base_scale
# 设置新的视图范围
new_width = (cur_xlim[1] - cur_xlim[0]) * scale_factor
new_height = (cur_ylim[1] - cur_ylim[0]) * scale_factor
self.ax.set_xlim([xdata - new_width * (xdata - cur_xlim[0]) / (cur_xlim[1] - cur_xlim[0]),
xdata + new_width * (cur_xlim[1] - xdata) / (cur_xlim[1] - cur_xlim[0])])
self.ax.set_ylim([ydata - new_height * (ydata - cur_ylim[0]) / (cur_ylim[1] - cur_ylim[0]),
ydata + new_height * (cur_ylim[1] - ydata) / (cur_ylim[1] - cur_ylim[0])])
self.fig.canvas.draw_idle()
def on_press(self, event):
"""鼠标按下事件"""
if event.inaxes != self.ax or event.button != 3: # 只响应右键
return
self._pan_start = (event.xdata, event.ydata)
def on_release(self, event):
"""鼠标释放事件"""
self._pan_start = None
def on_motion(self, event):
"""鼠标移动事件"""
if self._pan_start is None or event.inaxes != self.ax:
return
# 计算移动距离
dx = event.xdata - self._pan_start[0]
dy = event.ydata - self._pan_start[1]
# 更新视图范围
cur_xlim = self.ax.get_xlim()
cur_ylim = self.ax.get_ylim()
self.ax.set_xlim(cur_xlim - dx)
self.ax.set_ylim(cur_ylim - dy)
self.fig.canvas.draw_idle()
if __name__ == "__main__":
# 使用示例
selector = GPSSelector(
image_dir=r"G:\error_data\20240930091614\project\images",
output_dir=r"C:\datasets\ODM_output\error1_L"
)
selector.run()

297
grid.py
View File

@ -1,297 +0,0 @@
import csv
import numpy as np
import matplotlib.pyplot as plt
import math
from shapely.geometry import box, MultiPoint
from shapely.ops import unary_union
from scipy.spatial import cKDTree
from utils.gps_extractor import GPSExtractor
# ---------------------- overlap 截断为不超过 10% ----------------------
def clamp_overlap(overlap):
if overlap < 0:
return 0.0
elif overlap > 0.1:
return 0.1
else:
return overlap
# ====================== 1) 生成可用矩形并记录其覆盖点集 ======================
def generate_rectangles_with_point_indices(points, w, h, overlap=0.1, min_points=800):
"""
bounding box (w, h) + overlap 布置网格生成所有矩形
过滤只保留"矩形内点数 >= min_points"的矩形
返回:
rect_info_list: list of (rect_polygon, covered_indices)
- rect_polygon: Shapely Polygon
- covered_indices: 一个 set表示该矩形覆盖的所有点索引
"""
overlap = clamp_overlap(overlap)
if len(points) == 0:
return []
minx, miny = np.min(points, axis=0)
maxx, maxy = np.max(points, axis=0)
# 特殊情况:只有一个点或非常小范围 -> 很难满足 800 点
if abs(maxx - minx) < 1e-15 and abs(maxy - miny) < 1e-15:
return []
# 步长
step_x = w * (1 - overlap)
step_y = h * (1 - overlap)
x_coords = np.arange(minx, maxx + step_x, step_x)
y_coords = np.arange(miny, maxy + step_y, step_y)
# 建立 KDTree加速查找
tree = cKDTree(points)
rect_info_list = []
for x in x_coords:
for y in y_coords:
rect_poly = box(x, y, x + w, y + h)
rx_min, ry_min, rx_max, ry_max = rect_poly.bounds
cx = (rx_min + rx_max) / 2
cy = (ry_min + ry_max) / 2
r = math.sqrt((rx_max - rx_min) ** 2 + (ry_max - ry_min) ** 2) / 2
candidate_ids = tree.query_ball_point([cx, cy], r)
if not candidate_ids:
continue
covered_set = set()
for idx_pt in candidate_ids:
px, py = points[idx_pt]
if rx_min <= px <= rx_max and ry_min <= py <= ry_max:
covered_set.add(idx_pt)
# 如果覆盖的点数不足 min_points就不保留
if len(covered_set) < min_points:
continue
rect_info_list.append((rect_poly, covered_set))
return rect_info_list
# ====================== 2) 贪心算法选取子集覆盖所有点 ======================
def cover_all_points_greedy(points, rect_info_list):
"""
给定所有点 points 以及 "可用矩形+覆盖点集合" rect_info_list
要求:
- 选出若干矩形使得所有点都被覆盖 (每个点至少属于1个选中矩形)
- 最终并集面积最小 (做近似贪心)
返回:
chosen_rects: 最终选出的矩形列表 (每个是 shapely Polygon)
"""
n = len(points)
all_indices = set(range(n)) # 所有点的索引
uncovered = set(all_indices) # 尚未被覆盖的点索引
chosen_rects = []
union_polygon = None # 当前已选矩形的并集
# 如果没有任何矩形可用,就直接失败
if not rect_info_list:
return []
# 为了在贪心过程中快速评估"新矩形带来的额外并集面积"
# 我们每次选择矩形后更新 union_polygon然后比较 union_polygon.union(new_rect).area - union_polygon.area
# 但 union_polygon 初始为 None
while uncovered:
best_gain = 0
best_new_area = float('inf')
best_rect = None
best_covered_new = set()
for rect_poly, covered_set in rect_info_list:
# 计算能覆盖多少"尚未覆盖"的点
newly_covered = uncovered.intersection(covered_set)
if not newly_covered:
continue
# 计算额外增加的并集面积
if union_polygon is None:
# 第一次选union_polygon 为空 => new_area = rect_poly.area
new_area = rect_poly.area
area_increase = new_area
else:
# 计算 union_polygon rect_poly 的面积
test_union = union_polygon.union(rect_poly)
new_area = test_union.area
area_increase = new_area - union_polygon.area
# 贪心策略:最大化 (覆盖点数量) / (面积增量)
# 或者 equivalently, (覆盖点数量) 多、(面积增量) 小 都是好
ratio = len(newly_covered) / max(area_increase, 1e-12)
# 我们要找到 ratio 最大的那个
if ratio > best_gain:
best_gain = ratio
best_new_area = area_increase
best_rect = rect_poly
best_covered_new = newly_covered
if best_rect is None:
# 没有可选的矩形能覆盖任何剩余点 => 失败 (无法覆盖所有点)
return []
# 选中 best_rect
chosen_rects.append(best_rect)
uncovered -= best_covered_new
# 更新并集
if union_polygon is None:
union_polygon = best_rect
else:
union_polygon = union_polygon.union(best_rect)
return chosen_rects
# ====================== 3) 主流程: 离散搜索 (w,h) + 贪心覆盖 ======================
def find_optimal_rectangles_cover_all_points(
points,
base_w,
base_h,
overlap=0.1,
steps=5,
min_points=800
):
"""
[0.5*base_w,1.5*base_w] x [0.5*base_h,1.5*base_h] 的离散区间枚举 (w,h)
- 生成可用矩形(800 )的列表
- 用贪心算法选出子集来覆盖所有点
- 计算选中矩形的并集面积
选出面积最小的方案并返回
"""
overlap = clamp_overlap(overlap)
n = len(points)
if n == 0:
return [], (base_w, base_h), 0.0 # 没有点就不用覆盖了
w_candidates = np.linspace(0.3 * base_w, 2 * base_w, steps)
h_candidates = np.linspace(0.3 * base_h, 2 * base_h, steps)
best_rects = []
best_area = float('inf')
best_w, best_h = base_w, base_h
for w in w_candidates:
for h in h_candidates:
rect_info_list = generate_rectangles_with_point_indices(points, w, h, overlap, min_points)
if not rect_info_list:
# 说明没有任何矩形能达到 "≥800点"
continue
# 用贪心覆盖所有点
chosen_rects = cover_all_points_greedy(points, rect_info_list)
if not chosen_rects:
# 无法覆盖所有点
continue
# 计算并集面积
union_poly = unary_union(chosen_rects)
area_covered = union_poly.area
if area_covered < best_area:
best_area = area_covered
best_rects = chosen_rects
best_w, best_h = w, h
return best_rects, (best_w, best_h), best_area
# ====================== 4) 读取 CSV + 可视化 ======================
def plot_image_points_cover_all_min_area(
image_dir, # 新参数:图片文件夹路径
base_rect_width=0.001,
base_rect_height=0.001,
overlap=0.1,
steps=5,
min_points=800
):
"""
从图片文件夹读取GPS坐标:
1) 使用 GPSExtractor 从图片中提取GPS坐标
2) [0.5*base_w,1.5*base_w] x [0.5*base_h,1.5*base_h] 离散搜索 (w,h)
3) 对每个 (w,h), 先生成所有"含≥800点"的矩形 => 再用贪心覆盖所有点 => 计算并集面积
4) 最小并集面积的方案即近似最优解
5) 最终用该方案可视化
"""
overlap = clamp_overlap(overlap)
# 使用 GPSExtractor 读取图片GPS坐标
extractor = GPSExtractor(image_dir)
gps_df = extractor.extract_all_gps()
if gps_df.empty:
print("未能从图片中提取到GPS坐标。")
return
points = np.column_stack((gps_df['lon'], gps_df['lat'])) # (N, 2), [x=lon, y=lat]
n = len(points)
if n == 0:
print("No points extracted from images.")
return
# 贪心 + 离散搜索
chosen_rects, (best_w, best_h), best_area = find_optimal_rectangles_cover_all_points(
points,
base_w=base_rect_width,
base_h=base_rect_height,
overlap=overlap,
steps=steps,
min_points=min_points
)
if not chosen_rects:
print(f"无法找到满足 '每个矩形≥{min_points}' 且覆盖所有点 的方案,试着调大尺寸/步数/overlap。")
return
# 可视化
plt.figure(figsize=(10, 8))
# 画点
plt.scatter(points[:, 0], points[:, 1], c='red', s=10, label='Points')
# 画矩形
for i, rect in enumerate(chosen_rects):
if rect.is_empty:
continue
x, y = rect.exterior.xy
plt.fill(x, y, edgecolor='green', fill=False, alpha=0.3,
label='Chosen Rectangles' if i == 0 else "")
plt.title(
f"Cover All Points, Each Rect≥{min_points} pts, Minimal Union Area\n"
f"base=({base_rect_width:.6f} x {base_rect_height:.6f}), overlap≤{overlap}, steps={steps}\n"
f"best (w,h)=({best_w:.6f},{best_h:.6f}), union area={best_area:.6f}, #rect={len(chosen_rects)}"
)
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.legend()
plt.grid(True)
plt.show()
# ------------------ 测试入口 ------------------
if __name__ == "__main__":
image_dir = r"C:\datasets\134\code\images" # 替换为你的图片文件夹路径
plot_image_points_cover_all_min_area(
image_dir,
base_rect_width=0.01,
base_rect_height=0.01,
overlap=0.05, # 会被截断到 0.1
steps=40,
min_points=100
)

View File

@ -1,6 +1,6 @@
import argparse
from datetime import timedelta
from odm_preprocess import PreprocessConfig, ImagePreprocessor
from app_plugin import ProcessConfig, ODM_Plugin
def parse_args():
@ -28,7 +28,7 @@ def main():
args = parse_args()
# 创建配置
config = PreprocessConfig(
config = ProcessConfig(
image_dir=args.image_dir,
output_dir=args.output_dir,
mode=args.mode,
@ -50,7 +50,7 @@ def main():
)
# 创建处理器并执行
processor = ImagePreprocessor(config)
processor = ODM_Plugin(config)
processor.process()

View File

@ -49,6 +49,7 @@ class ConvertOBJ:
project_dir = os.path.join(self.output_dir, grid_name, "project")
texturing_dir = os.path.join(project_dir, "odm_texturing")
texturing_dst_dir = os.path.join(project_dir, "odm_texturing_dst")
split_obj_dir = os.path.join(texturing_dst_dir, "split_obj")
opensfm_dir = os.path.join(project_dir, "opensfm")
log_file = os.path.join(
project_dir, "odm_orthophoto", "odm_orthophoto_log.txt")
@ -65,28 +66,41 @@ class ConvertOBJ:
texturing_dir, texturing_dst_dir, utm_offset)
self.downsample_texture(texturing_dir, texturing_dst_dir)
# 将obj文件进行切片
self.logger.info(f"开始切片网格 {grid_id} 的OBJ文件")
os.makedirs(split_obj_dir)
cmd = (
f"D:\\software\\Obj2Tiles\\Obj2Tiles.exe --stage Splitting --lods 1 --divisions 3 "
f"{modified_obj} {split_obj_dir}"
)
subprocess.run(cmd, check=True)
# 执行格式转换Linux下osgconv有问题记得注释掉
self.logger.info(f"开始转换网格 {grid_id} 的OBJ文件")
output_osgb = os.path.join(texturing_dst_dir, "Tile.osgb")
cmd = (
f"osgconv {modified_obj} {output_osgb} "
f"--compressed --smooth --fix-transparency "
)
self.logger.info(f"执行osgconv命令{cmd}")
try:
subprocess.run(cmd, shell=True, check=True, cwd=texturing_dir)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"OSGB转换失败: {str(e)}")
# 先获取split_obj_dir下的所有obj文件
obj_lod_dir = os.path.join(split_obj_dir, "LOD-0")
obj_files = [f for f in os.listdir(
obj_lod_dir) if f.endswith('.obj')]
for obj_file in obj_files:
obj_path = os.path.join(obj_lod_dir, obj_file)
osgb_file = os.path.splitext(obj_file)[0] + '.osgb'
osgb_path = os.path.join(split_obj_dir, osgb_file)
# 执行 osgconv 命令
subprocess.run(['osgconv', obj_path, osgb_path, '--compressed',
'--smooth', '--fix-transparency'], check=True)
# 创建OSGB目录结构复制文件
osgb_base_dir = os.path.join(self.output_dir, "osgb")
data_dir = os.path.join(osgb_base_dir, "Data")
tile_dir = os.path.join(data_dir, f"Tile_{grid_id[0]}_{grid_id[1]}")
os.makedirs(tile_dir, exist_ok=True)
target_osgb = os.path.join(
tile_dir, f"Tile_{grid_id[0]}_{grid_id[1]}.osgb")
shutil.copy2(output_osgb, target_osgb)
for obj_file in obj_files:
obj_file_name = os.path.splitext(obj_file)[0]
tile_dirs = os.path.join(
data_dir, f"grid_{grid_id[0]}_{grid_id[1]}_{obj_file_name}")
os.makedirs(tile_dirs, exist_ok=True)
shutil.copy2(os.path.join(
split_obj_dir, obj_file_name+".osgb"), tile_dirs)
os.rename(os.path.join(tile_dirs, obj_file_name+".osgb"),
os.path.join(tile_dirs, f"grid_{grid_id[0]}_{grid_id[1]}_{obj_file_name}.osgb"))
def _create_merged_metadata(self):
"""创建合并后的metadata.xml文件"""

View File

@ -1,267 +0,0 @@
import os
import subprocess
import json
import shutil
import logging
from pyproj import Transformer
import cv2
class ConvertOBJ:
def __init__(self, output_dir: str):
self.output_dir = output_dir
# 用于存储所有grid的UTM范围
self.ref_east = float('inf')
self.ref_north = float('inf')
# 初始化UTM到WGS84的转换器
self.transformer = Transformer.from_crs(
"EPSG:32649", "EPSG:4326", always_xy=True)
self.logger = logging.getLogger('UAV_Preprocess.ConvertOBJ')
def convert_grid_obj(self, grid_points):
"""转换每个网格的OBJ文件为OSGB格式"""
os.makedirs(os.path.join(self.output_dir,
"osgb", "Data"), exist_ok=True)
# 以第一个grid的UTM坐标作为参照系
first_grid_id = list(grid_points.keys())[0]
first_grid_dir = os.path.join(
self.output_dir,
f"grid_{first_grid_id[0]}_{first_grid_id[1]}",
"project"
)
log_file = os.path.join(
first_grid_dir, "odm_orthophoto", "odm_orthophoto_log.txt")
self.ref_east, self.ref_north = self.read_utm_offset(log_file)
for grid_id in grid_points.keys():
try:
self._convert_single_grid(grid_id, grid_points)
except Exception as e:
self.logger.error(f"网格 {grid_id} 转换失败: {str(e)}")
self._create_merged_metadata()
def _convert_single_grid(self, grid_id, grid_points):
"""转换单个网格的OBJ文件"""
# 构建相关路径
grid_name = f"grid_{grid_id[0]}_{grid_id[1]}"
project_dir = os.path.join(self.output_dir, grid_name, "project")
texturing_dir = os.path.join(project_dir, "odm_texturing")
texturing_dst_dir = os.path.join(project_dir, "odm_texturing_dst")
split_obj_dir = os.path.join(texturing_dst_dir, "split_obj")
opensfm_dir = os.path.join(project_dir, "opensfm")
log_file = os.path.join(
project_dir, "odm_orthophoto", "odm_orthophoto_log.txt")
os.makedirs(texturing_dst_dir, exist_ok=True)
# 修改obj文件z坐标的值
min_25d_z = self.get_min_z_from_obj(os.path.join(
project_dir, 'odm_texturing_25d', 'odm_textured_model_geo.obj'))
self.modify_z_in_obj(texturing_dir, min_25d_z)
# 在新文件夹下利用UTM偏移量修改obj文件顶点坐标纹理文件下采样
utm_offset = self.read_utm_offset(log_file)
modified_obj = self.modify_obj_coordinates(
texturing_dir, texturing_dst_dir, utm_offset)
self.downsample_texture(texturing_dir, texturing_dst_dir)
# 将obj文件进行切片
self.logger.info(f"开始切片网格 {grid_id} 的OBJ文件")
os.makedirs(split_obj_dir)
cmd = (
f"D:\\software\\Obj2Tiles\\Obj2Tiles.exe --stage Splitting --lods 1 --divisions 3 "
f"{modified_obj} {split_obj_dir}"
)
subprocess.run(cmd, check=True)
# 执行格式转换Linux下osgconv有问题记得注释掉
self.logger.info(f"开始转换网格 {grid_id} 的OBJ文件")
# 先获取split_obj_dir下的所有obj文件
obj_lod_dir = os.path.join(split_obj_dir, "LOD-0")
obj_files = [f for f in os.listdir(
obj_lod_dir) if f.endswith('.obj')]
for obj_file in obj_files:
obj_path = os.path.join(obj_lod_dir, obj_file)
osgb_file = os.path.splitext(obj_file)[0] + '.osgb'
osgb_path = os.path.join(split_obj_dir, osgb_file)
# 执行 osgconv 命令
subprocess.run(['osgconv', obj_path, osgb_path, '--compressed',
'--smooth', '--fix-transparency'], check=True)
# 创建OSGB目录结构复制文件
osgb_base_dir = os.path.join(self.output_dir, "osgb")
data_dir = os.path.join(osgb_base_dir, "Data")
for obj_file in obj_files:
obj_file_name = os.path.splitext(obj_file)[0]
tile_dirs = os.path.join(
data_dir, f"grid_{grid_id[0]}_{grid_id[1]}_{obj_file_name}")
os.makedirs(tile_dirs, exist_ok=True)
shutil.copy2(os.path.join(
split_obj_dir, obj_file_name+".osgb"), tile_dirs)
os.rename(os.path.join(tile_dirs, obj_file_name+".osgb"),
os.path.join(tile_dirs, f"grid_{grid_id[0]}_{grid_id[1]}_{obj_file_name}.osgb"))
def _create_merged_metadata(self):
"""创建合并后的metadata.xml文件"""
# 转换为WGS84经纬度
center_lon, center_lat = self.transformer.transform(
self.ref_east, self.ref_north)
metadata_content = f"""<?xml version="1.0" encoding="utf-8"?>
<ModelMetadata version="1">
<SRS>EPSG:4326</SRS>
<SRSOrigin>{center_lon},{center_lat},0</SRSOrigin>
<Texture>
<ColorSource>Visible</ColorSource>
</Texture>
</ModelMetadata>"""
metadata_file = os.path.join(self.output_dir, "osgb", "metadata.xml")
with open(metadata_file, 'w', encoding='utf-8') as f:
f.write(metadata_content)
def read_utm_offset(self, log_file: str) -> tuple:
"""读取UTM偏移量"""
try:
east_offset = None
north_offset = None
with open(log_file, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines):
if 'utm_north_offset' in line and i + 1 < len(lines):
north_offset = float(lines[i + 1].strip())
elif 'utm_east_offset' in line and i + 1 < len(lines):
east_offset = float(lines[i + 1].strip())
if east_offset is None or north_offset is None:
raise ValueError("未找到UTM偏移量")
return east_offset, north_offset
except Exception as e:
self.logger.error(f"读取UTM偏移量时发生错误: {str(e)}")
raise
def modify_obj_coordinates(self, texturing_dir: str, texturing_dst_dir: str, utm_offset: tuple) -> str:
"""修改obj文件中的顶点坐标使用相对坐标系"""
obj_file = os.path.join(
texturing_dir, "odm_textured_model_modified.obj")
obj_dst_file = os.path.join(
texturing_dst_dir, "odm_textured_model_geo_utm.obj")
if not os.path.exists(obj_file):
raise FileNotFoundError(f"找不到OBJ文件: {obj_file}")
shutil.copy2(os.path.join(texturing_dir, "odm_textured_model_geo.mtl"),
os.path.join(texturing_dst_dir, "odm_textured_model_geo.mtl"))
east_offset, north_offset = utm_offset
self.logger.info(
f"UTM坐标偏移{east_offset - self.ref_east}, {north_offset - self.ref_north}")
try:
with open(obj_file, 'r') as f_in, open(obj_dst_file, 'w') as f_out:
for line in f_in:
if line.startswith('v '):
# 处理顶点坐标行
parts = line.strip().split()
# 使用相对于整体最小UTM坐标的偏移
x = float(parts[1]) + (east_offset - self.ref_east)
y = float(parts[2]) + (north_offset - self.ref_north)
z = float(parts[3])
f_out.write(f'v {x:.6f} {z:.6f} {-y:.6f}\n')
elif line.startswith('vn '): # 处理法线向量
parts = line.split()
nx = float(parts[1])
ny = float(parts[2])
nz = float(parts[3])
# 同步反转法线的 Y 轴
new_line = f"vn {nx} {nz} {-ny}\n"
f_out.write(new_line)
else:
# 其他行直接写入
f_out.write(line)
return obj_dst_file
except Exception as e:
self.logger.error(f"修改obj坐标时发生错误: {str(e)}")
raise
def downsample_texture(self, src_dir: str, dst_dir: str):
"""复制并重命名纹理文件对大于100MB的文件进行多次下采样直到文件小于100MB
Args:
src_dir: 源纹理目录
dst_dir: 目标纹理目录
"""
for file in os.listdir(src_dir):
if file.lower().endswith(('.png')):
src_path = os.path.join(src_dir, file)
dst_path = os.path.join(dst_dir, file)
# 检查文件大小(以字节为单位)
file_size = os.path.getsize(src_path)
if file_size <= 100 * 1024 * 1024: # 如果文件小于等于100MB直接复制
shutil.copy2(src_path, dst_path)
else:
# 文件大于100MB进行下采样
img = cv2.imread(src_path, cv2.IMREAD_UNCHANGED)
if_first_ds = True
while file_size > 100 * 1024 * 1024: # 大于100MB
self.logger.info(f"纹理文件 {file} 大于100MB进行下采样")
if if_first_ds:
# 计算新的尺寸长宽各变为1/4
new_size = (img.shape[1] // 4,
img.shape[0] // 4) # 逐步减小尺寸
# 使用双三次插值进行下采样
resized_img = cv2.resize(
img, new_size, interpolation=cv2.INTER_CUBIC)
if_first_ds = False
else:
# 计算新的尺寸长宽各变为1/2
new_size = (img.shape[1] // 2,
img.shape[0] // 2) # 逐步减小尺寸
# 使用双三次插值进行下采样
resized_img = cv2.resize(
img, new_size, interpolation=cv2.INTER_CUBIC)
# 更新文件路径为下采样后的路径
cv2.imwrite(dst_path, resized_img, [
cv2.IMWRITE_PNG_COMPRESSION, 9])
# 更新文件大小和图像
file_size = os.path.getsize(dst_path)
img = cv2.imread(dst_path, cv2.IMREAD_UNCHANGED)
self.logger.info(
f"下采样后文件大小: {file_size / (1024 * 1024):.2f} MB")
def get_min_z_from_obj(self, file_path):
min_z = float('inf') # 初始值设为无穷大
with open(file_path, 'r') as obj_file:
for line in obj_file:
# 检查每一行是否是顶点定义(以 'v ' 开头)
if line.startswith('v '):
# 获取顶点坐标
parts = line.split()
# 将z值转换为浮动数字
z = float(parts[3])
# 更新最小z值
if z < min_z:
min_z = z
return min_z
def modify_z_in_obj(self, texturing_dir, min_25d_z):
obj_file = os.path.join(texturing_dir, 'odm_textured_model_geo.obj')
output_file = os.path.join(
texturing_dir, 'odm_textured_model_modified.obj')
with open(obj_file, 'r') as f_in, open(output_file, 'w') as f_out:
for line in f_in:
if line.startswith('v '): # 顶点坐标行
parts = line.strip().split()
x = float(parts[1])
y = float(parts[2])
z = float(parts[3])
if z < min_25d_z:
z = min_25d_z
f_out.write(f"v {x} {y} {z}\n")
else:
f_out.write(line)

View File

@ -1,62 +0,0 @@
import os
import logging
import numpy as np
from typing import Dict, Tuple
import pandas as pd
import subprocess
import shutil
class MergePly:
def __init__(self, output_dir: str):
self.output_dir = output_dir
self.logger = logging.getLogger('UAV_Preprocess.MergePly')
def merge_grid_laz(self, grid_points: Dict[tuple, pd.DataFrame]):
"""合并所有网格的点云数据"""
try:
# 获取所有点云文件路径
laz_files = []
for grid_id, points in grid_points.items():
laz_path = os.path.join(
self.output_dir,
f"grid_{grid_id[0]}_{grid_id[1]}",
"project",
"odm_georeferencing",
"odm_georeferenced_model.laz"
)
if os.path.exists(laz_path):
laz_files.append(laz_path)
else:
self.logger.warning(
f"网格 ({grid_id[0]},{grid_id[1]}) 的点云文件不存在")
kwargs = {
'all_inputs': " ".join(laz_files),
'output': os.path.join(self.output_dir, 'pointcloud.laz')
}
subprocess.run(
'D:\\software\\LAStools\\bin\\lasmerge64.exe -i {all_inputs} -o "{output}"'.format(**kwargs))
except Exception as e:
self.logger.error(f"PLY点云合并过程中发生错误: {str(e)}", exc_info=True)
raise
if __name__ == "__main__":
from utils.logger import setup_logger
# 设置输出目录和日志
output_dir = r"G:\ODM_output\1009"
setup_logger(output_dir)
# 构造测试用的grid_points字典
grid_points = {
(0, 0): [], # 不再需要GPS点信息
(0, 1): []
}
# 创建MergePly实例并执行合并
merge_ply = MergePly(output_dir)
merge_ply.merge_grid_laz(grid_points)

View File

@ -1,443 +0,0 @@
import os
import logging
import numpy as np
from typing import Dict
import pandas as pd
import shutil
import time
import cv2
import subprocess
class MergeObj:
def __init__(self, output_dir: str):
self.output_dir = output_dir
self.logger = logging.getLogger('UAV_Preprocess.MergeObj')
def read_obj(self, file_path):
"""读取.obj文件返回顶点、纹理坐标、法线、面的列表和MTL文件名"""
vertices = [] # v
tex_coords = [] # vt
normals = [] # vn
faces = [] # f
face_materials = [] # 每个面对应的材质名称
mtl_file = None # mtl文件名
current_material = None # 当前使用的材质
with open(file_path, 'r') as file:
for line in file:
if line.startswith('#') or not line.strip():
continue
parts = line.strip().split()
if not parts:
continue
if parts[0] == 'mtllib': # MTL文件引用
mtl_file = parts[1]
elif parts[0] == 'usemtl': # 材质使用
current_material = parts[1]
elif parts[0] == 'v': # 顶点
vertices.append(
[float(parts[1]), float(parts[2]), float(parts[3])])
elif parts[0] == 'vt': # 纹理坐标
tex_coords.append([float(parts[1]), float(parts[2])])
elif parts[0] == 'vn': # 法线
normals.append(
[float(parts[1]), float(parts[2]), float(parts[3])])
elif parts[0] == 'f': # 面
# 处理面的索引 (v/vt/vn)
face_v = []
face_vt = []
face_vn = []
for p in parts[1:]:
indices = p.split('/')
face_v.append(int(indices[0]))
if len(indices) > 1 and indices[1]:
face_vt.append(int(indices[1]))
if len(indices) > 2:
face_vn.append(int(indices[2]))
faces.append((face_v, face_vt, face_vn))
face_materials.append(current_material) # 记录这个面使用的材质
return vertices, tex_coords, normals, faces, face_materials, mtl_file
def write_obj(self, file_path, vertices, tex_coords, normals, faces, face_materials, mtl_file=None):
"""将顶点、纹理坐标、法线和面写入到.obj文件"""
with open(file_path, 'w') as file:
# 写入MTL文件引用
if mtl_file:
file.write(f"mtllib {mtl_file}\n")
# 写入顶点
for v in vertices:
file.write(f"v {v[0]} {v[1]} {v[2]}\n")
# 写入纹理坐标
for vt in tex_coords:
file.write(f"vt {vt[0]} {vt[1]}\n")
# 写入法线
for vn in normals:
file.write(f"vn {vn[0]} {vn[1]} {vn[2]}\n")
# 写入面(按材质分组)
current_material = None
for face, material in zip(faces, face_materials):
# 如果材质发生变化写入新的usemtl
if material != current_material:
file.write(f"usemtl {material}\n")
current_material = material
face_str = "f"
for i in range(len(face[0])):
face_str += " "
face_str += str(face[0][i])
if face[1]:
face_str += f"/{face[1][i]}"
else:
face_str += "/"
if face[2]:
face_str += f"/{face[2][i]}"
else:
face_str += "/"
file.write(face_str + "\n")
def translate_vertices(self, vertices, translation):
"""平移顶点"""
return [[v[0] + translation[0], v[1] + translation[1], v[2] + translation[2]] for v in vertices]
def merge_two_objs(self, obj1_path: str, obj2_path: str, output_path: str, translation, grid_id1: tuple, grid_id2: tuple):
"""合并两个OBJ文件"""
try:
self.logger.info(f"开始合并OBJ模型:\n输入1: {obj1_path}\n输入2: {obj2_path}")
# 读取两个obj文件
vertices1, tex_coords1, normals1, faces1, face_materials1, mtl1 = self.read_obj(
obj1_path)
vertices2, tex_coords2, normals2, faces2, face_materials2, mtl2 = self.read_obj(
obj2_path)
# 读取MTL文件内容以获取正确的材质名称
src_dir1 = os.path.dirname(obj1_path)
src_dir2 = os.path.dirname(obj2_path)
mtl1_path = os.path.join(src_dir1, mtl1)
mtl2_path = os.path.join(src_dir2, mtl2)
# 读取并更新材质内容
materials1 = self.read_mtl(mtl1_path)
materials2 = self.read_mtl(mtl2_path)
# 创建材质名称映射使用与MTL文件相同的命名格式
material_map1 = {}
material_map2 = {}
# 处理第一个模型的材质映射
for old_name in materials1.keys():
if "grid_0_0" in obj1_path:
material_map1[old_name] = f"material_{grid_id1[0]}_{grid_id1[1]}_{old_name}"
else:
# 更新完一次后,之后就不用再更新了
material_map1[old_name] = old_name
# 处理第二个模型的材质映射
for old_name in materials2.keys():
material_map2[old_name] = f"material_{grid_id2[0]}_{grid_id2[1]}_{old_name}"
# 平移第二个模型的顶点
vertices2_translated = self.translate_vertices(
vertices2, translation)
# 计算偏移量
v_offset = len(vertices1)
vt_offset = len(tex_coords1)
vn_offset = len(normals1)
# 合并顶点、纹理坐标和法线
all_vertices = vertices1 + vertices2_translated
all_tex_coords = tex_coords1 + tex_coords2
all_normals = normals1 + normals2
# 调整第二个模型的面索引和材质名称
all_faces = faces1.copy()
all_face_materials = []
# 更新第一个模型的材质名称
for material in face_materials1:
all_face_materials.append(material_map1.get(material))
# 更新第二个模型的面索引和材质名称
for face, material in zip(faces2, face_materials2):
new_face_v = [f + v_offset for f in face[0]]
new_face_vt = [
f + vt_offset for f in face[1]] if face[1] else []
new_face_vn = [
f + vn_offset for f in face[2]] if face[2] else []
all_faces.append((new_face_v, new_face_vt, new_face_vn))
all_face_materials.append(material_map2.get(material))
# 写入合并后的obj文件使用与MTL文件相同的名称
mtl_filename = "textured_model.mtl" # 使用固定的MTL文件名
self.write_obj(output_path, all_vertices, all_tex_coords, all_normals,
all_faces, all_face_materials, mtl_filename)
self.logger.info(f"模型合并成功,已保存至: {output_path}")
except Exception as e:
self.logger.error(f"合并OBJ模型时发生错误: {str(e)}", exc_info=True)
raise
def read_mtl(self, mtl_path: str) -> dict:
"""读取MTL文件内容
Returns:
dict: 材质名称到材质信息的映射
"""
materials = {}
current_material = None
with open(mtl_path, 'r') as f:
content = f.read()
for line in content.strip().split('\n'):
if not line:
continue
parts = line.split()
if not parts:
continue
if parts[0] == 'newmtl':
current_material = parts[1]
materials[current_material] = []
elif current_material:
materials[current_material].append(line)
return materials
def copy_and_rename_texture(self, src_dir: str, dst_dir: str, grid_id: tuple) -> dict:
"""复制并重命名纹理文件对大于100MB的文件进行下采样
Args:
src_dir: 源纹理目录
dst_dir: 目标纹理目录
grid_id: 网格ID
Returns:
dict: 原始文件名到新文件名的映射
"""
texture_map = {}
os.makedirs(dst_dir, exist_ok=True)
for file in os.listdir(src_dir):
if file.lower().endswith(('.png', '.jpg', '.jpeg')):
# 生成新的文件名
new_name = f"grid_{grid_id[0]}_{grid_id[1]}_{file}"
src_path = os.path.join(src_dir, file)
dst_path = os.path.join(dst_dir, new_name)
# 检查文件大小(以字节为单位)
file_size = os.path.getsize(src_path)
if file_size > 100 * 1024 * 1024: # 大于100MB
self.logger.info(f"纹理文件 {file} 大于100MB进行4倍下采样")
# 读取图像
img = cv2.imread(src_path, cv2.IMREAD_UNCHANGED)
if img is not None:
# 计算新的尺寸长宽各变为1/4
new_size = (img.shape[1] // 4, img.shape[0] // 4)
# 使用双三次插值进行下采样
resized_img = cv2.resize(
img, new_size, interpolation=cv2.INTER_CUBIC)
# 保存压缩后的图像
if file.lower().endswith('.png'):
cv2.imwrite(dst_path, resized_img, [
cv2.IMWRITE_PNG_COMPRESSION, 9])
else:
cv2.imwrite(dst_path, resized_img, [
cv2.IMWRITE_JPEG_QUALITY, 95])
else:
self.logger.warning(f"无法读取图像文件: {src_path}")
shutil.copy2(src_path, dst_path)
else:
# 文件大小未超过100MB直接复制
shutil.copy2(src_path, dst_path)
texture_map[file] = new_name
self.logger.debug(f"处理纹理文件: {file} -> {new_name}")
return texture_map
def update_mtl_content(self, materials: dict, texture_map: dict, grid_id: tuple) -> dict:
"""更新材质内容,修改材质名称和纹理路径
Args:
materials: 原始材质信息
texture_map: 纹理文件映射
grid_id: 网格ID
Returns:
dict: 更新后的材质信息
"""
updated_materials = {}
for mat_name, content in materials.items():
# 为材质名称添加网格ID前缀与OBJ文件中的usemtl保持一致
new_mat_name = f"material_{grid_id[0]}_{grid_id[1]}_{mat_name}"
updated_content = []
for line in content:
if line.startswith('map_'): # 更新纹理文件路径
parts = line.split()
old_texture = parts[-1]
if old_texture in texture_map:
parts[-1] = texture_map[old_texture]
line = ' '.join(parts)
updated_content.append(line)
updated_materials[new_mat_name] = updated_content
return updated_materials
def merge_grid_obj(self, grid_points: Dict[tuple, pd.DataFrame], translations: Dict[tuple, tuple]):
"""合并所有网格的OBJ模型"""
if len(grid_points) == 1:
grid_id = list(grid_points.keys())[0]
shutil.copytree(os.path.join(self.output_dir,
f"grid_{grid_id[0]}_{grid_id[1]}",
"project",
"odm_texturing"),
os.path.join(self.output_dir, "texturing"))
os.rename(os.path.join(self.output_dir, "texturing", "odm_textured_model_geo.obj"),
os.path.join(self.output_dir, "texturing", "textured_model.obj"))
self.logger.info(f"开始执行格式转换")
docker_command = (
f"docker run --rm -it "
f"-v {self.output_dir}/texturing:/data "
f"-e LD_LIBRARY_PATH=/opt/osg/build/lib:$LD_LIBRARY_PATH "
f"osg-ubuntu2004 osgconv /data/textured_model.obj /data/textured_model.osgb"
)
self.logger.info(docker_command)
subprocess.run(
docker_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.logger.info(f"格式转换完成")
return
try:
# 创建输出目录
output_model_dir = os.path.join(self.output_dir, "texturing")
os.makedirs(output_model_dir, exist_ok=True)
# 获取所有有效的网格文件
grid_files = {}
for grid_id, points in grid_points.items():
base_dir = os.path.join(
self.output_dir,
f"grid_{grid_id[0]}_{grid_id[1]}",
"project",
"odm_texturing"
)
obj_path = os.path.join(base_dir, "odm_textured_model_geo.obj")
mtl_path = os.path.join(base_dir, "odm_textured_model_geo.mtl")
if not os.path.exists(obj_path) or not os.path.exists(mtl_path):
self.logger.warning(
f"网格 ({grid_id[0]},{grid_id[1]}) 的文件不存在")
continue
grid_files[grid_id] = {
'obj': obj_path,
'mtl': mtl_path,
'dir': base_dir
}
if not grid_files:
self.logger.error("没有找到有效的文件")
return
# 收集所有材质和纹理信息
all_materials = {}
for grid_id, files in grid_files.items():
# 复制并重命名纹理文件
texture_map = self.copy_and_rename_texture(
files['dir'],
output_model_dir,
grid_id
)
# 读取并更新MTL内容
materials = self.read_mtl(files['mtl'])
updated_materials = self.update_mtl_content(
materials,
texture_map,
grid_id
)
all_materials.update(updated_materials)
# 写入合并后的MTL文件
final_mtl = os.path.join(output_model_dir, "textured_model.mtl")
with open(final_mtl, 'w') as f:
for mat_name, content in all_materials.items():
f.write(f"newmtl {mat_name}\n")
for line in content:
f.write(f"{line}\n")
f.write("\n")
# 合并OBJ文件
reference_id = list(grid_files.keys())[0]
merged_obj = grid_files[reference_id]['obj']
temp_files = [] # 记录所有中间文件
for grid_id, files in list(grid_files.items())[1:]:
translation = translations[grid_id]
translation = (translation[0], translation[1], 0)
# 生成临时输出文件名
temp_output = os.path.join(
output_model_dir,
f"temp_merged_{int(time.time())}.obj"
)
temp_files.append(temp_output) # 添加到临时文件列表
self.merge_two_objs(
merged_obj, files['obj'], temp_output, translation, reference_id, grid_id)
merged_obj = temp_output
# 最终结果
final_obj = os.path.join(output_model_dir, "textured_model.obj")
try:
if os.path.exists(final_obj):
os.remove(final_obj)
os.rename(merged_obj, final_obj)
except Exception as e:
self.logger.warning(f"重命名最终文件失败: {str(e)}")
shutil.copy2(merged_obj, final_obj)
try:
os.remove(merged_obj)
except:
pass
# 清理所有临时文件
for temp_file in temp_files:
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except Exception as e:
self.logger.warning(
f"删除临时文件失败: {temp_file}, 错误: {str(e)}")
self.logger.info(
f"模型合并完成,输出目录: {output_model_dir}\n"
f"- OBJ文件: textured_model.obj\n"
f"- MTL文件: textured_model.mtl\n"
f"- 纹理文件: {len(os.listdir(output_model_dir)) - 2}"
)
self.logger.info(f"开始执行格式转换")
docker_command = (
f"docker run --rm -it "
f"-v {self.output_model_dir}:/data "
f"-e LD_LIBRARY_PATH=/opt/osg/build/lib:$LD_LIBRARY_PATH "
f"osg-ubuntu2004 osgconv /data/textured_model.obj /data/textured_model.osgb"
)
self.logger.info(docker_command)
subprocess.run(
docker_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.logger.info(f"格式转换完成")
except Exception as e:
self.logger.error(f"合并过程中发生错误: {str(e)}", exc_info=True)
raise

View File

@ -1,41 +0,0 @@
from PIL import Image
import os
import shutil
from multiprocessing import Pool
from functools import partial
def convert_image(file_name, img_dir, output_dir, convert_format):
input_path = os.path.join(img_dir, file_name)
output_path = os.path.join(output_dir, file_name.replace(".jpg", f".{convert_format}"))
# 打开并转换图像
img = Image.open(input_path)
img.save(output_path)
def main():
convert_format = "png"
img_dir = r"E:\datasets\UAV\134\project\images"
output_dir = r"E:\datasets\UAV\134_png\project\images"
# 如果输出目录存在,先删除
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
# 创建输出目录
os.makedirs(output_dir)
# 获取所有文件名
file_names = os.listdir(img_dir)
# 创建部分函数,固定除文件名外的其他参数
convert_partial = partial(convert_image,
img_dir=img_dir,
output_dir=output_dir,
convert_format=convert_format)
# 使用进程池并行处理
with Pool() as pool:
pool.map(convert_partial, file_names)
if __name__ == '__main__':
main()

View File

@ -1,45 +0,0 @@
import cv2
import numpy as np
def resize_image(image, max_size=1200):
# 获取原始尺寸
height, width = image.shape[:2]
# 计算缩放比例
scale = min(max_size/width, max_size/height)
if scale < 1:
# 只有当图像过大时才进行缩放
new_width = int(width * scale)
new_height = int(height * scale)
resized = cv2.resize(image, (new_width, new_height))
return resized, scale
return image, 1.0
def mouse_callback(event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONDOWN:
# 计算原始图像上的坐标
original_x = int(x / scale)
original_y = int(y / scale)
print(f'原始图像坐标 (x, y): ({original_x}, {original_y})')
# 在缩放后的图像上标记点击位置
cv2.circle(displayed_img, (x, y), 3, (0, 255, 0), -1)
cv2.imshow('image', displayed_img)
# 读取图像
img = cv2.imread(r"E:\datasets\UAV\134\project\images\20240312_093841_W_W.jpg")
if img is None:
print('错误:无法读取图像')
exit()
# 缩放图像
displayed_img, scale = resize_image(img)
# 创建窗口并设置鼠标回调函数
cv2.imshow('image', displayed_img)
cv2.setMouseCallback('image', mouse_callback)
# 等待按键,按 'q' 退出
while True:
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()

View File

@ -1,245 +0,0 @@
import os
import shutil
def read_obj(file_path):
"""读取OBJ文件返回顶点、纹理坐标、法线、面的列表和MTL文件名"""
vertices = [] # v
tex_coords = [] # vt
normals = [] # vn
faces = [] # f
face_materials = [] # 每个面对应的材质名称
mtl_file = None # mtl文件名
current_material = None # 当前使用的材质
with open(file_path, 'r') as file:
for line in file:
if line.startswith('#') or not line.strip():
continue
parts = line.strip().split()
if not parts:
continue
if parts[0] == 'mtllib': # MTL文件引用
mtl_file = parts[1]
elif parts[0] == 'usemtl': # 材质使用
current_material = parts[1]
elif parts[0] == 'v': # 顶点
vertices.append([float(parts[1]), float(parts[2]), float(parts[3])])
elif parts[0] == 'vt': # 纹理坐标
tex_coords.append([float(parts[1]), float(parts[2])])
elif parts[0] == 'vn': # 法线
normals.append([float(parts[1]), float(parts[2]), float(parts[3])])
elif parts[0] == 'f': # 面
# 处理面的索引 (v/vt/vn)
face_v = []
face_vt = []
face_vn = []
for p in parts[1:]:
indices = p.split('/')
face_v.append(int(indices[0]))
if len(indices) > 1 and indices[1]:
face_vt.append(int(indices[1]))
if len(indices) > 2:
face_vn.append(int(indices[2]))
faces.append((face_v, face_vt, face_vn))
face_materials.append(current_material) # 记录这个面使用的材质
return vertices, tex_coords, normals, faces, face_materials, mtl_file
def read_mtl(mtl_path: str) -> tuple:
"""读取MTL文件内容
Returns:
tuple: (文件内容列表, 材质名称列表)
"""
content = []
material_names = []
with open(mtl_path, 'r') as f:
content = f.readlines()
for line in content:
if line.startswith('newmtl'):
material_names.append(line.strip().split()[1])
return content, material_names
def update_mtl_content(content: list, model_id: int) -> tuple:
"""更新MTL文件内容修改材质名称和纹理文件路径
Returns:
tuple: (更新后的内容, 更新后的材质名称列表)
"""
updated_lines = []
updated_material_names = []
current_material = None
for line in content:
if line.startswith('newmtl'):
# 为材质名称添加前缀
parts = line.strip().split()
material_name = parts[1]
current_material = f"grid_{model_id}_{material_name}"
updated_material_names.append(current_material)
updated_lines.append(f"newmtl {current_material}\n")
elif line.startswith('map_'):
# 更新纹理文件路径
parts = line.strip().split()
texture_file = os.path.basename(parts[-1])
parts[-1] = f"grid_{model_id}_{texture_file}"
updated_lines.append(' '.join(parts) + '\n')
else:
updated_lines.append(line)
return updated_lines, updated_material_names
def merge_mtl_files(mtl1_path: str, mtl2_path: str, output_path: str) -> tuple:
"""合并两个MTL文件
Returns:
tuple: (第一个模型的材质名称列表, 第二个模型的材质名称列表)
"""
# 读取两个MTL文件
content1, materials1 = read_mtl(mtl1_path)
content2, materials2 = read_mtl(mtl2_path)
# 更新两个MTL的内容
updated_content1, updated_materials1 = update_mtl_content(content1, 0)
updated_content2, updated_materials2 = update_mtl_content(content2, 1)
# 合并并写入新的MTL文件
with open(output_path, 'w') as f:
f.writelines(updated_content1)
f.write('\n') # 添加分隔行
f.writelines(updated_content2)
return updated_materials1, updated_materials2
def write_obj(file_path, vertices, tex_coords, normals, faces, face_materials, mtl_file=None):
"""将顶点、纹理坐标、法线和面写入到OBJ文件"""
with open(file_path, 'w') as file:
# 写入MTL文件引用
if mtl_file:
file.write(f"mtllib {mtl_file}\n")
# 写入顶点
for v in vertices:
file.write(f"v {v[0]} {v[1]} {v[2]}\n")
# 写入纹理坐标
for vt in tex_coords:
file.write(f"vt {vt[0]} {vt[1]}\n")
# 写入法线
for vn in normals:
file.write(f"vn {vn[0]} {vn[1]} {vn[2]}\n")
# 写入面(按材质分组)
current_material = None
for face, material in zip(faces, face_materials):
# 如果材质发生变化写入新的usemtl
if material != current_material:
file.write(f"usemtl {material}\n")
current_material = material
face_str = "f"
for j in range(len(face[0])):
face_str += " "
face_str += str(face[0][j])
if face[1]:
face_str += f"/{face[1][j]}"
else:
face_str += "/"
if face[2]:
face_str += f"/{face[2][j]}"
else:
face_str += "/"
file.write(face_str + "\n")
def translate_vertices(vertices, translation):
"""平移顶点"""
return [[v[0] + translation[0], v[1] + translation[1], v[2] + translation[2]] for v in vertices]
def copy_mtl_and_textures(src_dir: str, dst_dir: str, model_id: int):
"""复制MTL文件和相关的纹理文件并重命名避免冲突
Args:
src_dir: 源目录包含MTL和纹理文件
dst_dir: 目标目录
model_id: 模型ID用于重命名
"""
# 复制并重命名纹理文件
for file in os.listdir(src_dir):
if file.lower().endswith('.png'):
src_file = os.path.join(src_dir, file)
new_name = f"grid_{model_id}_{file}"
dst_file = os.path.join(dst_dir, new_name)
shutil.copy2(src_file, dst_file)
print(f"复制纹理文件: {file} -> {new_name}")
def merge_objs(obj1_path, obj2_path, output_path):
"""合并两个OBJ文件"""
print(f"开始合并OBJ模型:\n输入1: {obj1_path}\n输入2: {obj2_path}")
# 读取两个obj文件
vertices1, tex_coords1, normals1, faces1, face_materials1, mtl1 = read_obj(obj1_path)
vertices2, tex_coords2, normals2, faces2, face_materials2, mtl2 = read_obj(obj2_path)
# 固定平移量(0, 1000, 0)
translation = (0, 1000, 0)
# 平移第二个模型的顶点
vertices2_translated = translate_vertices(vertices2, translation)
# 计算偏移量
v_offset = len(vertices1)
vt_offset = len(tex_coords1)
vn_offset = len(normals1)
# 合并顶点、纹理坐标和法线
all_vertices = vertices1 + vertices2_translated
all_tex_coords = tex_coords1 + tex_coords2
all_normals = normals1 + normals2
# 调整第二个模型的面索引和材质名称
all_faces = faces1.copy()
all_face_materials = face_materials1.copy()
for face, material in zip(faces2, face_materials2):
new_face_v = [f + v_offset for f in face[0]]
new_face_vt = [f + vt_offset for f in face[1]] if face[1] else []
new_face_vn = [f + vn_offset for f in face[2]] if face[2] else []
all_faces.append((new_face_v, new_face_vt, new_face_vn))
# 为第二个模型的材质名称添加前缀
all_face_materials.append(f"grid_1_{material}")
# 为第一个模型的材质添加前缀
all_face_materials[:len(faces1)] = [f"grid_0_{mat}" for mat in face_materials1]
# 创建输出子目录
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
# 复制并重命名两个模型的纹理文件
src_dir1 = os.path.dirname(obj1_path)
src_dir2 = os.path.dirname(obj2_path)
copy_mtl_and_textures(src_dir1, output_dir, 0)
copy_mtl_and_textures(src_dir2, output_dir, 1)
# 合并MTL文件并获取材质名称
src_mtl1 = os.path.join(src_dir1, mtl1)
src_mtl2 = os.path.join(src_dir2, mtl2)
dst_mtl = os.path.join(output_dir, "merged_model.mtl")
merge_mtl_files(src_mtl1, src_mtl2, dst_mtl)
# 写入合并后的obj文件
write_obj(output_path, all_vertices, all_tex_coords, all_normals,
all_faces, all_face_materials, "merged_model.mtl")
print(f"模型合并成功,已保存至: {output_path}")
if __name__ == "__main__":
# 测试参数
obj1_path = r"G:\ODM_output\1009\grid_0_0\project\odm_texturing\odm_textured_model_geo.obj"
obj2_path = r"G:\ODM_output\1009\grid_0_1\project\odm_texturing\odm_textured_model_geo.obj"
output_dir = r"G:\ODM_output\1009\merge_test"
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "merged_test.obj")
# 执行合并
merge_objs(obj1_path, obj2_path, output_path)

View File

@ -1,55 +0,0 @@
from datetime import datetime
import json
def parse_args():
import argparse
parser = argparse.ArgumentParser(description="ODM log time")
parser.add_argument(
"--path", default=r"E:\datasets\UAV\134\project\log.json")
args = parser.parse_args()
return args
def main(args):
# 读取 JSON 文件
with open(args.path, 'r') as file:
data = json.load(file)
# 提取 "stages" 中每个步骤的开始时间和持续时间
stage_timings = []
for i, stage in enumerate(data.get("stages", [])):
stage_name = stage.get("name", "Unnamed Stage")
start_time = stage.get("startTime")
# 获取当前阶段的开始时间
if start_time:
start_dt = datetime.fromisoformat(start_time)
# 获取阶段的结束时间:可以是下一个阶段的开始时间,或当前阶段的 `endTime`(如果存在)
if i + 1 < len(data["stages"]):
end_time = data["stages"][i + 1].get("startTime")
else:
end_time = stage.get("endTime") or data.get("endTime")
if end_time:
end_dt = datetime.fromisoformat(end_time)
duration = (end_dt - start_dt).total_seconds()
stage_timings.append((stage_name, duration))
# 输出每个阶段的持续时间,调整为对齐格式
total_time = 0
print(f"{'Stage Name':<25} {'Duration (seconds)':>15}")
print("=" * 45)
for stage_name, duration in stage_timings:
print(f"{stage_name:<25} {duration:>15.2f}")
total_time += duration
print('Total Time:', total_time)
if __name__ == '__main__':
args = parse_args()
main(args)

View File

@ -1,63 +0,0 @@
import os
import piexif
from PIL import Image
def dms_to_decimal(dms):
"""将DMS格式转换为十进制度"""
if not dms:
return None
degrees = dms[0][0] / dms[0][1]
minutes = dms[1][0] / dms[1][1] / 60
seconds = dms[2][0] / dms[2][1] / 3600
return degrees + minutes + seconds
def get_gps_info(image_path):
"""获取图片的GPS信息"""
try:
image = Image.open(image_path)
exif_data = piexif.load(image.info['exif'])
gps_info = exif_data.get("GPS", {})
if not gps_info:
return None, None, None
# 获取纬度
lat = dms_to_decimal(gps_info.get(2))
if lat and gps_info.get(1) and gps_info[1] == b'S':
lat = -lat
# 获取经度
lon = dms_to_decimal(gps_info.get(4))
if lon and gps_info.get(3) and gps_info[3] == b'W':
lon = -lon
# 获取高度
alt = None
if 6 in gps_info:
alt = gps_info[6][0] / gps_info[6][1]
return lat, lon, alt
except Exception as e:
print(f"读取文件 {image_path} 时出错: {str(e)}")
return None, None, None
def main():
# 设置输入输出路径
image_dir = r"E:\datasets\UAV\134\project\images"
output_path = r"E:\datasets\UAV\134\project\gps.txt"
with open(output_path, 'w', encoding='utf-8') as f:
for filename in os.listdir(image_dir):
if filename.lower().endswith(('.jpg', '.jpeg')):
image_path = os.path.join(image_dir, filename)
lat, lon, alt = get_gps_info(image_path)
if lat is not None and lon is not None:
# 如果没有高度信息使用0
alt = alt if alt is not None else 0
# filename = filename.replace(".jpg", ".tif")
f.write(f"{filename} {lat} {lon} {alt}\n")
if __name__ == '__main__':
main()

View File

@ -1,51 +0,0 @@
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import matplotlib.pyplot as plt
from utils.gps_extractor import GPSExtractor
DATASET = r'E:\datasets\UAV\1009\project\images'
if __name__ == '__main__':
extractor = GPSExtractor(DATASET)
gps_points = extractor.extract_all_gps()
# 创建两个子图
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, 8))
# 左图:原始散点图
ax1.scatter(gps_points['lon'], gps_points['lat'],
color='blue', marker='o', label='GPS Points')
ax1.set_title("GPS Coordinates of Images", fontsize=14)
ax1.set_xlabel("Longitude", fontsize=12)
ax1.set_ylabel("Latitude", fontsize=12)
ax1.grid(True)
ax1.legend()
# # 右图:按时间排序的轨迹图
# gps_points_sorted = gps_points.sort_values('date')
# # 绘制飞行轨迹线
# ax2.plot(gps_points_sorted['lon'][300:600], gps_points_sorted['lat'][300:600],
# color='blue', linestyle='-', linewidth=1, alpha=0.6)
# # 绘制GPS点
# ax2.scatter(gps_points_sorted['lon'][300:600], gps_points_sorted['lat'][300:600],
# color='red', marker='o', s=30, label='GPS Points')
# 标记起点和终点
# ax2.scatter(gps_points_sorted['lon'].iloc[0], gps_points_sorted['lat'].iloc[0],
# color='green', marker='^', s=100, label='Start')
# ax2.scatter(gps_points_sorted['lon'].iloc[-1], gps_points_sorted['lat'].iloc[-1],
# color='purple', marker='s', s=100, label='End')
ax2.set_title("UAV Flight Trajectory", fontsize=14)
ax2.set_xlabel("Longitude", fontsize=12)
ax2.set_ylabel("Latitude", fontsize=12)
ax2.grid(True)
ax2.legend()
# 调整子图之间的间距
plt.tight_layout()
plt.show()

View File

@ -1,138 +0,0 @@
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import matplotlib.pyplot as plt
from datetime import timedelta
import logging
import numpy as np
from utils.gps_extractor import GPSExtractor
from utils.logger import setup_logger
class GPSTimeVisualizer:
"""按时间组可视化GPS点"""
def __init__(self, image_dir: str, output_dir: str):
self.image_dir = image_dir
self.output_dir = output_dir
self.logger = logging.getLogger('UAV_Preprocess.GPSVisualizer')
def _group_by_time(self, points_df, time_threshold=timedelta(minutes=5)):
"""按时间间隔对点进行分组"""
if 'date' not in points_df.columns:
self.logger.error("数据中缺少date列")
return [points_df]
# 将date为空的行单独作为一组
null_date_group = points_df[points_df['date'].isna()]
valid_date_points = points_df[points_df['date'].notna()]
if not null_date_group.empty:
self.logger.info(f"发现 {len(null_date_group)} 个无时间戳的点,将作为单独分组")
if valid_date_points.empty:
self.logger.warning("没有有效的时间戳数据")
return [null_date_group] if not null_date_group.empty else []
# 按时间排序
valid_date_points = valid_date_points.sort_values('date')
# 计算时间差
time_diffs = valid_date_points['date'].diff()
# 找到时间差超过阈值的位置
time_groups = []
current_group_start = 0
for idx, time_diff in enumerate(time_diffs):
if time_diff and time_diff > time_threshold:
# 添加当前组
current_group = valid_date_points.iloc[current_group_start:idx]
time_groups.append(current_group)
current_group_start = idx
# 添加最后一组
last_group = valid_date_points.iloc[current_group_start:]
if not last_group.empty:
time_groups.append(last_group)
# 如果有空时间戳的点,将其作为最后一组
if not null_date_group.empty:
time_groups.append(null_date_group)
return time_groups
def visualize_time_groups(self, time_threshold=timedelta(minutes=5)):
"""在同一张图上显示所有时间组,用不同颜色区分"""
# 提取GPS数据
extractor = GPSExtractor(self.image_dir)
gps_points = extractor.extract_all_gps()
# 按时间分组
time_groups = self._group_by_time(gps_points, time_threshold)
# 创建图形
plt.figure(figsize=(15, 10))
# 生成不同的颜色
colors = plt.cm.rainbow(np.linspace(0, 1, len(time_groups)))
# 为每个时间组绘制点和轨迹
for idx, (group, color) in enumerate(zip(time_groups, colors)):
if not group['date'].isna().any():
# 有时间戳的组
sorted_group = group.sort_values('date')
# 绘制轨迹线
plt.plot(sorted_group['lon'], sorted_group['lat'],
color=color, linestyle='-', linewidth=1.5, alpha=0.6,
label=f'Flight Path {idx + 1}')
# 绘制GPS点
plt.scatter(sorted_group['lon'], sorted_group['lat'],
color=color, marker='o', s=30, alpha=0.6)
# 标记起点和终点
plt.scatter(sorted_group['lon'].iloc[0], sorted_group['lat'].iloc[0],
color=color, marker='^', s=100,
label=f'Start {idx + 1} ({sorted_group["date"].min().strftime("%H:%M:%S")})')
plt.scatter(sorted_group['lon'].iloc[-1], sorted_group['lat'].iloc[-1],
color=color, marker='s', s=100,
label=f'End {idx + 1} ({sorted_group["date"].max().strftime("%H:%M:%S")})')
else:
# 无时间戳的组
plt.scatter(group['lon'], group['lat'],
color=color, marker='x', s=50, alpha=0.6,
label='No Timestamp Points')
plt.title("GPS Points by Time Groups", fontsize=14)
plt.xlabel("Longitude", fontsize=12)
plt.ylabel("Latitude", fontsize=12)
plt.grid(True)
# 调整图例位置和大小
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', fontsize=10)
# 调整布局以适应图例
plt.tight_layout()
# 保存图片
plt.savefig(os.path.join(self.output_dir, 'gps_time_groups_combined.png'),
dpi=300, bbox_inches='tight')
plt.close()
self.logger.info(f"已生成包含 {len(time_groups)} 个时间组的组合可视化图形")
if __name__ == '__main__':
# 设置数据集路径
DATASET = r'F:\error_data\20241108134711\3D'
output_dir = r'E:\studio2\ODM_pro\test'
os.makedirs(output_dir, exist_ok=True)
# 设置日志
setup_logger(os.path.dirname(output_dir))
# 创建可视化器并生成图形
visualizer = GPSTimeVisualizer(DATASET, output_dir)
visualizer.visualize_time_groups(time_threshold=timedelta(minutes=5))

View File

@ -1,12 +0,0 @@
import subprocess
def run_docker_command(command):
result = subprocess.run(command, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return result.stdout.decode('utf-8'), result.stderr.decode('utf-8')
if __name__ == "__main__":
command = "docker run -ti --rm -v g:/ODM_output/20241024100834/grid_1:/datasets opendronemap/odm --project-path /datasets project --max-concurrency 10 --force-gps --feature-quality lowest --orthophoto-resolution 10 --fast-orthophoto --skip-3dmodel --rerun-all"
stdout, stderr = run_docker_command(command)
print(stdout)