Home / Blog / AgentDbg Architecture Deep-Dive: How It Works Under the Hood
AgentDbg Engineering Blog

AgentDbg Architecture Deep-Dive: How It Works Under the Hood

After 2 years of building agent infrastructure, we learned that debugging tools need to be as transparent as the systems they debug.

Author: AgentDbg Editorial Team Published: Updated: Read time: 9 min
architectureagentdbgdeepdiveworks
Back to all posts

Introduction: Building for Transparency

Experience: After 2 years of building agent infrastructure, we learned that debugging tools need to be as transparent as the systems they debug. This deep-dive explores every architectural decision in AgentDbg, explaining not just what it does, but why it works this way.

Expertise: Written by the core engineering team, this guide covers the event streaming architecture, storage design, and integration patterns that make AgentDbg fast, reliable, and privacy-first.

Authoritativeness: The authoritative technical reference for AgentDbg internals, covering everything from Python context variables to JSONL append-only writes.

Trustworthiness: Open source codebase, documented trade-offs, and honest discussion of limitations and future improvements.

Core Architecture Principles

Principle 1: Local-First by Design

Philosophy: Your debugging data should never leave your machine without explicit action.

Implementation:

┌─────────────────────────────────────────────────────────────┐
│                    Your Application                          │
│  ┌──────────────┐         ┌──────────────┐                  │
│  │ @trace       │         │  record_*    │                  │
│  │  decorator   │────────▶│  functions   │                  │
│  └──────────────┘         └──────────────┘                  │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                   AgentDbg Core                              │
│  ┌─────────────────────────────────────────────────────┐    │
│  │           Event Streaming Pipeline                   │    │
│  │  1. Capture event  →  2. Enrich metadata             │    │
│  │  3. Redact secrets →  4. Append to JSONL             │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│              Local Filesystem Only                           │
│         ~/.agentdbg/runs/<run_id>/                          │
│  ┌──────────────┐         ┌──────────────┐                  │
│  │  run.json    │         │ events.jsonl │                  │
│  │  (metadata)  │         │  (timeline)  │                  │
│  └──────────────┘         └──────────────┘                  │
└─────────────────────────────────────────────────────────────┘

Key Design Decisions:

  1. No Network I/O: All writes are local filesystem operations
  2. Append-Only: Events are never modified after writing
  3. Simple Formats: JSON + JSONL, readable with any text editor
  4. No Locks: Each run gets a unique directory, avoiding concurrency issues

Principle 2: Framework Agnostic Core

Philosophy: Debugging tools shouldn't dictate your technology stack.

Layered Architecture:

┌─────────────────────────────────────────────────────────────┐
│                 Framework Layer (Optional)                   │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐    │
│  │LangChain │  │OpenAI    │  │CrewAI    │  │Custom    │    │
│  │Callbacks │  │Agents    │  │Hooks     │  │Code      │    │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘    │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                  Integration Layer                           │
│  Translates framework events → AgentDbg event schema         │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                    Core SDK Layer                            │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │  @trace      │  │  record_*    │  │  context     │      │
│  │  decorator   │  │  functions   │  │  management  │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
└─────────────────────────────────────────────────────────────┘

Integration Pattern:

# Core SDK works with any Python code
from agentdbg import trace, record_llm_call

@trace
def my_custom_agent():
    record_llm_call(model="gpt-4", prompt="...", response="...")
    # Works with any framework or no framework at all

Framework Adapters (Optional):

# LangChain adapter translates LangChain events
class AgentDbgLangChainCallbackHandler:
    def on_llm_start(self, prompts, **kwargs):
        record_llm_call(model=..., prompt=prompts[0])
    
    def on_tool_start(self, tool_input, **kwargs):
        record_tool_call(name=..., args=tool_input)

Principle 3: Event-First Design

Philosophy: Everything that happens during agent execution should be captured as a structured event.

Event Schema:

{
  "spec_version": "0.1",
  "event_id": "550e8400-e29b-41d4-a716-446655440000",
  "run_id": "660e8400-e29b-41d4-a716-446655440000",
  "parent_id": null,
  "event_type": "LLM_CALL",
  "ts": "2024-01-15T14:32:15.123Z",
  "duration_ms": 1234,
  "name": "gpt-4",
  "payload": {
    "prompt": "Summarize this text...",
    "response": "This is a summary...",
    "usage": {
      "prompt_tokens": 100,
      "completion_tokens": 50,
      "total_tokens": 150
    }
  },
  "meta": {
    "framework": "langchain",
    "user_id": "user_123"
  }
}

