Solving the Shortest Path problem in Python
By John Lekberg on December 04, 2020.
NOTE: Oscar Martinez pointed out that I omitted the definition of the
NegativeCycleError
exception. I have fixed this. Thanks, Oscar!
This week's Python blog post is about the "Shortest Path" problem, which is a graph theory problem that has many applications, including finding arbitrage opportunities and planning travel between locations.
You will learn:
- How to solve the "Shortest Path" problem using a brute force solution.
- How to use the Bellman-Ford algorithm to create a more efficient solution.
- How to handle situations with no shortest path -- including negative cycles.
- How to apply your "shortest path solvers" (1) to plan a trip from Paris to Rome, and (2) to identify an arbitrage opportunity on a currency exchange.
Problem statement
As input, you are given:
- A weighted, directed graph.
- A "start" vertex and an "end" vertex.
Your goal is to find the shortest path (minimizing path weight) from "start" to "end". However, no shortest path may exist. Perhaps the graph has a cycle with negative weight, and thus you can repeatedly traverse the cycle to make the path shorter and shorter.
The goal is to find the path that "minimizes the sum of edge weights." Be aware of these two variations:
- "Maximizes the sum of edge weights." To deal with this variation, multiply all edge weights by -1 and then "minimize the sum of negative edge weights."
- "Minimizes the product of edge weights." As long as all edge weights are positive, deal with this variation by taking the logarithm of all edge weights and then "minimize the sum of logarithms of edge weights."
To demonstrate this problem, I'll use two examples: (1) finding the fastest trip between cities and (2) finding arbitrage opportunities in a currency exchange. See sections "scenario: trip scheduling" and "scenario: currency exchange" below for more details.
How I represent the data in the problem
For the input, consider this directed, weighted graph:
I represent this graph using this Python expression:
example_graph = {
"A": {
"B": 1,
},
"B": {
"A": 3,
"C": 0,
},
"C": {
"A": -1,
},
}
To enumerate all vertices in the graph, I use the helper function
all_vertices
:
def all_vertices(graph): """Return a set of all vertices in a graph. graph -- a weighted, directed graph. """ vertices = set() for v in graph.keys(): vertices.add(v) for u in graph[v].keys(): vertices.add(u) return vertices all_vertices(example_graph)
{'A', 'B', 'C'}
To check if an edge exists, I use this helper function is_edge
:
def is_edge(graph, tail, head): """Return if the edge (tail)->(head) is present in a graph. graph -- a weighted, directed graph. tail -- a vertex. head -- a vertex. """ return (tail in graph) and (head in graph[tail]) is_edge(example_graph, "A", "B")
True
is_edge(example_graph, "A", "C")
False
To find the weight of an edge, I simply index the dictionary:
example_graph["A"]["B"]
1
example_graph["C"]["A"]
-1
For the output, if a shortest path exists, then I represent the solution as a tuple of:
- The path weight.
- The path vertices.
For example, finding the shortest path from "B" to "A" in the above graph, I represent the solution as
-1, ["B", "C", "A"]
(-1, ['B', 'C', 'A'])
If no shortest path exists, then I will raise a custom NoShortestPathError
exception:
class NoShortestPathError(Exception): pass raise NoShortestPathError
NoShortestPathError:
I'm also interested in scenarios when no shortest path exists because of a negative cycle, so I use another custom exception for that:
class NegativeCycleError(NoShortestPathError): def __init__(self, weight, cycle): self.weight = weight self.cycle = cycle def __str__(self): return f"Weight {self.weight}: {self.cycle}" raise NegativeCycleError(-3, ["A", "B", "A"])
NegativeCycleError: Weight -3: ['A', 'B', 'A']
I use an exception instead of returning an error code because I want to make
sure I correctly handle scenarios with no shortest path. If I return an error
code, I can ignore it. But a NoShortestPathError
exception must be handled
with a try-statement, otherwise the exception will propagate until the
Python interpreter terminates.
Scenario: trip scheduling
In this scenario, my goal is to plan a trip from Paris to Rome. I have a table of train times between various cities.
from datetime import timedelta
example_travel_times = {
("Paris", "Zurich"): timedelta(hours=6, minutes=12),
("Paris", "Genoa"): timedelta(hours=9, minutes=26),
("Paris", "Milan"): timedelta(hours=8, minutes=55),
("Zurich", "Milan"): timedelta(hours=3, minutes=12),
("Milan", "Genoa"): timedelta(hours=1, minutes=48),
("Milan", "Rome"): timedelta(hours=5, minutes=45),
("Genoa", "Rome"): timedelta(hours=5, minutes=8),
}
So I write a function, plan_trip
, that will utilize a shortest path algorithm
to plan the trip:
from collections import defaultdict
def plan_trip(*, travel_times, shortest_path_solver, start, end):
"""Plan the fastest trip between two locations.
travel_times -- dict. { (cityA, cityB): travel_time, ... }.
shortest_path_solver -- a function that solves the
"Shortest Path" problem.
start -- starting location.
end -- ending location.
"""
hour = timedelta(hours=1)
graph = defaultdict(dict)
for (a, b), travel_time in travel_times.items():
weight = travel_time / hour
graph[a][b] = weight
graph[b][a] = weight
try:
best_weight, best_path = shortest_path_solver(
graph=graph, start=start, end=end,
)
except NoShortestPathError:
print(f"No possible trip from {start} to {end}.")
else:
best_time = best_weight * hour
print(f"The fastest trip from {start} to {end} takes {best_time}:")
for a, b in zip(best_path, best_path[1:]):
edge_time = graph[a][b] * hour
print(f"- {a} to {b}, {edge_time}")
(You will see this function in use in the sections "creating a brute force solution" and "creating a more efficient solution with the Bellman-Ford algorithm.")
Scenario: currency exchange
In this scenario, my goal is to convert one troy ounce of gold (XAU) into as much USD as possible. I have a table of exchange rates between different currencies:
from decimal import Decimal
example_exchange_rates = {
("XAU", "CAD"): Decimal("2_323.45"),
("XAU", "USD"): Decimal("1_772.84"),
("XAU", "MXN"): Decimal("35_560.57"),
("CAD", "USD"): Decimal("0.77"),
("CAD", "MXN"): Decimal("15.44"),
("USD", "MXN"): Decimal("20.05"),
}
(This table uses ISO 4217 currency codes.)
So I write a function, maximize_profit
, that will utilize a shortest path
algorithm to maximize my profit:
from collections import defaultdict
def maximize_profit(
*, exchange_rates, shortest_path_solver, start, end
):
"""Maximize the profit from exchanging currencies, starting
and ending on a given currency.
If a negative cycle is found, then the arbitrage opportunity
is reported.
exchange_rates -- dict. { (base, quote): rate, ... }.
shortest_path_solver -- a function that solves the
"Shortest Path" problem.
start -- starting currency.
end -- ending currency.
"""
graph = defaultdict(dict)
for (base, quote), rate in exchange_rates.items():
weight = -rate.ln()
graph[base][quote] = weight
graph[quote][base] = -weight
try:
best_weight, best_path = shortest_path_solver(
graph=graph, start=start, end=end
)
except NegativeCycleError as error:
cycle = error.cycle
cycle_weight = error.weight
cycle_rate = (-cycle_weight).exp()
print("Arbitrage opportunity:")
print(f"> {cycle[0]}/{cycle[-1]} {cycle_rate}")
print("Via:")
for (base, quote) in zip(cycle, cycle[1:]):
edge_weight = graph[base][quote]
edge_rate = (-edge_weight).exp()
print(f"- {base}/{quote} {edge_rate}")
except NoShortestPathError:
print(f"No way to exchange {start} into {end}")
else:
best_weight = Decimal(best_weight)
best_rate = (-best_weight).exp()
print("Optimal exchange:")
print(f"> {start}/{end} {best_rate}")
print("Via:")
for (base, quote) in zip(best_path, best_path[1:]):
edge_weight = graph[base][quote]
edge_rate = (-edge_weight).exp()
print(f"- {base}/{quote} {edge_rate}")
(You will see this function in use in the sections "creating a brute force solution" and "creating a more efficient solution with the Bellman-Ford algorithm.")
The formula I use to convert the exchange rate into an edge weight is:
weight = -log(rate)
Because the shortest path solver "minimizes the sum of edge weights", and the currency exchange wants to "maximize the product of exchange rates". (Recall the variations mentioned in the problem statement.)
Creating a brute force solution
My brute force solution has two steps:
- Check for negative cycles by checking all possible cycles.
- If any negative cycles exist, raise a
NegativeCycleError
exception.
- If any negative cycles exist, raise a
- Check for candidate paths by checking all possible paths.
- If no candidate paths exist, raise a
NoShortestPathError
exception. - If any candidate paths exist, take the minimum weight path.
- If no candidate paths exist, raise a
Here's a Python implementation of this:
from itertools import permutations def shortest_path_bf(*, graph, start, end): """Find the shortest path from start to end in graph, using brute force. If a negative cycle exists, raise NegativeCycleError. If no shortest path exists, raise NoShortestPathError. """ V = tuple(all_vertices(graph)) n = len(V) def path_weight(path): return sum( graph[tail][head] for (tail, head) in zip(path, path[1:]) ) all_paths = [ path for path_n in range(1, n + 1) for path in permutations(V, path_n) if all( is_edge(graph, tail, head) for (tail, head) in zip(path, path[1:]) ) ] # Check for negative cycles. all_cycles = ( (*path, path[0]) for path in all_paths if is_edge(graph, path[-1], path[0]) ) for cycle in all_cycles: weight = path_weight(cycle) if weight < 0: raise NegativeCycleError(weight, cycle) # Check candidate paths candidate_paths = [ path for path in all_paths if path[0] == start if path[-1] == end ] if len(candidate_paths) == 0: raise NoShortestPathError else: return min( (path_weight(path), path) for path in candidate_paths ) shortest_path_bf(graph=example_graph, start="B", end="A")
(-1, ('B', 'C', 'A'))
plan_trip( travel_times=example_travel_times, shortest_path_solver=shortest_path_bf, start="Paris", end="Rome", )
The fastest trip from Paris to Rome takes 14:34:00:
- Paris to Genoa, 9:26:00
- Genoa to Rome, 5:08:00
maximize_profit( exchange_rates=example_exchange_rates, shortest_path_solver=shortest_path_bf, start="XAU", end="USD", )
Arbitrage opportunity:
> CAD/CAD 1.009147187563457503215180163
Via:
- CAD/USD 0.7700000000000000000000000000
- USD/XAU 0.0005640666952460458924663252184
- XAU/CAD 2323.450000000000000000000001
What's the time complexity of this solution? For a graph with n vertices:
-
There are
P(n,1) + P(n,2) + ... + P(n,n)
possible paths (same for possible cycles). If I relax this slightly, I can simplify this to
-
Computing the weight of path of k nodes, takes O(k) time. Since k ≤ n, I will simplify this to
O(n)
As a result, the overall time complexity of this solution is
O(n2 × n!)
Creating a more efficient solution with the Bellman-Ford algorithm
The key idea of the Bellman-Ford algorithm is to overestimate the path weights and repeatedly relax the estimates. Each time the relaxation happens, the number of correct path weights increases. And -- for a graph with n vertices -- doing the relaxation process n-1 times will ensure that all vertices have correctly computed path weights.
Here's a Python implementation of this:
def shortest_path_bellman_ford(*, graph, start, end): """Find the shortest path from start to end in graph, using the Bellman-Ford algorithm. If a negative cycle exists, raise NegativeCycleError. If no shortest path exists, raise NoShortestPathError. """ n = len(all_vertices(graph)) dist = {} pred = {} def is_dist_infinite(v): return v not in dist def walk_pred(start, end): path = [start] v = start while v != end: v = pred[v] path.append(v) path.reverse() return path def find_cycle(start): nodes = [] node = start while True: if node in nodes: cycle = [ node, *reversed(nodes[nodes.index(node) :]), ] print(nodes) print(cycle) return cycle nodes.append(node) if node not in pred: break node = pred[node] dist[start] = 0 # Relax approximations (n-1) times. for _ in range(n - 1): for tail in graph.keys(): if is_dist_infinite(tail): continue for head, weight in graph[tail].items(): alt = dist[tail] + weight if is_dist_infinite(head) or ( alt < dist[head] ): dist[head] = alt pred[head] = tail # Check for negative cycles. for tail in graph.keys(): for head, weight in graph[tail].items(): if (dist[tail] + weight) < dist[head]: cycle = find_cycle(tail) cycle_weight = sum( graph[c_tail][c_head] for (c_tail, c_head) in zip( cycle, cycle[1:] ) ) raise NegativeCycleError( cycle_weight, cycle ) # Build shortest path. if is_dist_infinite(end): raise NoShortestPathError best_weight = dist[end] best_path = walk_pred(end, start) return best_weight, best_path shortest_path_bellman_ford( graph=example_graph, start="B", end="A" )
(-1, ['B', 'C', 'A'])
plan_trip( travel_times=example_travel_times, shortest_path_solver=shortest_path_bellman_ford, start="Paris", end="Rome", )
The fastest trip from Paris to Rome takes 14:34:00:
- Paris to Genoa, 9:26:00
- Genoa to Rome, 5:08:00
maximize_profit( exchange_rates=example_exchange_rates, shortest_path_solver=shortest_path_bellman_ford, start="XAU", end="USD", )
Arbitrage opportunity:
> XAU/XAU 1.009245235999597360471702672
Via:
- XAU/CAD 2323.450000000000000000000001
- CAD/MXN 15.43999999999999999999999999
- MXN/USD 0.04987531172069825436408977557
- USD/XAU 0.0005640666952460458924663252184
What's the time complexity of the Bellman-Ford algorithm? For a graph with n vertices and m edges:
- Counting the number of vertices takes O(n) time.
- The relaxation process takes O(m × n) time.
- Checking for negative cycles takes O(m) time.
As a result, the overall time complexity of the Bellman-Ford algorithm is
O(m × n)
In conclusion...
In this week's post, you learned how to solve the "Shortest Path" problem using a brute force solution, as well as the Bellman-Ford algorithm. You also learned how to identify and properly respond to situations with no shortest path -- including handling negative cycles. You used your "shortest path solvers" to plan a trip from Paris to Rome and to identify an arbitrage opportunity on a currency exchange.
My challenge to you:
Gather your own data for the trip planning and currency exchange scenarios.
What results do
plan_trip
andmaximize_profit
give for your data?
If you enjoyed this week's post, share it with your friends and stay tuned for next week's post. See you then!
(If you spot any errors or typos on this post, contact me via my contact page.)