Long-Running CrewAI Tasks That Survive Restarts
Multi-hour CrewAI tasks lose all progress on crash or context window reset. Learn how to add session tracking and checkpointing with Aegis Memory.
The Problem: Multi-Hour Tasks That Vanish
You have a CrewAI pipeline that runs for two hours. It researches a topic, generates a report, validates data sources, and compiles a final document. At step 47 of 60, the process crashes. Maybe the API rate-limited you. Maybe the machine ran out of memory. Maybe the context window just reset.
The result is always the same: all progress is lost. The agent starts over from step 1.
This is not a theoretical problem. Multi-hour agentic tasks are common in production — data migration pipelines, codebase refactoring, compliance audits, research synthesis. Any task that takes longer than a single context window is vulnerable.
In this tutorial, you will add session tracking and checkpointing to a CrewAI pipeline using Aegis Memory. When the agent crashes, it resumes exactly where it left off.
Prerequisites
pip install aegis-memory crewai
You will need a running Aegis Memory server:
docker run -d -p 8741:8741 quantifylabs/aegis-memory:latest
Step 1: Define the Long-Running Task
Let us build a realistic example: a multi-step code audit pipeline. The crew audits a codebase for security vulnerabilities, generates fixes, and verifies the patches.
from crewai import Agent, Task, Crew
from aegis_memory.integrations.crewai import AegisCrewMemory, AegisAgentMemory
# Initialize Aegis crew memory
crew_memory = AegisCrewMemory(
api_key="your-aegis-key",
namespace="security-audit",
default_scope="agent-shared"
)
Step 2: Create a Session for the Pipeline
Before the crew starts working, create a session that tracks overall progress. This session persists outside the LLM context window, so it survives crashes.
from aegis_memory.client import AegisClient
client = AegisClient(base_url="http://localhost:8741", api_key="your-aegis-key")
SESSION_ID = "audit-2026-01-15"
# Define all the work items upfront
ALL_ITEMS = [
"scan-dependencies",
"analyze-auth-module",
"analyze-api-endpoints",
"analyze-data-layer",
"generate-fix-auth",
"generate-fix-api",
"generate-fix-data",
"verify-fixes",
"compile-report"
]
# Create the session
client.create_session(
session_id=SESSION_ID,
agent_id="audit-lead"
)
# Set initial state
client.update_session(
session_id=SESSION_ID,
completed_items=[],
in_progress_item=ALL_ITEMS[0],
next_items=ALL_ITEMS[1:],
blocked_items=[],
summary="Starting security audit pipeline",
status="in_progress"
)
Step 3: Build Resume-Aware Agents
The key insight is that each agent should check the session state before starting work. If the item it is responsible for is already completed, it skips ahead.
def get_remaining_work(client, session_id):
"""Check session state and return what still needs to be done."""
session = client.get_session(session_id=session_id)
completed = set(session.get("completed_items", []))
in_progress = session.get("in_progress_item", None)
return {
"completed": completed,
"in_progress": in_progress,
"next_items": session.get("next_items", []),
"blocked_items": session.get("blocked_items", []),
"summary": session.get("summary", ""),
"status": session.get("status", "unknown")
}
def run_step(client, session_id, item, agent_fn):
"""Run a single step with checkpointing."""
state = get_remaining_work(client, session_id)
# Skip if already completed
if item in state["completed"]:
print(f"Skipping {item} -- already completed")
return "skipped"
# Mark as in progress
client.set_in_progress(session_id=session_id, item=item)
try:
# Execute the actual work
result = agent_fn()
# Mark complete on success
client.mark_complete(session_id=session_id, item=item)
# Update session summary with latest context
remaining = [i for i in ALL_ITEMS
if i not in state["completed"] and i != item]
client.update_session(
session_id=session_id,
completed_items=list(state["completed"]) + [item],
in_progress_item=remaining[0] if remaining else None,
next_items=remaining[1:] if len(remaining) > 1 else [],
blocked_items=[],
summary=f"Completed {item}. Result: {result[:200]}",
status="in_progress" if remaining else "completed"
)
return result
except Exception as e:
# Update session with error context so the next run knows what happened
client.update_session(
session_id=session_id,
completed_items=list(state["completed"]),
in_progress_item=item,
next_items=state["next_items"],
blocked_items=[],
summary=f"Failed on {item}: {str(e)[:200]}",
status="in_progress"
)
raise
Step 4: Wire Up the CrewAI Agents
Now create the CrewAI agents that use this checkpointing infrastructure.
# Create agents with Aegis memory
scanner_memory = AegisAgentMemory(
crew_memory=crew_memory,
agent_id="scanner",
scope="agent-shared"
)
fixer_memory = AegisAgentMemory(
crew_memory=crew_memory,
agent_id="fixer",
scope="agent-shared"
)
verifier_memory = AegisAgentMemory(
crew_memory=crew_memory,
agent_id="verifier",
scope="agent-shared"
)
scanner = Agent(
role="Security Scanner",
goal="Identify security vulnerabilities in the codebase",
backstory="Expert in OWASP Top 10 and common security anti-patterns"
)
fixer = Agent(
role="Security Fixer",
goal="Generate patches for identified vulnerabilities",
backstory="Senior developer specializing in secure coding practices"
)
verifier = Agent(
role="Patch Verifier",
goal="Verify that security patches are correct and complete",
backstory="QA engineer with deep security testing experience"
)
Step 5: Run the Pipeline with Checkpointing
def run_audit_pipeline():
"""Run the full audit pipeline with crash recovery."""
state = get_remaining_work(client, SESSION_ID)
if state["status"] == "completed":
print("Audit already completed!")
return
print(f"Resuming from: {state['in_progress'] or 'beginning'}")
print(f"Already completed: {state['completed']}")
# Phase 1: Scanning
scan_items = [
"scan-dependencies",
"analyze-auth-module",
"analyze-api-endpoints",
"analyze-data-layer"
]
for item in scan_items:
def scan_task(scan_item=item):
task = Task(
description=f"Perform {scan_item} security analysis",
expected_output="List of vulnerabilities found",
agent=scanner
)
crew = Crew(agents=[scanner], tasks=[task])
result = crew.kickoff()
# Store findings in shared memory
scanner_memory.save(
value=f"Scan results for {scan_item}: {result}",
metadata={"phase": "scanning", "item": scan_item}
)
return str(result)
run_step(client, SESSION_ID, item, scan_task)
# Phase 2: Fixing
fix_items = [
"generate-fix-auth",
"generate-fix-api",
"generate-fix-data"
]
for item in fix_items:
def fix_task(fix_item=item):
# Read scan results from shared memory
module = fix_item.replace("generate-fix-", "")
findings = fixer_memory.search(
query=f"vulnerabilities in {module}",
limit=5
)
task = Task(
description=f"Generate security patches based on: {findings}",
expected_output="Code patches for identified vulnerabilities",
agent=fixer
)
crew = Crew(agents=[fixer], tasks=[task])
result = crew.kickoff()
fixer_memory.save(
value=f"Patches for {module}: {result}",
metadata={"phase": "fixing", "module": module}
)
return str(result)
run_step(client, SESSION_ID, item, fix_task)
# Phase 3: Verification
run_step(client, SESSION_ID, "verify-fixes", lambda: verify_all_fixes())
run_step(client, SESSION_ID, "compile-report", lambda: compile_report())
def verify_all_fixes():
"""Verify all patches are correct."""
patches = verifier_memory.search(query="security patches", limit=10)
task = Task(
description=f"Verify these security patches: {patches}",
expected_output="Verification results for each patch",
agent=verifier
)
crew = Crew(agents=[verifier], tasks=[task])
return str(crew.kickoff())
def compile_report():
"""Compile the final audit report."""
all_findings = scanner_memory.search(query="vulnerabilities", limit=20)
all_patches = fixer_memory.search(query="patches", limit=20)
verification = verifier_memory.search(query="verification", limit=10)
task = Task(
description=f"Compile audit report from: findings={all_findings}, "
f"patches={all_patches}, verification={verification}",
expected_output="Complete security audit report in markdown",
agent=scanner
)
crew = Crew(agents=[scanner], tasks=[task])
return str(crew.kickoff())
Step 6: Add Feature Tracking
Session progress tells you which steps have been executed. Feature tracking tells you which capabilities have been delivered and verified. For a security audit, features map to vulnerability categories.
# Define features that map to audit deliverables
client.create_feature(
feature_id="auth-hardening",
description="All authentication vulnerabilities identified and patched",
session_id=SESSION_ID,
category="security",
test_steps=[
"SQL injection in login endpoint patched",
"Session tokens use secure random generation",
"Password hashing uses bcrypt with cost >= 12",
"Rate limiting on authentication endpoints"
]
)
client.create_feature(
feature_id="api-security",
description="All API endpoint vulnerabilities resolved",
session_id=SESSION_ID,
category="security",
test_steps=[
"Input validation on all endpoints",
"CORS configuration restricts origins",
"API keys are not exposed in responses",
"Rate limiting on public endpoints"
]
)
# As the verifier confirms fixes, update feature status
client.update_feature(
feature_id="auth-hardening",
status="in_progress",
passes=[
"SQL injection in login endpoint patched",
"Session tokens use secure random generation"
],
verified_by="verifier"
)
# When all test steps pass
client.mark_feature_complete(
feature_id="auth-hardening",
verified_by="verifier"
)
Querying Feature Status
At any point, you can check overall progress across all features:
# List all features for this session
all_features = client.list_features(session_id=SESSION_ID)
for feature in all_features:
print(f"{feature['feature_id']}: {feature['status']}")
# Filter to just in-progress features
in_progress = client.list_features(
session_id=SESSION_ID,
status="in_progress"
)
print(f"{len(in_progress)} features still in progress")
Step 7: Handle Crash Recovery
The entire point of this setup is crash recovery. Here is what happens when the process restarts:
def main():
"""Entry point that handles fresh starts and resumption."""
try:
# Try to get existing session
session = client.get_session(session_id=SESSION_ID)
completed = session.get("completed_items", [])
print(f"Resuming audit. {len(completed)}/{len(ALL_ITEMS)} "
f"steps completed.")
print(f"Last status: {session.get('summary', 'Unknown')}")
except Exception:
# No existing session -- fresh start
print("Starting fresh audit")
client.create_session(
session_id=SESSION_ID,
agent_id="audit-lead"
)
client.update_session(
session_id=SESSION_ID,
completed_items=[],
in_progress_item=ALL_ITEMS[0],
next_items=ALL_ITEMS[1:],
blocked_items=[],
summary="Starting security audit pipeline",
status="in_progress"
)
# Run the pipeline -- it will skip completed steps automatically
run_audit_pipeline()
# Check feature completion
features = client.list_features(session_id=SESSION_ID)
incomplete = [f for f in features if f.get("status") != "completed"]
if incomplete:
print(f"Warning: {len(incomplete)} features not fully verified")
else:
print("All features verified. Audit complete.")
if __name__ == "__main__":
main()
When this script runs after a crash, the output looks like:
Resuming audit. 5/9 steps completed.
Last status: Completed generate-fix-auth. Result: Patched SQL injection...
Skipping scan-dependencies -- already completed
Skipping analyze-auth-module -- already completed
Skipping analyze-api-endpoints -- already completed
Skipping analyze-data-layer -- already completed
Skipping generate-fix-auth -- already completed
Running generate-fix-api...
The agent picks up exactly where it left off. No repeated work. No lost context.
Best Practices for Session Tracking
Use Descriptive Session IDs
Session IDs should encode enough information to be useful in logs and debugging:
# Good: encodes what, when, and scope
session_id = "security-audit-payments-2026-01-15"
# Bad: opaque identifier
session_id = "abc123"
Update Summaries Frequently
The session summary is your crash recovery context. Write it as if your future self will read it with no other context:
client.update_session(
session_id=SESSION_ID,
completed_items=["scan-dependencies", "analyze-auth-module"],
in_progress_item="analyze-api-endpoints",
next_items=["analyze-data-layer", "generate-fix-auth"],
blocked_items=[],
summary="Found 3 critical vulns in auth module: SQL injection in "
"login, weak session tokens, no rate limiting. Dependencies "
"scan clean except lodash 4.17.15 (prototype pollution). "
"Starting API endpoint analysis.",
status="in_progress"
)
Combine Session Progress with Feature Tracking
Session progress tracks the process (which steps have run). Feature tracking tracks the product (which deliverables are verified). Use both:
# Session tracks steps
client.mark_complete(session_id=SESSION_ID, item="verify-fixes")
# Features track deliverables
client.mark_feature_complete(
feature_id="auth-hardening",
verified_by="verifier"
)
# Together they answer two questions:
# "What has the agent done?" -> session progress
# "What has been delivered and verified?" -> feature tracking
Use Agent Memory for Cross-Step Context
Session tracking tells you which steps are done. Agent memory preserves the detailed findings from each step so downstream steps can use them:
# Scanner stores detailed findings
scanner_memory.save(
value="AUTH-001: SQL injection in /api/login via username parameter. "
"Parameterized queries not used. Severity: Critical.",
metadata={"severity": "critical", "module": "auth", "vuln_id": "AUTH-001"}
)
# Fixer retrieves them to generate patches
findings = fixer_memory.search(
query="critical vulnerabilities auth module",
limit=10
)
Summary
Long-running CrewAI tasks do not have to be fragile. By adding session tracking and feature verification through Aegis Memory, you get:
- Crash recovery — Resume from the last checkpoint, not from scratch
- Progress visibility — Know exactly where a multi-hour pipeline stands
- Feature verification — Auditable proof that deliverables meet criteria
- Cross-step context — Agents share findings through persistent memory
The total overhead is minimal: a few API calls per step to update session state. The payoff is a pipeline that survives crashes, context resets, and infrastructure failures without losing hours of work.