"""
Capacitated vehicle routing problem with pickup and delivery.
Based on https://developers.google.com/optimization/routing/pickup_delivery#complete_programs
"""
from __future__ import annotations
from typing import List, Dict, Tuple
import numpy as np
from ortools.constraint_solver import routing_enums_pb2
from ortools.constraint_solver import pywrapcp
import orToolsDataModel
Node = int
Route = List[Node]
ORToolsSolution = Dict[int, Tuple[Route, int]]
def get_solution(data: orToolsDataModel.ORToolsDataModel, manager: pywrapcp.RoutingIndexManager,
routing: pywrapcp.RoutingModel, solution: pywrapcp.Assignment, verbose: bool) -> ORToolsSolution:
"""Returns the solution as a dict with one entry for each
vehicle (vehicle id: [0, ..., n_veh-1]) including the
route (list of nodes) and costs."""
if verbose:
print(f'Objective: {solution.ObjectiveValue()}')
time_dimension = routing.GetDimensionOrDie('Time')
solution_dict = {}
total_cost = 0
for vehicle_id in range(data.num_vehicles):
route = []
index = routing.Start(vehicle_id)
if verbose:
plan_output = 'Route for vehicle %s:\n ' % vehicle_id
route_cost = 0
route_load = 0
while not routing.IsEnd(index):
current_node = manager.IndexToNode(index)
route_load += data.demands[current_node]
time_var = time_dimension.CumulVar(index)
if verbose:
plan_output += (' %s (L: %s, C: %s, T: (%s,%s))\n -> ' %
(current_node, route_load, route_cost, solution.Min(time_var), solution.Max(time_var)))
route.append(current_node)
previous_index = index
index = solution.Value(routing.NextVar(index))
route_cost += routing.GetArcCostForVehicle(previous_index, index, vehicle_id)
last_node = manager.IndexToNode(index)
route_load += data.demands[last_node]
time_var = time_dimension.CumulVar(index)
route.append(last_node)
if verbose:
plan_output += (' %s (L: %s, C: %s, T: (%s,%s))\n' %
(last_node, route_load, route_cost, solution.Min(time_var), solution.Max(time_var)))
plan_output += 'Costs of the route: %s\n' % route_cost
print(plan_output)
total_cost += route_cost
solution_dict[vehicle_id] = (route, route_cost)
if verbose:
print(f'Total cost of the routes: {total_cost}')
return solution_dict
TransitCallbackIndex = int
def set_travel_cost(data: orToolsDataModel.ORToolsDataModel, routing: pywrapcp.RoutingModel,
manager: pywrapcp.RoutingIndexManager, verbose: bool) -> TransitCallbackIndex:
"""Create and register a transit callback."""
def distance_callback(from_index, to_index) -> int:
"""Returns the distance between the two nodes."""
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return data.cost_matrix[from_node][to_node]
if verbose:
print(' Register distance callback.')
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
return transit_callback_index
def add_cost_constraint(data: orToolsDataModel.ORToolsDataModel, routing: pywrapcp.RoutingModel,
transit_callback_index: TransitCallbackIndex, verbose: bool) -> pywrapcp.RoutingDimension:
if verbose:
print(' Add distance constraints...')
matrix_costs = int(np.sum(data.cost_matrix))
dimension_name = 'Costs'
routing.AddDimension(
transit_callback_index,
0,
matrix_costs,
True,
dimension_name)
distance_dimension = routing.GetDimensionOrDie(dimension_name)
return distance_dimension
def add_transportation_requests_constraint(data: orToolsDataModel.ORToolsDataModel,
routing: pywrapcp.RoutingModel, manager: pywrapcp.RoutingIndexManager,
solver: pywrapcp.Solver,
distance_dimension: pywrapcp.RoutingDimension, verbose: bool):
if verbose:
print(' Add pickup and delivery constraints...')
for request in data.pickups_deliveries:
pickup_index = manager.NodeToIndex(request.from_node)
delivery_index = manager.NodeToIndex(request.to_node)
if verbose:
print(f'pickup/dropoff nodes: {request.from_node}/{request.to_node}')
routing.AddPickupAndDelivery(pickup_index, delivery_index)
solver.Add(routing.VehicleVar(pickup_index) == routing.VehicleVar(delivery_index))
solver.Add(
distance_dimension.CumulVar(pickup_index) <=
distance_dimension.CumulVar(delivery_index))
if request.is_new():
if verbose:
print(f'allow to reject new reservation {request.get_id()}')
routing.AddDisjunction([pickup_index, delivery_index], 10*data.get_penalty(True), 2)
def add_direct_route_factor_constraint(data: orToolsDataModel.ORToolsDataModel,
routing: pywrapcp.RoutingModel, manager: pywrapcp.RoutingIndexManager,
solver: pywrapcp.Solver,
distance_dimension: pywrapcp.RoutingDimension, verbose: bool):
if data.drf == -1:
return
if verbose:
print(' Add direct route factor constraints...')
for request in data.pickups_deliveries:
if request.is_new():
add_hard_direct_route_factor_constraint(request, data, manager, solver, distance_dimension, verbose)
else:
add_soft_direct_route_factor_constraint(routing, data, request, solver,
manager, distance_dimension, verbose)
for request in data.dropoffs:
direct_route_cost = request.direct_route_cost
direct_route_cost_drf = solver.IntConst(round(direct_route_cost * data.drf))
delivery_index = manager.NodeToIndex(request.to_node)
distance_dimension.SetCumulVarSoftUpperBound(delivery_index, round(
direct_route_cost * data.drf - request.current_route_cost), 100*data.get_penalty())
if verbose:
print(f"reservation {request.get_id()}: direct route cost {direct_route_cost} and "
f"(soft) max cost {direct_route_cost_drf.Value()}, already used costs {request.current_route_cost}")
def add_hard_direct_route_factor_constraint(request: orToolsDataModel.Reservation,
data: orToolsDataModel.ORToolsDataModel,
manager: pywrapcp.RoutingIndexManager,
solver: pywrapcp.Solver,
distance_dimension: pywrapcp.RoutingDimension, verbose: bool):
pickup_index = manager.NodeToIndex(request.from_node)
delivery_index = manager.NodeToIndex(request.to_node)
direct_route_cost = request.direct_route_cost
solver.Add(
distance_dimension.CumulVar(delivery_index) - distance_dimension.CumulVar(pickup_index) <=
solver.IntConst(round(direct_route_cost * data.drf)))
direct_route_cost_drf = solver.IntConst(round(direct_route_cost * data.drf))
if verbose:
print(f"reservation {request.get_id()}: direct route cost {direct_route_cost} and "
f"(hard) max cost {direct_route_cost_drf.Value()}")
def add_soft_direct_route_factor_constraint(routing: pywrapcp.RoutingModel,
data: orToolsDataModel.ORToolsDataModel,
request: orToolsDataModel.Reservation,
solver: pywrapcp.Solver, manager: pywrapcp.RoutingIndexManager,
distance_dimension: pywrapcp.RoutingDimension, verbose: bool):
'''If the costs changed and the drf-constraint cannot be hold anymore use a soft constraint.
'''
matrix_costs = int(np.sum(data.cost_matrix))
request_cost_dimension_name = f'request_cost_{request.get_id()}'
routing.AddConstantDimensionWithSlack(
0,
matrix_costs,
matrix_costs,
True,
request_cost_dimension_name)
request_cost_dimension: pywrapcp.RoutingDimension = routing.GetDimensionOrDie(request_cost_dimension_name)
pickup_index = manager.NodeToIndex(request.from_node)
delivery_index = manager.NodeToIndex(request.to_node)
route_start = distance_dimension.CumulVar(pickup_index)
route_end = distance_dimension.CumulVar(delivery_index)
route_cost = request_cost_dimension.CumulVar(delivery_index)
solver.Add(route_cost == route_end - route_start)
request_cost_dimension.SetCumulVarSoftUpperBound(
delivery_index,
round(request.direct_route_cost * data.drf),
100*data.get_penalty()
)
if verbose:
print(f"reservation {request.get_id()}: direct route cost {request.direct_route_cost} and "
f"(soft) max cost {int(request.direct_route_cost * data.drf)}")
def add_dropoff_constraint(data: orToolsDataModel.ORToolsDataModel,
routing: pywrapcp.RoutingModel, manager: pywrapcp.RoutingIndexManager,
verbose: bool):
if verbose:
print(' Add dropoff constraints...')
for res in data.dropoffs:
if verbose:
print(f'reservation {res.get_id()} in veh {res.vehicle.id_vehicle}({res.vehicle.vehicle_index}), '
f'droppoff node: {res.to_node}')
index = manager.NodeToIndex(res.to_node)
routing.SetAllowedVehiclesForIndex([res.vehicle.vehicle_index], index)
def add_allocation_constraint(data: orToolsDataModel.ORToolsDataModel,
routing: pywrapcp.RoutingModel, manager: pywrapcp.RoutingIndexManager,
verbose: bool):
if verbose:
print(' Add "no re-allocation" constraints...')
for res in data.pickups_deliveries:
if res.vehicle:
if verbose:
print(f'reservation {res.get_id()} in veh id={res.vehicle.vehicle_index}')
index = manager.NodeToIndex(res.to_node)
routing.SetAllowedVehiclesForIndex([res.vehicle.vehicle_index], index)
def add_capacity_constraint(data: orToolsDataModel.ORToolsDataModel,
routing: pywrapcp.RoutingModel, manager: pywrapcp.RoutingIndexManager,
verbose: bool):
if verbose:
print(' Add capacity constraints...')
def demand_callback(from_index):
"""Returns the demand of the node."""
from_node = manager.IndexToNode(from_index)
return data.demands[from_node]
demand_callback_index = routing.RegisterUnaryTransitCallback(demand_callback)
routing.AddDimensionWithVehicleCapacity(
demand_callback_index,
0,
data.vehicle_capacities,
True,
'Capacity')
def create_time_dimension(data: orToolsDataModel.ORToolsDataModel,
routing: pywrapcp.RoutingModel, manager: pywrapcp.RoutingIndexManager,
verbose: bool) -> pywrapcp.RoutingDimension:
if verbose:
print(' Create time dimension.')
def time_callback(from_index, to_index):
"""Returns the travel time between the two nodes."""
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return data.time_matrix[from_node][to_node]
time_callback_index = routing.RegisterTransitCallback(time_callback)
dimension_name = 'Time'
routing.AddDimension(
time_callback_index,
int(data.max_time),
int(data.max_time),
False,
dimension_name)
time_dimension = routing.GetDimensionOrDie(dimension_name)
return time_dimension
def add_time_windows_constraint(data: orToolsDataModel.ORToolsDataModel,
time_dimension: pywrapcp.RoutingDimension, manager: pywrapcp.RoutingIndexManager,
verbose: bool):
if verbose:
print(' Add time windows constraints...')
depot = data.depot
new_requests_nodes = [node for req in data.pickups_deliveries
for node in (req.from_node, req.to_node) if req.is_new()]
old_requests_nodes = ([req.to_node for req in data.dropoffs] +
[node for req in data.pickups_deliveries
for node in (req.from_node, req.to_node) if not req.is_new()])
for location_idx, time_window in enumerate(data.time_windows):
if location_idx == depot:
continue
index = manager.NodeToIndex(location_idx)
if location_idx in data.starts + new_requests_nodes:
if verbose:
print(f'hard time window for node {location_idx}: [{time_window[0]}, {time_window[1]}]')
time_dimension.CumulVar(index).SetRange(
time_window[0], time_window[1])
if location_idx in old_requests_nodes:
if verbose:
print(f'soft time window for node {location_idx}: [{time_window[0]}, {time_window[1]}]')
time_dimension.SetCumulVarSoftLowerBound(index, time_window[0], 100*data.get_penalty(True))
time_dimension.SetCumulVarSoftUpperBound(index, time_window[1], 100*data.get_penalty(True))
def add_waiting_time_constraints(data: orToolsDataModel.ORToolsDataModel,
manager: pywrapcp.RoutingIndexManager,
time_dimension: pywrapcp.RoutingDimension,
verbose: bool):
"""Adds the constraints related to the maximum waiting times of the requests.
"""
global_waiting_time = data.waiting_time
if global_waiting_time == -1:
return
if verbose:
print(' Add waiting time constraints...')
for request in data.pickups_deliveries:
pickup_index = manager.NodeToIndex(request.from_node)
reservation_time = request.reservation.reservationTime
requested_pickup_time = request.get_earliest_pickup() or reservation_time
maximum_pickup_time = round(request.get_dropoff_latest() or (requested_pickup_time + global_waiting_time))
if request.is_new():
if verbose:
print(f"reservation {request.get_id()} has a maximum (hard) pickup time at {maximum_pickup_time}")
min_time_window = time_dimension.CumulVar(pickup_index).Min()
maximum_pickup_time = maximum_pickup_time if min_time_window < maximum_pickup_time else min_time_window
time_dimension.CumulVar(pickup_index).SetMax(maximum_pickup_time)
else:
time_dimension.SetCumulVarSoftUpperBound(
pickup_index,
maximum_pickup_time,
100*data.get_penalty(True))
if verbose:
print(f"reservation {request.get_id()} has a maximum (soft) pickup time at {maximum_pickup_time}")
def solve_from_initial_solution(routing: pywrapcp.RoutingModel, manager: pywrapcp.RoutingIndexManager,
search_parameters: any, data: orToolsDataModel.ORToolsDataModel, verbose: bool):
solution_requests = data.initial_routes
initial_routes = []
if solution_requests is not None:
for index_vehicle in solution_requests:
request_order = solution_requests[index_vehicle][0].copy()
for request_id in set(solution_requests[index_vehicle][0]):
old_status = solution_requests[index_vehicle][0].count(request_id)
new_status = 0
if request_id in [req.get_id() for req in data.pickups_deliveries]:
new_status = 2
elif request_id in [req.get_id() for req in data.dropoffs]:
new_status = 1
if new_status == 0:
request_order = [req for req in request_order if req != request_id]
if old_status == 2 and new_status == 1:
request_order.remove(request_id)
request_id_set = set(request_order)
reverserd_request_order = request_order.copy()
reverserd_request_order.reverse()
first_occurance_from_behind = [reverserd_request_order.index(id) for id in request_id_set]
all_requests = data.pickups_deliveries.copy()
all_requests.extend(data.dropoffs.copy())
nodes_order = []
for index, req_id in enumerate(reverserd_request_order):
req = [r for r in all_requests if r.get_id() == req_id][0]
if index in first_occurance_from_behind:
nodes_order.insert(0, manager.NodeToIndex(req.to_node))
else:
nodes_order.insert(0, manager.NodeToIndex(req.from_node))
initial_routes.append(nodes_order)
routing.CloseModelWithParameters(search_parameters)
if verbose:
print('Initial solution:')
for index_vehicle, index_list in enumerate(initial_routes):
print(f'veh {index_vehicle}: {[manager.IndexToNode(index) for index in index_list]}')
initial_solution = routing.ReadAssignmentFromRoutes(initial_routes, True)
solution = routing.SolveFromAssignmentWithParameters(initial_solution, search_parameters)
return solution
def set_first_solution_heuristic(time_limit_seconds: int, verbose: bool) -> any:
if verbose:
print(' Set solution heuristic...')
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.first_solution_strategy = routing_enums_pb2.FirstSolutionStrategy.AUTOMATIC
search_parameters.local_search_metaheuristic = routing_enums_pb2.LocalSearchMetaheuristic.AUTOMATIC
search_parameters.time_limit.FromSeconds(time_limit_seconds)
search_parameters.sat_parameters.num_search_workers = 8
return search_parameters
def main(data: orToolsDataModel.ORToolsDataModel,
time_limit_seconds: int = 10, verbose: bool = False) -> ORToolsSolution | None:
"""Entry point of the program."""
manager = pywrapcp.RoutingIndexManager(
len(data.cost_matrix),
data.num_vehicles,
data.starts, data.ends)
routing_parameters = pywrapcp.DefaultRoutingModelParameters()
routing = pywrapcp.RoutingModel(manager, routing_parameters)
solver = routing.solver()
transit_callback_index = set_travel_cost(data, routing, manager, verbose)
time_dimension = create_time_dimension(data, routing, manager, verbose)
distance_dimension = add_cost_constraint(data, routing, transit_callback_index, verbose)
add_transportation_requests_constraint(data, routing, manager, solver, distance_dimension, verbose)
add_direct_route_factor_constraint(data, routing, manager, solver, distance_dimension, verbose)
add_dropoff_constraint(data, routing, manager, verbose)
if data.fix_allocation:
add_allocation_constraint(data, routing, manager, verbose)
add_capacity_constraint(data, routing, manager, verbose)
add_time_windows_constraint(data, time_dimension, manager, verbose)
add_waiting_time_constraints(data, manager, time_dimension, verbose)
print('## Done')
search_parameters = set_first_solution_heuristic(time_limit_seconds, verbose)
if verbose:
print('Start solving the problem.')
if data.initial_routes:
solution = solve_from_initial_solution(routing, manager, search_parameters, data, verbose)
else:
solution = routing.SolveWithParameters(search_parameters)
if solution:
return get_solution(data, manager, routing, solution, verbose)
else:
if verbose:
print('There is no solution.')
return None