function cmd::activity::run() {
  local filter_peer="" filter_service="" filter_ip="" filter_type=""
  local hours=24 dropped_only=false
  local accept_only=false drop_only=false external_only=false

  while [[ $# -gt 0 ]]; do
    case "$1" in
      --peer)     filter_peer="$2";    shift 2 ;;
      --service)  filter_service="$2"; shift 2 ;;
      --ip)       filter_ip="$2";      shift 2 ;;
      --type)     filter_type="$2";    shift 2 ;;
      --hours)    hours="$2";          shift 2 ;;
      --dropped)  dropped_only=true;   shift   ;;
      --accept)   accept_only=true;    shift   ;;
      --drop)     drop_only=true;      shift   ;;
      --external) external_only=true;  shift   ;;
      --help)     cmd::activity::help; return  ;;
      *)
        log::error "Unknown flag: $1"
        cmd::activity::help
        return 1
        ;;
    esac
  done

  if command::json; then
    cmd::activity::_output_json "$hours"
    return 0
  fi

  # Resolve peer name if type provided
  if [[ -n "$filter_peer" && -n "$filter_type" ]]; then
    filter_peer=$(peers::resolve_and_require "$filter_peer" "$filter_type") || return 1
  fi

  # Resolve --service to IP
  local service_ip=""
  if [[ -n "$filter_service" ]]; then
    service_ip=$(net::resolve "$filter_service" 2>/dev/null | head -1 | cut -d: -f1) || true
    if [[ -z "$service_ip" ]]; then
      log::error "Service not found: ${filter_service}"
      return 1
    fi
  fi
  [[ -n "$filter_ip" ]] && service_ip="$filter_ip"

  # Fetch drop/handshake data
  local data=""
  if ! $accept_only; then
    data=$(json::activity_aggregate \
      "$(ctx::fw_events_log)" \
      "$(ctx::events_log)" \
      "$(config::interface)" \
      "$(ctx::net)" \
      "$(ctx::clients)" \
      "$(ctx::meta)" \
      "$hours" \
      "$filter_peer" \
      "$service_ip" 2>/dev/null)
  fi

  # Fetch accept data
  local accept_data=""
  if ! $drop_only; then
    local since_arg=""
    [[ "$hours" -gt 0 ]] && since_arg="${hours}h"
    local ext_flag="0"
    $external_only && ext_flag="1"
    [[ -f "$(ctx::accept_events_log)" ]] && \
      accept_data=$(json::accept_aggregate \
        "$(ctx::accept_events_log)" \
        "$(ctx::net)" \
        "$(ctx::clients)" \
        "$since_arg" \
        "$filter_peer" \
        "$ext_flag" \
        2>/dev/null)
  fi

  if [[ -z "$data" && -z "$accept_data" ]]; then
    log::wg_warning "No activity data found"
    return 0
  fi

  # Build accept lookup maps
  declare -gA _ACCEPT_PEER=()
  declare -gA _ACCEPT_DEST_KEYS=()
  declare -gA _ACCEPT_DEST=()

  while IFS='|' read -r type rest; do
    [[ -z "$type" ]] && continue
    case "$type" in
      peer)
        local a_name a_bi a_bo a_pi a_po a_conns
        IFS='|' read -r a_name a_bi a_bo a_pi a_po a_conns <<< "$rest"
        _ACCEPT_PEER["$a_name"]="${a_bi}|${a_bo}|${a_pi}|${a_po}|${a_conns}"
        ;;
      dest)
        local d_peer d_ip d_port d_proto d_bytes d_count
        IFS='|' read -r d_peer d_ip d_port d_proto d_bytes d_count <<< "$rest"
        local d_key="${d_peer}:${d_ip}:${d_port}:${d_proto}"
        _ACCEPT_DEST["$d_key"]="${d_bytes}|${d_count}"
        _ACCEPT_DEST_KEYS["$d_peer"]+="${d_key} "
        ;;
    esac
  done <<< "$accept_data"

  # Measure column widths
  local w_peer=16 w_drops=1
  while IFS='|' read -r type rest; do
    case "$type" in
      peer)
        local name drops
        name=$(echo "$rest"  | cut -d'|' -f1)
        drops=$(echo "$rest" | cut -d'|' -f4)
        (( ${#name}  > w_peer  )) && w_peer=${#name}
        (( ${#drops} > w_drops )) && w_drops=${#drops}
        ;;
      service)
        local count
        count=$(echo "$rest" | cut -d'|' -f3)
        (( ${#count} > w_drops )) && w_drops=${#count}
        ;;
    esac
  done <<< "$data"

  for a_name in "${!_ACCEPT_PEER[@]}"; do
    (( ${#a_name} > w_peer )) && w_peer=${#a_name}
  done

  (( w_peer += 2 ))
  local drops_col=$(( w_peer + 30 ))

  local hours_display="${hours}h"
  [[ "$hours" == "0" ]] && hours_display="all time"

  log::section "Activity Monitor (last ${hours_display})"
  echo ""

  if display::is_table "activity"; then
    cmd::activity::_render_table "$data"
    return 0
  fi

  # Helper — render accept dests for a peer inline
  _render_peer_accept_dests() {
    local peer_name="$1"
    local keys="${_ACCEPT_DEST_KEYS[$peer_name]:-}"
    [[ -z "$keys" ]] && return 0
    for d_key in $keys; do
      local dest_stats="${_ACCEPT_DEST[$d_key]:-}"
      [[ -z "$dest_stats" ]] && continue
      local d_bytes d_count
      IFS='|' read -r d_bytes d_count <<< "$dest_stats"
      local rest_key="${d_key#${peer_name}:}"
      local d_ip d_port d_proto
      d_ip="${rest_key%%:*}"
      local pp="${rest_key#*:}"
      d_port="${pp%%:*}"
      d_proto="${pp##*:}"
      local dest_display
      dest_display=$(resolve::dest "$d_ip" "$d_port" "$d_proto" 2>/dev/null \
        || echo "${d_ip}:${d_port}/${d_proto}")
      local bytes_fmt
      bytes_fmt=$(fmt::bytes "$d_bytes")
      ui::activity::accept_dest_row \
        "$dest_display" "$d_bytes" "$bytes_fmt" \
        "$d_count" "$drops_col" "$w_drops"
    done
  }

  local first_peer=true skip_peer=false current_name=""
  local -a rendered_peers=()

  while IFS='|' read -r record_type rest; do
    case "$record_type" in
      peer)
        local name rx tx drops
        IFS='|' read -r name rx tx drops <<< "$rest"

        # Flush previous peer's accept dests before starting new peer
        if [[ -n "$current_name" ]] && ! $drop_only; then
          _render_peer_accept_dests "$current_name"
        fi

        skip_peer=false
        current_name="$name"
        local has_accept="${_ACCEPT_PEER[$name]:-}"

        if $dropped_only && [[ "$drops" -eq 0 ]] && [[ -z "$has_accept" ]]; then
          skip_peer=true
          continue
        fi

        $first_peer || echo ""
        first_peer=false
        rendered_peers+=("$name")

        local rx_fmt tx_fmt
        rx_fmt=$(fmt::bytes "$rx")
        tx_fmt=$(fmt::bytes "$tx")

        local name_pad rx_pad tx_pad
        name_pad=$(printf "%-${w_peer}s" "$name")
        rx_pad=$(printf   "%-10s"        "$rx_fmt")
        tx_pad=$(printf   "%-10s"        "$tx_fmt")

        local drop_word="drops"
        [[ "$drops" -eq 1 ]] && drop_word="drop"

        if ! $accept_only; then
          ui::activity::peer_row \
            "$name_pad" "$rx_pad" "$tx_pad" "$drops" "$drop_word" "$w_drops"
        fi

        # Accept summary row
        if [[ -n "$has_accept" ]] && ! $drop_only; then
          local a_bi a_bo a_pi a_po a_conns
          IFS='|' read -r a_bi a_bo a_pi a_po a_conns <<< "$has_accept"
          local a_in_fmt a_out_fmt
          a_in_fmt=$(fmt::bytes "$a_bi")
          a_out_fmt=$(fmt::bytes "$a_bo")
          ui::activity::accept_row \
            "$name_pad" "$a_in_fmt" "$a_out_fmt" "$a_conns" "$w_drops"
        fi
        ;;

      service)
        $skip_peer && continue
        local peer dest_display drop_count
        IFS='|' read -r peer dest_display drop_count <<< "$rest"
        local svc_drop_word="drops"
        [[ "$drop_count" -eq 1 ]] && svc_drop_word="drop"
        if ! $accept_only; then
          ui::activity::service_row \
            "$dest_display" "$drop_count" "$svc_drop_word" "$drops_col" "$w_drops"
        fi
        ;;
    esac
  done <<< "$data"

  # Flush last peer's accept dests
  if [[ -n "$current_name" ]] && ! $drop_only; then
    _render_peer_accept_dests "$current_name"
  fi

  # Accept-only peers — not in drop data, render separately
  if ! $drop_only; then
    for a_name in $(echo "${!_ACCEPT_PEER[@]}" | tr ' ' '\n' | sort); do
      local already=false
      for rp in "${rendered_peers[@]:-}"; do
        [[ "$rp" == "$a_name" ]] && already=true && break
      done
      $already && continue

      $first_peer || echo ""
      first_peer=false

      local a_stats="${_ACCEPT_PEER[$a_name]}"
      local a_bi a_bo a_pi a_po a_conns
      IFS='|' read -r a_bi a_bo a_pi a_po a_conns <<< "$a_stats"
      local a_in_fmt a_out_fmt
      a_in_fmt=$(fmt::bytes "$a_bi")
      a_out_fmt=$(fmt::bytes "$a_bo")
      local name_pad
      name_pad=$(printf "%-${w_peer}s" "$a_name")
local a_in_pad a_out_pad
a_in_pad=$(printf "%-10s" "$a_in_fmt")
a_out_pad=$(printf "%-10s" "$a_out_fmt")
ui::activity::accept_row \
  "$name_pad" "$a_in_pad" "$a_out_pad" "$a_conns" "$w_drops"
    done
  fi

  echo ""
}