Source code for FEH_DijkstraAlgorithm

from typing import List


[docs]class Node: def __init__(self, data, indexloc=None): self.data = data self.index = indexloc self.holds = None
# to initialize graph normally, create nodes, instantiate graph object, and connect nodes # Example: # a = Node('a') # g = Graph([a, b, c, d, e, f]) # g.connect(a, b, 5) # # to initialize graph as a grid use init_as_grid(width, length) method # g = Graph.init_as_grid(3, 4)
[docs]class Graph: def __init__(self, nodes: List[Node], **kwargs): self.nodes = nodes if "adj_list" in kwargs: self.adj_list = kwargs["adj_list"] else: self.adj_list = [[node, []] for node in nodes] for i in range(len(nodes)): nodes[i].index = i
[docs] def connect_dir(self, node1, node2, weight=1): node1, node2 = self.get_index_from_node(node1), self.get_index_from_node(node2) # Note that the below doesn't protect from adding a connection twice self.adj_list[node1][1].append((node2, weight))
[docs] def connect(self, node1, node2, weight=1): self.connect_dir(node1, node2, weight) self.connect_dir(node2, node1, weight)
[docs] def connections(self, node): node = self.get_index_from_node(node) return self.adj_list[node][1]
[docs] @staticmethod def get_index_from_node(node): if not isinstance(node, Node) and not isinstance(node, int): raise ValueError("Node must be an integer or a Node object") if isinstance(node, int): return node else: return node.index
[docs] @staticmethod def create_grid(x, y, weight=1): nodes = [] for y in range(1, y + 1): for x in range(1, x + 1): nodes.append(Node((x, y))) for i in range(len(nodes)): nodes[i].index = i adj_list = [[node, []] for node in nodes] for node_index in range(len(nodes)): adj_left = (node_index - 1 if node_index % x != 0 else None, weight) adj_right = (node_index + 1 if (node_index - x + 1) % x != 0 else None, weight) adj_up = (node_index + x if node_index + x <= nodes[-1].index else None, weight) adj_down = (node_index - x if node_index - x >= 0 else None, weight) for i in [adj_left, adj_right, adj_up, adj_down]: if i[0] is not None: adj_list[node_index][1].append(i) # adj_list[node_index][1] = [adj_left, adj_right, adj_up, adj_down] return nodes, adj_list
[docs] def get_grid_width_height(grid): return grid.adj_list[-1][0].data
# xy and widthheight are both meant to be tuples
[docs] def get_index_from_xy(self, xy): widthheight = self.get_grid_width_height() index = (xy[1] - 1) * widthheight[0] + xy[0] - 1 if index < widthheight[0] * widthheight[1]: return index raise IndexError("Index of " + str(index) + " greater than " + str( widthheight[0] * widthheight[1]) + ", out of max range. Grid coordinate " + str( xy) + " was supplied, but the grid size was only " + str(widthheight))
[docs] @classmethod def init_as_grid(cls, width, height): nodes, adj_list = cls.create_grid(width, height) graph = cls(nodes, adj_list=adj_list) return graph
[docs] def dijkstra(self, src, endpoint=None, only_end=False, eval_to_length=-1): if isinstance(endpoint, tuple): endpoint = self.adj_list[self.get_index_from_xy(endpoint)][0] # check if endpoint is a possible destination node if endpoint and (endpoint not in [node_edges[0] for node_edges in self.adj_list]): print("Endpoint not in node list") return [] # algorithm entry point if isinstance(src, tuple): src_index = self.get_index_from_xy(src) else: src_index = self.get_index_from_node(src) # Map nodes to DijkstraNodeDecorators # This will initialize all provisional distances to infinity dnodes = [DijkstraNodeDecorator(node_edges[0]) for node_edges in self.adj_list] # Set the source node provisional distance to 0 and its hops array to its node (itself) dnodes[src_index].prov_dist = 0 dnodes[src_index].hops.append(dnodes[src_index].node) # Set up all heap customization methods is_less_than = lambda a, b: a.prov_dist < b.prov_dist get_index = lambda node: node.index() update_node = lambda node, data: node.update_data(data) # Instantiate heap to work with DijkstraNodeDecorators as the heap nodes heap = MinHeap(dnodes, is_less_than, get_index, update_node) min_dist_list = [] min_decorated_node = type('obj', (object,), {'node': None}) # while undiscovered nodes remain while heap.size() > 0 and not (min_decorated_node.node == endpoint and endpoint): # Get node in heap that has not yet been "seen" # that has smallest distance to starting node min_decorated_node = heap.pop() min_dist = min_decorated_node.prov_dist hops = min_decorated_node.hops if (not endpoint or (not only_end or min_decorated_node.node == endpoint)) and ( eval_to_length < 0 or (min_dist <= eval_to_length)): min_dist_list.append([min_dist, hops]) # Get all next hops. This is no longer an O(n^2) operation # Now it's an O(log(n)) operation! connections = self.connections(min_decorated_node.node) # For each connection, update its path and total distance from # starting node if the total distance is less than the current distance # in dist array for (inode, weight) in connections: node = self.adj_list[inode][0] heap_location = heap.order_mapping[inode] if heap_location is not None: tot_dist = weight + min_dist if tot_dist < heap.nodes[heap_location].prov_dist: hops_cpy = list(hops) hops_cpy.append(node) data = {'prov_dist': tot_dist, 'hops': hops_cpy} heap.decrease_key(heap_location, data) return min_dist_list
[docs]class DijkstraNodeDecorator: def __init__(self, node): self.node = node self.prov_dist = float('inf') self.hops = []
[docs] def index(self): return self.node.index
[docs] def data(self): return self.node.data
[docs] def update_data(self, data): self.prov_dist = data['prov_dist'] self.hops = data['hops'] return self
[docs]class BinaryTree: def __init__(self, nodes=None): if nodes is None: nodes = [] self.nodes = nodes
[docs] def root(self): return self.nodes[0]
[docs] def iparent(self, i): return (i - 1) // 2
[docs] def ileft(self, i): return 2 * i + 1
[docs] def iright(self, i): return 2 * i + 2
[docs] def left(self, i): return self.node_at_index(self.ileft(i))
[docs] def right(self, i): return self.node_at_index(self.iright(i))
[docs] def parent(self, i): return self.node_at_index(self.iparent(i))
[docs] def node_at_index(self, i): return self.nodes[i]
[docs] def size(self): return len(self.nodes)
[docs]class MinHeap(BinaryTree): def __init__(self, nodes, is_less_than=lambda a, b: a < b, get_index=None, update_node=lambda node, newval: newval): BinaryTree.__init__(self, nodes) self.order_mapping = list(range(len(nodes))) self.is_less_than, self.get_index, self.update_node = is_less_than, get_index, update_node self.min_heapify() # Heapify at a node assuming all subtrees are heapified
[docs] def min_heapify_subtree(self, i): size = self.size() ileft = self.ileft(i) iright = self.iright(i) imin = i if ileft < size and self.is_less_than(self.nodes[ileft], self.nodes[imin]): imin = ileft if iright < size and self.is_less_than(self.nodes[iright], self.nodes[imin]): imin = iright if imin != i: self.nodes[i], self.nodes[imin] = self.nodes[imin], self.nodes[i] # If there is a lambda to get absolute index of a node # update your order_mapping array to indicate where that index lives # in the nodes array (so lookup by this index is O(1)) if self.get_index is not None: self.order_mapping[self.get_index(self.nodes[imin])] = imin self.order_mapping[self.get_index(self.nodes[i])] = i self.min_heapify_subtree(imin)
# Heapify an un-heapified array
[docs] def min_heapify(self): for i in range(len(self.nodes), -1, -1): self.min_heapify_subtree(i)
[docs] def min(self): return self.nodes[0]
[docs] def pop(self): min_node = self.nodes[0] if self.size() > 1: self.nodes[0] = self.nodes[-1] self.nodes.pop() # Update order_mapping if applicable if self.get_index is not None: self.order_mapping[self.get_index(self.nodes[0])] = 0 self.min_heapify_subtree(0) elif self.size() == 1: self.nodes.pop() else: return None # If self.get_index exists, update self.order_mapping to indicate # the node of this index is no longer in the heap if self.get_index is not None: # Set value in self.order_mapping to None to indicate it is not in the heap self.order_mapping[self.get_index(min_node)] = None return min_node
# Update node value, bubble it up as necessary to maintain heap property
[docs] def decrease_key(self, i, val): self.nodes[i] = self.update_node(self.nodes[i], val) iparent = self.iparent(i) while i != 0 and not self.is_less_than(self.nodes[iparent], self.nodes[i]): self.nodes[iparent], self.nodes[i] = self.nodes[i], self.nodes[iparent] # If there is a lambda to get absolute index of a node # update your order_mapping array to indicate where that index lives # in the nodes array (so lookup by this index is O(1)) if self.get_index is not None: self.order_mapping[self.get_index(self.nodes[iparent])] = iparent self.order_mapping[self.get_index(self.nodes[i])] = i i = iparent iparent = self.iparent(i) if i > 0 else None
[docs] def index_of_node_at(self, i): return self.get_index(self.nodes[i])
# a = Node('a') # b = Node('b') # c = Node('c') # d = Node('d') # e = Node('e') # f = Node('f') # g = Graph([a, b, c, d, e, f]) # g.connect(a, b, 5) # g.connect(a, c, 10) # g.connect(a, e, 2) # g.connect(b, c, 2) # g.connect(b, d, 4) # g.connect(c, d, 7) # g.connect(c, f, 10) # g.connect(d, e, 3) # g = Graph.init_as_grid(3, 4) # print([i for i in g.get_grid_width_height()]) # for item in g.adj_list: # print(item[0].data, [g.adj_list[i[0]][0].data if i[0] is not None else None for i in item[1]]) # print([(weight, [n.data for n in node]) for (weight, node) in g.dijkstra(a)]) # from pprint import pprint # pprint([(weight, [n.data for n in node]) for (weight, node) in g.dijkstra((1,1), (3,4), only_end=True)])