UAV/utils/grid_divider.py
2025-01-02 20:11:47 +08:00

207 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from geopy.distance import geodesic
import matplotlib.pyplot as plt
import os
class GridDivider:
"""划分九宫格,并将图片分配到对应网格"""
def __init__(self, overlap=0.1, output_dir=None):
self.overlap = overlap
self.output_dir = output_dir
self.logger = logging.getLogger('UAV_Preprocess.GridDivider')
self.logger.info(f"初始化网格划分器,重叠率: {overlap}")
self.num_grids_width = 0 # 添加网格数量属性
self.num_grids_height = 0
def divide_grids(self, points_df, grid_size=500):
"""计算边界框并划分网格
Returns:
tuple: (grids, translations)
- grids: 网格边界列表
- translations: 网格平移量字典
"""
self.logger.info("开始划分网格")
min_lat, max_lat = points_df['lat'].min(), points_df['lat'].max()
min_lon, max_lon = points_df['lon'].min(), points_df['lon'].max()
# 计算区域的实际距离(米)
width = geodesic((min_lat, min_lon), (min_lat, max_lon)).meters
height = geodesic((min_lat, min_lon), (max_lat, min_lon)).meters
self.logger.info(f"区域宽度: {width:.2f}米, 高度: {height:.2f}")
# 计算需要划分的网格数量
self.num_grids_width = max(int(width / grid_size), 1)
self.num_grids_height = max(int(height / grid_size), 1)
# 计算每个网格对应的经纬度步长
lat_step = (max_lat - min_lat) / self.num_grids_height
lon_step = (max_lon - min_lon) / self.num_grids_width
grids = []
grid_translations = {} # 存储每个网格相对于第一个网格的平移量
# 先创建所有网格
for i in range(self.num_grids_height):
for j in range(self.num_grids_width):
grid_min_lat = min_lat + i * lat_step - self.overlap * lat_step
grid_max_lat = min_lat + (i + 1) * lat_step + self.overlap * lat_step
grid_min_lon = min_lon + j * lon_step - self.overlap * lon_step
grid_max_lon = min_lon + (j + 1) * lon_step + self.overlap * lon_step
grid_id = (i, j) # 使用(i,j)作为网格标识i代表行j代表列
grid_bounds = (grid_min_lat, grid_max_lat, grid_min_lon, grid_max_lon)
grids.append(grid_bounds)
self.logger.debug(
f"网格[{i},{j}]: 纬度[{grid_min_lat:.6f}, {grid_max_lat:.6f}], "
f"经度[{grid_min_lon:.6f}, {grid_max_lon:.6f}]"
)
# 计算每个网格相对于第一个网格的平移量
reference_grid = grids[0]
for i in range(self.num_grids_height):
for j in range(self.num_grids_width):
grid_id = (i, j)
grid_idx = i * self.num_grids_width + j
if grid_idx == 0: # 参考网格
grid_translations[grid_id] = (0, 0)
else:
translation = self.calculate_grid_translation(reference_grid, grids[grid_idx])
grid_translations[grid_id] = translation
self.logger.debug(
f"网格[{i},{j}]相对于参考网格的平移量: x={translation[0]:.2f}m, y={translation[1]:.2f}m"
)
self.logger.info(
f"成功划分为 {len(grids)} 个网格 ({self.num_grids_width}x{self.num_grids_height})")
# 添加可视化调用
self.visualize_grids(points_df, grids)
return grids, grid_translations
def assign_to_grids(self, points_df, grids):
"""将点分配到对应网格"""
self.logger.info(f"开始将 {len(points_df)} 个点分配到网格中")
grid_points = {} # 使用字典存储每个网格的点
points_assigned = 0
multiple_grid_points = 0
for i in range(self.num_grids_height):
for j in range(self.num_grids_width):
grid_points[(i, j)] = [] # 使用(i,j)元组
for _, point in points_df.iterrows():
point_assigned = False
for i in range(self.num_grids_height):
for j in range(self.num_grids_width):
grid_idx = i * self.num_grids_width + j
min_lat, max_lat, min_lon, max_lon = grids[grid_idx]
if min_lat <= point['lat'] <= max_lat and min_lon <= point['lon'] <= max_lon:
grid_points[(i, j)].append(point.to_dict())
if point_assigned:
multiple_grid_points += 1
else:
points_assigned += 1
point_assigned = True
# 记录每个网格的点数
for grid_id, points in grid_points.items():
self.logger.info(f"网格 {grid_id} 包含 {len(points)} 个点")
self.logger.info(
f"点分配完成: 总点数 {len(points_df)}, "
f"成功分配 {points_assigned} 个点, "
f"{multiple_grid_points} 个点被分配到多个网格"
)
return grid_points
def visualize_grids(self, points_df, grids):
"""可视化网格划分和GPS点的分布"""
self.logger.info("开始可视化网格划分")
plt.figure(figsize=(12, 8))
# 绘制GPS点
plt.scatter(points_df['lon'], points_df['lat'],
c='blue', s=10, alpha=0.6, label='GPS点')
# 绘制网格
for i in range(self.num_grids_height):
for j in range(self.num_grids_width):
grid_idx = i * self.num_grids_width + j
min_lat, max_lat, min_lon, max_lon = grids[grid_idx]
plt.plot([min_lon, max_lon, max_lon, min_lon, min_lon],
[min_lat, min_lat, max_lat, max_lat, min_lat],
'r-', alpha=0.5)
# 在网格中心添加网格编号
center_lon = (min_lon + max_lon) / 2
center_lat = (min_lat + max_lat) / 2
plt.text(center_lon, center_lat, f"({i},{j})", # 显示(i,j)
horizontalalignment='center', verticalalignment='center')
plt.title('Grid Division and GPS Point Distribution')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.legend()
plt.grid(True)
# 如果提供了输出目录,保存图像
if self.output_dir:
save_path = os.path.join(
self.output_dir, 'filter_imgs', 'grid_division.png')
plt.savefig(save_path, dpi=300, bbox_inches='tight')
self.logger.info(f"网格划分可视化图已保存至: {save_path}")
plt.close()
def get_grid_center(self, grid_bounds) -> tuple:
"""计算网格中心点的经纬度
Args:
grid_bounds: (min_lat, max_lat, min_lon, max_lon)
Returns:
(center_lat, center_lon)
"""
min_lat, max_lat, min_lon, max_lon = grid_bounds
return ((min_lat + max_lat) / 2, (min_lon + max_lon) / 2)
def calculate_grid_translation(self, reference_grid: tuple, target_grid: tuple) -> tuple:
"""计算目标网格相对于参考网格的平移距离(米)
Args:
reference_grid: 参考网格的边界 (min_lat, max_lat, min_lon, max_lon)
target_grid: 目标网格的边界 (min_lat, max_lat, min_lon, max_lon)
Returns:
(x_translation, y_translation): 在米制单位下的平移量
"""
ref_center = self.get_grid_center(reference_grid)
target_center = self.get_grid_center(target_grid)
# 计算经度方向的距离x轴
x_distance = geodesic(
(ref_center[0], ref_center[1]),
(ref_center[0], target_center[1])
).meters
# 如果目标在参考点西边,距离为负
if target_center[1] < ref_center[1]:
x_distance = -x_distance
# 计算纬度方向的距离y轴
y_distance = geodesic(
(ref_center[0], ref_center[1]),
(target_center[0], ref_center[1])
).meters
# 如果目标在参考点南边,距离为负
if target_center[0] < ref_center[0]:
y_distance = -y_distance
return (x_distance, y_distance)