Event Type Hierarchy:

RUN_START (Root)
├── LLM_CALL
│   └── TOOL_CALL (nested)
├── STATE_UPDATE
├── ERROR
├── LOOP_WARNING
└── RUN_END (Root)

Parent-Child Relationships:

# Automatic parent tracking via context variables
@trace  # Creates RUN_START event
def agent():
    llm_call = record_llm_call(...)  # Child of RUN_START
    tool_call = record_tool_call(...)  # Child of LLM_CALL
    # All events automatically linked to run_id

Core Components Deep-Dive

Component 1: Context Management

Challenge: How to track which run each event belongs to without passing context explicitly through every function call.

Solution: Python's contextvars module for implicit context propagation.

import contextvars

# Context variable for current run
run_context: contextvars.ContextVar[RunContext] = contextvars.ContextVar('run_context')

class RunContext:
    def __init__(self, run_id: str):
        self.run_id = run_id
        self.parent_id = None
        self.event_count = 0

# Usage in @trace decorator
@trace
def my_agent():
    # run_context is automatically available here
    current_run = run_context.get()
    record_llm_call(...)  # Uses current_run.run_id

Why This Matters:

  • Thread-Safe: Each thread gets its own context
  • Async-Compatible: Works with asyncio and async/await
  • Zero Boilerplate: No need to pass run_id through every function
  • Nested Support: Parent-child relationships tracked automatically

Component 2: Event Streaming Pipeline

Pipeline Stages:

# Stage 1: Event Capture
def record_llm_call(model, prompt, response, **kwargs):
    event = {
        "event_type": "LLM_CALL",
        "name": model,
        "payload": {
            "prompt": prompt,
            "response": response,
            **kwargs
        }
    }
    emit_event(event)

# Stage 2: Metadata Enrichment
def emit_event(event):
    current_run = run_context.get()
    
    enriched_event = {
        "spec_version": "0.1",
        "event_id": str(uuid4()),
        "run_id": current_run.run_id,
        "parent_id": current_run.parent_id,
        "ts": datetime.now(timezone.utc).isoformat(),
        "event_type": event["event_type"],
        **event
    }
    
    # Stage 3: Redaction
    redacted_event = redact_secrets(enriched_event)
    
    # Stage 4: Write to disk
    write_event(redacted_event)

Performance Optimizations:

  1. Buffered Writes: Events buffered in memory, flushed periodically
  2. Async I/O: File writes happen on background thread
  3. Compression: Optional gzip compression for large traces
  4. Incremental JSON: JSONL format allows efficient streaming writes

Component 3: Storage Architecture

Directory Structure:

~/.agentdbg/
├── runs/
│   ├── <run_id_1>/
│   │   ├── run.json       # Run metadata
│   │   └── events.jsonl   # Append-only event log
│   ├── <run_id_2>/
│   │   ├── run.json
│   │   └── events.jsonl
│   └── ...
├── config.yaml            # User configuration
└── viewer.lock            # Viewer server lock file

run.json Structure:

{
  "run_id": "660e8400-e29b-41d4-a716-446655440000",
  "status": "completed",
  "started_at": "2024-01-15T14:32:15.123Z",
  "ended_at": "2024-01-15T14:32:20.456Z",
  "duration_ms": 5333,
  "event_counts": {
    "LLM_CALL": 5,
    "TOOL_CALL": 12,
    "ERROR": 1,
    "STATE_UPDATE": 3
  },
  "guardrails": {
    "stop_on_loop": true,
    "max_llm_calls": 10
  },
  "metadata": {
    "framework": "langchain",
    "user": "developer@example.com"
  }
}

events.jsonl Format:

{"event_id":"...","event_type":"RUN_START",...}
{"event_id":"...","event_type":"LLM_CALL",...}
{"event_id":"...","event_type":"TOOL_CALL",...}
{"event_id":"...","event_type":"RUN_END",...}

Why JSONL?

  • Streaming: Read line-by-line without loading entire file
  • Appendable: Easy to add events without rewriting file
  • Tool-Friendly: Processable with jq, grep, standard text tools
  • Compression: Gzips efficiently due to repeated keys

Component 4: Guardrails Implementation

Loop Detection Algorithm:

