LangGraph Supervisor Pattern with Aegis Coordination
Build a LangGraph supervisor that tracks what worker agents learn, aggregates insights, and coordinates multi-agent workflows through persistent memory.
The Supervisor Pattern
In LangGraph, the supervisor pattern uses a central orchestrator node to route work to specialized worker nodes. The supervisor decides which worker handles each sub-task, collects results, and determines when the overall task is complete.
The problem is coordination. Workers operate independently and have no shared memory. The supervisor sees their outputs but cannot track what they have learned across multiple invocations. When the graph runs again, all accumulated knowledge is gone.
This tutorial adds persistent memory coordination to a LangGraph supervisor using AegisClient directly. There is no LangGraph-specific integration class — we use the core client, which gives you full control over scoping, voting, reflections, and session tracking.
What We Are Building
A research analysis supervisor with three worker nodes:
- Researcher — Finds and summarizes information on a topic
- Analyst — Analyzes findings for patterns and insights
- Writer — Produces a final report from the analysis
The supervisor coordinates these workers, and Aegis Memory provides:
- Cross-run memory so workers remember past research
- Scoped access so each worker has appropriate visibility
- Reflections so the system improves over time
- Session tracking so long research tasks survive interruptions
Prerequisites
pip install aegis-memory langgraph langchain-openai
Step 1: Define the Graph State
LangGraph uses typed state that flows between nodes. We extend it with memory context.
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from aegis_memory.client import AegisClient
# Initialize Aegis client
client = AegisClient(base_url="http://localhost:8741", api_key="your-aegis-key")
# LLM for all agents
llm = ChatOpenAI(model="gpt-4o", temperature=0)
class ResearchState(TypedDict):
topic: str
session_id: str
research_findings: str
analysis: str
report: str
memory_context: str
current_worker: str
iteration: int
max_iterations: int
status: str
Step 2: Build the Supervisor Node
The supervisor queries Aegis Memory for relevant context from past runs, then decides which worker should act next.
def supervisor_node(state: ResearchState) -> ResearchState:
"""Supervisor routes work and provides memory context."""
topic = state["topic"]
session_id = state["session_id"]
iteration = state.get("iteration", 0)
# Query past research on this topic from all agents
past_research = client.query(
query=f"research findings analysis {topic}",
user_id="system",
agent_id="supervisor",
top_k=5
)
# Query playbook for research strategy lessons
playbook = client.query_playbook(
query=f"research strategy {topic}",
agent_id="supervisor",
min_effectiveness=0.3
)
# Build memory context for workers
memory_lines = []
if past_research:
memory_lines.append("Previous research on this topic:")
for mem in past_research:
memory_lines.append(f" - {mem.get('content', '')[:200]}")
if playbook:
memory_lines.append("Lessons from past research:")
for entry in playbook:
memory_lines.append(f" - {entry.get('content', '')[:200]}")
memory_context = "\n".join(memory_lines) if memory_lines else "No prior research found."
# Update session tracking
if iteration == 0:
client.create_session(
session_id=session_id,
agent_id="supervisor"
)
client.update_session(
session_id=session_id,
completed_items=[],
in_progress_item="research",
next_items=["analysis", "writing", "review"],
blocked_items=[],
summary=f"Starting research on: {topic}",
status="in_progress"
)
return {
**state,
"memory_context": memory_context,
"iteration": iteration + 1
}
Step 3: Build the Worker Nodes
Each worker node uses the memory context provided by the supervisor and stores its outputs back into Aegis Memory with appropriate scoping.
Researcher Node
def researcher_node(state: ResearchState) -> ResearchState:
"""Research worker finds and summarizes information."""
topic = state["topic"]
memory_context = state.get("memory_context", "")
# Check for existing research from past sessions
past_findings = client.query(
query=f"research findings {topic}",
user_id="system",
agent_id="researcher",
top_k=3
)
past_context = ""
if past_findings:
past_context = "Previously discovered:\n"
for finding in past_findings:
past_context += f"- {finding.get('content', '')[:200]}\n"
past_context += "\nBuild on these findings. Do not repeat them.\n"
prompt = f"""Research the following topic thoroughly:
Topic: {topic}
{past_context}
Context from supervisor:
{memory_context}
Provide detailed findings with sources and key data points.
Focus on new information not covered in previous research."""
response = llm.invoke(prompt)
findings = response.content
# Store findings in agent-shared scope so analyst can read them
client.add(
content=f"Research findings on {topic}: {findings[:1000]}",
user_id="system",
agent_id="researcher",
scope="agent-shared",
metadata={
"type": "research-findings",
"topic": topic,
"session": state["session_id"]
}
)
# Update session progress
client.mark_complete(session_id=state["session_id"], item="research")
client.set_in_progress(session_id=state["session_id"], item="analysis")
return {**state, "research_findings": findings}
Analyst Node
def analyst_node(state: ResearchState) -> ResearchState:
"""Analyst examines findings for patterns and insights."""
topic = state["topic"]
findings = state.get("research_findings", "")
memory_context = state.get("memory_context", "")
# Query for past analyses on similar topics
past_analyses = client.query(
query=f"analysis patterns insights {topic}",
user_id="system",
agent_id="analyst",
top_k=3
)
past_context = ""
if past_analyses:
past_context = "Previous analyses found these patterns:\n"
for analysis in past_analyses:
past_context += f"- {analysis.get('content', '')[:200]}\n"
prompt = f"""Analyze these research findings for patterns and insights:
Findings:
{findings}
{past_context}
Context from supervisor:
{memory_context}
Identify:
1. Key trends and patterns
2. Contradictions or gaps
3. Actionable insights
4. Areas needing further research"""
response = llm.invoke(prompt)
analysis = response.content
# Store analysis in shared scope
client.add(
content=f"Analysis of {topic}: {analysis[:1000]}",
user_id="system",
agent_id="analyst",
scope="agent-shared",
metadata={
"type": "analysis",
"topic": topic,
"session": state["session_id"]
}
)
# Update session
client.mark_complete(session_id=state["session_id"], item="analysis")
client.set_in_progress(session_id=state["session_id"], item="writing")
return {**state, "analysis": analysis}
Writer Node
def writer_node(state: ResearchState) -> ResearchState:
"""Writer produces the final report."""
topic = state["topic"]
findings = state.get("research_findings", "")
analysis = state.get("analysis", "")
memory_context = state.get("memory_context", "")
prompt = f"""Write a comprehensive research report.
Topic: {topic}
Research Findings:
{findings}
Analysis:
{analysis}
Context from past research:
{memory_context}
Structure:
1. Executive Summary
2. Key Findings
3. Analysis and Insights
4. Recommendations
5. Areas for Further Research"""
response = llm.invoke(prompt)
report = response.content
# Store the final report in global scope so all agents benefit
client.add(
content=f"Research report on {topic}: {report[:1000]}",
user_id="system",
agent_id="writer",
scope="global",
metadata={
"type": "report",
"topic": topic,
"session": state["session_id"]
}
)
# Update session
client.mark_complete(session_id=state["session_id"], item="writing")
return {**state, "report": report, "status": "complete"}
Step 4: Add the Review and Reflection Node
After the report is written, a review node evaluates quality and adds reflections.
def review_node(state: ResearchState) -> ResearchState:
"""Review the report and add reflections for future improvement."""
report = state.get("report", "")
topic = state["topic"]
session_id = state["session_id"]
prompt = f"""Evaluate this research report for quality:
{report}
Rate on:
1. Completeness (are key aspects covered?)
2. Accuracy (are claims supported?)
3. Actionability (are recommendations specific?)
4. Clarity (is it well-structured?)
Provide a quality score (1-10) and specific improvement suggestions."""
response = llm.invoke(prompt)
review = response.content
# Add reflection based on what worked and what did not
if "improvement" in review.lower() or int(review.split("/")[0][-2:].strip()) < 7:
client.add_reflection(
content=f"Research report on {topic} needed improvements. "
f"Review feedback: {review[:500]}",
agent_id="supervisor",
namespace="research",
error_pattern="Research report quality below threshold",
correct_approach="Ensure researcher provides specific data "
"points and sources. Analyst should cross-"
"reference multiple sources. Writer should "
"include quantitative evidence.",
applicable_contexts=["research", "report", topic],
scope="global"
)
else:
client.add_reflection(
content=f"Research report on {topic} was high quality. "
f"Effective pattern: structured handoff from "
f"researcher to analyst to writer with shared memory.",
agent_id="supervisor",
namespace="research",
error_pattern="N/A - success pattern",
correct_approach="Use structured three-phase research: "
"gather, analyze, synthesize. Consult "
"playbook before starting.",
applicable_contexts=["research", "report", topic],
scope="global"
)
# Vote on the memories that were used
used_memories = client.query(
query=f"research {topic}",
user_id="system",
agent_id="supervisor",
top_k=5
)
for mem in used_memories:
if "memory_id" in mem:
client.vote(
memory_id=mem["memory_id"],
vote="helpful",
voter_agent_id="supervisor",
context=f"Used in research report on {topic}",
task_id=session_id
)
# Mark session complete
client.mark_complete(session_id=session_id, item="review")
client.update_session(
session_id=session_id,
completed_items=["research", "analysis", "writing", "review"],
in_progress_item=None,
next_items=[],
blocked_items=[],
summary=f"Research on {topic} complete. Report quality: "
f"{review[:100]}",
status="completed"
)
return {**state, "status": "reviewed"}
Step 5: Define the Router
The supervisor needs a routing function to decide which worker acts next.
def route_to_worker(state: ResearchState) -> Literal["researcher", "analyst", "writer", "review", "__end__"]:
"""Route to the appropriate worker based on current state."""
status = state.get("status", "")
if status == "reviewed":
return "__end__"
if not state.get("research_findings"):
return "researcher"
elif not state.get("analysis"):
return "analyst"
elif not state.get("report"):
return "writer"
else:
return "review"
Step 6: Assemble the Graph
def build_research_graph():
"""Build the LangGraph research pipeline."""
graph = StateGraph(ResearchState)
# Add nodes
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher_node)
graph.add_node("analyst", analyst_node)
graph.add_node("writer", writer_node)
graph.add_node("review", review_node)
# Set entry point
graph.set_entry_point("supervisor")
# Supervisor routes to workers
graph.add_conditional_edges(
"supervisor",
route_to_worker,
{
"researcher": "researcher",
"analyst": "analyst",
"writer": "writer",
"review": "review",
"__end__": END
}
)
# Workers always return to supervisor for next routing decision
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("review", END)
return graph.compile()
Step 7: Run the Graph
def run_research(topic, session_id=None):
"""Execute the research pipeline."""
if session_id is None:
session_id = f"research-{topic.lower().replace(' ', '-')[:30]}"
graph = build_research_graph()
# Check for existing session (resume support)
try:
existing = client.get_session(session_id=session_id)
print(f"Resuming session: {existing.get('summary', '')}")
except Exception:
print(f"Starting new research on: {topic}")
initial_state = ResearchState(
topic=topic,
session_id=session_id,
research_findings="",
analysis="",
report="",
memory_context="",
current_worker="",
iteration=0,
max_iterations=10,
status=""
)
result = graph.invoke(initial_state)
print("\n" + "=" * 60)
print("Research Complete")
print("=" * 60)
print(f"\nReport:\n{result['report'][:500]}...")
return result
# First run: no prior knowledge
result1 = run_research("impact of AI on software development")
# Second run on related topic: benefits from memories of first run
result2 = run_research("AI coding assistants market analysis")
On the second run, the supervisor’s client.query call retrieves relevant findings from the first research project. The researcher builds on existing knowledge rather than starting from scratch. The playbook provides strategy lessons learned from the first run.
Scoping Strategy for LangGraph
Memory scoping in the LangGraph supervisor pattern follows a specific logic:
| Agent | Stores As | Reads From | Rationale |
|---|---|---|---|
| Supervisor | global | All scopes | Orchestrator needs full visibility |
| Researcher | agent-shared | agent-shared, global | Findings should be accessible to analyst and writer |
| Analyst | agent-shared | agent-shared, global | Analysis builds on shared research |
| Writer | global | All scopes | Final reports benefit everyone |
The supervisor uses global scope for reflections because the lessons it learns about research strategy apply to all future research, regardless of which specific workers are involved.
# Supervisor stores global reflections
client.add_reflection(
content="Multi-source research produces better reports than "
"single-source deep dives",
agent_id="supervisor",
namespace="research",
error_pattern="Shallow research from limited sources",
correct_approach="Instruct researcher to consult at least 3 "
"independent sources per claim",
applicable_contexts=["research", "analysis", "report"],
scope="global"
)
# Worker stores shared findings
client.add(
content="Key finding about AI adoption rates...",
user_id="system",
agent_id="researcher",
scope="agent-shared",
metadata={"type": "finding", "topic": "ai-adoption"}
)
Handling Agent Handoffs
While LangGraph handles routing through the graph edges, Aegis handoffs provide semantic context transfer that persists across runs.
# After the researcher finishes, create a handoff record
client.handoff(
source_agent_id="researcher",
target_agent_id="analyst",
task_context="Research complete. Found 3 major trends: "
"1) AI coding tools adoption at 70% in enterprise, "
"2) Productivity gains of 30-50% reported, "
"3) Quality concerns around AI-generated code. "
"Priority: analyze the quality concern deeply."
)
# The analyst can query for handoff context in future runs
handoff_context = client.query(
query="handoff from researcher about AI trends",
user_id="system",
agent_id="analyst",
top_k=3
)
Adding Feature Tracking
For structured research deliverables, use feature tracking to ensure all sections of the report are verified.
# Define report sections as features
client.create_feature(
feature_id="exec-summary",
description="Executive summary covering all key findings",
session_id=session_id,
category="report-section",
test_steps=[
"Summarizes all three identified trends",
"Includes quantitative data points",
"States clear recommendations"
]
)
client.create_feature(
feature_id="methodology",
description="Research methodology section",
session_id=session_id,
category="report-section",
test_steps=[
"Lists all sources consulted",
"Describes analysis framework",
"Notes limitations and biases"
]
)
# After review, mark sections as verified
client.update_feature(
feature_id="exec-summary",
status="in_progress",
passes=[
"Summarizes all three identified trends",
"Includes quantitative data points"
],
verified_by="supervisor"
)
Production Considerations
Error Handling in Nodes
Wrap each node in error handling that updates the session state, so crashes are recoverable:
def safe_node(node_fn, node_name):
"""Wrap a node function with error handling and session updates."""
def wrapper(state: ResearchState) -> ResearchState:
try:
return node_fn(state)
except Exception as e:
client.update_session(
session_id=state["session_id"],
completed_items=[],
in_progress_item=node_name,
next_items=[],
blocked_items=[],
summary=f"Error in {node_name}: {str(e)[:200]}",
status="in_progress"
)
# Add reflection about the failure
client.add_reflection(
content=f"Node {node_name} failed with: {str(e)[:200]}",
agent_id="supervisor",
namespace="research",
error_pattern=f"{node_name} node failure",
correct_approach="Add input validation and retry logic",
applicable_contexts=[node_name, "error-handling"],
scope="global"
)
raise
return wrapper
# Use it when adding nodes
graph.add_node("researcher", safe_node(researcher_node, "researcher"))
graph.add_node("analyst", safe_node(analyst_node, "analyst"))
graph.add_node("writer", safe_node(writer_node, "writer"))
Limiting Memory Context Size
When querying for past research, be mindful of how much context you inject into prompts:
# Limit the number of results and truncate content
past_research = client.query(
query=f"research {topic}",
user_id="system",
agent_id="supervisor",
top_k=3 # Keep it focused
)
# Truncate each memory to avoid bloating the prompt
for mem in past_research:
mem["content"] = mem.get("content", "")[:300]
Playbook Filtering
Use min_effectiveness to ensure you only surface proven strategies:
playbook = client.query_playbook(
query="research methodology",
agent_id="supervisor",
min_effectiveness=0.5 # Only well-proven strategies
)
Summary
This tutorial demonstrated how to add persistent memory coordination to a LangGraph supervisor pattern using AegisClient. The key concepts are:
- Use
AegisClientdirectly for LangGraph (there is no LangGraph-specific integration) - Supervisor as memory coordinator — queries past research and distributes context to workers
- Scoped storage — workers store in
agent-shared, final outputs inglobal - Cross-run learning — reflections and playbook queries make the system smarter over time
- Session tracking — long research tasks survive process restarts
- Handoffs — semantic context transfer that persists beyond the current graph execution
The result is a research pipeline that accumulates knowledge across runs. The second research project benefits from the first. The tenth project benefits from all nine before it.