11-concurrency-observability

Concurrency & Observability

Lane-based locking, trace IDs, and production debugging — the infrastructure that keeps autonomous agents from colliding.

The Collision Problem

Think of it as two people editing the same Google Doc at the same time — neither sees what the other is typing, and every few minutes the document just resets to an old version. That’s what happens when an autonomous agent is running multiple surfaces without coordination.

An autonomous agent isn’t one process. It’s multiple surfaces sharing the same database, the same skills, and the same memory:

Surface 1: HEARTBEAT        (cron, every 30 min)
Surface 2: AGENT-OPERATE    (admin interaction)
Surface 3: CHAT-COMPLETION  (visitor chat)
Surface 4: WEBHOOKS          (external triggers)

What happens when a heartbeat fires while the admin is mid-conversation with the agent? Both try to:

  • Read and write to agent_memory
  • Execute skills that modify state
  • Update agent_objectives progress
  • Log to agent_activity

Without coordination, you get:

  • Race conditions — heartbeat overwrites memory that operate just wrote
  • Duplicate work — both surfaces execute the same pending automation
  • Corrupted state — partial writes from interrupted operations
  • Billing surprises — parallel API calls double your token spend

Lane-Based Locking

FlowPilot uses a simple, effective concurrency model: lane-based advisory locks stored in the agent_locks table.

The Model

Each agent surface claims a “lane” before operating. Only one process can hold a lane at a time.

CREATE TABLE agent_locks (
  lane        TEXT PRIMARY KEY,
  locked_by   TEXT NOT NULL,
  locked_at   TIMESTAMPTZ DEFAULT now(),
  expires_at  TIMESTAMPTZ DEFAULT now() + interval '5 minutes'
);

Acquiring a Lock

import { tryAcquireLock, releaseLock } from '../_shared/concurrency.ts';

// In heartbeat
const acquired = await tryAcquireLock(supabase, 'heartbeat', 'heartbeat-cron', 300);
if (!acquired) {
  console.log('[heartbeat] Another instance is running, skipping');
  return; // Graceful exit, no error
}

try {
  // ... run heartbeat logic ...
} finally {
  await releaseLock(supabase, 'heartbeat');
}

Lock Lanes

LaneHolderPurpose
heartbeatCron triggerPrevents overlapping heartbeat cycles
operateAdmin sessionPrevents heartbeat from interfering with interactive use
automation:{id}HeartbeatPrevents duplicate automation execution
objective:{id}Any surfacePrevents parallel progress on same objective

TTL-Based Expiry

Locks auto-expire after 5 minutes (configurable). This prevents deadlocks from crashed processes:

-- The RPC function checks expiry atomically
CREATE OR REPLACE FUNCTION try_acquire_agent_lock(
  p_lane TEXT, p_locked_by TEXT, p_ttl_seconds INT DEFAULT 300
) RETURNS BOOLEAN AS $$
BEGIN
  -- Delete expired locks
  DELETE FROM agent_locks WHERE lane = p_lane AND expires_at < now();
  -- Try to insert
  INSERT INTO agent_locks (lane, locked_by, expires_at)
  VALUES (p_lane, p_locked_by, now() + (p_ttl_seconds || ' seconds')::interval)
  ON CONFLICT (lane) DO NOTHING;
  -- Check if we got it
  RETURN EXISTS (
    SELECT 1 FROM agent_locks
    WHERE lane = p_lane AND locked_by = p_locked_by
  );
END;
$$ LANGUAGE plpgsql;

Why Not Redis?

For a self-hosted system running on a single Supabase instance, PostgreSQL advisory locks are:

  • Simpler — no additional infrastructure
  • Sufficient — agent concurrency is low (max 4-5 concurrent surfaces)
  • Persistent — lock state survives edge function cold starts
  • Auditable — you can query agent_locks to see current state

Redis would be overkill. If you’re running hundreds of agent instances, you need Redis (or something equivalent). For a single-tenant self-hosted deployment, PostgreSQL is the right tool.


Trace IDs: Following the Thread

The heartbeat runs 11 operations in 45 seconds — self-healing, planning, automating, reflecting, remembering. Without a trace ID, debugging is like trying to piece together a crime scene from scattered witnesses who don’t agree on the timeline.

With a trace ID, you get a complete story. Every operation in a single autonomous run is linked under one ID. You can see what happened — in order, from start to finish.

When a heartbeat runs, it might:

  1. Self-heal 2 skills
  2. Advance 3 objective steps
  3. Execute 2 automations
  4. Reflect on 7 days of performance
  5. Save 4 memories

That’s 11+ operations across multiple tables. Without a correlation ID, debugging is archaeology — piecing together timestamps and hoping they align.

The Solution

Every autonomous run generates a trace ID that flows through the entire operation chain:

import { generateTraceId } from '../_shared/trace.ts';

const traceId = generateTraceId('hb'); // hb_m3k9f2_a7x2p1

The trace ID is:

  • Human-readable — prefix tells you the surface (hb = heartbeat, op = operate)
  • Sortable — timestamp component enables chronological ordering
  • Unique — random suffix prevents collisions

Propagation

The trace ID is passed through every function call and stored in every activity log:

// Heartbeat creates trace
const traceId = generateTraceId('hb');

// Passed to reasoning engine
const result = await reason({
  ...config,
  metadata: { traceId },
});

// Stored in activity logs
await supabase.from('agent_activity').insert({
  skill_name: 'advance_plan',
  conversation_id: traceId,  // Reuses conversation_id for trace correlation
  status: 'success',
  token_usage: usage,
});

Querying by Trace

