>_< arvis

06 — Queue & Scheduler

How jobs are queued, prioritized, retried, and scheduled.


The Job Queue

Every LLM call goes through the queue — including messages, heartbeats, delegations, and webhooks.

queue table schema:
  id           — auto-increment
  agent_id     — which agent handles this
  type         — 'message' | 'heartbeat' | 'cron' | 'webhook'
  status       — 'pending' | 'running' | 'completed' | 'failed'
  priority     — 1-10 (10 = highest, processed first)
  payload      — JSON: prompt, systemPrompt, channelId, platform, images, ...
  attempts     — how many times we've tried
  max_attempts — default 3
  result       — final response text (on success)
  error        — error message (on failure)
  retry_after  — timestamp: don't pick up until after this time
  created_at   — when queued
  started_at   — when processing began
  completed_at — when finished

Priority Levels

Priority What When
10 Heartbeat/cron jobs Scheduled tasks
5 Normal user messages Default
4 Delegation sub-jobs When conductor delegates to sub-agent
1 Low-priority background Future use

Highest priority jobs are processed first. If two jobs have the same priority, oldest wins (FIFO).


Processing Flow

QueueManager.enqueue(job)
  → INSERT INTO queue (status='pending')
  → setImmediate(() => processNext())   ← fires instantly, no 1s wait

QueueManager.processNext()
  → SELECT * FROM queue WHERE status='pending' AND (retry_after IS NULL OR retry_after < now())
    ORDER BY priority DESC, created_at ASC LIMIT 1
  → If none: return (idle)
  → UPDATE SET status='running', started_at=now, attempts+=1
  → activeJobs++
  → processJob(job)
  → processNext() again (keeps drain loop going)

On success:
  → UPDATE SET status='completed', result=response, completed_at=now
  → activeJobs--
  → processNext()

On error (attempts < max_attempts):
  → backoff = 2^attempts minutes (1 min, 2 min, 4 min)
  → UPDATE SET status='pending', retry_after=now+backoff, error=message
  → Job retries after backoff

On error (attempts >= max_attempts):
  → UPDATE SET status='failed', completed_at=now
  → Shows in /queue page as failed
  → Can retry manually via dashboard

Job Recovery On Restart

When Arvis starts, recoverStuckJobs() runs:

SELECT * FROM queue
WHERE status = 'running'
  AND started_at < datetime('now', '-5 minutes')

Any job that was "running" for >5 minutes is marked as failed (process likely crashed). These appear in the /queue page and can be retried manually.

This runs once on startup AND every 5 minutes while running.


Live Queue Monitor (Dashboard)

Dashboard → Queue page shows:

  • Running jobs (highlighted amber) — currently being processed
  • Pending jobs — waiting to run
  • Failed jobs — need manual retry

Actions:

  • Retry — PATCH to reset a failed job back to pending
  • Cancel — DELETE a pending job from queue
  • Auto-refreshes every 3 seconds

The Scheduler

The scheduler manages two types of recurring jobs:

Heartbeats

Simple interval-based tasks: "Run this prompt every X minutes"

heartbeat_configs table:
  agent_id    — which agent runs it
  name        — human label
  schedule    — e.g., 'every 5m', 'every 1h', 'every 1d'
  prompt      — the prompt to send
  channel_id  — where to post the response
  platform    — discord, telegram, etc.
  enabled     — on/off toggle
  last_run_at — when last executed

Example: "Check BTC price every 5 minutes and post to #prices"

Cron Jobs

Full cron expression format: run at specific times

cron_jobs table:
  agent_id    — which agent runs it
  name        — human label
  cron        — cron expression, e.g. "0 9 * * 1-5" (9am weekdays)
  prompt      — the prompt to send
  channel_id  — where to post
  platform    — discord, telegram, etc.
  enabled     — on/off toggle
  last_run_at — when last executed

Example: "Every Monday at 9am, write a weekly crypto market summary"


Scheduler Loop

Scheduler polls every 10 seconds:
  ↓
For each enabled heartbeat:
  Is it due? (now >= last_run_at + interval)
  → NO: skip
  → YES:
    Flood guard: is there already a pending/running job for this heartbeat?
    → YES: skip (prevents pile-up if processing is slow)
    → NO: enqueue job with priority 10
    UPDATE last_run_at = now

For each enabled cron job:
  Does cron expression match current minute?
  (Parsed with cronstrue/cron-parser)
  → NO: skip
  → YES:
    Flood guard check
    → already queued: skip
    → not queued: enqueue job
    UPDATE last_run_at = now

Managing Schedules (Dashboard)

Dashboard → Workflows page:

  • Create heartbeat: agent + interval + prompt + channel
  • Create cron: agent + cron expression + prompt + channel
  • Enable/disable toggle (instant, no restart needed)
  • Delete jobs
  • See last run time

Cron presets available:

  • Every 5 minutes
  • Every hour
  • Daily at midnight
  • Weekly on Monday

Queue API

GET  /api/queue        — list jobs (filter by status, agent)
PATCH /api/queue/:id   — retry failed job (reset to pending)
DELETE /api/queue/:id  — cancel/remove a job
Context & MemorySecurity