A2A Protocol - TaskManager Module Documentation
1. Module Overview
The TaskManager module is a core component of the A2A (Agent-to-Agent) Protocol that provides comprehensive task lifecycle management capabilities. This module enables the creation, tracking, updating, and monitoring of tasks according to the A2A specification, serving as a foundational building block for agent communication and coordination.
Purpose and Design Rationale
The TaskManager is designed to address the need for standardized task management in multi-agent systems. It provides:
- State Management: Formalized task states with valid transition rules
- Event-driven Architecture: Real-time notifications for task lifecycle changes
- Data Integrity: Input validation, size limits, and task expiry mechanisms
- Concurrency Control: Limits on maximum concurrent tasks
- Audit Trail: Complete history of state transitions and updates
This module intentionally separates protocol primitives from authentication and authorization, leaving those concerns to integrators who can implement appropriate security measures at the transport or application layer.
2. Architecture and Components
Core Architecture
The TaskManager module follows a simple yet robust architecture centered around the TaskManager class, which extends Node.js's EventEmitter to provide event-driven capabilities.
Key Components
1. TaskManager Class
The central class that orchestrates all task management operations. It maintains an in-memory store of tasks and provides methods for manipulating them.
2. State Management Constants
VALID_STATES: Array of all possible task statesTERMINAL_STATES: States from which no further transitions are allowedVALID_TRANSITIONS: Object defining allowed state transitionsMAX_TASKS: Default maximum concurrent tasks (1000)DEFAULT_EXPIRY_MS: Default task expiry time (24 hours)
3. Task Lifecycle and State Management
Task States and Transitions
The TaskManager implements a formal state machine with well-defined states and transition rules.
State Definitions
| State | Description | Can Transition To |
|---|---|---|
submitted |
Initial state when a task is first created | working, failed, canceled |
working |
Task is actively being processed | input-required, completed, failed, canceled |
input-required |
Task is paused waiting for additional input | working, failed, canceled |
completed |
Terminal - Task finished successfully | (None) |
failed |
Terminal - Task encountered an error | (None) |
canceled |
Terminal - Task was canceled by user | (None) |
4. Core API Reference
Constructor
const { TaskManager } = require('./src/protocols/a2a/task-manager');
const taskManager = new TaskManager({
maxTasks: 500, // Optional: Maximum concurrent tasks (default: 1000)
expiryMs: 86400000, // Optional: Task expiry in milliseconds (default: 24 hours)
maxInputSize: 1048576 // Optional: Max input+metadata size in bytes (default: 1MB)
});
Methods
createTask(params)
Creates a new task with the specified parameters.
Parameters:
params.skill(string, required): Skill ID to invokeparams.input(object, optional): Task input dataparams.metadata(object, optional): Arbitrary metadata
Returns:
object: The created task object
Events Emitted:
task:created: Emitted with the created task
Example:
try {
const task = taskManager.createTask({
skill: 'data-analysis',
input: { dataset: 'sales-2023', format: 'json' },
metadata: { priority: 'high', requester: 'agent-123' }
});
console.log('Task created:', task.id);
} catch (error) {
console.error('Failed to create task:', error.message);
}
Throws:
- Error if
skillparameter is missing - Error if combined input+metadata size exceeds
maxInputSize - Error if maximum task limit is reached
updateTask(taskId, update)
Updates a task's state, output, artifacts, or message.
Parameters:
taskId(string, required): ID of the task to updateupdate(object, required): Update object containing:state(string, optional): New state (must be a valid transition)output(any, optional): Task output dataartifacts(Array, optional): Additional artifacts to appendmessage(string, optional): Status message
Returns:
object: The updated task object
Events Emitted:
task:stateChange: Emitted with{ taskId, from, to }when state changestask:updated: Emitted with the updated task
Example:
try {
const updatedTask = taskManager.updateTask(taskId, {
state: 'working',
message: 'Processing started...'
});
console.log('Task updated:', updatedTask.state);
} catch (error) {
console.error('Failed to update task:', error.message);
}
Throws:
- Error if task is not found
- Error if task is in a terminal state
- Error if state transition is invalid
getTask(taskId)
Retrieves a task by its ID.
Parameters:
taskId(string, required): ID of the task to retrieve
Returns:
object|null: The task object if found,nullotherwise
Example:
const task = taskManager.getTask(taskId);
if (task) {
console.log('Task found:', task);
} else {
console.log('Task not found');
}
cancelTask(taskId)
Cancels a task (transitions to canceled state).
Parameters:
taskId(string, required): ID of the task to cancel
Returns:
object: The canceled task object
Events Emitted:
task:stateChange: Emitted with{ taskId, from, to: 'canceled' }
Example:
try {
const canceledTask = taskManager.cancelTask(taskId);
console.log('Task canceled:', canceledTask.id);
} catch (error) {
console.error('Failed to cancel task:', error.message);
}
Throws:
- Error if task is not found
- Error if task is already in a terminal state
listTasks(filter)
Lists all tasks, optionally filtered by state or skill.
Parameters:
filter(object, optional): Filter criteria:state(string, optional): Filter by task stateskill(string, optional): Filter by skill ID
Returns:
Array: Array of matching task objects
Example:
// List all tasks
const allTasks = taskManager.listTasks();
// List only working tasks
const workingTasks = taskManager.listTasks({ state: 'working' });
// List tasks for a specific skill
const analysisTasks = taskManager.listTasks({ skill: 'data-analysis' });
size()
Returns the current number of tasks.
Returns:
number: Number of tasks currently stored
Example:
console.log(`Current task count: ${taskManager.size()}`);
destroy()
Clears all tasks and removes all event listeners.
Example:
taskManager.destroy();
5. Event System
The TaskManager emits events that can be listened to for real-time updates:
Event Types
| Event | Payload | Description |
|---|---|---|
task:created |
task object |
Emitted when a new task is created |
task:stateChange |
{ taskId, from, to } |
Emitted when a task's state changes |
task:updated |
task object |
Emitted when any task update occurs |
Event Listening Example
// Listen for task creation
taskManager.on('task:created', (task) => {
console.log('New task created:', task.id);
});
// Listen for state changes
taskManager.on('task:stateChange', ({ taskId, from, to }) => {
console.log(`Task ${taskId} changed from ${from} to ${to}`);
});
// Listen for any task updates
taskManager.on('task:updated', (task) => {
console.log('Task updated:', task);
});
6. Task Object Structure
A task object contains the following properties:
{
id: "uuid-v4-string", // Unique task identifier
skill: "skill-id", // Skill ID
state: "submitted", // Current state
input: { ... }, // Input data (or null)
output: { ... }, // Output data (or null)
artifacts: [], // Array of artifacts
metadata: { ... }, // Custom metadata
history: [ // State transition history
{
state: "submitted",
timestamp: "2023-01-01T00:00:00.000Z"
}
],
createdAt: "2023-01-01T00:00:00.000Z", // Creation time
updatedAt: "2023-01-01T00:00:00.000Z" // Last update time
}
7. Usage Examples
Complete Workflow Example
const { TaskManager } = require('./src/protocols/a2a/task-manager');
// Initialize TaskManager
const taskManager = new TaskManager({ maxTasks: 100 });
// Set up event listeners
taskManager.on('task:created', (task) => {
console.log(`Task ${task.id} created for skill ${task.skill}`);
});
taskManager.on('task:stateChange', ({ taskId, from, to }) => {
console.log(`Task ${taskId}: ${from} → ${to}`);
});
// Create a task
const task = taskManager.createTask({
skill: 'document-processing',
input: { documentId: 'doc-123', options: { extractText: true } },
metadata: { department: 'legal', priority: 'medium' }
});
console.log('Created task:', task.id);
// Simulate processing
setTimeout(() => {
// Start working on the task
taskManager.updateTask(task.id, {
state: 'working',
message: 'Starting document analysis...'
});
}, 1000);
setTimeout(() => {
// Request input if needed
taskManager.updateTask(task.id, {
state: 'input-required',
message: 'Please specify which sections to extract'
});
}, 2000);
setTimeout(() => {
// Resume working after receiving input
taskManager.updateTask(task.id, {
state: 'working',
message: 'Resuming processing with user input...'
});
}, 3000);
setTimeout(() => {
// Complete the task
taskManager.updateTask(task.id, {
state: 'completed',
output: {
extractedText: '...',
summary: 'Document analysis complete',
sections: ['intro', 'conclusion']
},
artifacts: [{ type: 'summary', url: '/reports/doc-123-summary.pdf' }],
message: 'Processing completed successfully'
});
}, 4000);
Error Handling Example
try {
const task = taskManager.createTask({
skill: 'data-processing',
input: veryLargeObject // Might exceed size limit
});
} catch (error) {
if (error.message.includes('Input size')) {
console.error('Input too large, please reduce payload size');
} else if (error.message.includes('Maximum task limit')) {
console.error('System busy, please try again later');
} else {
console.error('Error creating task:', error.message);
}
}
// Safe update pattern
function safeUpdate(taskId, update) {
try {
return taskManager.updateTask(taskId, update);
} catch (error) {
if (error.message.includes('Task not found')) {
console.warn('Task may have been deleted or expired');
} else if (error.message.includes('terminal state')) {
console.warn('Task already completed/failed/canceled');
} else if (error.message.includes('Invalid transition')) {
console.warn('Invalid state transition attempted');
}
throw error;
}
}
8. Integration with Other A2A Protocol Components
The TaskManager is designed to work seamlessly with other A2A Protocol components:
A2AClient Integration
The TaskManager can be used alongside the A2AClient to implement complete A2A workflows:
// TaskManager manages local task state
const taskManager = new TaskManager();
// A2AClient handles communication with other agents
const a2aClient = new A2AClient({ taskManager });
// Create task locally, then send to remote agent
const task = taskManager.createTask({ skill: 'remote-analysis' });
a2aClient.sendTask(remoteAgentId, task);
SSEStream Integration
For real-time streaming of task events to clients, integrate with SSEStream:
const sseStream = new SSEStream();
// Forward TaskManager events to SSE stream
taskManager.on('task:created', (task) => {
sseStream.send('task.created', task);
});
taskManager.on('task:updated', (task) => {
sseStream.send('task.updated', task);
});
AgentCard Integration
Use AgentCard to validate task compatibility with agent capabilities:
const agentCard = new AgentCard(agentInfo);
function createCompatibleTask(agentCard, params) {
if (agentCard.hasSkill(params.skill)) {
return taskManager.createTask(params);
}
throw new Error(`Agent does not support skill: ${params.skill}`);
}
9. Security Considerations
Important security notes from the module documentation:
Authentication and authorization are the integrator's responsibility.
This module provides the A2A protocol primitives (task lifecycle, state management, event emission) but does not include an auth layer. Integrators should add middleware or guards at the transport/HTTP layer before invoking TaskManager methods.
Recommended Security Practices
- Input Validation: Always validate and sanitize inputs before passing to TaskManager methods
- Authentication: Implement proper authentication at the API layer
- Authorization: Check permissions before allowing task operations
- Rate Limiting: Add rate limiting to prevent abuse
- Input Size: The built-in
maxInputSizehelps prevent memory exhaustion attacks - Task Expiry: Configure appropriate
expiryMsto prevent memory leaks
10. Edge Cases and Limitations
Edge Cases
- Concurrent Task Limits: When
_maxTasksis reached,createTask()will throw an error - Task Expiry: Tasks are automatically pruned when they exceed
expiryMs(checked duringcreateTask()) - Terminal State Updates: Once a task reaches a terminal state, no further updates are allowed
- Deep Copy Behavior: All returned task objects are deep copies to prevent accidental mutations
- Input Size Calculation: Both
inputandmetadatacontribute to the size limit
Known Limitations
- In-Memory Storage: Tasks are stored in memory only - no persistence across process restarts
- Single Process: Not designed for multi-process or distributed deployments without additional coordination
- No Built-in Persistence: Integrators must implement their own persistence if needed
- Event Listener Leaks: Always call
destroy()when finished to clean up event listeners - No Task Dependencies: Does not natively support task dependencies or workflows
Persistence Considerations
Since TaskManager uses in-memory storage, for production use you may want to:
- Implement Periodic Snapshots: Serialize tasks to disk/database periodically
- Use Event Sourcing: Persist all events to rebuild state if needed
- Integrate with a Database: Wrap TaskManager with database read/write operations
Example persistence wrapper:
class PersistentTaskManager extends TaskManager {
constructor(db, opts) {
super(opts);
this.db = db;
this._loadFromDB();
// Persist changes
this.on('task:created', (task) => this._saveTask(task));
this.on('task:updated', (task) => this._saveTask(task));
}
async _loadFromDB() {
const tasks = await this.db.getAllTasks();
tasks.forEach(task => {
this._tasks.set(task.id, task);
});
}
async _saveTask(task) {
await this.db.saveTask(task);
}
}
11. Performance Considerations
Memory Usage
- Each task is stored in memory - monitor
size()and configure_maxTasksappropriately - Larger
maxInputSizevalues allow bigger payloads but increase memory footprint - Task expiry (
_expiryMs) helps manage memory by removing old tasks
Time Complexity
| Operation | Time Complexity |
|---|---|
createTask() |
O(n) due to _pruneExpired() |
updateTask() |
O(1) |
getTask() |
O(1) |
cancelTask() |
O(1) |
listTasks() |
O(n) |
size() |
O(1) |
Optimization Tips
- Task Expiry: Set appropriate
expiryMsto keep the task list manageable - Batch Operations: For bulk operations, consider extending the class with custom methods
- Event Listener Efficiency: Keep event listener callbacks lightweight
- Sharding: For very large scale, consider sharding by skill or task type
12. Related Modules
For more information about related A2A Protocol components, see:
- A2A Protocol - AgentCard: Agent capability discovery and validation
- A2A Protocol - A2AClient: Client for A2A communication
- A2A Protocol - SSEStream: Server-Sent Events streaming for task updates
13. Constants Reference
The module exports these constants for convenience:
const {
TaskManager,
VALID_STATES,
TERMINAL_STATES,
VALID_TRANSITIONS,
MAX_TASKS
} = require('./src/protocols/a2a/task-manager');
VALID_STATES:['submitted', 'working', 'input-required', 'completed', 'failed', 'canceled']TERMINAL_STATES:['completed', 'failed', 'canceled']MAX_TASKS:1000VALID_TRANSITIONS: Object defining valid state transitions (see source for details)