masterthesis/codes/node-ranking/node_ranking.py

529 lines
16 KiB
Python
Raw Normal View History

2022-04-05 10:23:45 +02:00
#!/usr/bin/env python3
# TODO:
# - in execution iterate botnet -> timeslot -> algorithm, so the graph loaded from the db for the timeslot can be reused between algorithms (is lru_cache enough?)
from collections import defaultdict
from datetime import datetime, timedelta
from functools import reduce, lru_cache
from math import ceil
from typing import Callable, Iterable, Set, List, Union, Any
import logging
import networkx as nx
import os
# import psycopg2 as psy
# import psycopg2.extras as psye
# import schedule
import time
# TODO: load only from environment
db_host = os.environ.get("BMS_DB_HOST", "localhost")
db_name = os.environ.get("BMS_DB_NAME", "postgres")
db_user = os.environ.get("BMS_DB_USER", "server")
db_pass = os.environ.get("BMS_DB_PASS", "peu3Aiv1Oobie5eicho1oD8bdee1ooR1")
# TODO: how far back
HOURS_BACK: int = int(
os.environ.get("BMS_RANKING_HOURS_BACK", str(5 * 24))
) # default to 5 days
TOP_PERCENT: float = float(
os.environ.get("BMS_RANKING_TOP_PERCENT", "0.1")
) # default to 10%
THRESHOLD: int = int(
os.environ.get("BMS_RANKING_THRESHOLD", 100)
) # default to top 100 elements
"""
dict: botnet -> algo -> last_check
"""
LAST_CHECKED = defaultdict(lambda: {})
# range_from=datetime(2021, 4, 21, hour=0)
# range_to=datetime(2021, 4, 21, hour=0, minute=5)
def create_connection() -> Any:
sleep_sec = 1
conn = None
while conn is None:
try:
conn = psy.connect(
host=db_host, database=db_name, user=db_user, password=db_pass
)
except psy.Error as e:
logging.warning("waiting for connection", e)
time.sleep(sleep_sec)
sleep_sec = min(2 * sleep_sec, 300) # Double Sleep time up to 5 minutes
return conn
class Edge:
"""
Representation of a graph edge as loaded from the database or a file
"""
def __init__(self, source, destination):
self.source = source
self.destination = destination
def __eq__(self, other):
return (
isinstance(other, Edge)
and self.source == other.source
and self.destination == other.destination
)
def __hash__(self):
return hash((self.source, self.destination))
def __repr__(self):
return "Edge(%s, %s)" % (self.source, self.destination)
class Node:
"""
Representation of a graph node, consisting of an IP address and a port
"""
def __init__(self, ip: str, port: int):
self.ip = ip
self.port = port
def __eq__(self, other):
return (
isinstance(other, Node) and self.ip == other.ip and self.port == other.port
)
def __hash__(self):
return hash((self.ip, self.port))
def __repr__(self):
return "Node(%s, %s)" % (self.ip, self.port)
class RankedNode:
"""
Composes a `Node` object with a rank value
"""
def __init__(self, node: Node, rank: float):
self.node = node
self.rank = rank
def with_rank(self, new_rank: float):
"""
Create a copy of this node with a new rank value
"""
return RankedNode(self.node, new_rank)
def __eq__(self, other):
"""
Equality only compares the `Node`, not the rank value
"""
return isinstance(other, RankedNode) and self.node == other.node
def __hash__(self):
"""
Equality only hashes the `Node`, not the rank value
"""
return hash((self.node))
def __repr__(self):
return "RankedNode(%s, %s)" % (self.node, self.rank)
query = """
SELECT DISTINCT be.source_ip, be.source_port, be.destination_ip, be.destination_port
FROM bms.bot_edges be
JOIN bms.sessions se
ON be.session_id = se.id
WHERE be.time_seen BETWEEN %s AND %s
AND se.botnet_id = %s
-- only count bots for which there are recorded replies
AND EXISTS (
SELECT 1
FROM bms.bot_replies br
WHERE br.session_id = be.session_id
AND br.ip = be.destination_ip
AND br.port = be.destination_port
AND br.time_seen BETWEEN %s AND %s
)
;
"""
# cache the last 512 results of this function so in the execution loop, each slot for each graph gets only loaded once
@lru_cache(maxsize=512)
def database_loader(
conn: Any, range_from: datetime, range_until: datetime, botnet_id: int
) -> Iterable[Edge]:
"""
Load the edges between `range_from` and `range_until` for `botnet_id`
"""
with conn.cursor() as cursor:
cursor.execute(
query, (range_from, range_until, botnet_id, range_from, range_until)
)
return map(
lambda row: Edge(Node(row[0], row[1]), Node(row[2], row[3])),
cursor.fetchall(),
)
def parse_edge(line: str) -> Edge:
"""
Parse a line into an edge. The line is expected to be of the format source_ip:source_port->destination_ip:destination_port,
e.g. `1.1.1.1:1337->2.2.2.2:1337`
:param line: (str) the line
:return: the parsed edge
"""
split = line.split("->")
source = split[0].split(":")
destination = split[1].split(":")
return Edge(
Node(source[0].strip(), int(source[1].strip())),
Node(destination[0].strip(), int(destination[1].strip())),
)
def parse_csv(line: str, source_ip_index: int = 0, source_port_index: int = 1, dest_ip_index: int = 2, dest_port_index: int = 3, delim: str = ',') -> Edge:
split = line.split(delim)
return Edge(
Node(split[source_ip_index].strip(), int(split[source_port_index].strip())),
Node(split[dest_ip_index].strip(), int(split[dest_port_index].strip())),
)
def csv_loader(path: str) -> Iterable[Edge]:
with open(path, 'r') as file:
return map(parse_csv, file.readlines())
def file_loader(path: str) -> Iterable[Edge]:
"""
Load a list of edges from a file.
:param path: (str) filepath
:return: iterator over the edges
"""
with open(path, "r") as file:
return map(parse_edge, file.readlines())
def build_graph(edges: Iterable[Edge], initial_rank: float = 0.25) -> nx.DiGraph:
"""
Create a graph from an iterator over `Edge`s
"""
graph = nx.DiGraph()
for edge in edges:
source = RankedNode(edge.source, initial_rank)
destination = RankedNode(edge.destination, initial_rank)
graph.add_node(source)
graph.add_node(destination)
graph.add_edge(source, destination)
return graph
def page_ranker(
graph: nx.DiGraph, node: RankedNode, dampingFactor: float = 1.0
) -> float:
return (
reduce(
lambda x, y: x + y,
map(
lambda pred: pred.rank / len(list(graph.successors(pred))),
graph.predecessors(node),
),
0.0,
)
* dampingFactor
+ (1 - dampingFactor) / graph.number_of_nodes()
)
def page_rank(graph: nx.DiGraph, dampingFactor: float = 1.0) -> nx.DiGraph:
return rank(graph, lambda g, node: page_ranker(g, node, dampingFactor))
def sensor_rank(graph: nx.DiGraph) -> nx.DiGraph:
number_of_nodes = graph.number_of_nodes()
return rank(
graph,
lambda g, node: node.rank * (len(list(g.predecessors(node))) / number_of_nodes),
)
def rank(
graph: nx.DiGraph, rank_fn: Callable[[nx.DiGraph, RankedNode], float]
) -> nx.DiGraph:
ranked_graph = nx.DiGraph()
# number_of_nodes = graph.number_of_nodes()
for node in graph.nodes:
rank = rank_fn(graph, node)
ranked_graph.add_node(node.with_rank(rank))
for u, v in graph.edges:
ranked_graph.add_edge(u, v)
return ranked_graph
def weakly_connected(graph: nx.DiGraph) -> Set[RankedNode]:
return set(
[
node
for component in nx.strongly_connected_components(graph)
if len(component) == 1
for node in component
]
)
def weakly_connected_exclusion(graph: nx.DiGraph) -> Set[RankedNode]:
largest = max(nx.strongly_connected_components(graph), key=len)
sensors = set()
for node in graph.nodes:
if node not in largest:
sensors.add(node)
return sensors
def list_botnets(conn: Any) -> List[int]:
with conn.cursor() as cursor:
cursor.execute("SELECT DISTINCT id FROM bms.botnets")
return [identifier[0] for identifier in cursor.fetchall()]
def last_ranking_for_botnet(
conn: Any, identifier: int, algo: str
) -> Union[datetime, None]:
with conn.cursor() as cursor:
cursor.execute(
"SELECT MAX(range_until) FROM bms.detected_sensors WHERE botnet_id = %s AND ranking_algorithm = %s",
(str(identifier), algo),
)
return cursor.fetchone()[0]
"""
dict algo_name => (ranking_fn, sort_reverse)
"""
ALGOS = {
"page_rank": (page_rank, False),
"sensor_rank": (sensor_rank, True),
"weakly_connected": (weakly_connected, False),
}
def execute():
with create_connection() as conn:
logging.info("starting execution")
botnets = load_execution_data(conn)
# for each botnet
for botnet_id, algos in botnets.items():
# for each algorithm
for algo, (range_from, range_until, reverse_sort) in algos.items():
# for each timeslot from the last one until now
while datetime.now().replace(tzinfo=range_until.tzinfo) > range_until:
logging.info(
"calculating %s for botnet %s between %s and %s"
% (algo, botnet_id, range_from, range_until)
)
rank_and_insert(
conn,
botnet_id,
range_from,
range_until,
ALGOS[algo][0],
algo,
reverse_sort,
)
LAST_CHECKED[botnet_id][algo] = range_until
# increment the time range
range_from = range_until
range_until = range_until + timedelta(hours=1)
def rank_and_insert(
conn: Any,
botnet_id: int,
range_from: datetime,
range_until: datetime,
ranker: Callable[[nx.DiGraph], nx.DiGraph],
ranker_name: str,
reverse_sort: bool,
):
g = build_graph(database_loader(conn, range_from, range_until, botnet_id))
logging.info("loaded graph of size %s" % len(g))
ranked = ranker(g)
# only insert top `TOP_PERCENT` into the database
limits = [0]
ranked = sorted(ranked, key=lambda v: v.rank, reverse=reverse_sort)
if TOP_PERCENT is not None:
limits.append(ceil(len(ranked) * TOP_PERCENT))
if THRESHOLD is not None:
limits.append(THRESHOLD)
num_elements = max(limits)
ranked = ranked[:num_elements]
logging.info("ranked and will insert %s nodes" % len(ranked))
params = [
(
botnet_id,
range_from,
range_until,
node.node.ip,
node.node.port,
ranker_name,
node.rank,
)
for node in ranked
]
logging.debug("built %s params" % len(params))
with conn.cursor() as cursor:
psye.execute_batch(
cursor,
"""
INSERT INTO bms.detected_sensors (botnet_id, range_from, range_until, bot_ip, bot_port, ranking_algorithm, ranking)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
params,
)
conn.commit()
def load_execution_data(conn):
"""
Load a dict `botnet_id -> ranking algorithm -> end of last evaluated timeframe`
"""
first_timeslot = datetime.now().replace(
minute=0, second=0, microsecond=0
) - timedelta(hours=HOURS_BACK)
botnets = list_botnets(conn)
last = {}
for identifier in botnets:
last[identifier] = {}
for algo in ALGOS.keys():
last_timeslot = last_ranking_for_botnet(conn, identifier, algo)
last_from_local_state = LAST_CHECKED[identifier].get(algo)
# this will include the end of the last timeframe from the database,
# the end of the last timeframe from the local state and
# the earliest allowed timeslot: now - BMS_RANKING_HOURS_BACK
limits = []
if last_timeslot is None:
limits.append(first_timeslot)
else:
# normalize timezone
limits.append(first_timeslot.replace(tzinfo=last_timeslot.tzinfo))
limits.append(last_timeslot)
if last_from_local_state is not None:
limits.append(last_from_local_state)
# pick the maximum from the possible limits
limit = max(limits)
LAST_CHECKED[identifier][algo] = limit
range_until = limit + timedelta(hours=1)
if range_until < datetime.now().replace(tzinfo=range_until.tzinfo):
last[identifier][algo] = (limit, range_until, ALGOS[algo][1])
return last
# known = [Node('195.37.209.28', 4488), Node('34.204.196.211', 9796)]
known = [Node('34.204.196.211', 9796)]
def find_rank(graph):
found = []
for node in graph:
if node.node in known:
found.append(node.rank)
return max(found)
import numpy as np
def main():
initial_ranks = [0.25, 0.5, 0.75]
iterations = 5
# e = file_loader("./e.txt")
e = list(csv_loader('./dist-edges.csv'))
for initial_rank in initial_ranks:
g = build_graph(e, initial_rank=initial_rank)
nodes = len(g)
print(f'Initial Rank: {initial_rank}')
# g = sensor_rank(page_rank(page_rank(page_rank(page_rank(g)))))
# s = list(map(lambda n: n.node, sorted(g, key=lambda n: n.rank, reverse=True)))
# for kno in known:
# print(f'{kno}: {s.index(kno)}/{len(s)}')
# # for n in s[:20]:
# # print(n)
# exit(1)
for iteration in range(iterations):
g = page_rank(g)
avg_pr = reduce(lambda acc, n: acc + n.rank, g, 0.) / nodes
# max_pr = max(g, key = lambda n : n.rank).rank
max_pr = find_rank(g)
perc = np.percentile(np.array(sorted(map(lambda n: n.rank, g))), [80, 90, 95])
print(f'InitialRank: {initial_rank}, Iteration: {iteration}, Percentiles [80, 90, 95]: {perc}')
# print(f'InitialRank: {initial_rank}, Iteration: {iteration}, Avg. PR: {avg_pr}')
# print(f'InitialRank: {initial_rank}, Iteration: {iteration}, max PR: {max_pr}')
sr = sensor_rank(g)
avg_sr = reduce(lambda acc, n: acc + n.rank, sr, 0.) / nodes
# max_sr = max(sr, key = lambda n : n.rank).rank
max_sr = find_rank(sr)
perc = np.percentile(np.array(sorted(map(lambda n: n.rank, sr))), [80, 90, 95])
print(f'InitialRank: {initial_rank}, Iteration: {iteration}, Percentiles [80, 90, 95]: {perc}')
# print(f'InitialRank: {initial_rank}, Iteration: {iteration}, Avg. SR: {avg_sr}')
# print(f'InitialRank: {initial_rank}, Iteration: {iteration}, max SR: {max_sr}')
print(f'{iteration+1} & {avg_pr:.8f} & {max_pr:.8f} & {avg_sr:.8f} & {max_sr:.8f} \\\\')
# with open(f'./{initial_rank}_{iteration+1}_pr.txt', 'w') as f:
# for n in g:
# f.write(f'{n.rank}' + '\n')
# with open(f'./{initial_rank}_{iteration+1}_sr.txt', 'w') as f:
# for n in sr:
# f.write(f'{n.rank}' + '\n')
# ranked = page_rank(g)
# for node in ranked.nodes:
# print(node)
# logging.basicConfig(
# format="%(asctime)s: %(levelname)s - %(message)s", level=logging.INFO
# )
# # schedule: do execute() every hour at `:01` minutes
# schedule.every().hour.at(":01").do(execute)
# # Run once on startup
# schedule.run_all()
# while True:
# n = schedule.idle_seconds()
# if n is None:
# break
# elif n > 0:
# logging.debug("sleeping for %s seconds" % n)
# time.sleep(n)
# schedule.run_pending()
if __name__ == "__main__":
main()