本项目旨在构建一个地理空间感知系统,帮助智能体(如自动驾驶车辆、机器人、无人机等)感知和理解其周围的地理环境。该系统支持从多种数据源(包括手动输入、GeoJSON 文件、Shapefile 文件)加载地理数据,并提供路径规划、可视化、感知等功能。
| 组件 | 技术选型 | 版本要求 |
|---|---|---|
| 核心框架 | Python 3.10+ | ≥3.10.6 |
| 地理处理 | GeoPandas 0.12+ | ≥0.12.2 |
| 空间索引 | Rtree 0.9+ | ≥0.9.7 |
| 路径规划 | NetworkX 3.0+ | ≥3.0 |
| 可视化 | Folium/Matplotlib | 可选 |
| 并发处理 | asyncio/Ray | 可选 |
class ResearchArea:
def __init__(self, name: str, crs: str = "EPSG:4326"):
self.name = name
self.crs = crs # 强制统一坐标系
self.road_network = RoadNetwork()
self.poi_collection = POICollection()
self.agent_manager = AgentManager()
self.sim_engine = SimulationEngine()
self.graph_editor = GraphEditor()
def load_data(self, config: dict):
"""支持多数据源加载
config示例:
{
"roads": {
"type": "shp",
"path": "data/roads.shp",
"topo_check": True
},
"pois": {
"type": "geojson",
"path": "data/pois.geojson"
}
}
"""
class RoadNode(TypedDict):
node_id: str # "lat_lon_uuid"
lon: float
lat: float
elevation: Optional[float]
connectivity: int # 连接边数
class RoadEdge(TypedDict):
edge_id: str
start_node: str
end_node: str
length: float # 米
geometry: LineString
attrs: dict # 道路等级/车道数等
class HybridIndex:
def __init__(self):
self.rtree = index.Index() # 精确索引
self.grid = SpatialGrid(100) # 100m网格粗索引
def query(self, bbox: tuple, zoom_level: int):
"""混合查询策略:
- zoom_level > 12: 使用RTree精确查询
- zoom_level ≤ 12: 使用网格快速过滤
"""
class MovementStrategy(ABC):
@abstractmethod
def calculate_move(self, agent: Agent, dt: float) -> MovementCommand:
pass
class NetworkMovement(MovementStrategy):
def calculate_move(self, agent, dt):
"""沿路网移动逻辑"""
next_node = agent.current_edge.end_node
return MoveToNodeCommand(next_node)
class FreeMovement(MovementStrategy):
def calculate_move(self, agent, dt):
"""自由空间移动"""
return MoveByVectorCommand(agent.velocity * dt)
def find_nearest_edge(point: Point) -> Tuple[Edge, float]:
"""混合查询算法流程:
1. 使用网格索引快速筛选候选边(10-50条)
2. 使用RTree进行精确距离计算
3. 应用Shapely的nearest_points优化
4. 返回最近边及投影点
时间复杂度:O(logN + M) M为候选边数
"""
class RoutePlanner:
def __init__(self, network: RoadNetwork):
self.ch = ContractionHierarchies(network) # 预计算层次
def shortest_path(self, start: Point, end: Point) -> Route:
"""使用CH加速查询:
1. 将起点/终点投影到路网
2. 在高层级图上快速路由
3. 逐步细化到底层路径
比传统A*快5-10倍
"""
| 端点 | 方法 | 描述 |
|---|---|---|
/v1/spatial/query | POST | 空间查询接口 |
/v1/routing/calculate | GET | 路径规划接口 |
/v1/agents/{id}/position | GET | 获取Agent实时位置 |
/v1/simulation/step | PUT | 推进模拟时间步长 |
路径规划请求:
{
"origin": {
"lon": 121.4737,
"lat": 31.2304,
"node_id": "optional"
},
"destination": {
"lon": 121.4788,
"lat": 31.2355
},
"profile": {
"type": "car",
"options": {
"avoid_tolls": true,
"max_speed": 100
}
}
}
成功响应:
{
"status": "success",
"data": {
"path": [
[121.4737, 31.2304],
[121.4740, 31.2306],
...
],
"cost": 125.6,
"edges": ["E4512", "E4513"],
"geometry": "LINESTRING(...)"
},
"metadata": {
"processing_time": 23.5
}
}
class ResearchAreaData:
road_network: nx.MultiDiGraph
poi_kd_tree: KDTree # 快速POI查询
agent_states: Dict[str, AgentState]
class AgentState(NamedTuple):
position: Tuple[float, float]
velocity: float
current_edge: Optional[str]
path: Optional[List[Tuple]]
def save_snapshot(path: str, format: str = "parquet"):
"""支持保存为:
- Parquet(推荐,压缩率高)
- GeoJSON(可读性好)
- Protocol Buffers(高性能)
"""
def load_snapshot(path: str):
"""自动检测格式并加载"""
| 场景 | 目标性能 | 实现手段 |
|---|---|---|
| 10k节点路网加载 | <1.5s | 并行化Shp解析 |
| 100 Agent并发移动 | <50ms/step | 向量化位置更新 |
| 1km²区域POI查询 | <10ms | 分层空间索引 |
| 10km路径规划 | <100ms | 预计算Contraction Hierarchy |
# 使用Numba加速距离计算
@njit
def calculate_distances(points: np.ndarray) -> np.ndarray:
"""计算点阵间距离矩阵"""
n = points.shape[0]
dists = np.zeros((n, n))
for i in range(n):
for j in range(n):
dists[i,j] = haversine(points[i], points[j])
return dists
# plugins/template.py
class GISensePlugin:
def __init__(self, research_area: ResearchArea):
self.area = research_area
def on_load(self):
"""插件加载时调用"""
def on_sim_step(self, dt: float):
"""模拟步进时调用"""
def register(research_area: ResearchArea) -> GISensePlugin:
return MyPlugin(research_area)
class TestRoadNetwork(unittest.TestCase):
def test_node_connectivity(self):
"""测试节点连接性"""
self.assertEqual(network.get_node_degree('n123'), 3)
def test_path_continuity(self):
"""测试路径连续性"""
path = network.shortest_path('A', 'B')
self.assertTrue(check_path_connected(path))
# 使用Locust模拟并发
$ locust -f tests/load_test.py --users 100 --spawn-rate 10
FROM python:3.10-slim RUN pip install gisense geopandas==0.12.2 # 启用JIT加速 ENV NUMBA_DISABLE_JIT=0 EXPOSE 8000 CMD ["gunicorn", "gisense.api:app", "-b :8000"]
| 版本 | 重点功能 | 预计周期 |
|---|---|---|
| v1.0 | 基础路网建模+Agent移动 | 6周 |
| v1.5 | 高性能路径规划+空间索引 | 4周 |
| v2.0 | 分布式模拟+三维扩展 | 8周 |
以下是 GISense 基础实现的详细伪代码,包含关键设计细节和可扩展的接口定义:
import uuid
from typing import NamedTuple
class GeoCoordinate(NamedTuple):
lon: float # 经度 [-180, 180]
lat: float # 纬度 [-90, 90]
alt: float = 0.0 # 高程(可选)
class Node:
def __init__(self, coord: GeoCoordinate, osm_id: str = None):
self.coord = coord
self.osm_id = osm_id # 保留原始ID
self.uid = self._generate_uid(coord)
@staticmethod
def _generate_uid(coord: GeoCoordinate) -> str:
"""生成唯一节点ID:base36(经纬度哈希)_随机后缀"""
lat_lon_hash = hash(f"{coord.lat:.6f}_{coord.lon:.6f}")
return (
base36_encode(abs(lat_lon_hash))[:6]
+ "_"
+ str(uuid.uuid4())[:4]
)
@property
def as_shapely(self) -> Point:
return Point(self.coord.lon, self.coord.lat)
class RoadEdge:
def __init__(self,
edge_id: str,
start_node: Node,
end_node: Node,
geometry: LineString,
attrs: dict = None):
self.edge_id = edge_id
self.nodes = (start_node, end_node)
self.geometry = geometry # Shapely LineString
self.length = geometry.length # 单位:米
self.attrs = attrs or {}
# 动态计算字段
self._bbox = geometry.bounds # 缓存边界框
def split(self, ratio: float) -> Tuple['RoadEdge', 'RoadEdge']:
"""在比例位置分割边(0.3表示30%位置)"""
split_point = self.geometry.interpolate(ratio, normalized=True)
new_node = Node(GeoCoordinate(*split_point.coords[0]))
# 创建两条新边
edge1 = RoadEdge(
edge_id=f"{self.edge_id}_a",
start_node=self.nodes[0],
end_node=new_node,
geometry=LineString(list(self.geometry.coords)[:int(ratio*100)])
edge2 = RoadEdge(
edge_id=f"{self.edge_id}_b",
start_node=new_node,
end_node=self.nodes[1],
geometry=LineString(list(self.geometry.coords)[int(ratio*100):]))
return edge1, edge2
class HybridIndex:
def __init__(self, grid_size: float = 100.0):
self.rtree = index.Index()
self.grid = {}
self.grid_size = grid_size
def _get_grid_key(self, point: Point) -> Tuple[int, int]:
"""计算点所属网格坐标"""
return (
int(point.x // self.grid_size),
int(point.y // self.grid_size)
)
def insert(self, obj: Union[Node, RoadEdge]):
"""插入对象到索引"""
# RTree插入
self.rtree.insert(id(obj), obj.bbox)
# 网格索引插入
if isinstance(obj, Node):
grid_key = self._get_grid_key(obj.as_shapely)
self.grid.setdefault(grid_key, []).append(obj)
else: # RoadEdge
for segment in self._split_edge_to_grid(obj):
grid_key = self._get_grid_key(segment.centroid)
self.grid.setdefault(grid_key, []).append(obj)
def query_radius(self, center: Point, radius: float) -> List:
"""圆形区域查询优化流程"""
# 第一步:网格快速过滤
affected_grids = self._get_grids_in_radius(center, radius)
candidates = []
for grid in affected_grids:
candidates.extend(self.grid.get(grid, []))
# 第二步:RTree精确筛选
precise_matches = [
obj for obj in candidates
if self.rtree.intersection(obj.bbox)
and center.distance(obj.geometry) <= radius
]
return precise_matches
class LinearReference:
@staticmethod
def encode_position(edge: RoadEdge, point: Point) -> dict:
"""将点编码为线性参考位置"""
projected = edge.geometry.project(point)
return {
"edge_id": edge.edge_id,
"offset": projected, # 距起点的米数
"ratio": projected / edge.length,
"node_a": edge.nodes[0].uid,
"node_b": edge.nodes[1].uid
}
@staticmethod
def decode_position(ref: dict) -> Point:
"""从线性参考解码为坐标"""
edge = get_edge_by_id(ref["edge_id"])
return edge.geometry.interpolate(ref["offset"])
class MovementStrategy(ABC):
@abstractmethod
def get_next_position(self,
agent: 'Agent',
current_pos: GeoCoordinate,
dt: float) -> GeoCoordinate:
pass
class NetworkStrategy(MovementStrategy):
def get_next_position(self, agent, current_pos, dt):
"""沿路网移动策略"""
if not agent.current_path:
agent.current_path = self._find_path(agent.destination)
next_node = agent.current_path.pop(0)
move_vector = self._calculate_move_vector(
current_pos,
next_node.coord,
agent.speed * dt
)
return current_pos + move_vector
class FreeMoveStrategy(MovementStrategy):
def get_next_position(self, agent, current_pos, dt):
"""自由空间移动策略"""
return GeoCoordinate(
current_pos.lon + agent.velocity.lon * dt,
current_pos.lat + agent.velocity.lat * dt
)
class GraphEncoder:
@staticmethod
def to_geojson(nodes: List[Node], edges: List[RoadEdge]) -> dict:
"""转换为GeoJSON格式"""
return {
"type": "FeatureCollection",
"features": [
*[{
"type": "Feature",
"geometry": node.as_shapely.__geo_interface__,
"properties": {"uid": node.uid}
} for node in nodes],
*[{
"type": "Feature",
"geometry": edge.geometry.__geo_interface__,
"properties": {
"edge_id": edge.edge_id,
"length": edge.length,
**edge.attrs
}
} for edge in edges]
]
}
@staticmethod
def from_geojson(data: dict) -> Tuple[List[Node], List[RoadEdge]]:
"""从GeoJSON解析"""
# 实现逆解析逻辑
class GISenseError(Exception):
"""基础异常类"""
class TopologyError(GISenseError):
"""拓扑关系异常"""
def __init__(self, edge1: str, edge2: str):
super().__init__(f"Edge {edge1} and {edge2} are not connected")
class SpatialQueryError(GISenseError):
"""空间查询异常"""
def __init__(self, point: GeoCoordinate, radius: float):
super().__init__(
f"No results found around {point} within {radius}m")
节点ID生成:
base36(经纬度哈希)_4位UUID的混合方案k5g9f3_1a2b边分割策略:
E123_a和E123_b)混合索引优化:
线性参考系统:
offset和ratio两种表示法策略模式应用:
Node和RoadEdge基础类HybridIndex并测试空间查询LinearReference位置编码MovementStrategy及其子类