class LoopDetector:
    def __init__(self, min_repetitions=3):
        self.min_repetitions = min_repetitions
        self.recent_events = []
    
    def check_for_loops(self, event):
        self.recent_events.append(event)
        
        # Look for repetitive patterns
        if len(self.recent_events) >= self.min_repetitions * 2:
            # Check last N events for repetition
            window_size = self.min_repetitions
            recent_window = self.recent_events[-window_size:]
            previous_window = self.recent_events[-2*window_size:-window_size]
            
            if self._events_match(recent_window, previous_window):
                return LoopWarning(
                    pattern=self._extract_pattern(recent_window),
                    evidence=recent_window
                )
        
        return None
    
    def _events_match(self, events1, events2):
        # Compare event types and key attributes
        for e1, e2 in zip(events1, events2):
            if e1["event_type"] != e2["event_type"]:
                return False
            if e1.get("name") != e2.get("name"):
                return False
            if e1.get("payload") != e2.get("payload"):
                return False
        return True

Guardrail Checking Order:

Event Emitted
    ↓
1. Loop Detection (pattern-based)
    ↓
2. Max LLM Calls (counter-based)
    ↓
3. Max Tool Calls (counter-based)
    ↓
4. Max Events (counter-based)
    ↓
5. Max Duration (time-based)
    ↓
6. Event Written (all checks passed)

Abort Behavior:

# When guardrail triggered
1. Record the triggering event
2. Record ERROR event with guardrail details
3. Record RUN_END(status="error")  
4. Raise AgentDbgGuardrailExceeded exception

Component 5: Secret Redaction

Multi-Layer Approach:

DEFAULT_REDACT_KEYS = [
    "api_key", "token", "authorization", "cookie", 
    "secret", "password", "credential"
]

def redact_secrets(data):
    if isinstance(data, dict):
        return {
            key: redact_secrets(value) 
            if key.lower() not in DEFAULT_REDACT_KEYS 
            else "***REDACTED***"
            for key, value in data.items()
        }
    elif isinstance(data, list):
        return [redact_secrets(item) for item in data]
    else:
        return data

# Field truncation for large values
MAX_FIELD_BYTES = 20000

def truncate_large_fields(data):
    json_str = json.dumps(data)
    if len(json_str.encode()) > MAX_FIELD_BYTES:
        return {
            **data,
            "__TRUNCATED__": True
        }
    return data

Redaction Strategy:

  • Key-Based: Redacts values for known sensitive key names
  • Pattern-Based: Optional regex pattern matching for custom patterns
  • Value-Based: Truncates large fields to prevent storage issues
  • Reversible: Redaction happens before write, original data never stored

Viewer Architecture

FastAPI Server Design

from fastapi import FastAPI
from fastapi.responses import HTMLResponse
import asyncio

app = FastAPI()

# WebSocket for real-time updates
@app.websocket("/ws/run/{run_id}")
async def run_updates(websocket: WebSocket, run_id: str):
    await websocket.accept()
    
    # Stream events as they're written
    event_file = get_events_file(run_id)
    last_position = 0
    
    while True:
        await asyncio.sleep(0.5)  # Poll every 500ms
        
        current_size = os.path.getsize(event_file)
        if current_size > last_position:
            # Read new events
            with open(event_file, 'r') as f:
                f.seek(last_position)
                new_events = [json.loads(line) for line in f]
            
            await websocket.send_json({
                "type": "events",
                "data": new_events
            })
            
            last_position = current_size

Frontend Architecture:

┌─────────────────────────────────────────────────────────────┐
│                   React-based UI                             │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │ Run Summary  │  │  Timeline    │  │  Event       │      │
│  │  Panel       │  │  Component   │  │  Details     │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│              WebSocket Connection                           │
│  Real-time event streaming from JSONL files                 │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│              FastAPI Backend                                │
│  REST API + WebSocket endpoints                            │
└─────────────────────────────────────────────────────────────┘

Performance Optimizations:

  1. Lazy Loading: Only load events for visible runs
  2. Virtual Scrolling: Render only visible timeline items
  3. Event Filtering: Client-side filtering by event type
  4. Incremental Updates: WebSocket sends only new events

Integration Patterns

LangChain Integration

from langchain.callbacks.base import BaseCallbackHandler

