Messaging System
Detailed design for the persistent message bus, event listener, stream processing, and WebSocket broadcast pipeline.
Source files:
teaparty/messaging/conversations.py-- SqliteMessageBus, conversation types and stateteaparty/messaging/bus.py-- EventBus (in-process async pub/sub)teaparty/messaging/listener.py-- BusEventListener (Unix socket IPC for Send/Reply)teaparty/teams/stream.py-- Stream event classification and live relayteaparty/bridge/message_relay.py-- MessageRelay (WebSocket broadcast to dashboard)
SqliteMessageBus
SqliteMessageBus is the persistence layer for human-agent and agent-agent
conversations. Each agent has its own SQLite database at
{teaparty_home}/management/agents/{agent-name}/{agent-name}-messages.db,
located by the agent_bus_path() helper.
Schema
Three primary tables:
messages -- (id TEXT PK, conversation TEXT, sender TEXT, content TEXT, timestamp REAL).
Indexed on (conversation, timestamp) for efficient range queries.
conversations -- (id TEXT PK, type TEXT, state TEXT, created_at REAL, awaiting_input INTEGER).
Tracks conversation lifecycle and input-request flags.
agent_contexts -- (context_id TEXT PK, initiator_agent_id, recipient_agent_id,
parent_context_id, session_id, status, pending_count INTEGER, created_at,
conversation_status, agent_worktree_path). Tracks agent-to-agent dispatch
contexts for fan-out/fan-in coordination.
All databases use WAL mode for concurrent read safety.
Conversation types
The ConversationType enum defines ten conversation scopes:
| Type | Prefix | Persistence | Scope |
|---|---|---|---|
OFFICE_MANAGER |
om: |
Indefinite | One per human |
PROJECT_MANAGER |
pm: |
Indefinite | One per project+human |
PROJECT_SESSION |
session: |
Session | Closes when session ends |
SUBTEAM |
team: |
Dispatch | One per dispatch |
JOB |
job: |
Job | One per project+job |
TASK |
task: |
Task | One per project+job+task |
PROXY_REVIEW |
proxy: |
Indefinite | One per decider |
LIAISON |
liaison: |
Session | Requester+target |
CONFIG_LEAD |
config: |
Indefinite | One per entity-scope |
PROJECT_LEAD |
lead: |
Indefinite | One per project lead |
Conversation IDs are namespaced: make_conversation_id(type, qualifier) produces
e.g. om:primus or session:20260327-143000.
Conversation state
ConversationState is either ACTIVE or CLOSED. The bus rejects writes to
closed conversations with a ValueError. The awaiting_input flag signals that
the orchestrator is waiting for human input; the bridge polls this to surface
input-requested events.
Agent contexts and fan-in
The agent_contexts table tracks agent-to-agent dispatch. When an agent fans out
to N workers, each child context is created atomically with create_sub_context(),
which increments the parent's pending_count in the same transaction. As each
worker replies, decrement_pending_count() decrements the counter. When it
reaches zero the fan-in is complete and the caller is re-invoked.
EventBus (in-process pub/sub)
EventBus in bus.py is a lightweight async pub/sub for orchestrator-to-bridge
communication within the same process. It defines 20 event types:
- Session lifecycle:
SESSION_STARTED,SESSION_COMPLETED - Dispatch lifecycle:
DISPATCH_STARTED,DISPATCH_COMPLETED - Phase lifecycle:
PHASE_STARTED,PHASE_COMPLETED - State:
STATE_CHANGED - Streaming:
STREAM_DATA,STREAM_ERROR - Input:
INPUT_REQUESTED,INPUT_RECEIVED - Control:
WITHDRAW,INTERVENE - Health:
FAILURE,LOG,API_OVERLOADED - Cost:
COST_WARNING,COST_LIMIT,CONTEXT_WARNING,TURN_COST
Each Event carries type, data dict, session_id, and timestamp.
Subscribers are async callbacks; failures are logged but do not propagate.
The InputRequest dataclass describes what the orchestrator needs: type
(approval, prompt, dialog, failure), CfA state, optional artifact path,
bridge_text summary, and human-readable options.
BusEventListener (Unix socket IPC)
BusEventListener bridges MCP tool calls (Send, CloseConversation) and
bridge-triggered interjections to bus operations via Unix domain sockets.
The orchestrator starts three sockets before launching Claude Code:
| Socket | Purpose |
|---|---|
send.sock |
Receives Send(member, composite, context_id) calls |
close.sock |
Receives CloseConversation calls |
interject.sock |
Receives bridge-triggered interjections (--resume) |
There is no agent-facing Reply tool — replies are inferred from subprocess
exit (see Reply flow below).
Send flow
- Agent calls Send via MCP; the MCP server connects to
send.sock. - Listener receives
{type: "send", member, composite, context_id}. - If
context_idrefers to an existing open conversation, the recipient's prior session is resumed (not spawned fresh) and{status: "queued", context_id}is returned immediately — non-blocking. - If the conversation is closed, an error is returned immediately.
- For new conversations, a context record is created synchronously in the
bus and
_spawn_and_record(...)is awaited. Send blocks until the recipient subprocess completes (or is detached), and returns{status: "ok", context_id, result: <recipient output>}with the inline result.
Context IDs follow the format agent:{initiator}:{recipient}:{uuid4}. The UUID
suffix ensures parallel Send calls to the same recipient produce distinct contexts.
Reply flow and fan-in
There is no separate Reply MCP tool — the recipient subprocess exiting IS the reply signal.
_spawn_and_record()runs the recipient'sspawn_fnand captures its returned result text.- When
spawn_fnreturns, the listener callstrigger_reply(context_id, result_text), which closes the agent context record and injects the result into the caller's conversation history. - The parent's
pending_countis decremented. When it reaches zero,reinvoke_fntriggers the caller's re-invocation. - Per-agent
asyncio.Lockinstances serialize concurrent--resumecalls for the same agent, preventing race conditions in fan-in scenarios.
Turn-end is the canonical reply signal — the classifier
(teaparty/cfa/actors.py:_classify_review) categorizes the final response
instead of the agent emitting a structured verdict.
Stream processing
teaparty/teams/stream.py provides two key functions for real-time event handling.
_classify_event
Maps raw stream-json event dicts to (sender, content) pairs:
assistantevents: iterates content blocks --thinkingblocks yield('thinking', text),textblocks yield(agent_role, text),tool_useblocks yield('tool_use', JSON).tool_use/tool_resultevents: yield their respective sender labels.userevents: extractstool_resultblocks from content.systemevents: yield('system', JSON).resultevents: yield('cost', JSON)with token/cost stats.
Deduplication: tool_use and tool_result events are tracked by ID in
seen_tool_use and seen_tool_result sets, preventing duplicate entries
when the same tool call appears in both assistant and standalone events.
NON_CONVERSATIONAL_SENDERS (thinking, tool_use, tool_result, system,
orchestrator, state, cost, log) filters internal trace from conversational history.
_make_live_stream_relay
Returns (callback, events) for real-time streaming to the message bus.
The callback processes each stream-json event synchronously: for every
(sender, content) pair from _classify_event, it writes immediately to
the bus via bus.send(conv_id, sender, content) and appends to the events list
for post-processing.
WebSocket delivery: MessageRelay
MessageRelay in teaparty/bridge/message_relay.py polls per-session message
buses and delivers events to subscribed dashboard connections via WebSocket.
Architecture
- Holds a shared
bus_registry: dict[session_id, SqliteMessageBus](managed by the StatePoller). - Tracks
_subscriptions: dict[connection, dict[conversation_id, cursor]]mapping each WebSocket connection to the set of conversations it is following, each with its own cursor. Cursors are opaque"{ts:.9f}:{id}"strings so that ties on timestamp are broken by message id. - Tracks
_awaiting: dict[conversation_id, session_id]to avoid redundant input_requested events and to emitescalation_clearedon True→False transitions. - Uses a single
asyncio.Lockto serialize the fetch-and-subscribe atomicity contract documented in chat-delivery.
Poll cycle
poll_once() iterates subscriptions rather than broadcasting globally:
- For each
(connection, conversation_id)subscription, fetches messages with cursor > last-seen cursor. - Delivers new messages only to the owning connection as
{type: "message", id, conversation_id, sender, content, timestamp}. - Queries
conversations_awaiting_input()for flag changes. - For newly-awaiting conversations, fetches the latest orchestrator
message as the question text and broadcasts
{type: "input_requested", session_id, conversation_id, question}to connections subscribed to that conversation. - On True→False transitions, emits
{type: "escalation_cleared", session_id, conversation_id}so the UI can dismiss its pending prompt.
run() loops poll_once() at a configurable interval (default 1 second).
Event types emitted
| Event type | Payload | Trigger |
|---|---|---|
message |
id, conversation_id, sender, content, timestamp | New message in a subscribed conversation |
input_requested |
session_id, conversation_id, question | awaiting_input flag set on a conversation |
escalation_cleared |
session_id, conversation_id | awaiting_input flag cleared (escalation resolved) |