gistfile1.txt
· 4.1 KiB · Text
Raw
def activity_aggregate(fw_file, wg_file, wg_interface, net_file,
clients_dir, meta_dir, hours, filter_peer,
filter_service_ip):
"""
Aggregate activity data for wgctl activity.
Output:
peer|name|rx_bytes|tx_bytes|drop_count
service|peer_name|dest_display|drop_count
"""
hours = int(hours) if hours else 24
cutoff = None
if hours > 0:
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
# Preload lookups once
ip_to_peer = build_ip_to_name(clients_dir)
pubkey_to_peer = build_pubkey_to_name(clients_dir)
net_data = load_net_data(net_file)
def _reverse(dest_ip, dest_port, proto):
return reverse_lookup(net_data, dest_ip, dest_port, proto)
# WireGuard transfer totals
peer_rx = defaultdict(int)
peer_tx = defaultdict(int)
try:
result = subprocess.run(
['wg', 'show', wg_interface, 'transfer'],
capture_output=True, text=True
)
for line in result.stdout.strip().splitlines():
parts = line.split()
if len(parts) >= 3:
pubkey, rx, tx = parts[0], int(parts[1]), int(parts[2])
peer = pubkey_to_peer.get(pubkey)
if peer:
peer_rx[peer] += rx
peer_tx[peer] += tx
except Exception:
pass
# Parse fw_events for drops
peer_drops = defaultdict(int)
service_drops = defaultdict(lambda: defaultdict(int))
if os.path.exists(fw_file):
try:
with open(fw_file) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
ev = json.loads(line)
if cutoff:
ts_str = ev.get('timestamp', '')
try:
ts = datetime.fromisoformat(ts_str)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
if ts < cutoff:
continue
except Exception:
pass
src_ip = ev.get('src_ip', '')
if not src_ip:
continue
dest_ip = ev.get('dest_ip', '')
dest_port = str(ev.get('dest_port', ''))
proto_num = ev.get('ip.protocol', 0)
proto = PROTO_MAP.get(int(proto_num), str(proto_num))
peer = ip_to_peer.get(src_ip)
if not peer:
continue
if filter_peer and peer != filter_peer:
continue
if filter_service_ip and dest_ip != filter_service_ip:
continue
svc_name = _reverse(dest_ip, dest_port, proto)
dest_display = make_dest_display(dest_ip, dest_port, proto, svc_name)
peer_drops[peer] += 1
service_drops[peer][dest_display] += 1
except Exception:
continue
except Exception:
pass
# Collect peers with any activity
all_peers = set()
all_peers.update(k for k in peer_rx if peer_rx[k] > 0)
all_peers.update(k for k in peer_tx if peer_tx[k] > 0)
all_peers.update(peer_drops.keys())
if filter_peer:
all_peers = {p for p in all_peers if p == filter_peer}
for peer in sorted(all_peers):
rx = peer_rx.get(peer, 0)
tx = peer_tx.get(peer, 0)
drops = peer_drops.get(peer, 0)
print(f"peer|{peer}|{rx}|{tx}|{drops}")
svc_map = service_drops.get(peer, {})
for dest_display, count in sorted(svc_map.items(), key=lambda x: -x[1]):
print(f"service|{peer}|{dest_display}|{count}")
| 1 | def activity_aggregate(fw_file, wg_file, wg_interface, net_file, |
| 2 | clients_dir, meta_dir, hours, filter_peer, |
| 3 | filter_service_ip): |
| 4 | """ |
| 5 | Aggregate activity data for wgctl activity. |
| 6 | Output: |
| 7 | peer|name|rx_bytes|tx_bytes|drop_count |
| 8 | service|peer_name|dest_display|drop_count |
| 9 | """ |
| 10 | hours = int(hours) if hours else 24 |
| 11 | cutoff = None |
| 12 | if hours > 0: |
| 13 | cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) |
| 14 | |
| 15 | # Preload lookups once |
| 16 | ip_to_peer = build_ip_to_name(clients_dir) |
| 17 | pubkey_to_peer = build_pubkey_to_name(clients_dir) |
| 18 | net_data = load_net_data(net_file) |
| 19 | |
| 20 | def _reverse(dest_ip, dest_port, proto): |
| 21 | return reverse_lookup(net_data, dest_ip, dest_port, proto) |
| 22 | |
| 23 | # WireGuard transfer totals |
| 24 | peer_rx = defaultdict(int) |
| 25 | peer_tx = defaultdict(int) |
| 26 | try: |
| 27 | result = subprocess.run( |
| 28 | ['wg', 'show', wg_interface, 'transfer'], |
| 29 | capture_output=True, text=True |
| 30 | ) |
| 31 | for line in result.stdout.strip().splitlines(): |
| 32 | parts = line.split() |
| 33 | if len(parts) >= 3: |
| 34 | pubkey, rx, tx = parts[0], int(parts[1]), int(parts[2]) |
| 35 | peer = pubkey_to_peer.get(pubkey) |
| 36 | if peer: |
| 37 | peer_rx[peer] += rx |
| 38 | peer_tx[peer] += tx |
| 39 | except Exception: |
| 40 | pass |
| 41 | |
| 42 | # Parse fw_events for drops |
| 43 | peer_drops = defaultdict(int) |
| 44 | service_drops = defaultdict(lambda: defaultdict(int)) |
| 45 | |
| 46 | if os.path.exists(fw_file): |
| 47 | try: |
| 48 | with open(fw_file) as f: |
| 49 | for line in f: |
| 50 | line = line.strip() |
| 51 | if not line: |
| 52 | continue |
| 53 | try: |
| 54 | ev = json.loads(line) |
| 55 | if cutoff: |
| 56 | ts_str = ev.get('timestamp', '') |
| 57 | try: |
| 58 | ts = datetime.fromisoformat(ts_str) |
| 59 | if ts.tzinfo is None: |
| 60 | ts = ts.replace(tzinfo=timezone.utc) |
| 61 | if ts < cutoff: |
| 62 | continue |
| 63 | except Exception: |
| 64 | pass |
| 65 | |
| 66 | src_ip = ev.get('src_ip', '') |
| 67 | if not src_ip: |
| 68 | continue |
| 69 | |
| 70 | dest_ip = ev.get('dest_ip', '') |
| 71 | dest_port = str(ev.get('dest_port', '')) |
| 72 | proto_num = ev.get('ip.protocol', 0) |
| 73 | proto = PROTO_MAP.get(int(proto_num), str(proto_num)) |
| 74 | |
| 75 | peer = ip_to_peer.get(src_ip) |
| 76 | if not peer: |
| 77 | continue |
| 78 | if filter_peer and peer != filter_peer: |
| 79 | continue |
| 80 | if filter_service_ip and dest_ip != filter_service_ip: |
| 81 | continue |
| 82 | |
| 83 | svc_name = _reverse(dest_ip, dest_port, proto) |
| 84 | dest_display = make_dest_display(dest_ip, dest_port, proto, svc_name) |
| 85 | |
| 86 | peer_drops[peer] += 1 |
| 87 | service_drops[peer][dest_display] += 1 |
| 88 | |
| 89 | except Exception: |
| 90 | continue |
| 91 | except Exception: |
| 92 | pass |
| 93 | |
| 94 | # Collect peers with any activity |
| 95 | all_peers = set() |
| 96 | all_peers.update(k for k in peer_rx if peer_rx[k] > 0) |
| 97 | all_peers.update(k for k in peer_tx if peer_tx[k] > 0) |
| 98 | all_peers.update(peer_drops.keys()) |
| 99 | if filter_peer: |
| 100 | all_peers = {p for p in all_peers if p == filter_peer} |
| 101 | |
| 102 | for peer in sorted(all_peers): |
| 103 | rx = peer_rx.get(peer, 0) |
| 104 | tx = peer_tx.get(peer, 0) |
| 105 | drops = peer_drops.get(peer, 0) |
| 106 | print(f"peer|{peer}|{rx}|{tx}|{drops}") |
| 107 | |
| 108 | svc_map = service_drops.get(peer, {}) |
| 109 | for dest_display, count in sorted(svc_map.items(), key=lambda x: -x[1]): |
| 110 | print(f"service|{peer}|{dest_display}|{count}") |