Package Information
Downloads: 1 weeklyĀ /Ā 16 monthly
Latest Version: 1.0.6
Author: Communiti Node Team
Documentation
n8n-nodes-communiti-message-aggregator
Intelligently aggregate messages with smart waiting strategies for n8n workflows.
Installation
npm install n8n-nodes-communiti-message-aggregator
Features
Processing Strategies
- Smart Wait: Wait for a period with no new messages before processing
- Immediate on Complete: Process immediately when detecting completion words
- Max Count: Process when reaching maximum message count
Key Capabilities
- Group messages by any field (user_id, thread_id, session_id, etc.)
- Multiple aggregation methods (newline, space, comma, custom separator)
- Persistent storage with Supabase database
- Automatic table creation and management
- Comprehensive output with metadata and processing details
š§ Setup
1. Create Supabase Credentials
- Go to your Supabase project dashboard
- Navigate to Settings > API
- Copy your:
- Project URL:
https://your-project.supabase.co - Service Role Key: (for table creation)
- Anon Key: (for regular operations)
- Project URL:
2. Add Credentials in n8n
- Go to Credentials in n8n
- Click Add Credential
- Search for "Supabase Message Aggregator API"
- Fill in your Supabase details
3. Setup Database Tables
Option 1: Automatic Setup (Recommended)
- Add Message Aggregator node to your workflow
- Enable Auto Create Tables option
- Set your desired Table Prefix (default: "message_aggregator")
- Execute the node - it will automatically create tables if they don't exist
Option 2: Manual Setup
- Open your Supabase SQL Editor
- Copy and run the SQL from
supabase-setup.sqlfile (included in the package) - Or run this SQL manually:
-- Create the stored procedure
CREATE OR REPLACE FUNCTION create_message_aggregator_tables(table_prefix TEXT)
RETURNS TEXT
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
BEGIN
-- Create buffer table
EXECUTE format('
CREATE TABLE IF NOT EXISTS %I_buffer (
id BIGSERIAL PRIMARY KEY,
group_key TEXT NOT NULL,
message TEXT NOT NULL,
metadata JSONB,
workflow_id TEXT,
execution_id TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
)', table_prefix);
-- Create indexes
EXECUTE format('CREATE INDEX IF NOT EXISTS idx_%I_buffer_group_key ON %I_buffer(group_key)', table_prefix, table_prefix);
EXECUTE format('CREATE INDEX IF NOT EXISTS idx_%I_buffer_created_at ON %I_buffer(created_at)', table_prefix, table_prefix);
-- Create stats table
EXECUTE format('
CREATE TABLE IF NOT EXISTS %I_stats (
id BIGSERIAL PRIMARY KEY,
group_key TEXT NOT NULL,
message_count INTEGER NOT NULL,
strategy TEXT,
trigger TEXT,
workflow_id TEXT,
processed_at TIMESTAMPTZ DEFAULT NOW()
)', table_prefix);
-- Create indexes
EXECUTE format('CREATE INDEX IF NOT EXISTS idx_%I_stats_workflow_id ON %I_stats(workflow_id)', table_prefix, table_prefix);
EXECUTE format('CREATE INDEX IF NOT EXISTS idx_%I_stats_processed_at ON %I_stats(processed_at)', table_prefix, table_prefix);
RETURN 'Tables created successfully for prefix: ' || table_prefix;
END;
$$;
-- Create default tables
SELECT create_message_aggregator_tables('message_aggregator');
Note: If automatic table creation fails, the node will provide the exact SQL commands in the console log for manual execution.
š Usage
Basic Message Aggregation
- Add Message Aggregator node to your workflow
- Set Operation to "Aggregate Messages"
- Configure:
- Wait Time: How long to wait for additional messages (default: 15 seconds)
- Group By Field: Field to group messages by (e.g., "threadId")
- Message Field: Field containing message content (e.g., "message")
- Combine Method: How to join messages (newline, space, custom)
Example Workflow
Webhook ā Message Aggregator ā Send to AI/API
Input Messages:
[
{"threadId": "123", "message": "Hello"},
{"threadId": "123", "message": "How are you?"},
{"threadId": "123", "message": "I need help"}
]
Output (after 15 seconds):
{
"threadId": "123",
"message": "Hello\nHow are you?\nI need help",
"messageCount": 3,
"aggregatedAt": "2024-01-15T10:30:00Z"
}
āļø Configuration Options
Aggregate Messages
| Parameter | Type | Default | Description |
|---|---|---|---|
| Wait Time | Number | 15 | Seconds to wait for additional messages |
| Group By Field | String | "threadId" | Field to group messages by |
| Message Field | String | "message" | Field containing message content |
| Combine Method | Options | "newline" | How to join messages |
| Custom Separator | String | " | " | Custom separator (if selected) |
| Max Messages | Number | 50 | Maximum messages per batch |
Setup Database
| Parameter | Type | Default | Description |
|---|---|---|---|
| Table Prefix | String | "msg_agg_" | Prefix for created tables |
| Database Schema | String | "public" | Database schema to use |
| Enable RLS | Boolean | true | Enable Row Level Security |
š Database Tables
The node automatically creates two tables:
msg_agg_buffer
Temporary storage for incoming messages
id: Primary keygroup_key: Grouping field valuemessage_content: Message contentoriginal_data: Full original message datacreated_at: Message timestampexpires_at: When message expiresworkflow_id: n8n workflow IDexecution_id: n8n execution ID
msg_agg_stats
Statistics and monitoring
id: Primary keyworkflow_id: n8n workflow IDgroup_key: Grouping field valuemessage_count: Number of messages in batchwait_time_seconds: Wait time usedcombined_length: Length of combined messageprocessed_at: Processing timestamp
š Security
- Row Level Security: Automatically enabled on created tables
- Credential Management: Secure storage of Supabase credentials
- Data Isolation: Each workflow execution is isolated
š ļø Development
Building
npm run build
Linting
npm run lint
npm run lintfix
Testing
npm test
š Examples
Chat Message Batching
Perfect for chatbots that need to process multiple rapid messages:
{
"waitTime": 10,
"groupByField": "userId",
"messageField": "text",
"combineMethod": "newline"
}
Notification Aggregation
Batch notifications before sending emails:
{
"waitTime": 60,
"groupByField": "recipientEmail",
"messageField": "notificationText",
"combineMethod": "custom",
"customSeparator": "\n⢠"
}
Event Processing
Group related events for batch processing:
{
"waitTime": 30,
"groupByField": "eventSource",
"messageField": "eventData",
"combineMethod": "space"
}
š¤ Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
š License
MIT License - see LICENSE file for details.
š Support
- Issues: GitHub Issues
- Documentation: Full Documentation
- Community: n8n Community
Made with ā¤ļø for the n8n community