Return to Blog

Solving the Shortest Path problem in Python

By John Lekberg on December 04, 2020.


NOTE: Oscar Martinez pointed out that I omitted the definition of the NegativeCycleError exception. I have fixed this. Thanks, Oscar!


This week's Python blog post is about the "Shortest Path" problem, which is a graph theory problem that has many applications, including finding arbitrage opportunities and planning travel between locations.

You will learn:

Problem statement

As input, you are given:

Your goal is to find the shortest path (minimizing path weight) from "start" to "end". However, no shortest path may exist. Perhaps the graph has a cycle with negative weight, and thus you can repeatedly traverse the cycle to make the path shorter and shorter.

The goal is to find the path that "minimizes the sum of edge weights." Be aware of these two variations:

To demonstrate this problem, I'll use two examples: (1) finding the fastest trip between cities and (2) finding arbitrage opportunities in a currency exchange. See sections "scenario: trip scheduling" and "scenario: currency exchange" below for more details.

How I represent the data in the problem

For the input, consider this directed, weighted graph:

I represent this graph using this Python expression:

example_graph = {
    "A": {
        "B": 1,
    },
    "B": {
        "A": 3,
        "C": 0,
    },
    "C": {
        "A": -1,
    },
}

To enumerate all vertices in the graph, I use the helper function all_vertices:

def all_vertices(graph):
    """Return a set of all vertices in a graph.
    
    graph -- a weighted, directed graph.
    """
    vertices = set()
    for v in graph.keys():
        vertices.add(v)
        for u in graph[v].keys():
            vertices.add(u)
    return vertices


all_vertices(example_graph)
{'A', 'B', 'C'}

To check if an edge exists, I use this helper function is_edge:

def is_edge(graph, tail, head):
    """Return if the edge (tail)->(head) is present
    in a graph.
    
    graph -- a weighted, directed graph.
    tail -- a vertex.
    head -- a vertex.
    """
    return (tail in graph) and (head in graph[tail])


is_edge(example_graph, "A", "B")
True
is_edge(example_graph, "A", "C")
False

To find the weight of an edge, I simply index the dictionary:

example_graph["A"]["B"]
1
example_graph["C"]["A"]
-1

For the output, if a shortest path exists, then I represent the solution as a tuple of:

For example, finding the shortest path from "B" to "A" in the above graph, I represent the solution as

-1, ["B", "C", "A"]
(-1, ['B', 'C', 'A'])

If no shortest path exists, then I will raise a custom NoShortestPathError exception:

class NoShortestPathError(Exception):
    pass


raise NoShortestPathError
NoShortestPathError:

I'm also interested in scenarios when no shortest path exists because of a negative cycle, so I use another custom exception for that:

class NegativeCycleError(NoShortestPathError):
    def __init__(self, weight, cycle):
        self.weight = weight
        self.cycle = cycle

    def __str__(self):
        return f"Weight {self.weight}: {self.cycle}"


raise NegativeCycleError(-3, ["A", "B", "A"])
NegativeCycleError: Weight -3: ['A', 'B', 'A']

I use an exception instead of returning an error code because I want to make sure I correctly handle scenarios with no shortest path. If I return an error code, I can ignore it. But a NoShortestPathError exception must be handled with a try-statement, otherwise the exception will propagate until the Python interpreter terminates.

Scenario: trip scheduling

In this scenario, my goal is to plan a trip from Paris to Rome. I have a table of train times between various cities.

from datetime import timedelta

example_travel_times = {
    ("Paris", "Zurich"): timedelta(hours=6, minutes=12),
    ("Paris", "Genoa"): timedelta(hours=9, minutes=26),
    ("Paris", "Milan"): timedelta(hours=8, minutes=55),
    ("Zurich", "Milan"): timedelta(hours=3, minutes=12),
    ("Milan", "Genoa"): timedelta(hours=1, minutes=48),
    ("Milan", "Rome"): timedelta(hours=5, minutes=45),
    ("Genoa", "Rome"): timedelta(hours=5, minutes=8),
}

So I write a function, plan_trip, that will utilize a shortest path algorithm to plan the trip:

from collections import defaultdict

def plan_trip(*, travel_times, shortest_path_solver, start, end):
    """Plan the fastest trip between two locations.
    
    travel_times -- dict. { (cityA, cityB): travel_time, ... }.
    shortest_path_solver -- a function that solves the
        "Shortest Path" problem.
    start -- starting location.
    end -- ending location.
    """
    
    hour = timedelta(hours=1)
    
    graph = defaultdict(dict)
    for (a, b), travel_time in travel_times.items():
        weight = travel_time / hour
        graph[a][b] = weight
        graph[b][a] = weight
    
    try:
        best_weight, best_path = shortest_path_solver(
            graph=graph, start=start, end=end,
        )
    except NoShortestPathError:
        print(f"No possible trip from {start} to {end}.")
    else:
        best_time = best_weight * hour
        
        print(f"The fastest trip from {start} to {end} takes {best_time}:")
        for a, b in zip(best_path, best_path[1:]):
            edge_time = graph[a][b] * hour
            print(f"- {a} to {b}, {edge_time}")

