#!/usr/bin/env python3 import statistics import multiprocessing from random import sample import glob import reduce_edges import rank_with_churn from datetime import datetime import json from node_ranking import ( rank as rank_nr, page_rank, # sensor_rank, find_rank, parse_csv, csv_loader, build_graph, Node, RankedNode, ) def sr(graph, node, number_of_nodes): try: return node.rank * (len(list(graph.predecessors(node))) / number_of_nodes) except ZeroDivisionError as e: print(f'{node=}, {number_of_nodes=}') raise e def sensor_rank(graph): number_of_nodes = graph.number_of_nodes() return rank_nr( page_rank(graph), lambda g, node: sr(g, node, number_of_nodes) ) def find_known(g, known): nodes = list(filter(lambda n: n.node == known.node, g.nodes())) n = len(nodes) assert n == 1 return nodes[0] def known_out(g, known): return len(list(g.successors(known))) def known_in(g, known): return len(list(g.predecessors(known))) def high_succ(g, n, known): result = [] counter = 0 # choose nodes with many successors to reduce pr for node in sorted(g.nodes(), key=lambda node: len(list(g.successors(node))), reverse=True): if node.node == known.node: print('skipping known') continue if counter >= n: break counter += 1 result.append(node) return result def analyze(g): known = find_known(g, rank_with_churn.KNOWN) # avg_r = rank_with_churn.avg_without_known(g) avg_in = rank_with_churn.avg_in(g) kn_in = known_in(g, known) kn_out = known_out(g, known) d = list(map(lambda node: node.rank, g.nodes())) mean = statistics.mean(d) stddev = statistics.stdev(d) return { 'known_rank': known.rank, 'known_in': kn_in, 'known_out': kn_out, # 'avg_rank': avg_r, 'avg_in': avg_in, 'mean': mean, 'stdev': stddev, } def create_crawlers(graph, n_crawlers, n_edges): nodes = list(filter(lambda n: n.node != rank_with_churn.KNOWN.node, graph.nodes())) crawlers = [] for i in range(n_crawlers): ip = f'0.0.0.{i+1}' crawler = RankedNode(Node(ip, 1337), rank_with_churn.INITIAL_RANK) crawlers.append(crawler) candidates = sample(nodes, n_edges) for candidate in candidates: graph.add_edge(crawler, candidate) return crawlers def rank(path, added_percentage): edges = reduce_edges.load_data(path) g = build_graph(edges, initial_rank=rank_with_churn.INITIAL_RANK) # edges = list(filter(lambda e: e[1] == rank_with_churn.KNOWN, g.edges())) # for_removal = sample(edges, int(len(edges) * remove_edges_percentage)) # print(f'removing {len(for_removal)} incoming edges') # for edge in for_removal: # g.remove_edge(edge[0], edge[1]) n_known_in = len(list(filter(lambda e: e[1] == rank_with_churn.KNOWN, g.edges()))) # avg_out = rank_with_churn.avg_out(g) churned_peers = int(n_known_in * added_percentage) known_pred = len(list(g.predecessors(rank_with_churn.KNOWN))) c_out = int(known_pred * added_percentage) crawlers = create_crawlers(g, churned_peers, c_out) print(f'{added_percentage=}, {churned_peers=}') # assert added_percentage == 0 or churned_peers != 0 # if churned_peers > 0: # # nodes = list(g.nodes()) # # destinations = sample(nodes, churned_peers) # destinations = high_succ(g, churned_peers, rank_with_churn.KNOWN) # print(f'!!!!!! adding destinations: {destinations}') print(f'adding {len(crawlers)=} crawlers with {c_out=} successors') for node in crawlers: g.add_edge(rank_with_churn.KNOWN, node) print('pr start') g_pr = page_rank(page_rank(g)) print('sr start') g_sr = sensor_rank(sensor_rank(g)) print('analyze pr start') res_pr = analyze(g_pr) print('analyze sr start') res_sr = analyze(g_sr) print('done!') res = {'sr': res_sr, 'pr': res_pr} return res def main(): # pool = multiprocessing.Pool(processes=4) params = [] for reduced_percentage in [0.0]: #in reduce_edges.percentages: for file in glob.glob(f'./edges_reduced/{reduced_percentage:.02f}/*.txt'): params.append([reduced_percentage, file]) # p = Proc(reduced_percentage, file) # p.start() # for added_percentage in reduce_edges.percentages: # when = datetime.fromtimestamp(float(file.split('/')[-1][:-4])) # print(f'{reduced_percentage=:.02f}, {added_percentage=:.02f}, {when=}') # result = rank(file, added_percentage) # with open(f'./data_reduced/{reduced_percentage:.02f}/{added_percentage:.02f}/{when.timestamp()}.json', 'w') as f: # json.dump(result, f) with multiprocessing.Pool(processes=8) as pool: l_path_data = pool.map(wohoo, params) for path_data in l_path_data: for path, data in path_data.items(): with open(path, 'w') as f: json.dump(data, f) def wohoo(p): reduced_percentage = p[0] file = p[1] path_data = {} # ps = reduce_edges.percentages.copy() ps = [0.3, 0.5, 0.75] ps.append(0.1) ps.append(1.0) ps.append(1.2) # ps.append(2.0) for added_percentage in ps: when = datetime.fromtimestamp(float(file.split('/')[-1][:-4])) print(f'{reduced_percentage=:.02f}, {added_percentage=:.02f}, {when=}') result = rank(file, added_percentage) path = f'./data_reduced/{reduced_percentage:.02f}/{added_percentage:.02f}/{when.timestamp()}.json' path_data[path] = result # with open() as f: # json.dump(result, f) return path_data if __name__ == '__main__': main()