#!/usr/bin/env python3 # TODO: # - in execution iterate botnet -> timeslot -> algorithm, so the graph loaded from the db for the timeslot can be reused between algorithms (is lru_cache enough?) from collections import defaultdict from datetime import datetime, timedelta from functools import reduce, lru_cache from math import ceil from typing import Callable, Iterable, Set, List, Union, Any import logging import networkx as nx import os # import psycopg2 as psy # import psycopg2.extras as psye # import schedule import time # TODO: load only from environment db_host = os.environ.get("BMS_DB_HOST", "localhost") db_name = os.environ.get("BMS_DB_NAME", "postgres") db_user = os.environ.get("BMS_DB_USER", "server") db_pass = os.environ.get("BMS_DB_PASS", "peu3Aiv1Oobie5eicho1oD8bdee1ooR1") # TODO: how far back HOURS_BACK: int = int( os.environ.get("BMS_RANKING_HOURS_BACK", str(5 * 24)) ) # default to 5 days TOP_PERCENT: float = float( os.environ.get("BMS_RANKING_TOP_PERCENT", "0.1") ) # default to 10% THRESHOLD: int = int( os.environ.get("BMS_RANKING_THRESHOLD", 100) ) # default to top 100 elements """ dict: botnet -> algo -> last_check """ LAST_CHECKED = defaultdict(lambda: {}) # range_from=datetime(2021, 4, 21, hour=0) # range_to=datetime(2021, 4, 21, hour=0, minute=5) def create_connection() -> Any: sleep_sec = 1 conn = None while conn is None: try: conn = psy.connect( host=db_host, database=db_name, user=db_user, password=db_pass ) except psy.Error as e: logging.warning("waiting for connection", e) time.sleep(sleep_sec) sleep_sec = min(2 * sleep_sec, 300) # Double Sleep time up to 5 minutes return conn class Edge: """ Representation of a graph edge as loaded from the database or a file """ def __init__(self, source, destination): self.source = source self.destination = destination def __eq__(self, other): return ( isinstance(other, Edge) and self.source == other.source and self.destination == other.destination ) def __hash__(self): return hash((self.source, self.destination)) def __repr__(self): return "Edge(%s, %s)" % (self.source, self.destination) class Node: """ Representation of a graph node, consisting of an IP address and a port """ def __init__(self, ip: str, port: int): self.ip = ip self.port = port def __eq__(self, other): return ( isinstance(other, Node) and self.ip == other.ip and self.port == other.port ) def __hash__(self): return hash((self.ip, self.port)) def __repr__(self): return "Node(%s, %s)" % (self.ip, self.port) class RankedNode: """ Composes a `Node` object with a rank value """ def __init__(self, node: Node, rank: float): self.node = node self.rank = rank def with_rank(self, new_rank: float): """ Create a copy of this node with a new rank value """ return RankedNode(self.node, new_rank) def __eq__(self, other): """ Equality only compares the `Node`, not the rank value """ return isinstance(other, RankedNode) and self.node == other.node def __hash__(self): """ Equality only hashes the `Node`, not the rank value """ return hash((self.node)) def __repr__(self): return "RankedNode(%s, %s)" % (self.node, self.rank) query = """ SELECT DISTINCT be.source_ip, be.source_port, be.destination_ip, be.destination_port FROM bms.bot_edges be JOIN bms.sessions se ON be.session_id = se.id WHERE be.time_seen BETWEEN %s AND %s AND se.botnet_id = %s -- only count bots for which there are recorded replies AND EXISTS ( SELECT 1 FROM bms.bot_replies br WHERE br.session_id = be.session_id AND br.ip = be.destination_ip AND br.port = be.destination_port AND br.time_seen BETWEEN %s AND %s ) ; """ # cache the last 512 results of this function so in the execution loop, each slot for each graph gets only loaded once @lru_cache(maxsize=512) def database_loader( conn: Any, range_from: datetime, range_until: datetime, botnet_id: int ) -> Iterable[Edge]: """ Load the edges between `range_from` and `range_until` for `botnet_id` """ with conn.cursor() as cursor: cursor.execute( query, (range_from, range_until, botnet_id, range_from, range_until) ) return map( lambda row: Edge(Node(row[0], row[1]), Node(row[2], row[3])), cursor.fetchall(), ) def parse_edge(line: str) -> Edge: """ Parse a line into an edge. The line is expected to be of the format source_ip:source_port->destination_ip:destination_port, e.g. `1.1.1.1:1337->2.2.2.2:1337` :param line: (str) the line :return: the parsed edge """ split = line.split("->") source = split[0].split(":") destination = split[1].split(":") return Edge( Node(source[0].strip(), int(source[1].strip())), Node(destination[0].strip(), int(destination[1].strip())), ) def parse_csv(line: str, source_ip_index: int = 0, source_port_index: int = 1, dest_ip_index: int = 2, dest_port_index: int = 3, delim: str = ',') -> Edge: split = line.split(delim) return Edge( Node(split[source_ip_index].strip(), int(split[source_port_index].strip())), Node(split[dest_ip_index].strip(), int(split[dest_port_index].strip())), ) def csv_loader(path: str) -> Iterable[Edge]: with open(path, 'r') as file: return map(parse_csv, file.readlines()) def file_loader(path: str) -> Iterable[Edge]: """ Load a list of edges from a file. :param path: (str) filepath :return: iterator over the edges """ with open(path, "r") as file: return map(parse_edge, file.readlines()) def build_graph(edges: Iterable[Edge], initial_rank: float = 0.25) -> nx.DiGraph: """ Create a graph from an iterator over `Edge`s """ graph = nx.DiGraph() for edge in edges: source = RankedNode(edge.source, initial_rank) destination = RankedNode(edge.destination, initial_rank) graph.add_node(source) graph.add_node(destination) graph.add_edge(source, destination) return graph def page_ranker( graph: nx.DiGraph, node: RankedNode, dampingFactor: float = 1.0 ) -> float: return ( reduce( lambda x, y: x + y, map( lambda pred: pred.rank / len(list(graph.successors(pred))), graph.predecessors(node), ), 0.0, ) * dampingFactor + (1 - dampingFactor) / graph.number_of_nodes() ) def page_rank(graph: nx.DiGraph, dampingFactor: float = 1.0) -> nx.DiGraph: return rank(graph, lambda g, node: page_ranker(g, node, dampingFactor)) def sensor_rank(graph: nx.DiGraph) -> nx.DiGraph: number_of_nodes = graph.number_of_nodes() return rank( graph, lambda g, node: node.rank * (len(list(g.predecessors(node))) / number_of_nodes), ) def rank( graph: nx.DiGraph, rank_fn: Callable[[nx.DiGraph, RankedNode], float] ) -> nx.DiGraph: ranked_graph = nx.DiGraph() # number_of_nodes = graph.number_of_nodes() for node in graph.nodes: rank = rank_fn(graph, node) ranked_graph.add_node(node.with_rank(rank)) for u, v in graph.edges: ranked_graph.add_edge(u, v) return ranked_graph def weakly_connected(graph: nx.DiGraph) -> Set[RankedNode]: return set( [ node for component in nx.strongly_connected_components(graph) if len(component) == 1 for node in component ] ) def weakly_connected_exclusion(graph: nx.DiGraph) -> Set[RankedNode]: largest = max(nx.strongly_connected_components(graph), key=len) sensors = set() for node in graph.nodes: if node not in largest: sensors.add(node) return sensors def list_botnets(conn: Any) -> List[int]: with conn.cursor() as cursor: cursor.execute("SELECT DISTINCT id FROM bms.botnets") return [identifier[0] for identifier in cursor.fetchall()] def last_ranking_for_botnet( conn: Any, identifier: int, algo: str ) -> Union[datetime, None]: with conn.cursor() as cursor: cursor.execute( "SELECT MAX(range_until) FROM bms.detected_sensors WHERE botnet_id = %s AND ranking_algorithm = %s", (str(identifier), algo), ) return cursor.fetchone()[0] """ dict algo_name => (ranking_fn, sort_reverse) """ ALGOS = { "page_rank": (page_rank, False), "sensor_rank": (sensor_rank, True), "weakly_connected": (weakly_connected, False), } def execute(): with create_connection() as conn: logging.info("starting execution") botnets = load_execution_data(conn) # for each botnet for botnet_id, algos in botnets.items(): # for each algorithm for algo, (range_from, range_until, reverse_sort) in algos.items(): # for each timeslot from the last one until now while datetime.now().replace(tzinfo=range_until.tzinfo) > range_until: logging.info( "calculating %s for botnet %s between %s and %s" % (algo, botnet_id, range_from, range_until) ) rank_and_insert( conn, botnet_id, range_from, range_until, ALGOS[algo][0], algo, reverse_sort, ) LAST_CHECKED[botnet_id][algo] = range_until # increment the time range range_from = range_until range_until = range_until + timedelta(hours=1) def rank_and_insert( conn: Any, botnet_id: int, range_from: datetime, range_until: datetime, ranker: Callable[[nx.DiGraph], nx.DiGraph], ranker_name: str, reverse_sort: bool, ): g = build_graph(database_loader(conn, range_from, range_until, botnet_id)) logging.info("loaded graph of size %s" % len(g)) ranked = ranker(g) # only insert top `TOP_PERCENT` into the database limits = [0] ranked = sorted(ranked, key=lambda v: v.rank, reverse=reverse_sort) if TOP_PERCENT is not None: limits.append(ceil(len(ranked) * TOP_PERCENT)) if THRESHOLD is not None: limits.append(THRESHOLD) num_elements = max(limits) ranked = ranked[:num_elements] logging.info("ranked and will insert %s nodes" % len(ranked)) params = [ ( botnet_id, range_from, range_until, node.node.ip, node.node.port, ranker_name, node.rank, ) for node in ranked ] logging.debug("built %s params" % len(params)) with conn.cursor() as cursor: psye.execute_batch( cursor, """ INSERT INTO bms.detected_sensors (botnet_id, range_from, range_until, bot_ip, bot_port, ranking_algorithm, ranking) VALUES (%s, %s, %s, %s, %s, %s, %s) """, params, ) conn.commit() def load_execution_data(conn): """ Load a dict `botnet_id -> ranking algorithm -> end of last evaluated timeframe` """ first_timeslot = datetime.now().replace( minute=0, second=0, microsecond=0 ) - timedelta(hours=HOURS_BACK) botnets = list_botnets(conn) last = {} for identifier in botnets: last[identifier] = {} for algo in ALGOS.keys(): last_timeslot = last_ranking_for_botnet(conn, identifier, algo) last_from_local_state = LAST_CHECKED[identifier].get(algo) # this will include the end of the last timeframe from the database, # the end of the last timeframe from the local state and # the earliest allowed timeslot: now - BMS_RANKING_HOURS_BACK limits = [] if last_timeslot is None: limits.append(first_timeslot) else: # normalize timezone limits.append(first_timeslot.replace(tzinfo=last_timeslot.tzinfo)) limits.append(last_timeslot) if last_from_local_state is not None: limits.append(last_from_local_state) # pick the maximum from the possible limits limit = max(limits) LAST_CHECKED[identifier][algo] = limit range_until = limit + timedelta(hours=1) if range_until < datetime.now().replace(tzinfo=range_until.tzinfo): last[identifier][algo] = (limit, range_until, ALGOS[algo][1]) return last # known = [Node('195.37.209.28', 4488), Node('34.204.196.211', 9796)] known = [Node('34.204.196.211', 9796)] def find_rank(graph): found = [] for node in graph: if node.node in known: found.append(node.rank) return max(found) import numpy as np def main(): initial_ranks = [0.25, 0.5, 0.75] iterations = 5 # e = file_loader("./e.txt") e = list(csv_loader('./dist-edges.csv')) for initial_rank in initial_ranks: g = build_graph(e, initial_rank=initial_rank) nodes = len(g) print(f'Initial Rank: {initial_rank}') # g = sensor_rank(page_rank(page_rank(page_rank(page_rank(g))))) # s = list(map(lambda n: n.node, sorted(g, key=lambda n: n.rank, reverse=True))) # for kno in known: # print(f'{kno}: {s.index(kno)}/{len(s)}') # # for n in s[:20]: # # print(n) # exit(1) for iteration in range(iterations): g = page_rank(g) avg_pr = reduce(lambda acc, n: acc + n.rank, g, 0.) / nodes # max_pr = max(g, key = lambda n : n.rank).rank max_pr = find_rank(g) perc = np.percentile(np.array(sorted(map(lambda n: n.rank, g))), [80, 90, 95]) print(f'InitialRank: {initial_rank}, Iteration: {iteration}, Percentiles [80, 90, 95]: {perc}') # print(f'InitialRank: {initial_rank}, Iteration: {iteration}, Avg. PR: {avg_pr}') # print(f'InitialRank: {initial_rank}, Iteration: {iteration}, max PR: {max_pr}') sr = sensor_rank(g) avg_sr = reduce(lambda acc, n: acc + n.rank, sr, 0.) / nodes # max_sr = max(sr, key = lambda n : n.rank).rank max_sr = find_rank(sr) perc = np.percentile(np.array(sorted(map(lambda n: n.rank, sr))), [80, 90, 95]) print(f'InitialRank: {initial_rank}, Iteration: {iteration}, Percentiles [80, 90, 95]: {perc}') # print(f'InitialRank: {initial_rank}, Iteration: {iteration}, Avg. SR: {avg_sr}') # print(f'InitialRank: {initial_rank}, Iteration: {iteration}, max SR: {max_sr}') print(f'{iteration+1} & {avg_pr:.8f} & {max_pr:.8f} & {avg_sr:.8f} & {max_sr:.8f} \\\\') # with open(f'./{initial_rank}_{iteration+1}_pr.txt', 'w') as f: # for n in g: # f.write(f'{n.rank}' + '\n') # with open(f'./{initial_rank}_{iteration+1}_sr.txt', 'w') as f: # for n in sr: # f.write(f'{n.rank}' + '\n') # ranked = page_rank(g) # for node in ranked.nodes: # print(node) # logging.basicConfig( # format="%(asctime)s: %(levelname)s - %(message)s", level=logging.INFO # ) # # schedule: do execute() every hour at `:01` minutes # schedule.every().hour.at(":01").do(execute) # # Run once on startup # schedule.run_all() # while True: # n = schedule.idle_seconds() # if n is None: # break # elif n > 0: # logging.debug("sleeping for %s seconds" % n) # time.sleep(n) # schedule.run_pending() if __name__ == "__main__": main()