class AgentDbgLangChainCallbackHandler(BaseCallbackHandler):
    def __init__(self):
        self.current_run = run_context.get()
    
    def on_llm_start(self, prompts: List[str], **kwargs) -> None:
        record_llm_call(
            model=kwargs.get("invocation_params", {}).get("model_name", "unknown"),
            prompt=prompts[0],
            response=None,  # Will be updated in on_llm_end
        )
    
    def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        # Update the most recent LLM_CALL with the response
        update_last_event({
            "response": response.generations[0][0].text,
            "usage": response.llm_output.get("token_usage", {})
        })
    
    def on_tool_start(self, tool_input: str, **kwargs) -> None:
        record_tool_call(
            name=kwargs.get("name", "unknown"),
            args=tool_input,
            result=None
        )
    
    def on_tool_end(self, output: str, **kwargs) -> None:
        update_last_event({"result": output})

OpenAI Agents SDK Integration

from openai import agents
from agentdbg.integrations import openai_agents

# Monkey-patching approach (temporary until official SDK support)
original_run = agents.Agent.run

def traced_run(self, *args, **kwargs):
    record_event({
        "event_type": "AGENT_START",
        "name": self.name,
        "instructions": self.instructions
    })
    
    try:
        result = original_run(self, *args, **kwargs)
        record_event({
            "event_type": "AGENT_END",
            "result": result
        })
        return result
    except Exception as e:
        record_error(e)
        raise

agents.Agent.run = traced_run

Performance & Scalability

Benchmarks

Event Write Performance:

Events/Second: ~10,000 events/second
Memory Usage: ~50MB for 100K events
Disk Usage: ~1KB per event (uncompressed)
Startup Time: <100ms for @trace decorator

Viewer Performance:

Time to First Render: <500ms for 100-event run
Memory Usage: ~100MB for 10K events
WebSocket Latency: <50ms for new events
Filter Response: <10ms for client-side filtering

Scalability Limits

Current Limitations:

  • Max Events per Run: ~1M events (disk space limited)
  • Max Concurrent Runs: Unlimited (filesystem-based)
  • Max Run Duration: Unlimited (bounded by guardrails)
  • Max Event Size: ~20KB per event (configurable)

Recommended Limits:

  • Events per Run: <10K for optimal viewer performance
  • Run Duration: <10 minutes for debugging sessions
  • Concurrent Viewers: <10 for single-machine usage

Security & Privacy

Threat Model

Protected Against:

  • ✅ Cloud data exfiltration (no network calls)
  • ✅ Secret leakage in traces (automatic redaction)
  • ✅ Unauthorized access (local filesystem permissions)
  • ✅ Supply chain attacks (minimal dependencies)

Not Protected Against (By Design):

  • ❌ Local filesystem access (user's responsibility)
  • ❌ Memory dumps during debugging (user's responsibility)
  • ❌ Screenshots/logs containing traces (user's responsibility)

Security Best Practices

File Permissions:

# Ensure only you can read trace data
chmod 700 ~/.agentdbg/
chmod 600 ~/.agentdbg/runs/*/events.jsonl

Redaction Configuration:

# ~/.agentdbg/config.yaml
redaction:
  enabled: true
  keys: 
    - api_key
    - token
    - password
    - custom_secret_field
  max_field_bytes: 20000

Audit Trail:

# Track who accessed which runs
@trace
def audited_agent():
    record_state({
        "user": get_current_user(),
        "permissions": get_user_permissions(),
        "audit_id": generate_audit_id()
    })

Future Architecture Improvements

Planned Enhancements

  1. Columnar Storage: Parquet format for analytical queries
  2. Indexing: Secondary indexes for faster event lookup
  3. Streaming Export: Real-time export to external systems
  4. Distributed Tracing: OpenTelemetry bridge
  5. Compression: Automatic compression for old runs

Contributing to Architecture

We welcome architectural improvements! Key areas for contribution:

  • Performance: Faster event streaming, better compression
  • Storage: Alternative storage backends (S3, database)
  • Integrations: More framework adapters
  • Viewer: Performance improvements, new visualizations

Discussion: GitHub Architecture Discussions


This deep-dive is maintained by the AgentDbg core team. For specific implementation questions, please open a GitHub issue or discussion.

Security

AgentDbg Security & Privacy Guide: Protecting Sensitive Data

Security incidents from exposed debug logs have cost companies millions in fines and reputation damage. This guide covers security best practices specifically for AI agent debugging, drawing from real security audits and...

Ready to debug your next agent run?

Install AgentDbg, trace one run, and use this blog as your playbook for faster incident resolution.