Последняя активность 1 month ago

gistfile1.txt Исходник
1def wg_events(file, filter_client, filter_type, limit, collapse='1',
2 since='', filter_event='', endpoint_cache_file=''):
3 """
4 Format WireGuard events with dedup, counts, gap and endpoint resolution.
5 Output per line: ts|client|endpoint|event|count|gap_seconds
6 """
7 from datetime import datetime
8 from collections import defaultdict
9 do_collapse = str(collapse) != '0'
10 limit = int(limit) if limit else 50
11 since_dt = parse_since(since) if since else None
12
13 # Load endpoint cache once
14 endpoint_cache = {}
15 if endpoint_cache_file and os.path.exists(endpoint_cache_file):
16 try:
17 with open(endpoint_cache_file) as f:
18 endpoint_cache = json.load(f)
19 except Exception:
20 pass
21
22 events = []
23 try:
24 with open(file) as f:
25 for line in f:
26 try:
27 e = json.loads(line.strip())
28 client = e.get('client', '')
29 if not client:
30 continue
31 if filter_client and client != filter_client:
32 continue
33 if filter_type and not client.startswith(filter_type + '-'):
34 continue
35 if filter_event and e.get('event', '') != filter_event:
36 continue
37 if since_dt:
38 ts_str = e.get('timestamp', '')
39 try:
40 from datetime import timezone
41