gistfile1.txt
· 1.5 KiB · Text
Исходник
def wg_events(file, filter_client, filter_type, limit, collapse='1',
since='', filter_event='', endpoint_cache_file=''):
"""
Format WireGuard events with dedup, counts, gap and endpoint resolution.
Output per line: ts|client|endpoint|event|count|gap_seconds
"""
from datetime import datetime
from collections import defaultdict
do_collapse = str(collapse) != '0'
limit = int(limit) if limit else 50
since_dt = parse_since(since) if since else None
# Load endpoint cache once
endpoint_cache = {}
if endpoint_cache_file and os.path.exists(endpoint_cache_file):
try:
with open(endpoint_cache_file) as f:
endpoint_cache = json.load(f)
except Exception:
pass
events = []
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
client = e.get('client', '')
if not client:
continue
if filter_client and client != filter_client:
continue
if filter_type and not client.startswith(filter_type + '-'):
continue
if filter_event and e.get('event', '') != filter_event:
continue
if since_dt:
ts_str = e.get('timestamp', '')
try:
from datetime import timezone
| 1 | def wg_events(file, filter_client, filter_type, limit, collapse='1', |
| 2 | since='', filter_event='', endpoint_cache_file=''): |
| 3 | """ |
| 4 | Format WireGuard events with dedup, counts, gap and endpoint resolution. |
| 5 | Output per line: ts|client|endpoint|event|count|gap_seconds |
| 6 | """ |
| 7 | from datetime import datetime |
| 8 | from collections import defaultdict |
| 9 | do_collapse = str(collapse) != '0' |
| 10 | limit = int(limit) if limit else 50 |
| 11 | since_dt = parse_since(since) if since else None |
| 12 | |
| 13 | # Load endpoint cache once |
| 14 | endpoint_cache = {} |
| 15 | if endpoint_cache_file and os.path.exists(endpoint_cache_file): |
| 16 | try: |
| 17 | with open(endpoint_cache_file) as f: |
| 18 | endpoint_cache = json.load(f) |
| 19 | except Exception: |
| 20 | pass |
| 21 | |
| 22 | events = [] |
| 23 | try: |
| 24 | with open(file) as f: |
| 25 | for line in f: |
| 26 | try: |
| 27 | e = json.loads(line.strip()) |
| 28 | client = e.get('client', '') |
| 29 | if not client: |
| 30 | continue |
| 31 | if filter_client and client != filter_client: |
| 32 | continue |
| 33 | if filter_type and not client.startswith(filter_type + '-'): |
| 34 | continue |
| 35 | if filter_event and e.get('event', '') != filter_event: |
| 36 | continue |
| 37 | if since_dt: |
| 38 | ts_str = e.get('timestamp', '') |
| 39 | try: |
| 40 | from datetime import timezone |
| 41 |