Shortest Path DAG Source to Sink

Given a list of directed edges, a list of lists of costs associated with the list of edges (representing a weighted acyclic graph), and two identifiers A, B in the graph, find the lowest sum path from A to B. Every preceding look up from A to B1, A to B2, etc, should be computed in linear time.

Return the path as a list from A to B.

For example, the given input:

edges = [['A', 'B'], ['A', 'C'], ['A', 'D'], ['B', 'E'], ['B', 'F'],
         ['C', 'F'], ['D', 'F'], ['D', 'G'], ['E', 'H'], ['F', 'H'],
         ['F', 'I'], ['F', 'J'], ['G', 'J'], ['H', 'K'], ['H', 'L'],
         ['I', 'L'], ['J', 'L'], ['J', 'M'], ['K', 'N'], ['L', 'N'],
         ['M', 'N']]
costs = [2, 1, 3, 5, 2, 7, 6, 4, 2, 5, 3, 2, 3, 8, 6, 1, 9, 7, 1, 2, 4]

The Idea: This problem can be solved using BFS and dynamic programming. It is actually very similar to seam carving in the sense that the next optimal path is dependent on the optimal paths that came before it.

We the graph below, we begin at source A because it has no dependencies and does not have an optimal path that comes before it. If we follow through using a BFS, we can find the optimal path for preceding nodes because their parent has it's optimum already computed and can be used to solve for the next optimal path. Note that this only works if the topological ordering of the graph also follows the BFS traversal.

For example, consider node F. We can find the optimal path to node F by selecting the minimal path of its neighbors. OPT[f] =min([OPT[B] + edge(B,F), OPT[C] + edge(C,F), OPT[D] + edge(D, F)]). In order to find the edges that lead into F, we can access our inverse graph.

Once the optimal paths from source to every other node is computed, we can backtrack from sink to source to find the minimum path. Consider source node A and sink node N. We know that the OPT[N] = 10, and so the next leading node must be (8) where OPT[x] = OPT[N] - edge(x, N)

Complexity: O(n^2) time of preprocessing, then O(1) to retrieve the distance from source to any node, and O(n) time to obtain the path from source to any node.

import queue


class Min_Path_Graph:
    # static variables
    __dp_cum_cost = {}

    def __init__(self, edges, costs, start):
        """
        :param edges: List[List[<type>]]
        :param costs: List[int]
        :param start: <type> - starting identifier
        """
        self.start = start
        self.g = {}
        self.g_inv = {}

        for u, v in edges:
            if u not in self.g:
                self.g[u] = []
            if v not in self.g:
                self.g[v] = []
            if u not in self.g_inv:
                self.g_inv[u] = []
            if v not in self.g_inv:
                self.g_inv[v] = []

        for edge, c in zip(edges, costs):
            self.g[edge[0]].append((edge[1], c))

        for edge, c in zip(edges, costs):
            self.g_inv[edge[1]].append((edge[0], c))

    @staticmethod
    def print_all_optimal_costs():
        for id in Min_Path_Graph.__dp_cum_cost:
            print(id, ': ', Min_Path_Graph.__dp_cum_cost[id])

    def min_cost(self, B):
        """
        :param start: instance(g) - start point
        :param B: instance(g) - end point
        :return: List[instance(g)] - min path from start to B
        """
        assert(B in self.g)

        # the first time this function is run the static variable
        # __dp_cum_cost is going to be empty, so preprocess it in O(n^2)
        if not Min_Path_Graph.__dp_cum_cost:
            self.preprocess_dist()

        # otherwise the we can backtrack using __dp_cum_cost and g_inv
        iter = B
        min_path = [iter]
        opt_cum_sum = Min_Path_Graph.__dp_cum_cost[iter]
        while iter != self.start:
            # match the target sum with the backwards edge
            for edge, cost in self.g_inv[iter]:
                if opt_cum_sum - cost == Min_Path_Graph.__dp_cum_cost[edge]:
                    opt_cum_sum -= cost
                    iter = edge
                    min_path.append(iter)
                    break
        return min_path[::-1]

    def preprocess_dist(self):
        # initial the optimal position to itself to be 0
        Min_Path_Graph.__dp_cum_cost[self.start] = 0

        q = queue.Queue()
        q.put(self.start)

        while not q.empty():
            p_id = q.get()

            outgoing_edges = self.g_inv[p_id]
            if outgoing_edges:
                options = [(edge, Min_Path_Graph.__dp_cum_cost[edge] + cost) for edge, cost in outgoing_edges]
                best_option = min(options, key=lambda tup: tup[1])
                Min_Path_Graph.__dp_cum_cost[p_id] = best_option[1]

            p_adj_list = self.g[p_id]
            for neighbor, _ in p_adj_list:
                q.put(neighbor)


# TEST 1
edges = [['A', 'B'], ['A', 'C'], ['A', 'D'], ['B', 'E'], ['B', 'F'],
         ['C', 'F'], ['D', 'F'], ['D', 'G'], ['E', 'H'], ['F', 'H'],
         ['F', 'I'], ['F', 'J'], ['G', 'J'], ['H', 'K'], ['H', 'L'],
         ['I', 'L'], ['J', 'L'], ['J', 'M'], ['K', 'N'], ['L', 'N'],
         ['M', 'N']]
costs = [2, 1, 3, 5, 2, 7, 6, 4, 2, 5, 3, 2, 3, 8, 6, 1, 9, 7, 1, 2, 4]

g = Min_Path_Graph(edges, costs, 'A')
print(g.min_cost('N'))
print(g.min_cost('M'))
print(g.print_all_optimal_costs())

Output

['A', 'B', 'F', 'I', 'L', 'N']
['A', 'B', 'F', 'J', 'M']
A :  0
B :  2
C :  1
D :  3
E :  7
F :  4
G :  7
H :  9
I :  7
J :  6
K :  17
L :  8
M :  13
N :  10

Last updated