masterthesis/scripts/node-ranking/node_ranking.py
2022-03-08 20:15:15 +01:00

160 lines
4.1 KiB
Python
Executable File

#!/usr/bin/env python3
from functools import reduce
from typing import Callable, Iterator
import sys
import networkx as nx
class Edge:
def __init__(self, source, destination):
self.source = source
self.destination = destination
def __eq__(self, other):
return (
isinstance(other, Edge)
and self.source == other.source
and self.destination == other.destination
)
def __hash__(self):
return hash((self.source, self.destination))
def __repr__(self):
return f'Edge({self.source}, {self.destination})'
class Node:
def __init__(self, node, rank: float):
self.node = node
self.rank = rank
def with_rank(self, new_rank: float):
return Node(self.node, new_rank)
def __eq__(self, other):
"""
Equality only compares the `Node`, not the rank value
"""
return (
isinstance(other, Node)
and self.node == other.node
)
def __hash__(self):
"""
Equality only hashes the `Node`, not the rank value
"""
return hash((self.node))
def __repr__(self):
return f'Node({self.node}, {self.rank})'
def build_graph(edges: Iterator[Edge], initial_rank: float = 0.25) -> nx.DiGraph:
"""
Create a graph from an iterator over `Edge`s
"""
graph = nx.DiGraph()
for edge in edges:
source = Node(edge.source, initial_rank)
destination = Node(edge.destination, initial_rank)
graph.add_node(source)
graph.add_node(destination)
graph.add_edge(source, destination)
return graph
def parse_edge(line: str) -> Edge:
"""
Parse a line into an edge. The line is expected to be of the format source_ip:source_port->destination_ip:destination_port,
e.g. `1.1.1.1:1337->2.2.2.2:1337`
:param line: (str) the line
:return: the parsed edge
"""
# remove spaces and potential leading semicolon. then split at ->
split = line.strip().strip(';').split("->")
return Edge(split[0].strip(), split[1].strip())
def file_loader(path: str) -> Iterator[Edge]:
"""
Load a list of edges from a file.
:param path: (str) filepath
:return: iterator over the edges
"""
with open(path, "r") as file:
return map(
parse_edge,
# filter only edges
filter(
lambda line: '->' in line,
file.readlines()
)
)
def page_ranker(
graph: nx.DiGraph, node: Node, dampingFactor: float = 1.0
) -> float:
return (
reduce(
lambda x, y: x + y,
map(
lambda pred: pred.rank / len(list(graph.successors(pred))),
graph.predecessors(node),
),
0.0,
)
* dampingFactor
+ (1 - dampingFactor) / graph.number_of_nodes()
)
def page_rank(graph: nx.DiGraph, dampingFactor: float = 1.0) -> nx.DiGraph:
return rank(graph, lambda g, node: page_ranker(g, node, dampingFactor))
def sensor_rank(graph: nx.DiGraph) -> nx.DiGraph:
number_of_nodes = graph.number_of_nodes()
return rank(
graph,
lambda g, node: (node.rank / len(list(g.successors(node)))) * (len(list(g.predecessors(node))) / number_of_nodes),
)
def rank(
graph: nx.DiGraph, rank_fn: Callable[[nx.DiGraph, Node], float]
) -> nx.DiGraph:
ranked_graph = nx.DiGraph()
# number_of_nodes = graph.number_of_nodes()
for node in graph.nodes:
rank = rank_fn(graph, node)
ranked_graph.add_node(node.with_rank(rank))
for u, v in graph.edges:
ranked_graph.add_edge(u, v)
return ranked_graph
def main():
arg = sys.argv
if len(arg) != 2:
print(f'Usage: {arg[0]} graph.dot')
sys.exit(1)
edges = file_loader(arg[1])
graph = build_graph(edges)
page_ranked = page_rank(graph)
print("PageRank: ")
for node in page_ranked:
print(node)
print()
sensor_ranked = sensor_rank(page_ranked)
print("SensorRank: ")
for node in sensor_ranked:
print(node)
if __name__ == '__main__':
main()