def poll_handshakes(): """ Poll wg show latest-handshakes periodically. Log a handshake event only when gap > WG_HANDSHAKE_CHECK_SEC (new session). """ global _hs_last_logged _hs_last_logged = load_hs_cache() pubkey_to_name = build_pubkey_to_name() log.info(f"Handshake poller started — {len(pubkey_to_name)} peers, " f"session threshold {WG_HANDSHAKE_CHECK_SEC}s") while True: try: result = subprocess.run( ["wg", "show", WG_WG_INTERFACE, "latest-handshakes"], capture_output=True, text=True ) for line in result.stdout.strip().splitlines(): parts = line.split() if len(parts) != 2: continue pubkey, ts_str = parts try: ts = int(ts_str) except ValueError: continue if ts == 0: continue client = pubkey_to_name.get(pubkey) if not client: continue last = _hs_last_logged.get(pubkey, 0) gap = ts - last # Always update last seen _hs_last_logged[pubkey] = ts if gap < WG_HANDSHAKE_CHECK_SEC: continue # keepalive, skip # Get endpoint endpoint = get_endpoint(pubkey) or '' if not endpoint: try: cache = json.loads(ENDPOINT_CACHE_FILE.read_text()) endpoint = cache.get(client, '') except Exception: pass # New session, log it entry = { "timestamp": datetime.fromtimestamp(ts, tz=timezone.utc).isoformat(), "ip": "", "client": client, "event": "handshake", "endpoint": endpoint, } try: with EVENTS_LOG.open("a") as f: f.write(json.dumps(entry) + "\n") log.info(f"New session: {client} from {endpoint}") except Exception as e: log.error(f"Failed to write handshake event: {e}") log.debug(f"Gap for {client}: {gap}s (threshold: {WG_HANDSHAKE_CHECK_SEC}s)") save_hs_cache(_hs_last_logged) except Exception as e: log.error(f"Handshake poll error: {e}") time.sleep(WG_HANDSHAKE_CHECK_SEC // 2) # poll at half the threshold # ============================================ # Packet Handler # ============================================ def handle_packet(pkt):