529 lines
16 KiB
Python
Executable File
529 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# TODO:
|
|
# - in execution iterate botnet -> timeslot -> algorithm, so the graph loaded from the db for the timeslot can be reused between algorithms (is lru_cache enough?)
|
|
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
from functools import reduce, lru_cache
|
|
from math import ceil
|
|
from typing import Callable, Iterable, Set, List, Union, Any
|
|
|
|
import logging
|
|
import networkx as nx
|
|
import os
|
|
# import psycopg2 as psy
|
|
# import psycopg2.extras as psye
|
|
# import schedule
|
|
import time
|
|
|
|
# TODO: load only from environment
|
|
db_host = os.environ.get("BMS_DB_HOST", "localhost")
|
|
db_name = os.environ.get("BMS_DB_NAME", "postgres")
|
|
db_user = os.environ.get("BMS_DB_USER", "server")
|
|
db_pass = os.environ.get("BMS_DB_PASS", "peu3Aiv1Oobie5eicho1oD8bdee1ooR1")
|
|
|
|
# TODO: how far back
|
|
HOURS_BACK: int = int(
|
|
os.environ.get("BMS_RANKING_HOURS_BACK", str(5 * 24))
|
|
) # default to 5 days
|
|
TOP_PERCENT: float = float(
|
|
os.environ.get("BMS_RANKING_TOP_PERCENT", "0.1")
|
|
) # default to 10%
|
|
THRESHOLD: int = int(
|
|
os.environ.get("BMS_RANKING_THRESHOLD", 100)
|
|
) # default to top 100 elements
|
|
|
|
"""
|
|
dict: botnet -> algo -> last_check
|
|
"""
|
|
LAST_CHECKED = defaultdict(lambda: {})
|
|
|
|
# range_from=datetime(2021, 4, 21, hour=0)
|
|
# range_to=datetime(2021, 4, 21, hour=0, minute=5)
|
|
|
|
|
|
def create_connection() -> Any:
|
|
sleep_sec = 1
|
|
conn = None
|
|
while conn is None:
|
|
try:
|
|
conn = psy.connect(
|
|
host=db_host, database=db_name, user=db_user, password=db_pass
|
|
)
|
|
except psy.Error as e:
|
|
logging.warning("waiting for connection", e)
|
|
time.sleep(sleep_sec)
|
|
sleep_sec = min(2 * sleep_sec, 300) # Double Sleep time up to 5 minutes
|
|
|
|
return conn
|
|
|
|
|
|
class Edge:
|
|
"""
|
|
Representation of a graph edge as loaded from the database or a file
|
|
"""
|
|
|
|
def __init__(self, source, destination):
|
|
self.source = source
|
|
self.destination = destination
|
|
|
|
def __eq__(self, other):
|
|
return (
|
|
isinstance(other, Edge)
|
|
and self.source == other.source
|
|
and self.destination == other.destination
|
|
)
|
|
|
|
def __hash__(self):
|
|
return hash((self.source, self.destination))
|
|
|
|
def __repr__(self):
|
|
return "Edge(%s, %s)" % (self.source, self.destination)
|
|
|
|
|
|
class Node:
|
|
"""
|
|
Representation of a graph node, consisting of an IP address and a port
|
|
"""
|
|
|
|
def __init__(self, ip: str, port: int):
|
|
self.ip = ip
|
|
self.port = port
|
|
|
|
def __eq__(self, other):
|
|
return (
|
|
isinstance(other, Node) and self.ip == other.ip and self.port == other.port
|
|
)
|
|
|
|
def __hash__(self):
|
|
return hash((self.ip, self.port))
|
|
|
|
def __repr__(self):
|
|
return "Node(%s, %s)" % (self.ip, self.port)
|
|
|
|
|
|
class RankedNode:
|
|
"""
|
|
Composes a `Node` object with a rank value
|
|
"""
|
|
|
|
def __init__(self, node: Node, rank: float):
|
|
self.node = node
|
|
self.rank = rank
|
|
|
|
def with_rank(self, new_rank: float):
|
|
"""
|
|
Create a copy of this node with a new rank value
|
|
"""
|
|
return RankedNode(self.node, new_rank)
|
|
|
|
def __eq__(self, other):
|
|
"""
|
|
Equality only compares the `Node`, not the rank value
|
|
"""
|
|
return isinstance(other, RankedNode) and self.node == other.node
|
|
|
|
def __hash__(self):
|
|
"""
|
|
Equality only hashes the `Node`, not the rank value
|
|
"""
|
|
return hash((self.node))
|
|
|
|
def __repr__(self):
|
|
return "RankedNode(%s, %s)" % (self.node, self.rank)
|
|
|
|
|
|
query = """
|
|
SELECT DISTINCT be.source_ip, be.source_port, be.destination_ip, be.destination_port
|
|
FROM bms.bot_edges be
|
|
JOIN bms.sessions se
|
|
ON be.session_id = se.id
|
|
WHERE be.time_seen BETWEEN %s AND %s
|
|
AND se.botnet_id = %s
|
|
-- only count bots for which there are recorded replies
|
|
AND EXISTS (
|
|
SELECT 1
|
|
FROM bms.bot_replies br
|
|
WHERE br.session_id = be.session_id
|
|
AND br.ip = be.destination_ip
|
|
AND br.port = be.destination_port
|
|
AND br.time_seen BETWEEN %s AND %s
|
|
)
|
|
;
|
|
"""
|
|
|
|
|
|
# cache the last 512 results of this function so in the execution loop, each slot for each graph gets only loaded once
|
|
@lru_cache(maxsize=512)
|
|
def database_loader(
|
|
conn: Any, range_from: datetime, range_until: datetime, botnet_id: int
|
|
) -> Iterable[Edge]:
|
|
"""
|
|
Load the edges between `range_from` and `range_until` for `botnet_id`
|
|
"""
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(
|
|
query, (range_from, range_until, botnet_id, range_from, range_until)
|
|
)
|
|
return map(
|
|
lambda row: Edge(Node(row[0], row[1]), Node(row[2], row[3])),
|
|
cursor.fetchall(),
|
|
)
|
|
|
|
|
|
def parse_edge(line: str) -> Edge:
|
|
"""
|
|
Parse a line into an edge. The line is expected to be of the format source_ip:source_port->destination_ip:destination_port,
|
|
e.g. `1.1.1.1:1337->2.2.2.2:1337`
|
|
|
|
:param line: (str) the line
|
|
:return: the parsed edge
|
|
"""
|
|
split = line.split("->")
|
|
source = split[0].split(":")
|
|
destination = split[1].split(":")
|
|
return Edge(
|
|
Node(source[0].strip(), int(source[1].strip())),
|
|
Node(destination[0].strip(), int(destination[1].strip())),
|
|
)
|
|
|
|
|
|
def parse_csv(line: str, source_ip_index: int = 0, source_port_index: int = 1, dest_ip_index: int = 2, dest_port_index: int = 3, delim: str = ',') -> Edge:
|
|
split = line.split(delim)
|
|
return Edge(
|
|
Node(split[source_ip_index].strip(), int(split[source_port_index].strip())),
|
|
Node(split[dest_ip_index].strip(), int(split[dest_port_index].strip())),
|
|
)
|
|
|
|
|
|
def csv_loader(path: str) -> Iterable[Edge]:
|
|
with open(path, 'r') as file:
|
|
return map(parse_csv, file.readlines())
|
|
|
|
def file_loader(path: str) -> Iterable[Edge]:
|
|
"""
|
|
Load a list of edges from a file.
|
|
|
|
:param path: (str) filepath
|
|
:return: iterator over the edges
|
|
"""
|
|
with open(path, "r") as file:
|
|
return map(parse_edge, file.readlines())
|
|
|
|
|
|
def build_graph(edges: Iterable[Edge], initial_rank: float = 0.25) -> nx.DiGraph:
|
|
"""
|
|
Create a graph from an iterator over `Edge`s
|
|
"""
|
|
graph = nx.DiGraph()
|
|
for edge in edges:
|
|
source = RankedNode(edge.source, initial_rank)
|
|
destination = RankedNode(edge.destination, initial_rank)
|
|
graph.add_node(source)
|
|
graph.add_node(destination)
|
|
graph.add_edge(source, destination)
|
|
|
|
return graph
|
|
|
|
|
|
def page_ranker(
|
|
graph: nx.DiGraph, node: RankedNode, dampingFactor: float = 1.0
|
|
) -> float:
|
|
return (
|
|
reduce(
|
|
lambda x, y: x + y,
|
|
map(
|
|
lambda pred: pred.rank / len(list(graph.successors(pred))),
|
|
graph.predecessors(node),
|
|
),
|
|
0.0,
|
|
)
|
|
* dampingFactor
|
|
+ (1 - dampingFactor) / graph.number_of_nodes()
|
|
)
|
|
|
|
|
|
def page_rank(graph: nx.DiGraph, dampingFactor: float = 1.0) -> nx.DiGraph:
|
|
return rank(graph, lambda g, node: page_ranker(g, node, dampingFactor))
|
|
|
|
|
|
def sensor_rank(graph: nx.DiGraph) -> nx.DiGraph:
|
|
number_of_nodes = graph.number_of_nodes()
|
|
return rank(
|
|
graph,
|
|
lambda g, node: node.rank * (len(list(g.predecessors(node))) / number_of_nodes),
|
|
)
|
|
|
|
|
|
def rank(
|
|
graph: nx.DiGraph, rank_fn: Callable[[nx.DiGraph, RankedNode], float]
|
|
) -> nx.DiGraph:
|
|
ranked_graph = nx.DiGraph()
|
|
# number_of_nodes = graph.number_of_nodes()
|
|
for node in graph.nodes:
|
|
rank = rank_fn(graph, node)
|
|
|
|
ranked_graph.add_node(node.with_rank(rank))
|
|
|
|
for u, v in graph.edges:
|
|
ranked_graph.add_edge(u, v)
|
|
|
|
return ranked_graph
|
|
|
|
|
|
def weakly_connected(graph: nx.DiGraph) -> Set[RankedNode]:
|
|
return set(
|
|
[
|
|
node
|
|
for component in nx.strongly_connected_components(graph)
|
|
if len(component) == 1
|
|
for node in component
|
|
]
|
|
)
|
|
|
|
|
|
def weakly_connected_exclusion(graph: nx.DiGraph) -> Set[RankedNode]:
|
|
largest = max(nx.strongly_connected_components(graph), key=len)
|
|
sensors = set()
|
|
for node in graph.nodes:
|
|
if node not in largest:
|
|
sensors.add(node)
|
|
|
|
return sensors
|
|
|
|
|
|
def list_botnets(conn: Any) -> List[int]:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute("SELECT DISTINCT id FROM bms.botnets")
|
|
|
|
return [identifier[0] for identifier in cursor.fetchall()]
|
|
|
|
|
|
def last_ranking_for_botnet(
|
|
conn: Any, identifier: int, algo: str
|
|
) -> Union[datetime, None]:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(
|
|
"SELECT MAX(range_until) FROM bms.detected_sensors WHERE botnet_id = %s AND ranking_algorithm = %s",
|
|
(str(identifier), algo),
|
|
)
|
|
|
|
return cursor.fetchone()[0]
|
|
|
|
|
|
"""
|
|
dict algo_name => (ranking_fn, sort_reverse)
|
|
"""
|
|
ALGOS = {
|
|
"page_rank": (page_rank, False),
|
|
"sensor_rank": (sensor_rank, True),
|
|
"weakly_connected": (weakly_connected, False),
|
|
}
|
|
|
|
|
|
def execute():
|
|
with create_connection() as conn:
|
|
logging.info("starting execution")
|
|
botnets = load_execution_data(conn)
|
|
# for each botnet
|
|
for botnet_id, algos in botnets.items():
|
|
# for each algorithm
|
|
for algo, (range_from, range_until, reverse_sort) in algos.items():
|
|
# for each timeslot from the last one until now
|
|
while datetime.now().replace(tzinfo=range_until.tzinfo) > range_until:
|
|
logging.info(
|
|
"calculating %s for botnet %s between %s and %s"
|
|
% (algo, botnet_id, range_from, range_until)
|
|
)
|
|
rank_and_insert(
|
|
conn,
|
|
botnet_id,
|
|
range_from,
|
|
range_until,
|
|
ALGOS[algo][0],
|
|
algo,
|
|
reverse_sort,
|
|
)
|
|
LAST_CHECKED[botnet_id][algo] = range_until
|
|
|
|
# increment the time range
|
|
range_from = range_until
|
|
range_until = range_until + timedelta(hours=1)
|
|
|
|
|
|
def rank_and_insert(
|
|
conn: Any,
|
|
botnet_id: int,
|
|
range_from: datetime,
|
|
range_until: datetime,
|
|
ranker: Callable[[nx.DiGraph], nx.DiGraph],
|
|
ranker_name: str,
|
|
reverse_sort: bool,
|
|
):
|
|
g = build_graph(database_loader(conn, range_from, range_until, botnet_id))
|
|
logging.info("loaded graph of size %s" % len(g))
|
|
ranked = ranker(g)
|
|
|
|
# only insert top `TOP_PERCENT` into the database
|
|
limits = [0]
|
|
ranked = sorted(ranked, key=lambda v: v.rank, reverse=reverse_sort)
|
|
if TOP_PERCENT is not None:
|
|
limits.append(ceil(len(ranked) * TOP_PERCENT))
|
|
if THRESHOLD is not None:
|
|
limits.append(THRESHOLD)
|
|
|
|
num_elements = max(limits)
|
|
ranked = ranked[:num_elements]
|
|
logging.info("ranked and will insert %s nodes" % len(ranked))
|
|
params = [
|
|
(
|
|
botnet_id,
|
|
range_from,
|
|
range_until,
|
|
node.node.ip,
|
|
node.node.port,
|
|
ranker_name,
|
|
node.rank,
|
|
)
|
|
for node in ranked
|
|
]
|
|
logging.debug("built %s params" % len(params))
|
|
|
|
with conn.cursor() as cursor:
|
|
psye.execute_batch(
|
|
cursor,
|
|
"""
|
|
INSERT INTO bms.detected_sensors (botnet_id, range_from, range_until, bot_ip, bot_port, ranking_algorithm, ranking)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
""",
|
|
params,
|
|
)
|
|
|
|
conn.commit()
|
|
|
|
|
|
def load_execution_data(conn):
|
|
"""
|
|
Load a dict `botnet_id -> ranking algorithm -> end of last evaluated timeframe`
|
|
"""
|
|
first_timeslot = datetime.now().replace(
|
|
minute=0, second=0, microsecond=0
|
|
) - timedelta(hours=HOURS_BACK)
|
|
|
|
botnets = list_botnets(conn)
|
|
last = {}
|
|
for identifier in botnets:
|
|
last[identifier] = {}
|
|
for algo in ALGOS.keys():
|
|
last_timeslot = last_ranking_for_botnet(conn, identifier, algo)
|
|
last_from_local_state = LAST_CHECKED[identifier].get(algo)
|
|
# this will include the end of the last timeframe from the database,
|
|
# the end of the last timeframe from the local state and
|
|
# the earliest allowed timeslot: now - BMS_RANKING_HOURS_BACK
|
|
limits = []
|
|
if last_timeslot is None:
|
|
limits.append(first_timeslot)
|
|
else:
|
|
# normalize timezone
|
|
limits.append(first_timeslot.replace(tzinfo=last_timeslot.tzinfo))
|
|
limits.append(last_timeslot)
|
|
|
|
if last_from_local_state is not None:
|
|
limits.append(last_from_local_state)
|
|
|
|
# pick the maximum from the possible limits
|
|
limit = max(limits)
|
|
LAST_CHECKED[identifier][algo] = limit
|
|
range_until = limit + timedelta(hours=1)
|
|
if range_until < datetime.now().replace(tzinfo=range_until.tzinfo):
|
|
last[identifier][algo] = (limit, range_until, ALGOS[algo][1])
|
|
|
|
return last
|
|
|
|
|
|
# known = [Node('195.37.209.28', 4488), Node('34.204.196.211', 9796)]
|
|
known = [Node('34.204.196.211', 9796)]
|
|
|
|
|
|
def find_rank(graph):
|
|
found = []
|
|
for node in graph:
|
|
if node.node in known:
|
|
found.append(node.rank)
|
|
return max(found)
|
|
|
|
|
|
|
|
import numpy as np
|
|
def main():
|
|
initial_ranks = [0.25, 0.5, 0.75]
|
|
iterations = 5
|
|
# e = file_loader("./e.txt")
|
|
e = list(csv_loader('./dist-edges.csv'))
|
|
for initial_rank in initial_ranks:
|
|
g = build_graph(e, initial_rank=initial_rank)
|
|
nodes = len(g)
|
|
print(f'Initial Rank: {initial_rank}')
|
|
# g = sensor_rank(page_rank(page_rank(page_rank(page_rank(g)))))
|
|
# s = list(map(lambda n: n.node, sorted(g, key=lambda n: n.rank, reverse=True)))
|
|
# for kno in known:
|
|
# print(f'{kno}: {s.index(kno)}/{len(s)}')
|
|
# # for n in s[:20]:
|
|
# # print(n)
|
|
# exit(1)
|
|
for iteration in range(iterations):
|
|
g = page_rank(g)
|
|
avg_pr = reduce(lambda acc, n: acc + n.rank, g, 0.) / nodes
|
|
# max_pr = max(g, key = lambda n : n.rank).rank
|
|
max_pr = find_rank(g)
|
|
perc = np.percentile(np.array(sorted(map(lambda n: n.rank, g))), [80, 90, 95])
|
|
print(f'InitialRank: {initial_rank}, Iteration: {iteration}, Percentiles [80, 90, 95]: {perc}')
|
|
# print(f'InitialRank: {initial_rank}, Iteration: {iteration}, Avg. PR: {avg_pr}')
|
|
# print(f'InitialRank: {initial_rank}, Iteration: {iteration}, max PR: {max_pr}')
|
|
sr = sensor_rank(g)
|
|
avg_sr = reduce(lambda acc, n: acc + n.rank, sr, 0.) / nodes
|
|
# max_sr = max(sr, key = lambda n : n.rank).rank
|
|
max_sr = find_rank(sr)
|
|
perc = np.percentile(np.array(sorted(map(lambda n: n.rank, sr))), [80, 90, 95])
|
|
print(f'InitialRank: {initial_rank}, Iteration: {iteration}, Percentiles [80, 90, 95]: {perc}')
|
|
# print(f'InitialRank: {initial_rank}, Iteration: {iteration}, Avg. SR: {avg_sr}')
|
|
# print(f'InitialRank: {initial_rank}, Iteration: {iteration}, max SR: {max_sr}')
|
|
print(f'{iteration+1} & {avg_pr:.8f} & {max_pr:.8f} & {avg_sr:.8f} & {max_sr:.8f} \\\\')
|
|
# with open(f'./{initial_rank}_{iteration+1}_pr.txt', 'w') as f:
|
|
# for n in g:
|
|
# f.write(f'{n.rank}' + '\n')
|
|
# with open(f'./{initial_rank}_{iteration+1}_sr.txt', 'w') as f:
|
|
# for n in sr:
|
|
# f.write(f'{n.rank}' + '\n')
|
|
|
|
|
|
|
|
# ranked = page_rank(g)
|
|
|
|
# for node in ranked.nodes:
|
|
# print(node)
|
|
# logging.basicConfig(
|
|
# format="%(asctime)s: %(levelname)s - %(message)s", level=logging.INFO
|
|
# )
|
|
|
|
# # schedule: do execute() every hour at `:01` minutes
|
|
# schedule.every().hour.at(":01").do(execute)
|
|
|
|
# # Run once on startup
|
|
# schedule.run_all()
|
|
|
|
# while True:
|
|
# n = schedule.idle_seconds()
|
|
# if n is None:
|
|
# break
|
|
# elif n > 0:
|
|
# logging.debug("sleeping for %s seconds" % n)
|
|
# time.sleep(n)
|
|
|
|
# schedule.run_pending()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|