上一篇: 第10篇:配送路线优化VRP实战
一、写在前面
配送路线优化完,下一步是:取件路线应如何统筹规划?
配送和取件看起来差不多,都是跑点,但仔细想想,取件难多了。
为啥?配送你可以灵活安排时间(只要当天送到就行),但取件不行——客户预约了9:00-11:00,你11:10到,客户刚好出门了,白跑一趟。
这就是带时间窗口约束的车辆路径问题(VRPTW)。LaDe数据集里正好有取件数据,我就拿来试试看,能不能优化出更好的路线。
二、取件 vs 配送
2.1、配送路线
- 目标:走最短的路,把所有包裹送完
- 约束:当天送完就行(时间窗很宽松)
- 难度:⭐⭐⭐
2.2、取件路线
- 目标:走最短的路,在规定时间窗内取完所有包裹
- 约束:
- 每个点都有时间窗(通常2小时)
- 早到了要等,迟到了客户可能不在
- 有些时间窗会冲突(物理上无法同时满足)
- 难度:⭐⭐⭐⭐⭐
三、数据准备
3.1、LaDe-P数据集
LaDe-P(Pickup)是LaDe数据集的取件部分,包含:
- 5个城市(上海、杭州、重庆、吉林、烟台)
- 约68万取件订单(比配送少,毕竟配送是主要业务)
- 每条记录包含:
- 取件位置(GPS坐标:lng/lat)
- 时间窗口:time_window_start 和 time_window_end(客户预约的时间段)
- 实际取件时间:pickup_time(快递员实际到达取件的时间)
- 快递员ID
- 包裹ID
3.2、加载数据
选择上海快递员3243,2022-06-08的取件数据:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# 读取上海取件数据
df_pickup = pd.read_csv('data/raw/pickup/pickup_sh.csv')
# 选择快递员3243,2022-06-08
courier_id = 3243
test_date = '0608'
pickup_points = df_pickup[
(df_pickup['courier_id'] == courier_id) &
(df_pickup['ds'] == test_date)
].copy()
# 过滤GPS缺失的数据
pickup_points = pickup_points[
(pickup_points['pickup_gps_lng'].notna()) &
(pickup_points['pickup_gps_lat'].notna())
].copy()
# 按实际取件时间排序(这是原始路线)
pickup_points = pickup_points.sort_values('pickup_time').reset_index(drop=True)
print(f"快递员{courier_id}在2022-06-08的取件:")
print(f"总订单数: {len(pickup_points)}")
运行结果:
快递员3243在2022-06-08的取件:
总订单数: 14
14个取件点,看看时间窗分布:
# 解析时间窗
pickup_points['tw_start'] = pd.to_datetime(
'2022-06-08 ' + pickup_points['time_window_start'].astype(str).str.zfill(4).str[:2] + ':' +
pickup_points['time_window_start'].astype(str).str.zfill(4).str[2:]
)
pickup_points['tw_end'] = pd.to_datetime(
'2022-06-08 ' + pickup_points['time_window_end'].astype(str).str.zfill(4).str[:2] + ':' +
pickup_points['time_window_end'].astype(str).str.zfill(4).str[2:]
)
print("\n时间窗分布:")
for i, row in pickup_points.iterrows():
print(f" 点{i+1}: {row['tw_start'].strftime('%H:%M')} - {row['tw_end'].strftime('%H:%M')}")
运行结果:
时间窗分布:
点1: 09:00 - 11:00
点2: 09:00 - 11:00
点3: 10:00 - 12:00
点4: 10:00 - 12:00
点5: 11:00 - 13:00
点6: 11:00 - 13:00
点7: 13:00 - 15:00
点8: 13:00 - 15:00
点9: 14:00 - 16:00
点10: 14:00 - 16:00
点11: 15:00 - 17:00
点12: 15:00 - 17:00
点13: 16:00 - 18:00
点14: 16:00 - 18:00
时间窗从早上9点到晚上6点,每个窗口2小时。
3.3、计算距离矩阵
和配送一样,用Haversine公式计算真实地理距离:
def haversine_distance(lon1, lat1, lon2, lat2):
"""计算两个GPS坐标之间的距离(单位:公里)"""
from math import radians, cos, sin, asin, sqrt
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6371 # 地球半径(公里)
return c * r
def compute_distance_matrix(locations):
n = len(locations)
dist_matrix = np.zeros((n, n))
for i in range(n):
for j in range(n):
if i != j:
dist_matrix[i][j] = haversine_distance(
locations[i][0], locations[i][1],
locations[j][0], locations[j][1]
)
return dist_matrix
locations = pickup_points[['pickup_gps_lng', 'pickup_gps_lat']].values
distance_matrix = compute_distance_matrix(locations)
distance_matrix_int = (distance_matrix * 1000).astype(int) # 转为米
print(f"\n距离矩阵: {len(distance_matrix_int)}×{len(distance_matrix_int)}")
四、用OR-Tools求解VRPTW
4.1、核心代码
VRPTW比TSP多了时间窗约束:
from ortools.constraint_solver import routing_enums_pb2
from ortools.constraint_solver import pywrapcp
def solve_vrptw(distance_matrix, time_windows, service_time=5):
"""
求解VRPTW问题
参数:
- distance_matrix: 距离矩阵(米)
- time_windows: 时间窗列表 [(start_min, end_min), ...]
- service_time: 每个点的服务时间(分钟)
"""
# 创建路径规划管理器
manager = pywrapcp.RoutingIndexManager(
len(distance_matrix),
1, # 1个快递员
0 # 起点
)
routing = pywrapcp.RoutingModel(manager)
# 距离回调
def distance_callback(from_index, to_index):
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return distance_matrix[from_node][to_node]
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
# 时间回调(距离转时间,假设速度20km/h)
def time_callback(from_index, to_index):
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
travel_time = distance_matrix[from_node][to_node] / 1000 * 3 # 20km/h = 3分钟/km
return int(travel_time + service_time)
time_callback_index = routing.RegisterTransitCallback(time_callback)
# 添加时间维度
time = 'Time'
routing.AddDimension(
time_callback_index,
30, # 允许等待时间(分钟)
600, # 最大时间(分钟,10小时)
False, # 不强制从0开始
time
)
time_dimension = routing.GetDimensionOrDie(time)
# 设置时间窗约束
for location_idx, time_window in enumerate(time_windows):
if location_idx == 0:
continue # 起点不设时间窗
index = manager.NodeToIndex(location_idx)
time_dimension.CumulVar(index).SetRange(time_window[0], time_window[1])
# 搜索参数
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
)
search_parameters.local_search_metaheuristic = (
routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
)
search_parameters.time_limit.seconds = 30
solution = routing.SolveWithParameters(search_parameters)
return manager, routing, solution, time_dimension
# 准备时间窗数据(转为分钟)
time_windows = []
start_time = datetime(2022, 6, 8, 8, 0) # 早上8点开始
for i, row in pickup_points.iterrows():
tw_start_min = int((row['tw_start'] - start_time).total_seconds() / 60)
tw_end_min = int((row['tw_end'] - start_time).total_seconds() / 60)
time_windows.append((tw_start_min, tw_end_min))
# 起点时间窗(8:00-8:00)
time_windows[0] = (0, 0)
print("求解VRPTW...")
import time
start = time.time()
manager, routing, solution, time_dimension = solve_vrptw(
distance_matrix_int,
time_windows,
service_time=5
)
solve_time = time.time() - start
if solution:
print(f"找到解!(用时{solve_time:.3f}秒)")
else:
print("未找到可行解")
4.2、结果分析
if solution:
# 提取优化路线
route = []
index = routing.Start(0)
while not routing.IsEnd(index):
node = manager.IndexToNode(index)
route.append(node)
index = solution.Value(routing.NextVar(index))
route.append(manager.IndexToNode(index))
# 计算总距离
total_distance = 0
for i in range(len(route) - 1):
total_distance += distance_matrix[route[i]][route[i+1]]
print(f"\n优化路线总距离: {total_distance:.2f} km")
# 对比原始路线
naive_route = list(range(len(distance_matrix)))
naive_distance = 0
for i in range(len(naive_route) - 1):
naive_distance += distance_matrix[naive_route[i]][naive_route[i+1]]
print(f"原始路线总距离: {naive_distance:.2f} km")
print(f"节省距离: {(naive_distance - total_distance) / naive_distance * 100:.1f}%")
# 检查时间窗违反情况
violations = 0
for i, node in enumerate(route[1:-1], 1):
time_var = time_dimension.CumulVar(manager.NodeToIndex(node))
arrival_time = solution.Value(time_var)
tw_start, tw_end = time_windows[node]
if arrival_time < tw_start or arrival_time > tw_end:
violations += 1
print(f"时间窗违反: {violations}个")
运行结果:
求解VRPTW...
找到解!(用时28.342秒)
优化路线总距离: 4.24 km
原始路线总距离: 6.43 km
节省距离: 34.1%
时间窗违反: 0个
优化效果:
- 距离节省34.1%
- 所有时间窗都满足
- 求解时间30秒
4.3、Web界面展示
除了脚本,我还做了一个Web界面,可以直观地查看优化效果。
4.3.1、界面功能
核心展示:
- 📊 优化前后数据对比
- 🗺️ 地图可视化路线
- ⏰ 时间窗违反检测
- 📈 准时率统计
4.3.2、实际效果
关键指标对比:
| 指标 | 原始路线 | 优化后路线 | 改善 |
|---|---|---|---|
| 总距离 | 6.43 km | 4.24 km | ↓ 34.1% |
| 准时率 | 21.4% | 100% | ↑ 78.6% |
| 时间窗违反 | 11个 | 0个 | ✅ 全部满足 |
优化亮点:
- 距离节省超过1/3,直接降低油耗成本
- 准时率从21.4%提升到100%,客户体验大幅改善
- 所有时间窗约束都满足,不会出现客户等待或白跑情况
五、踩过的坑
5.1、坑1:时间窗设置不合理
最开始我把时间窗设得太紧,导致无解。
错误做法:
# 每个点只有30分钟时间窗
time_windows.append((tw_start_min, tw_start_min + 30))
问题:14个点,每个30分钟,物理上不可能都满足。
解决:用真实的时间窗(2小时),给优化算法足够的灵活空间。
5.2、坑2:速度估计不准
最开始我假设速度30km/h,结果算出来的路线时间窗全违反。
原因:市区配送不是高速公路,要考虑红绿灯、拥堵、停车。实际速度可能只有15-20km/h。
解决:
# 保守估计速度20km/h
travel_time = distance_matrix[from_node][to_node] / 1000 * 3 # 3分钟/km
5.3、坑3:服务时间被忽略
最开始我没加服务时间,导致时间窗计算不准。
每个点要停车、找客户、交接包裹,至少5分钟。
解决:
return int(travel_time + service_time) # 加上服务时间
六、实战建议
6.1、时间窗冲突怎么办?
如果时间窗物理上无法满足(比如14个点都要求9-10点),有几个办法:
- 软约束:允许违反时间窗,但加惩罚
- 优先级:优先满足重要客户(大客户、VIP)
- 协商:违反的点提前联系客户协商
6.2、多车场景
如果有多个快递员(多辆车),需要用MDVRPTW(多车辆带时间窗的VRP)。
修改很简单:
manager = pywrapcp.RoutingIndexManager(
len(distance_matrix),
num_vehicles, # 改成车辆数
0
)
求解器会自动分配订单到不同车辆。
七、总结
取件路线优化(VRPTW)比配送(TSP)难多了,但也更有价值。
实测效果(上海快递员3243,2022-06-08):
- 取件点数:14个
- 距离节省:34.1%(6.43km → 4.24km)
- 时间窗违反:0个
- 求解时间:30秒
核心难点:
- 时间窗是硬约束,早到迟到都不行
- 需要在空间距离和时间窗口之间找平衡
- 可能存在无可行解的情况
适用场景:
- 上门取件(快递、外卖)
- 上门维修(家电、汽车)
- 医疗上门服务
- 任何有时间窗约束的服务