Why Services?
Actions handle one-off tasks. But what about:
- Persistent connections to Discord, Telegram, databases?
- Background jobs like monitoring, cleanup, scheduling?
- Shared resources like connection pools, caches?
Services fill this gap. They start with your agent and run continuously.
One instance per type. Services are singletons - getService('discord') always returns the same instance.
Available Service Types
| Service Type | Purpose | Plugin |
|---|
TRANSCRIPTION | Audio → Text | @elizaos/plugin-openai |
VIDEO | Video processing | @elizaos/plugin-video |
BROWSER | Web automation | @elizaos/plugin-browser |
PDF | PDF processing | @elizaos/plugin-pdf |
REMOTE_FILES | AWS S3, cloud storage | @elizaos/plugin-s3 |
WEB_SEARCH | Search queries | @elizaos/plugin-web-search |
EMAIL | Email sending | @elizaos/plugin-email |
TASK | Background tasks | @elizaos/core |
WALLET | Crypto wallets | @elizaos/plugin-evm |
MESSAGE_SERVICE | Real-time messaging | @elizaos/core |
Service System
Services are long-running background tasks that extend agent functionality beyond request-response patterns. They manage connections, handle events, and perform ongoing operations.
Service Interface
Abstract Service Class
abstract class Service {
static serviceType: ServiceType;
constructor(runtime?: IAgentRuntime) {}
abstract capabilityDescription: string;
config?: ServiceConfig;
static async start(runtime: IAgentRuntime): Promise<Service> {
// Return new instance of service
}
abstract stop(): Promise<void>;
}
Service Properties
- serviceType: Unique identifier for the service type
- capabilityDescription: Human-readable description of service capabilities
- config: Optional configuration object
- start: Static method to initialize and start the service
- stop: Instance method to gracefully shut down the service
Service Types
Core Service Types
The core package defines base service types:
const ServiceType = {
// Core services defined in @elizaos/core
TASK: 'task',
DATABASE: 'database',
// ... other core types
} as const;
Plugin Service Types
Plugins extend service types through module augmentation:
// Plugin extends ServiceType through module augmentation
declare module '@elizaos/core' {
interface ServiceTypeRegistry {
DISCORD: 'discord';
TELEGRAM: 'telegram';
TWITTER: 'twitter';
SEARCH: 'search';
IMAGE_GENERATION: 'image_generation';
TRANSCRIPTION: 'transcription';
// ... other plugin-specific types
}
}
Service Lifecycle
Lifecycle Phases
- Registration: Service registered with runtime during plugin initialization
- Queuing: Service queued for startup
- Initialization: Runtime prepares service environment
- Start: Service
start() method called
- Running: Service actively processing
- Stop: Graceful shutdown initiated
- Cleanup: Resources released
Common Service Patterns
Services that connect to external platforms:
class DiscordService extends Service {
static serviceType = 'discord' as const;
capabilityDescription = 'Discord bot integration';
private client: Discord.Client;
constructor(private runtime: IAgentRuntime) {
super(runtime);
}
static async start(runtime: IAgentRuntime): Promise<Service> {
const service = new DiscordService(runtime);
await service.initialize();
return service;
}
private async initialize() {
// Parse environment configuration
const token = this.runtime.getSetting("DISCORD_API_TOKEN");
if (!token) {
this.runtime.logger.warn("Discord token not found");
return;
}
// Initialize Discord client
this.client = new Discord.Client({
intents: [/* Discord intents */],
partials: [/* Discord partials */]
});
// Set up event handlers
this.setupEventHandlers();
// Connect to Discord
await this.client.login(token);
}
private setupEventHandlers() {
this.client.on('messageCreate', async (message) => {
// Convert Discord message to Memory format
const memory = await this.convertToMemory(message);
// Process through runtime
await this.runtime.processActions(memory, []);
});
}
async stop() {
await this.client?.destroy();
}
}
Background Task Service
Services that perform periodic or scheduled tasks:
class TaskService extends Service {
static serviceType = ServiceType.TASK;
capabilityDescription = 'Scheduled task execution';
private interval: NodeJS.Timer;
private readonly TICK_INTERVAL = 60000; // 1 minute
static async start(runtime: IAgentRuntime): Promise<Service> {
const service = new TaskService(runtime);
await service.startTimer();
return service;
}
private async startTimer() {
this.interval = setInterval(async () => {
await this.checkTasks();
}, this.TICK_INTERVAL);
}
private async checkTasks() {
try {
// Check for scheduled tasks
const tasks = await this.runtime.databaseAdapter.getTasks({
status: 'pending',
scheduledFor: { $lte: new Date() }
});
// Execute each task
for (const task of tasks) {
await this.executeTask(task);
}
} catch (error) {
this.runtime.logger.error('Task check failed:', error);
}
}
private async executeTask(task: Task) {
try {
// Mark task as running
task.status = 'running';
await this.runtime.databaseAdapter.updateTask(task);
// Execute task logic
await this.processTask(task);
// Mark task as complete
task.status = 'completed';
await this.runtime.databaseAdapter.updateTask(task);
} catch (error) {
task.status = 'failed';
task.error = error.message;
await this.runtime.databaseAdapter.updateTask(task);
}
}
async stop() {
if (this.interval) {
clearInterval(this.interval);
}
}
}
Data Service
Services that provide data access or caching:
class SearchService extends Service {
static serviceType = 'search' as const;
capabilityDescription = 'Web search capabilities';
private searchClient: SearchClient;
private cache: Map<string, CachedResult>;
static async start(runtime: IAgentRuntime): Promise<Service> {
const service = new SearchService(runtime);
await service.initialize();
return service;
}
private async initialize() {
const apiKey = this.runtime.getSetting('SEARCH_API_KEY');
this.searchClient = new SearchClient({
apiKey,
timeout: 5000
});
this.cache = new Map();
// Clear cache periodically
setInterval(() => this.clearOldCache(), 3600000); // 1 hour
}
async search(query: string): Promise<SearchResult[]> {
// Check cache
const cached = this.cache.get(query);
if (cached && !this.isExpired(cached)) {
return cached.results;
}
// Perform search
const results = await this.searchClient.search(query);
// Cache results
this.cache.set(query, {
results,
timestamp: Date.now()
});
return results;
}
private clearOldCache() {
const oneHourAgo = Date.now() - 3600000;
for (const [key, value] of this.cache.entries()) {
if (value.timestamp < oneHourAgo) {
this.cache.delete(key);
}
}
}
async stop() {
this.cache.clear();
await this.searchClient?.close();
}
}
Model Provider Service
Services that provide AI model access:
class OpenAIService extends Service {
static serviceType = 'openai' as const;
capabilityDescription = 'OpenAI model provider';
private client: OpenAI;
static async start(runtime: IAgentRuntime): Promise<Service> {
const service = new OpenAIService(runtime);
await service.initialize();
return service;
}
private async initialize() {
const apiKey = this.runtime.getSetting('OPENAI_API_KEY');
if (!apiKey) {
throw new Error('OpenAI API key not configured');
}
this.client = new OpenAI({ apiKey });
// Register model handlers
this.runtime.registerModel(
ModelType.TEXT_LARGE,
this.handleTextGeneration.bind(this),
'openai',
100 // priority
);
this.runtime.registerModel(
ModelType.TEXT_EMBEDDING,
this.handleEmbedding.bind(this),
'openai',
100
);
}
private async handleTextGeneration(params: GenerateTextParams) {
const response = await this.client.chat.completions.create({
model: params.model || 'gpt-4',
messages: params.messages,
temperature: params.temperature,
max_tokens: params.maxTokens
});
return response.choices[0].message;
}
private async handleEmbedding(params: EmbedParams) {
const response = await this.client.embeddings.create({
model: 'text-embedding-3-small',
input: params.input
});
return response.data[0].embedding;
}
async stop() {
// Cleanup if needed
}
}
Service Registration
Plugin Registration
Services are registered during plugin initialization:
export const discordPlugin: Plugin = {
name: 'discord',
services: [DiscordService],
init: async (config, runtime) => {
// Services auto-registered and started
}
};
Manual Registration
Services can also be registered manually:
await runtime.registerService(CustomService);
Service Management
Getting Services
Access services through the runtime:
// Get service by type
const discord = runtime.getService('discord');
// Type-safe service access
const searchService = runtime.getService<SearchService>('search');
const results = await searchService.search('elizaOS');
Service Communication
Services can interact with each other:
class NotificationService extends Service {
static serviceType = 'notification' as const;
capabilityDescription = 'Cross-platform notifications';
async notify(message: string) {
// Get Discord service
const discord = this.runtime.getService('discord');
if (discord) {
await discord.sendMessage(channelId, message);
}
// Get Telegram service
const telegram = this.runtime.getService('telegram');
if (telegram) {
await telegram.sendMessage(chatId, message);
}
// Get all services for broadcasting
const services = this.runtime.getAllServices();
for (const service of services) {
if (service.supportsNotifications) {
await service.notify(message);
}
}
}
}
Error Handling
Graceful Initialization
Handle missing configuration gracefully:
constructor(runtime: IAgentRuntime) {
super(runtime);
const token = runtime.getSetting("SERVICE_TOKEN");
if (!token) {
runtime.logger.warn("Service token not configured");
this.client = null;
return;
}
// Initialize with token
this.initializeClient(token);
}
Error Recovery
Implement retry logic and error recovery:
private async connectWithRetry(maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
await this.client.connect();
this.runtime.logger.info('Service connected successfully');
return;
} catch (error) {
this.runtime.logger.error(`Connection attempt ${i + 1} failed:`, error);
if (i < maxRetries - 1) {
const delay = Math.pow(2, i) * 1000; // Exponential backoff
await new Promise(resolve => setTimeout(resolve, delay));
} else {
throw error;
}
}
}
}
Graceful Shutdown
Ensure proper cleanup on service stop:
async stop() {
try {
// Stop accepting new work
this.accepting = false;
// Wait for ongoing work to complete
await this.waitForCompletion();
// Close connections
await this.client?.disconnect();
// Clear timers
if (this.interval) {
clearInterval(this.interval);
}
// Clear caches
this.cache?.clear();
this.runtime.logger.info('Service stopped gracefully');
} catch (error) {
this.runtime.logger.error('Error during service shutdown:', error);
// Force cleanup
this.forceCleanup();
}
}
Best Practices
Service Design
- Single Responsibility: Each service should have one clear purpose
- Stateless When Possible: Avoid maintaining state that could be lost
- Idempotent Operations: Operations should be safe to retry
- Resource Management: Clean up resources properly
- Error Isolation: Errors shouldn’t crash other services
Configuration
- Environment Variables: Use for sensitive configuration
- Graceful Defaults: Provide sensible defaults
- Validation: Validate configuration on startup
- Hot Reload: Support configuration updates without restart
- Async Operations: Use async/await for non-blocking operations
- Connection Pooling: Reuse connections when possible
- Caching: Cache frequently accessed data
- Rate Limiting: Respect external API limits
- Monitoring: Log performance metrics
Reliability
- Health Checks: Implement health check endpoints
- Circuit Breakers: Prevent cascade failures
- Retry Logic: Handle transient failures
- Graceful Degradation: Continue with reduced functionality
- Audit Logging: Log important operations
Common Services
| Service | Purpose | Example Plugin |
|---|
| Platform Services | Connect to chat platforms | Discord, Telegram, Twitter |
| Model Services | AI model providers | OpenAI, Anthropic, Ollama |
| Data Services | External data sources | Web search, SQL, APIs |
| Media Services | Process media | TTS, image generation, transcription |
| Background Services | Scheduled tasks | Task runner, cron jobs |
| Monitoring Services | System monitoring | Metrics, logging, alerting |
Model Context Protocol (MCP) Services
MCP (Model Context Protocol) allows your ElizaOS agent to use external tools and services. Think of it as giving your agent abilities like web search, file access, or API connections.
MCP Plugin Setup
bun add @elizaos/plugin-mcp
Add MCP to your character’s plugins:
export const character: Character = {
name: 'YourAgent',
plugins: [
'@elizaos/plugin-sql',
'@elizaos/plugin-bootstrap',
'@elizaos/plugin-mcp', // Add MCP plugin
// ... other plugins
],
// ... rest of configuration
};
MCP Server Types
MCP supports two types of servers:
1. STDIO Servers
STDIO servers run as local processes and communicate through standard input/output.
export const character: Character = {
name: 'WebSearchAgent',
plugins: ['@elizaos/plugin-mcp'],
settings: {
mcp: {
servers: {
firecrawl: {
type: 'stdio',
command: 'npx',
args: ['-y', 'firecrawl-mcp'],
env: {
// Optional: Add your Firecrawl API key if you have one
FIRECRAWL_API_KEY: process.env.FIRECRAWL_API_KEY || '',
},
},
},
},
},
system: 'You are a helpful assistant with web search capabilities.',
};
Capabilities:
- Search the web for current information
- Extract content from websites
- Perform deep research on topics
- Scrape structured data
2. SSE Servers
SSE (Server-Sent Events) servers connect to remote APIs through HTTP.
export const character: Character = {
name: 'APIAgent',
plugins: ['@elizaos/plugin-mcp'],
settings: {
mcp: {
servers: {
myApiServer: {
type: 'sse',
url: 'https://your-api-server.com/sse', // Replace with your SSE server URL
},
},
},
},
system: 'You are a helpful assistant with API access capabilities.',
};
Capabilities:
- Real-time data access
- API interactions
- Custom tool execution
- Dynamic resource fetching
Complete MCP Configuration Example
import { type Character } from '@elizaos/core';
export const character: Character = {
name: 'Eliza',
plugins: [
'@elizaos/plugin-sql',
...(process.env.ANTHROPIC_API_KEY ? ['@elizaos/plugin-anthropic'] : []),
...(process.env.OPENAI_API_KEY ? ['@elizaos/plugin-openai'] : []),
'@elizaos/plugin-bootstrap',
'@elizaos/plugin-mcp',
],
settings: {
mcp: {
servers: {
// STDIO server example - runs locally
firecrawl: {
type: 'stdio',
command: 'npx',
args: ['-y', 'firecrawl-mcp'],
env: {},
},
// SSE server example - connects to remote API
customApi: {
type: 'sse',
url: 'https://your-api.com/sse',
},
},
},
},
system: 'You are a helpful assistant with access to web search and API tools.',
bio: [
'Can search the web for information',
'Can connect to external APIs',
'Provides helpful responses',
],
};
Testing MCP Integration
- Start your agent:
- Ask your agent to use the tools:
- For web search: “Search for [topic]”
- For API tools: Use commands specific to your SSE server
MCP Troubleshooting
- Server not connecting: Check that the command/URL is correct
- Tools not available: Ensure
@elizaos/plugin-mcp is in your plugins array
- Permission errors: For STDIO servers, ensure the command can be executed
- CORS issues: For SSE servers, ensure proper CORS headers are configured
MCP Service Implementation
The MCP plugin internally creates services for each configured server:
class MCPService extends Service {
static serviceType = 'mcp' as const;
capabilityDescription = 'Model Context Protocol tool integration';
private servers: Map<string, MCPServer> = new Map();
async start(runtime: IAgentRuntime) {
const mcpConfig = runtime.getSetting('mcp');
for (const [name, config] of Object.entries(mcpConfig.servers)) {
if (config.type === 'stdio') {
await this.startSTDIOServer(name, config);
} else if (config.type === 'sse') {
await this.startSSEServer(name, config);
}
}
}
private async startSTDIOServer(name: string, config: STDIOConfig) {
const server = spawn(config.command, config.args, {
env: { ...process.env, ...config.env }
});
this.servers.set(name, server);
// Handle tool responses
server.stdout.on('data', (data) => {
this.handleToolResponse(name, data);
});
}
private async startSSEServer(name: string, config: SSEConfig) {
const eventSource = new EventSource(config.url);
eventSource.onmessage = (event) => {
this.handleToolResponse(name, event.data);
};
this.servers.set(name, eventSource);
}
async stop() {
for (const [name, server] of this.servers) {
if (server instanceof ChildProcess) {
server.kill();
} else if (server instanceof EventSource) {
server.close();
}
}
}
}
Service Testing
Unit Testing
Test services in isolation:
describe('SearchService', () => {
let service: SearchService;
let runtime: MockRuntime;
beforeEach(async () => {
runtime = createMockRuntime();
service = await SearchService.start(runtime);
});
afterEach(async () => {
await service.stop();
});
it('should cache search results', async () => {
const results1 = await service.search('test');
const results2 = await service.search('test');
expect(results1).toBe(results2); // Same object reference
});
});
Integration Testing
Test service interactions:
it('should notify through multiple channels', async () => {
const notificationService = runtime.getService('notification');
const discordSpy = jest.spyOn(discordService, 'sendMessage');
const telegramSpy = jest.spyOn(telegramService, 'sendMessage');
await notificationService.notify('Test message');
expect(discordSpy).toHaveBeenCalled();
expect(telegramSpy).toHaveBeenCalled();
});
Task Worker System
Task workers enable background task execution with scheduling, validation, and lifecycle management. Tasks are persisted in the database while workers are registered in-memory.
Task Interface
interface Task {
id?: UUID;
name: string; // Must match a registered TaskWorker.name
description: string;
roomId?: UUID;
worldId?: UUID;
entityId?: UUID;
tags: string[]; // Control tags: 'queue', 'repeat', etc.
metadata?: TaskMetadata;
updatedAt?: number;
}
interface TaskMetadata {
updateInterval?: number; // For recurring tasks (milliseconds)
options?: { name: string; description: string }[];
[key: string]: unknown;
}
TaskWorker Interface
interface TaskWorker {
name: string; // Unique identifier
execute: (
runtime: IAgentRuntime,
options: { [key: string]: unknown },
task: Task
) => Promise<void>;
validate?: (
runtime: IAgentRuntime,
message: Memory,
state: State
) => Promise<boolean>;
}
Registering Task Workers
// In your plugin or service initialization
runtime.registerTaskWorker({
name: 'SEND_DAILY_REPORT',
validate: async (runtime, message, state) => {
// Optional: Check if task should run
return true;
},
execute: async (runtime, options, task) => {
runtime.logger.info('Executing daily report task');
// Do the work
const report = await generateReport(runtime);
await sendReport(report, options.recipientId);
runtime.logger.info('Daily report sent');
}
});
Creating Tasks
// One-time task (deleted after execution)
await runtime.createTask({
name: 'SEND_DAILY_REPORT',
description: 'Send daily analytics report',
tags: ['queue'], // 'queue' = scheduled for execution
metadata: {
recipientId: 'user-123'
}
});
// Recurring task (keeps running)
await runtime.createTask({
name: 'SYNC_EXTERNAL_DATA',
description: 'Sync data from external API every hour',
tags: ['queue', 'repeat'], // 'repeat' = don't delete after execution
metadata: {
updateInterval: 1000 * 60 * 60 // 1 hour
}
});
Task Execution Flow
| Tag | Behavior |
|---|
queue | Task is eligible for execution |
repeat | Task persists after execution (recurring) |
immediate | Execute as soon as possible |
| Custom tags | For filtering and organization |
Runtime Task Methods
// Create a task
const taskId = await runtime.createTask(task);
// Get tasks by criteria
const tasks = await runtime.getTasks({
roomId: '...',
tags: ['queue'],
entityId: '...'
});
// Get specific task
const task = await runtime.getTask(taskId);
// Update task
await runtime.updateTask(taskId, {
metadata: { ...task.metadata, lastResult: 'success' }
});
// Delete task
await runtime.deleteTask(taskId);
// Get/register workers
runtime.registerTaskWorker(worker);
const worker = runtime.getTaskWorker('TASK_NAME');
Best Practices
- Idempotent execution: Tasks may run multiple times on failures
- Error handling: Catch errors in
execute() to prevent task service crashes
- Reasonable intervals: Don’t set
updateInterval too low (< 1000ms)
- Clean up: Delete completed one-time tasks to avoid database bloat
- Validation: Use
validate() to check preconditions before execution
See Also