CXB Dialler is the campaign execution plane. It reads running campaigns from MongoDB, paces outbound SIP calls through LiveKit, screens answers with AMD, and attaches answered calls to a free CXB Core fleet worker. It runs as a separate long-running process so campaign dispatch can evolve independently from the CXB API server.

How it works

A single supervised loop ticks every 2 seconds. Each tick:
  1. Queries running campaigns.
  2. Calculates demand per campaign using predictive pacing.
  3. Fair-share allocates across campaigns when budget is tight.
  4. Leases calls from MongoDB via an atomic status transition.
  5. Dials via LiveKit SIP create_sip_participant.
  6. On answer, screens AMD, then attaches the answered room to a free CXB Core worker.

Lease priority

Every contact is reached at least once before any retries fire:
PrioritySourceMeaning
1CallbacksCustomer explicitly requested a callback.
2PendingFresh records not yet dialled (full first pass before retries).
3RetriesFailed records whose next_retry_at has elapsed.

Concurrency vs carrier CPS

Two independent limits protect two different things:
LimitProtectsConfig
campaign.config.concurrency_limitCXB Core fleet — max simultaneous connected conversationsper campaign
carrier.outbound_cpsCarrier — max new SIP dials per secondper carrier
Ringing is not counted against concurrency; it is self-limited by CPS and the ring timeout. The carrier limit is enforced by a per-carrier token bucket in rate_limiter.py, keyed by carrier (falling back to trunk). A pending_wait() check prevents leasing more calls than the carrier can absorb in a tick.

Predictive pacing

target_dial_rate = min(cps_cap, seats / (AHT × answer_rate))
Pacing (pacing.py) plans a ringing pipeline deep enough to keep seats busy, bounded between a floor and ceiling well under the ring timeout. Cold start (fewer than ~20 completed calls, or no answer-rate signal yet) does no over-dial — it just fills free slots.

Abandon-rate brake

Abandon rate is computed over a rolling 5-minute window (metrics.py) and feeds back into pacing:
ConditionAction
Continuous adaptAnchors toward the 3% regulatory target (FTC/Ofcom).
Abandon rate > 5%Hard halving brake so a spike does not bleed minutes before the proportional loop catches up.
Below ABANDON_MIN_SAMPLE answered callsBrake not applied yet — signal is not stable.
metrics.py keeps per-campaign rolling-window answer_rate, AHT, and abandon_rate.

AMD screening

amd.py runs answering-machine detection on the answered media before handing the call to the bot. It is configured per deployment (initial silence, greeting window, beep detection, word timing). The outcome drives an action:
DecisionDefault action
machineteardown — drop the call, mark amd_machine.
unknowncontinue — attach anyway.
humanAttach to a CXB Core worker.

Attach-on-answer

When a call is answered and cleared by AMD, attacher.py POSTs /attach to a free CXB Core worker, which joins the already-answered LiveKit room as the bot. fleet.py polls CXB Core fleet capacity and reserves the best worker; trunks.py (OutboundTrunkResolver / TrunkResolution) resolves the outbound trunk and its CPS/burst/channels from CXB API.

State machine

Waiting/cancellable states include pending, scheduled, retry_scheduled, callback_scheduled, and leased. Active/draining states include ringing, amd_screening, attaching, dialling, and in_progress. Terminal outcomes include completed, exhausted, abandoned, and amd_machine.

Hardening

MechanismFilePurpose
Circuit breakerscircuit_breaker.pyPer-dependency async breaker (closed/open/half-open) around fleet/SIP/API calls.
Stale reaperloop.pyBounds stale in_progress calls by attached_at + bot.max_call_duration_seconds × 1.10, falling back to a global timeout.
Liveness /healthhealth.pyLoop records a tick each cycle; /health on port 8090 returns 503 if the last tick is stale, so systemd/LB can restart a wedged loop.
/metricshealth.py, ops_metrics.pyJSON snapshot of the in-process metrics registry.
Smoke checkscripts/smoke_check.pyRead-only post-deploy check: settings load, Mongo connect, indexes, /health, /metrics, fleet reachability, no stale in-flight calls.

Key files

FileWhy it matters
src/dialler/main.pyProcess entrypoint, health server, graceful shutdown.
src/dialler/loop.pyMain supervised tick loop, leasing, fair-share, state machine, reaper.
src/dialler/pacing.pyPredictive pacing formula and abandon-rate brake.
src/dialler/metrics.pyRolling-window answer_rate, AHT, abandon_rate.
src/dialler/rate_limiter.pyPer-carrier CPS token bucket.
src/dialler/trunks.pyOutbound trunk resolution and per-carrier CPS/burst/channels.
src/dialler/sip_dialler.pyLiveKit room creation and SIP participant dialing.
src/dialler/amd.pyAnswering-machine detection screening.
src/dialler/attacher.pyAttaches answered rooms to CXB Core workers.
src/dialler/fleet.pyPolls CXB Core fleet capacity and reserves the best worker.
src/dialler/circuit_breaker.pyPer-dependency async circuit breaker.
src/dialler/health.py / ops_metrics.pyLiveness and /metrics.

Deployments

CXB Dialler runs as a separate process per deployment that uses it, under /opt/dialler/ on each host.

Verification

curl -fsS http://127.0.0.1:8090/health
uv run python scripts/smoke_check.py
uv run pytest -v