import logging from geopy.distance import geodesic import matplotlib.pyplot as plt import os class GridDivider: """划分网格,并将图片分配到对应网格""" def __init__(self, overlap=0.1, grid_size=500, output_dir=None): self.overlap = overlap self.grid_size = grid_size self.output_dir = output_dir self.logger = logging.getLogger('UAV_Preprocess.GridDivider') self.logger.info(f"初始化网格划分器,重叠率: {overlap}") self.num_grids_width = 0 # 添加网格数量属性 self.num_grids_height = 0 def adjust_grid_size(self, points_df): """动态调整网格大小 Args: points_df: 包含GPS点的DataFrame Returns: tuple: (grids, translations, grid_points, final_grid_size) """ self.logger.info(f"开始动态调整网格大小,初始大小: {self.grid_size}米") while True: # 使用当前grid_size划分网格 grids, translations = self.divide_grids(points_df) grid_points, multiple_grid_points = self.assign_to_grids(points_df, grids) # 检查每个网格中的点数 max_points = 0 for grid_id, points in grid_points.items(): max_points = max(max_points, len(points)) self.logger.info(f"当前网格大小: {self.grid_size}米, 单个网格最大点数: {max_points}") # 如果最大点数超过1500,减小网格大小 if max_points > 1500: self.grid_size -= 100 self.logger.info(f"点数超过1500,减小网格大小至: {self.grid_size}米") if self.grid_size < 500: # 设置一个最小网格大小限制 self.logger.warning("网格大小已达到最小值500米,停止调整") break else: self.logger.info(f"找到合适的网格大小: {self.grid_size}米") break return grids def adjust_grid_size_and_overlap(self, points_df): """动态调整网格重叠率""" grids = self.adjust_grid_size(points_df) self.logger.info(f"开始动态调整网格重叠率,初始重叠率: {self.overlap}") while True: # 使用调整好的网格大小划分网格 grids, translations = self.divide_grids(points_df) grid_points, multiple_grid_points = self.assign_to_grids(points_df, grids) if len(grids) == 1: self.logger.info(f"网格数量为1,跳过重叠率调整") break elif multiple_grid_points < 0.1*len(points_df): self.overlap += 0.02 self.logger.info(f"重叠率增加到: {self.overlap}") else: self.logger.info(f"找到合适的重叠率: {self.overlap}, 有{multiple_grid_points}个点被分配到多个网格") break return grids, translations, grid_points def divide_grids(self, points_df): """计算边界框并划分网格 Returns: tuple: (grids, translations) - grids: 网格边界列表 - translations: 网格平移量字典 """ self.logger.info("开始划分网格") min_lat, max_lat = points_df['lat'].min(), points_df['lat'].max() min_lon, max_lon = points_df['lon'].min(), points_df['lon'].max() # 计算区域的实际距离(米) width = geodesic((min_lat, min_lon), (min_lat, max_lon)).meters height = geodesic((min_lat, min_lon), (max_lat, min_lon)).meters self.logger.info(f"区域宽度: {width:.2f}米, 高度: {height:.2f}米") # 精细调整网格的长宽,避免出现2*grid_size-1的情况的影响 grid_size_lt = [self.grid_size -200, self.grid_size -100, self.grid_size , self.grid_size +100, self.grid_size +200] width_modulus_lt = [width % grid_size for grid_size in grid_size_lt] grid_width = grid_size_lt[width_modulus_lt.index(min(width_modulus_lt))] height_modulus_lt = [height % grid_size for grid_size in grid_size_lt] grid_height = grid_size_lt[height_modulus_lt.index(min(height_modulus_lt))] self.logger.info(f"网格宽度: {grid_width:.2f}米, 网格高度: {grid_height:.2f}米") # 计算需要划分的网格数量 self.num_grids_width = max(int(width / grid_width), 1) self.num_grids_height = max(int(height / grid_height), 1) # 计算每个网格对应的经纬度步长 lat_step = (max_lat - min_lat) / self.num_grids_height lon_step = (max_lon - min_lon) / self.num_grids_width grids = [] grid_translations = {} # 存储每个网格相对于第一个网格的平移量 # 先创建所有网格 for i in range(self.num_grids_height): for j in range(self.num_grids_width): grid_min_lat = min_lat + i * lat_step - self.overlap * lat_step grid_max_lat = min_lat + (i + 1) * lat_step + self.overlap * lat_step grid_min_lon = min_lon + j * lon_step - self.overlap * lon_step grid_max_lon = min_lon + (j + 1) * lon_step + self.overlap * lon_step grid_id = (i, j) # 使用(i,j)作为网格标识,i代表行,j代表列 grid_bounds = (grid_min_lat, grid_max_lat, grid_min_lon, grid_max_lon) grids.append(grid_bounds) self.logger.debug( f"网格[{i},{j}]: 纬度[{grid_min_lat:.6f}, {grid_max_lat:.6f}], " f"经度[{grid_min_lon:.6f}, {grid_max_lon:.6f}]" ) # 计算每个网格相对于第一个网格的平移量 reference_grid = grids[0] for i in range(self.num_grids_height): for j in range(self.num_grids_width): grid_id = (i, j) grid_idx = i * self.num_grids_width + j if grid_idx == 0: # 参考网格 grid_translations[grid_id] = (0, 0) else: translation = self.calculate_grid_translation(reference_grid, grids[grid_idx]) grid_translations[grid_id] = translation self.logger.debug( f"网格[{i},{j}]相对于参考网格的平移量: x={translation[0]:.2f}m, y={translation[1]:.2f}m" ) self.logger.info( f"成功划分为 {len(grids)} 个网格 ({self.num_grids_width}x{self.num_grids_height})") return grids, grid_translations def assign_to_grids(self, points_df, grids): """将点分配到对应网格""" self.logger.info(f"开始将 {len(points_df)} 个点分配到网格中") grid_points = {} # 使用字典存储每个网格的点 points_assigned = 0 multiple_grid_points = 0 for i in range(self.num_grids_height): for j in range(self.num_grids_width): grid_points[(i, j)] = [] # 使用(i,j)元组 for _, point in points_df.iterrows(): point_assigned = False for i in range(self.num_grids_height): for j in range(self.num_grids_width): grid_idx = i * self.num_grids_width + j min_lat, max_lat, min_lon, max_lon = grids[grid_idx] if min_lat <= point['lat'] <= max_lat and min_lon <= point['lon'] <= max_lon: grid_points[(i, j)].append(point.to_dict()) if point_assigned: multiple_grid_points += 1 else: points_assigned += 1 point_assigned = True # 记录每个网格的点数 for grid_id, points in grid_points.items(): self.logger.info(f"网格 {grid_id} 包含 {len(points)} 个点") self.logger.info( f"点分配完成: 总点数 {len(points_df)}, " f"成功分配 {points_assigned} 个点, " f"{multiple_grid_points} 个点被分配到多个网格" ) return grid_points, multiple_grid_points def visualize_grids(self, points_df, grids): """可视化网格划分和GPS点的分布""" self.logger.info("开始可视化网格划分") plt.figure(figsize=(12, 8)) # 绘制GPS点 plt.scatter(points_df['lon'], points_df['lat'], c='blue', s=10, alpha=0.6, label='GPS points') # 绘制网格 for i in range(self.num_grids_height): for j in range(self.num_grids_width): grid_idx = i * self.num_grids_width + j min_lat, max_lat, min_lon, max_lon = grids[grid_idx] # 计算网格的实际长度和宽度(米) width = geodesic((min_lat, min_lon), (min_lat, max_lon)).meters height = geodesic((min_lat, min_lon), (max_lat, min_lon)).meters plt.plot([min_lon, max_lon, max_lon, min_lon, min_lon], [min_lat, min_lat, max_lat, max_lat, min_lat], 'r-', alpha=0.5) # 在网格中心添加网格编号和尺寸信息 center_lon = (min_lon + max_lon) / 2 center_lat = (min_lat + max_lat) / 2 plt.text(center_lon, center_lat, f"({i},{j})\n{width:.0f}m×{height:.0f}m", # 显示(i,j)和尺寸 horizontalalignment='center', verticalalignment='center', fontsize=8) plt.title('Grid Division and GPS Point Distribution') plt.xlabel('Longitude') plt.ylabel('Latitude') plt.legend() plt.grid(True) # 如果提供了输出目录,保存图像 if self.output_dir: save_path = os.path.join( self.output_dir, 'filter_imgs', 'grid_division.png') plt.savefig(save_path, dpi=300, bbox_inches='tight') self.logger.info(f"网格划分可视化图已保存至: {save_path}") plt.close() def get_grid_center(self, grid_bounds) -> tuple: """计算网格中心点的经纬度 Args: grid_bounds: (min_lat, max_lat, min_lon, max_lon) Returns: (center_lat, center_lon) """ min_lat, max_lat, min_lon, max_lon = grid_bounds return ((min_lat + max_lat) / 2, (min_lon + max_lon) / 2) def calculate_grid_translation(self, reference_grid: tuple, target_grid: tuple) -> tuple: """计算目标网格相对于参考网格的平移距离(米) Args: reference_grid: 参考网格的边界 (min_lat, max_lat, min_lon, max_lon) target_grid: 目标网格的边界 (min_lat, max_lat, min_lon, max_lon) Returns: (x_translation, y_translation): 在米制单位下的平移量 """ ref_center = self.get_grid_center(reference_grid) target_center = self.get_grid_center(target_grid) # 计算经度方向的距离(x轴) x_distance = geodesic( (ref_center[0], ref_center[1]), (ref_center[0], target_center[1]) ).meters # 如果目标在参考点西边,距离为负 if target_center[1] < ref_center[1]: x_distance = -x_distance # 计算纬度方向的距离(y轴) y_distance = geodesic( (ref_center[0], ref_center[1]), (target_center[0], ref_center[1]) ).meters # 如果目标在参考点南边,距离为负 if target_center[0] < ref_center[0]: y_distance = -y_distance return (x_distance, y_distance)