Last active 1 month ago

Revision 7e2cc0d581c70f6bcea4c2f9da881a0583f6312d

gistfile1.txt Raw
1def activity_aggregate(fw_file, wg_file, wg_interface, net_file,
2 clients_dir, meta_dir, hours, filter_peer,
3 filter_service_ip):
4 """
5 Aggregate activity data for wgctl activity.
6 Output:
7 peer|name|rx_bytes|tx_bytes|drop_count
8 service|peer_name|dest_display|drop_count
9 """
10 hours = int(hours) if hours else 24
11 cutoff = None
12 if hours > 0:
13 cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
14
15 # Preload lookups once
16 ip_to_peer = build_ip_to_name(clients_dir)
17 pubkey_to_peer = build_pubkey_to_name(clients_dir)
18 net_data = load_net_data(net_file)
19
20 def _reverse(dest_ip, dest_port, proto):
21 return reverse_lookup(net_data, dest_ip, dest_port, proto)
22
23 # WireGuard transfer totals
24 peer_rx = defaultdict(int)
25 peer_tx = defaultdict(int)
26 try:
27 result = subprocess.run(
28 ['wg', 'show', wg_interface, 'transfer'],
29 capture_output=True, text=True
30 )
31 for line in result.stdout.strip().splitlines():
32 parts = line.split()
33 if len(parts) >= 3:
34 pubkey, rx, tx = parts[0], int(parts[1]), int(parts[2])
35 peer = pubkey_to_peer.get(pubkey)
36 if peer:
37 peer_rx[peer] += rx
38 peer_tx[peer] += tx
39 except Exception:
40 pass
41
42 # Parse fw_events for drops
43 peer_drops = defaultdict(int)
44 service_drops = defaultdict(lambda: defaultdict(int))
45
46 if os.path.exists(fw_file):
47 try:
48 with open(fw_file) as f:
49 for line in f:
50 line = line.strip()
51 if not line:
52 continue
53 try:
54 ev = json.loads(line)
55 if cutoff:
56 ts_str = ev.get('timestamp', '')
57 try:
58 ts = datetime.fromisoformat(ts_str)
59 if ts.tzinfo is None:
60 ts = ts.replace(tzinfo=timezone.utc)
61 if ts < cutoff:
62 continue
63 except Exception:
64 pass
65
66 src_ip = ev.get('src_ip', '')
67 if not src_ip:
68 continue
69
70 dest_ip = ev.get('dest_ip', '')
71 dest_port = str(ev.get('dest_port', ''))
72 proto_num = ev.get('ip.protocol', 0)
73 proto = PROTO_MAP.get(int(proto_num), str(proto_num))
74
75 peer = ip_to_peer.get(src_ip)
76 if not peer:
77 continue
78 if filter_peer and peer != filter_peer:
79 continue
80 if filter_service_ip and dest_ip != filter_service_ip:
81 continue
82
83 svc_name = _reverse(dest_ip, dest_port, proto)
84 dest_display = make_dest_display(dest_ip, dest_port, proto, svc_name)
85
86 peer_drops[peer] += 1
87 service_drops[peer][dest_display] += 1
88
89 except Exception:
90 continue
91 except Exception:
92 pass
93
94 # Collect peers with any activity
95 all_peers = set()
96 all_peers.update(k for k in peer_rx if peer_rx[k] > 0)
97 all_peers.update(k for k in peer_tx if peer_tx[k] > 0)
98 all_peers.update(peer_drops.keys())
99 if filter_peer:
100 all_peers = {p for p in all_peers if p == filter_peer}
101
102 for peer in sorted(all_peers):
103 rx = peer_rx.get(peer, 0)
104 tx = peer_tx.get(peer, 0)
105 drops = peer_drops.get(peer, 0)
106 print(f"peer|{peer}|{rx}|{tx}|{drops}")
107
108 svc_map = service_drops.get(peer, {})
109 for dest_display, count in sorted(svc_map.items(), key=lambda x: -x[1]):
110 print(f"service|{peer}|{dest_display}|{count}")