“Show me everything that happened in the last heartbeat”:

SELECT skill_name, status, duration_ms, token_usage, created_at
FROM agent_activity
WHERE conversation_id = 'hb_m3k9f2_a7x2p1'
ORDER BY created_at;

This returns the complete story of a single autonomous run — every skill called, every result, every failure — in chronological order.


The Activity Log: Structured Observability

Every agent action is logged to agent_activity with a consistent schema:

{
  id: uuid,
  agent: 'flowpilot' | 'visitor_chat',
  skill_name: string,           // What was attempted
  skill_id: uuid | null,        // Link to skill definition
  status: 'success' | 'error' | 'pending_approval' | 'skipped',
  input: json,                  // What was sent (sanitized)
  output: json,                 // What came back
  error_message: string | null, // If failed, why
  token_usage: {                // Cost tracking
    prompt_tokens: number,
    completion_tokens: number,
    total_tokens: number
  },
  duration_ms: number,          // Performance tracking
  conversation_id: string,      // Trace ID for correlation
  created_at: timestamptz
}

What This Enables

  1. Cost attribution — Which skills consume the most tokens?
  2. Performance monitoring — Which skills are slowest?
  3. Failure tracking — Which skills fail most often? (feeds self-healing)
  4. Audit trail — What did the agent do, when, and why?
  5. Trace reconstruction — Follow a single autonomous run end-to-end

Self-Healing: Observability as Input

The activity log isn’t just for humans. The agent reads its own logs during the self-healing phase of every heartbeat:

SELF-HEAL phase:
  1. Query agent_activity for last 3 days
  2. Group by skill_name
  3. Find skills with 3+ consecutive failures
  4. Auto-disable failing skills
  5. Disable linked automations
  6. Inject healing report into prompt

This closes the observability loop: the agent monitors itself and acts on what it sees. A failing skill doesn’t just generate alerts — it gets quarantined automatically.


The Engine Room Dashboard

All observability data surfaces in the admin UI through the “Engine Room” — a real-time view of agent operations:

PanelData SourceShows
Activity Feedagent_activityRecent actions with status, duration, tokens
Token Spendagent_activity.token_usageCumulative cost by skill and time period
Skill Healthagent_activity aggregatedSuccess rates, failure streaks
Active Locksagent_locksCurrently held lanes
Memory Usageagent_memory countTotal memories by category
Objectivesagent_objectivesProgress on active goals

The Engine Room answers the operator’s core question: “What is my agent doing right now, and is it working?”


Tool Hallucination Recovery

LLMs sometimes “hallucinate” tool calls — requesting tools that don’t exist or passing malformed parameters. Without recovery, this crashes the reasoning loop and leaves the agent in an undefined state.

FlowPilot handles this with a structured recovery pattern:

1. LLM calls non-existent tool
   → Catch the "unknown tool" error
   → Do NOT abort the session

2. Inject correction message into conversation:
   "Tool 'X' doesn't exist. Available tools: [list of valid tool names]
    Please try again with one of the available tools."

3. Re-enter reasoning loop (max 2 retries)

4. If still failing after retries:
   → Log error with full details (tool name attempted, parameters, context)
   → Exit gracefully with summary of what was accomplished
   → Flag the stall in agent_activity for review

Why This Happens

Hallucinated tool calls are more common than you’d expect, especially when:

  • The agent has been given context about tools that no longer exist (stale skill definitions)
  • The model infers a tool should exist based on domain knowledge (“surely there’s a send_sms skill”)
  • The model confuses skill names across similar domains

The Recovery Loop

for (let attempt = 0; attempt < MAX_TOOL_RETRIES; attempt++) {
  const result = await executeTool(toolName, params);

  if (result.error === 'UNKNOWN_TOOL') {
    const availableTools = skills.map(s => s.name).join(', ');
    conversation.push({
      role: 'system',
      content: `Tool '${toolName}' does not exist. Available tools: ${availableTools}. Please use one of these.`
    });
    continue; // Re-enter reasoning loop
  }

  if (result.error === 'INVALID_PARAMS') {
    conversation.push({
      role: 'system',
      content: `Tool call failed: ${result.error}. Schema: ${JSON.stringify(skill.tool_definition)}`
    });
    continue;
  }

  break; // Success
}

The recovery message is injected as a system role message — not a user message — so the agent understands it as infrastructure feedback, not user input.


The Anti-Patterns

Anti-PatternConsequenceFix
No concurrency controlRace conditions, duplicate workLane-based locking
No trace IDsCan’t debug autonomous runsGenerate and propagate trace IDs
Unstructured logsconsole.log everywhere, no queryable dataStructured activity log
Logs for humans onlyAgent can’t learn from its failuresSelf-healing reads activity log
No TTL on locksCrashed process holds lock foreverAuto-expiry with TTL
Over-engineering (Redis, Kafka)Complexity without benefit for single-tenantPostgreSQL is sufficient

Concurrency and observability aren’t glamorous. They’re plumbing. But plumbing is what separates a demo from a product. Without it, your agent works in the lab and fails in production. With it, you can sleep while your agent runs — and know exactly what it did when you wake up.

Next: the skills ecosystem and how capabilities are organized. Skills Ecosystem →

Community — Under Development

This is your handbook

Agentic AI is evolving fast. The patterns, the laws, the architecture — they need to stay current with the community's collective knowledge.

If you have thoughts on autonomous agents, or if you want to contribute to the work around AI-operated CMS, CRM, and ERP systems — whether it's a production story, a pattern you've discovered, or an idea you want to explore — I'd love to hear from you.

Connect on GitHub