(You will see this function in use in the sections "creating a brute force solution" and "creating a more efficient solution with the Bellman-Ford algorithm.")

Scenario: currency exchange

In this scenario, my goal is to convert one troy ounce of gold (XAU) into as much USD as possible. I have a table of exchange rates between different currencies:

from decimal import Decimal

example_exchange_rates = {
    ("XAU", "CAD"): Decimal("2_323.45"),
    ("XAU", "USD"): Decimal("1_772.84"),
    ("XAU", "MXN"): Decimal("35_560.57"),
    ("CAD", "USD"): Decimal("0.77"),
    ("CAD", "MXN"): Decimal("15.44"),
    ("USD", "MXN"): Decimal("20.05"),
}

(This table uses ISO 4217 currency codes.)

So I write a function, maximize_profit, that will utilize a shortest path algorithm to maximize my profit:

from collections import defaultdict


def maximize_profit(
    *, exchange_rates, shortest_path_solver, start, end
):
    """Maximize the profit from exchanging currencies, starting
    and ending on a given currency.
    
    If a negative cycle is found, then the arbitrage opportunity
    is reported.
    
    exchange_rates -- dict. { (base, quote): rate, ... }.
    shortest_path_solver -- a function that solves the
         "Shortest Path" problem.
    start -- starting currency.
    end -- ending currency.
    """

    graph = defaultdict(dict)
    for (base, quote), rate in exchange_rates.items():
        weight = -rate.ln()
        graph[base][quote] = weight
        graph[quote][base] = -weight

    try:
        best_weight, best_path = shortest_path_solver(
            graph=graph, start=start, end=end
        )
    except NegativeCycleError as error:
        cycle = error.cycle
        cycle_weight = error.weight
        cycle_rate = (-cycle_weight).exp()
        print("Arbitrage opportunity:")
        print(f"> {cycle[0]}/{cycle[-1]} {cycle_rate}")
        print("Via:")
        for (base, quote) in zip(cycle, cycle[1:]):
            edge_weight = graph[base][quote]
            edge_rate = (-edge_weight).exp()
            print(f"- {base}/{quote} {edge_rate}")
    except NoShortestPathError:
        print(f"No way to exchange {start} into {end}")
    else:
        best_weight = Decimal(best_weight)
        best_rate = (-best_weight).exp()
        print("Optimal exchange:")
        print(f"> {start}/{end} {best_rate}")
        print("Via:")
        for (base, quote) in zip(best_path, best_path[1:]):
            edge_weight = graph[base][quote]
            edge_rate = (-edge_weight).exp()
            print(f"- {base}/{quote} {edge_rate}")

(You will see this function in use in the sections "creating a brute force solution" and "creating a more efficient solution with the Bellman-Ford algorithm.")

The formula I use to convert the exchange rate into an edge weight is:

weight = -log(rate)

Because the shortest path solver "minimizes the sum of edge weights", and the currency exchange wants to "maximize the product of exchange rates". (Recall the variations mentioned in the problem statement.)

Creating a brute force solution

My brute force solution has two steps:

  1. Check for negative cycles by checking all possible cycles.
    • If any negative cycles exist, raise a NegativeCycleError exception.
  2. Check for candidate paths by checking all possible paths.
    • If no candidate paths exist, raise a NoShortestPathError exception.
    • If any candidate paths exist, take the minimum weight path.

Here's a Python implementation of this:

from itertools import permutations


def shortest_path_bf(*, graph, start, end):
    """Find the shortest path from start to end in graph,
    using brute force.
    
    If a negative cycle exists, raise NegativeCycleError.
    If no shortest path exists, raise NoShortestPathError.
    """
    V = tuple(all_vertices(graph))
    n = len(V)

    def path_weight(path):
        return sum(
            graph[tail][head]
            for (tail, head) in zip(path, path[1:])
        )

    all_paths = [
        path
        for path_n in range(1, n + 1)
        for path in permutations(V, path_n)
        if all(
            is_edge(graph, tail, head)
            for (tail, head) in zip(path, path[1:])
        )
    ]

    # Check for negative cycles.

    all_cycles = (
        (*path, path[0])
        for path in all_paths
        if is_edge(graph, path[-1], path[0])
    )
    for cycle in all_cycles:
        weight = path_weight(cycle)
        if weight < 0:
            raise NegativeCycleError(weight, cycle)

    # Check candidate paths

    candidate_paths = [
        path
        for path in all_paths
        if path[0] == start
        if path[-1] == end
    ]

    if len(candidate_paths) == 0:
        raise NoShortestPathError
    else:
        return min(
            (path_weight(path), path)
            for path in candidate_paths
        )


