160 lines
4.1 KiB
Python
Executable File
160 lines
4.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from functools import reduce
|
|
from typing import Callable, Iterator
|
|
import sys
|
|
|
|
import networkx as nx
|
|
|
|
class Edge:
|
|
def __init__(self, source, destination):
|
|
self.source = source
|
|
self.destination = destination
|
|
|
|
def __eq__(self, other):
|
|
return (
|
|
isinstance(other, Edge)
|
|
and self.source == other.source
|
|
and self.destination == other.destination
|
|
)
|
|
|
|
def __hash__(self):
|
|
return hash((self.source, self.destination))
|
|
|
|
def __repr__(self):
|
|
return f'Edge({self.source}, {self.destination})'
|
|
|
|
class Node:
|
|
def __init__(self, node, rank: float):
|
|
self.node = node
|
|
self.rank = rank
|
|
|
|
def with_rank(self, new_rank: float):
|
|
return Node(self.node, new_rank)
|
|
|
|
def __eq__(self, other):
|
|
"""
|
|
Equality only compares the `Node`, not the rank value
|
|
"""
|
|
return (
|
|
isinstance(other, Node)
|
|
and self.node == other.node
|
|
)
|
|
|
|
def __hash__(self):
|
|
"""
|
|
Equality only hashes the `Node`, not the rank value
|
|
"""
|
|
return hash((self.node))
|
|
|
|
def __repr__(self):
|
|
return f'Node({self.node}, {self.rank})'
|
|
|
|
def build_graph(edges: Iterator[Edge], initial_rank: float = 0.25) -> nx.DiGraph:
|
|
"""
|
|
Create a graph from an iterator over `Edge`s
|
|
"""
|
|
graph = nx.DiGraph()
|
|
for edge in edges:
|
|
source = Node(edge.source, initial_rank)
|
|
destination = Node(edge.destination, initial_rank)
|
|
graph.add_node(source)
|
|
graph.add_node(destination)
|
|
graph.add_edge(source, destination)
|
|
|
|
return graph
|
|
|
|
def parse_edge(line: str) -> Edge:
|
|
"""
|
|
Parse a line into an edge. The line is expected to be of the format source_ip:source_port->destination_ip:destination_port,
|
|
e.g. `1.1.1.1:1337->2.2.2.2:1337`
|
|
|
|
:param line: (str) the line
|
|
:return: the parsed edge
|
|
"""
|
|
# remove spaces and potential leading semicolon. then split at ->
|
|
split = line.strip().strip(';').split("->")
|
|
return Edge(split[0].strip(), split[1].strip())
|
|
|
|
def file_loader(path: str) -> Iterator[Edge]:
|
|
"""
|
|
Load a list of edges from a file.
|
|
|
|
:param path: (str) filepath
|
|
:return: iterator over the edges
|
|
"""
|
|
with open(path, "r") as file:
|
|
return map(
|
|
parse_edge,
|
|
# filter only edges
|
|
filter(
|
|
lambda line: '->' in line,
|
|
file.readlines()
|
|
)
|
|
)
|
|
|
|
def page_ranker(
|
|
graph: nx.DiGraph, node: Node, dampingFactor: float = 1.0
|
|
) -> float:
|
|
return (
|
|
reduce(
|
|
lambda x, y: x + y,
|
|
map(
|
|
lambda pred: pred.rank / len(list(graph.successors(pred))),
|
|
graph.predecessors(node),
|
|
),
|
|
0.0,
|
|
)
|
|
* dampingFactor
|
|
+ (1 - dampingFactor) / graph.number_of_nodes()
|
|
)
|
|
|
|
def page_rank(graph: nx.DiGraph, dampingFactor: float = 1.0) -> nx.DiGraph:
|
|
return rank(graph, lambda g, node: page_ranker(g, node, dampingFactor))
|
|
|
|
def sensor_rank(graph: nx.DiGraph) -> nx.DiGraph:
|
|
number_of_nodes = graph.number_of_nodes()
|
|
return rank(
|
|
graph,
|
|
lambda g, node: (node.rank / len(list(g.successors(node)))) * (len(list(g.predecessors(node))) / number_of_nodes),
|
|
)
|
|
|
|
|
|
def rank(
|
|
graph: nx.DiGraph, rank_fn: Callable[[nx.DiGraph, Node], float]
|
|
) -> nx.DiGraph:
|
|
ranked_graph = nx.DiGraph()
|
|
# number_of_nodes = graph.number_of_nodes()
|
|
for node in graph.nodes:
|
|
rank = rank_fn(graph, node)
|
|
|
|
ranked_graph.add_node(node.with_rank(rank))
|
|
|
|
for u, v in graph.edges:
|
|
ranked_graph.add_edge(u, v)
|
|
|
|
return ranked_graph
|
|
|
|
def main():
|
|
arg = sys.argv
|
|
if len(arg) != 2:
|
|
print(f'Usage: {arg[0]} graph.dot')
|
|
sys.exit(1)
|
|
|
|
edges = file_loader(arg[1])
|
|
graph = build_graph(edges)
|
|
page_ranked = page_rank(graph)
|
|
print("PageRank: ")
|
|
for node in page_ranked:
|
|
print(node)
|
|
|
|
print()
|
|
sensor_ranked = sensor_rank(page_ranked)
|
|
print("SensorRank: ")
|
|
for node in sensor_ranked:
|
|
print(node)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|