#!/usr/bin/env python3 from functools import reduce from typing import Callable, Iterator import sys import networkx as nx class Edge: def __init__(self, source, destination): self.source = source self.destination = destination def __eq__(self, other): return ( isinstance(other, Edge) and self.source == other.source and self.destination == other.destination ) def __hash__(self): return hash((self.source, self.destination)) def __repr__(self): return f'Edge({self.source}, {self.destination})' class Node: def __init__(self, node, rank: float): self.node = node self.rank = rank def with_rank(self, new_rank: float): return Node(self.node, new_rank) def __eq__(self, other): """ Equality only compares the `Node`, not the rank value """ return ( isinstance(other, Node) and self.node == other.node ) def __hash__(self): """ Equality only hashes the `Node`, not the rank value """ return hash((self.node)) def __repr__(self): return f'Node({self.node}, {self.rank})' def build_graph(edges: Iterator[Edge], initial_rank: float = 0.25) -> nx.DiGraph: """ Create a graph from an iterator over `Edge`s """ graph = nx.DiGraph() for edge in edges: source = Node(edge.source, initial_rank) destination = Node(edge.destination, initial_rank) graph.add_node(source) graph.add_node(destination) graph.add_edge(source, destination) return graph def parse_edge(line: str) -> Edge: """ Parse a line into an edge. The line is expected to be of the format source_ip:source_port->destination_ip:destination_port, e.g. `1.1.1.1:1337->2.2.2.2:1337` :param line: (str) the line :return: the parsed edge """ # remove spaces and potential leading semicolon. then split at -> split = line.strip().strip(';').split("->") return Edge(split[0].strip(), split[1].strip()) def file_loader(path: str) -> Iterator[Edge]: """ Load a list of edges from a file. :param path: (str) filepath :return: iterator over the edges """ with open(path, "r") as file: return map( parse_edge, # filter only edges filter( lambda line: '->' in line, file.readlines() ) ) def page_ranker( graph: nx.DiGraph, node: Node, dampingFactor: float = 1.0 ) -> float: return ( reduce( lambda x, y: x + y, map( lambda pred: pred.rank / len(list(graph.successors(pred))), graph.predecessors(node), ), 0.0, ) * dampingFactor + (1 - dampingFactor) / graph.number_of_nodes() ) def page_rank(graph: nx.DiGraph, dampingFactor: float = 1.0) -> nx.DiGraph: return rank(graph, lambda g, node: page_ranker(g, node, dampingFactor)) def sensor_rank(graph: nx.DiGraph) -> nx.DiGraph: number_of_nodes = graph.number_of_nodes() return rank( graph, lambda g, node: (node.rank / len(list(g.successors(node)))) * (len(list(g.predecessors(node))) / number_of_nodes), ) def rank( graph: nx.DiGraph, rank_fn: Callable[[nx.DiGraph, Node], float] ) -> nx.DiGraph: ranked_graph = nx.DiGraph() # number_of_nodes = graph.number_of_nodes() for node in graph.nodes: rank = rank_fn(graph, node) ranked_graph.add_node(node.with_rank(rank)) for u, v in graph.edges: ranked_graph.add_edge(u, v) return ranked_graph def main(): arg = sys.argv if len(arg) != 2: print(f'Usage: {arg[0]} graph.dot') sys.exit(1) edges = file_loader(arg[1]) graph = build_graph(edges) page_ranked = page_rank(graph) print("PageRank: ") for node in page_ranked: print(node) print() sensor_ranked = sensor_rank(page_ranked) print("SensorRank: ") for node in sensor_ranked: print(node) if __name__ == '__main__': main()