shortest_path_bf(graph=example_graph, start="B", end="A")
(-1, ('B', 'C', 'A'))
plan_trip(
    travel_times=example_travel_times,
    shortest_path_solver=shortest_path_bf,
    start="Paris",
    end="Rome",
)
The fastest trip from Paris to Rome takes 14:34:00:
- Paris to Genoa, 9:26:00
- Genoa to Rome, 5:08:00
maximize_profit(
    exchange_rates=example_exchange_rates,
    shortest_path_solver=shortest_path_bf,
    start="XAU",
    end="USD",
)
Arbitrage opportunity:
> CAD/CAD 1.009147187563457503215180163
Via:
- CAD/USD 0.7700000000000000000000000000
- USD/XAU 0.0005640666952460458924663252184
- XAU/CAD 2323.450000000000000000000001

What's the time complexity of this solution? For a graph with n vertices:

As a result, the overall time complexity of this solution is

O(n2 × n!)

Creating a more efficient solution with the Bellman-Ford algorithm

The key idea of the Bellman-Ford algorithm is to overestimate the path weights and repeatedly relax the estimates. Each time the relaxation happens, the number of correct path weights increases. And -- for a graph with n vertices -- doing the relaxation process n-1 times will ensure that all vertices have correctly computed path weights.

Here's a Python implementation of this:

def shortest_path_bellman_ford(*, graph, start, end):
    """Find the shortest path from start to end in graph,
    using the Bellman-Ford algorithm.
    
    If a negative cycle exists, raise NegativeCycleError.
    If no shortest path exists, raise NoShortestPathError.
    """
    n = len(all_vertices(graph))

    dist = {}
    pred = {}

    def is_dist_infinite(v):
        return v not in dist

    def walk_pred(start, end):
        path = [start]
        v = start
        while v != end:
            v = pred[v]
            path.append(v)
        path.reverse()
        return path

    def find_cycle(start):
        nodes = []
        node = start
        while True:
            if node in nodes:
                cycle = [
                    node,
                    *reversed(nodes[nodes.index(node) :]),
                ]
                print(nodes)
                print(cycle)
                return cycle
            nodes.append(node)
            if node not in pred:
                break
            node = pred[node]

    dist[start] = 0

    # Relax approximations (n-1) times.

    for _ in range(n - 1):
        for tail in graph.keys():
            if is_dist_infinite(tail):
                continue
            for head, weight in graph[tail].items():
                alt = dist[tail] + weight
                if is_dist_infinite(head) or (
                    alt < dist[head]
                ):
                    dist[head] = alt
                    pred[head] = tail

    # Check for negative cycles.

    for tail in graph.keys():
        for head, weight in graph[tail].items():
            if (dist[tail] + weight) < dist[head]:
                cycle = find_cycle(tail)
                cycle_weight = sum(
                    graph[c_tail][c_head]
                    for (c_tail, c_head) in zip(
                        cycle, cycle[1:]
                    )
                )
                raise NegativeCycleError(
                    cycle_weight, cycle
                )

    # Build shortest path.

    if is_dist_infinite(end):
        raise NoShortestPathError

    best_weight = dist[end]
    best_path = walk_pred(end, start)

    return best_weight, best_path

shortest_path_bellman_ford(
    graph=example_graph, start="B", end="A"
)
(-1, ['B', 'C', 'A'])
plan_trip(
    travel_times=example_travel_times,
    shortest_path_solver=shortest_path_bellman_ford,
    start="Paris",
    end="Rome",
)
The fastest trip from Paris to Rome takes 14:34:00:
- Paris to Genoa, 9:26:00
- Genoa to Rome, 5:08:00
maximize_profit(
    exchange_rates=example_exchange_rates,
    shortest_path_solver=shortest_path_bellman_ford,
    start="XAU",
    end="USD",
)
Arbitrage opportunity:
> XAU/XAU 1.009245235999597360471702672
Via:
- XAU/CAD 2323.450000000000000000000001
- CAD/MXN 15.43999999999999999999999999
- MXN/USD 0.04987531172069825436408977557
- USD/XAU 0.0005640666952460458924663252184

What's the time complexity of the Bellman-Ford algorithm? For a graph with n vertices and m edges:

As a result, the overall time complexity of the Bellman-Ford algorithm is

O(m × n)

In conclusion...

In this week's post, you learned how to solve the "Shortest Path" problem using a brute force solution, as well as the Bellman-Ford algorithm. You also learned how to identify and properly respond to situations with no shortest path -- including handling negative cycles. You used your "shortest path solvers" to plan a trip from Paris to Rome and to identify an arbitrage opportunity on a currency exchange.

My challenge to you:

Gather your own data for the trip planning and currency exchange scenarios.

What results do plan_trip and maximize_profit give for your data?

If you enjoyed this week's post, share it with your friends and stay tuned for next week's post. See you then!


(If you spot any errors or typos on this post, contact me via my contact page.)