UAV/utils/grid_divider.py
2025-01-04 14:49:42 +08:00

277 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from geopy.distance import geodesic
import matplotlib.pyplot as plt
import os
class GridDivider:
"""划分网格,并将图片分配到对应网格"""
def __init__(self, overlap=0.1, grid_size=500, output_dir=None):
self.overlap = overlap
self.grid_size = grid_size
self.output_dir = output_dir
self.logger = logging.getLogger('UAV_Preprocess.GridDivider')
self.logger.info(f"初始化网格划分器,重叠率: {overlap}")
self.num_grids_width = 0 # 添加网格数量属性
self.num_grids_height = 0
def adjust_grid_size(self, points_df):
"""动态调整网格大小
Args:
points_df: 包含GPS点的DataFrame
Returns:
tuple: (grids, translations, grid_points, final_grid_size)
"""
self.logger.info(f"开始动态调整网格大小,初始大小: {self.grid_size}")
while True:
# 使用当前grid_size划分网格
grids, translations = self.divide_grids(points_df)
grid_points, multiple_grid_points = self.assign_to_grids(points_df, grids)
# 检查每个网格中的点数
max_points = 0
for grid_id, points in grid_points.items():
max_points = max(max_points, len(points))
self.logger.info(f"当前网格大小: {self.grid_size}米, 单个网格最大点数: {max_points}")
# 如果最大点数超过1500减小网格大小
if max_points > 1500:
self.grid_size -= 100
self.logger.info(f"点数超过1500减小网格大小至: {self.grid_size}")
if self.grid_size < 500: # 设置一个最小网格大小限制
self.logger.warning("网格大小已达到最小值500米停止调整")
break
else:
self.logger.info(f"找到合适的网格大小: {self.grid_size}")
break
return grids
def adjust_grid_size_and_overlap(self, points_df):
"""动态调整网格重叠率"""
grids = self.adjust_grid_size(points_df)
self.logger.info(f"开始动态调整网格重叠率,初始重叠率: {self.overlap}")
while True:
# 使用调整好的网格大小划分网格
grids, translations = self.divide_grids(points_df)
grid_points, multiple_grid_points = self.assign_to_grids(points_df, grids)
if len(grids) == 1:
self.logger.info(f"网格数量为1跳过重叠率调整")
break
elif multiple_grid_points < 0.1*len(points_df):
self.overlap += 0.02
self.logger.info(f"重叠率增加到: {self.overlap}")
else:
self.logger.info(f"找到合适的重叠率: {self.overlap}, 有{multiple_grid_points}个点被分配到多个网格")
break
return grids, translations, grid_points
def divide_grids(self, points_df):
"""计算边界框并划分网格
Returns:
tuple: (grids, translations)
- grids: 网格边界列表
- translations: 网格平移量字典
"""
self.logger.info("开始划分网格")
min_lat, max_lat = points_df['lat'].min(), points_df['lat'].max()
min_lon, max_lon = points_df['lon'].min(), points_df['lon'].max()
# 计算区域的实际距离(米)
width = geodesic((min_lat, min_lon), (min_lat, max_lon)).meters
height = geodesic((min_lat, min_lon), (max_lat, min_lon)).meters
self.logger.info(f"区域宽度: {width:.2f}米, 高度: {height:.2f}")
# 精细调整网格的长宽避免出现2*grid_size-1的情况的影响
grid_size_lt = [self.grid_size -200, self.grid_size -100, self.grid_size , self.grid_size +100, self.grid_size +200]
width_modulus_lt = [width % grid_size for grid_size in grid_size_lt]
grid_width = grid_size_lt[width_modulus_lt.index(min(width_modulus_lt))]
height_modulus_lt = [height % grid_size for grid_size in grid_size_lt]
grid_height = grid_size_lt[height_modulus_lt.index(min(height_modulus_lt))]
self.logger.info(f"网格宽度: {grid_width:.2f}米, 网格高度: {grid_height:.2f}")
# 计算需要划分的网格数量
self.num_grids_width = max(int(width / grid_width), 1)
self.num_grids_height = max(int(height / grid_height), 1)
# 计算每个网格对应的经纬度步长
lat_step = (max_lat - min_lat) / self.num_grids_height
lon_step = (max_lon - min_lon) / self.num_grids_width
grids = []
grid_translations = {} # 存储每个网格相对于第一个网格的平移量
# 先创建所有网格
for i in range(self.num_grids_height):
for j in range(self.num_grids_width):
grid_min_lat = min_lat + i * lat_step - self.overlap * lat_step
grid_max_lat = min_lat + (i + 1) * lat_step + self.overlap * lat_step
grid_min_lon = min_lon + j * lon_step - self.overlap * lon_step
grid_max_lon = min_lon + (j + 1) * lon_step + self.overlap * lon_step
grid_id = (i, j) # 使用(i,j)作为网格标识i代表行j代表列
grid_bounds = (grid_min_lat, grid_max_lat, grid_min_lon, grid_max_lon)
grids.append(grid_bounds)
self.logger.debug(
f"网格[{i},{j}]: 纬度[{grid_min_lat:.6f}, {grid_max_lat:.6f}], "
f"经度[{grid_min_lon:.6f}, {grid_max_lon:.6f}]"
)
# 计算每个网格相对于第一个网格的平移量
reference_grid = grids[0]
for i in range(self.num_grids_height):
for j in range(self.num_grids_width):
grid_id = (i, j)
grid_idx = i * self.num_grids_width + j
if grid_idx == 0: # 参考网格
grid_translations[grid_id] = (0, 0)
else:
translation = self.calculate_grid_translation(reference_grid, grids[grid_idx])
grid_translations[grid_id] = translation
self.logger.debug(
f"网格[{i},{j}]相对于参考网格的平移量: x={translation[0]:.2f}m, y={translation[1]:.2f}m"
)
self.logger.info(
f"成功划分为 {len(grids)} 个网格 ({self.num_grids_width}x{self.num_grids_height})")
return grids, grid_translations
def assign_to_grids(self, points_df, grids):
"""将点分配到对应网格"""
self.logger.info(f"开始将 {len(points_df)} 个点分配到网格中")
grid_points = {} # 使用字典存储每个网格的点
points_assigned = 0
multiple_grid_points = 0
for i in range(self.num_grids_height):
for j in range(self.num_grids_width):
grid_points[(i, j)] = [] # 使用(i,j)元组
for _, point in points_df.iterrows():
point_assigned = False
for i in range(self.num_grids_height):
for j in range(self.num_grids_width):
grid_idx = i * self.num_grids_width + j
min_lat, max_lat, min_lon, max_lon = grids[grid_idx]
if min_lat <= point['lat'] <= max_lat and min_lon <= point['lon'] <= max_lon:
grid_points[(i, j)].append(point.to_dict())
if point_assigned:
multiple_grid_points += 1
else:
points_assigned += 1
point_assigned = True
# 记录每个网格的点数
for grid_id, points in grid_points.items():
self.logger.info(f"网格 {grid_id} 包含 {len(points)} 个点")
self.logger.info(
f"点分配完成: 总点数 {len(points_df)}, "
f"成功分配 {points_assigned} 个点, "
f"{multiple_grid_points} 个点被分配到多个网格"
)
return grid_points, multiple_grid_points
def visualize_grids(self, points_df, grids):
"""可视化网格划分和GPS点的分布"""
self.logger.info("开始可视化网格划分")
plt.figure(figsize=(12, 8))
# 绘制GPS点
plt.scatter(points_df['lon'], points_df['lat'],
c='blue', s=10, alpha=0.6, label='GPS points')
# 绘制网格
for i in range(self.num_grids_height):
for j in range(self.num_grids_width):
grid_idx = i * self.num_grids_width + j
min_lat, max_lat, min_lon, max_lon = grids[grid_idx]
# 计算网格的实际长度和宽度(米)
width = geodesic((min_lat, min_lon), (min_lat, max_lon)).meters
height = geodesic((min_lat, min_lon), (max_lat, min_lon)).meters
plt.plot([min_lon, max_lon, max_lon, min_lon, min_lon],
[min_lat, min_lat, max_lat, max_lat, min_lat],
'r-', alpha=0.5)
# 在网格中心添加网格编号和尺寸信息
center_lon = (min_lon + max_lon) / 2
center_lat = (min_lat + max_lat) / 2
plt.text(center_lon, center_lat,
f"({i},{j})\n{width:.0f}m×{height:.0f}m", # 显示(i,j)和尺寸
horizontalalignment='center',
verticalalignment='center',
fontsize=8)
plt.title('Grid Division and GPS Point Distribution')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.legend()
plt.grid(True)
# 如果提供了输出目录,保存图像
if self.output_dir:
save_path = os.path.join(
self.output_dir, 'filter_imgs', 'grid_division.png')
plt.savefig(save_path, dpi=300, bbox_inches='tight')
self.logger.info(f"网格划分可视化图已保存至: {save_path}")
plt.close()
def get_grid_center(self, grid_bounds) -> tuple:
"""计算网格中心点的经纬度
Args:
grid_bounds: (min_lat, max_lat, min_lon, max_lon)
Returns:
(center_lat, center_lon)
"""
min_lat, max_lat, min_lon, max_lon = grid_bounds
return ((min_lat + max_lat) / 2, (min_lon + max_lon) / 2)
def calculate_grid_translation(self, reference_grid: tuple, target_grid: tuple) -> tuple:
"""计算目标网格相对于参考网格的平移距离(米)
Args:
reference_grid: 参考网格的边界 (min_lat, max_lat, min_lon, max_lon)
target_grid: 目标网格的边界 (min_lat, max_lat, min_lon, max_lon)
Returns:
(x_translation, y_translation): 在米制单位下的平移量
"""
ref_center = self.get_grid_center(reference_grid)
target_center = self.get_grid_center(target_grid)
# 计算经度方向的距离x轴
x_distance = geodesic(
(ref_center[0], ref_center[1]),
(ref_center[0], target_center[1])
).meters
# 如果目标在参考点西边,距离为负
if target_center[1] < ref_center[1]:
x_distance = -x_distance
# 计算纬度方向的距离y轴
y_distance = geodesic(
(ref_center[0], ref_center[1]),
(target_center[0], ref_center[1])
).meters
# 如果目标在参考点南边,距离为负
if target_center[0] < ref_center[0]:
y_distance = -y_distance
return (x_distance, y_distance)