communiti-message-aggregator

Intelligently batch and combine messages within a specified time window for n8n

Package Information

Downloads: 0 weeklyĀ /Ā 5 monthly
Latest Version: 1.0.1
Author: Communiti Node Team

Documentation

Message Aggregator - n8n Community Node

šŸš€ Intelligently batch and combine messages within a specified time window

✨ Features

  • Smart Message Batching: Automatically groups messages by any field (threadId, userId, etc.)
  • Configurable Wait Time: Set custom time windows (1-300 seconds) for message collection
  • Multiple Combine Methods: Join messages with newlines, spaces, or custom separators
  • Auto Database Setup: Automatically creates required Supabase tables
  • Statistics Tracking: Monitor aggregation performance and usage
  • Cleanup Management: Automatic cleanup of processed messages

šŸŽÆ Use Cases

  • Chat Applications: Combine rapid-fire messages from users
  • Notification Systems: Batch notifications before sending
  • Data Processing: Group related events for batch processing
  • API Optimization: Reduce API calls by processing messages in batches

šŸ“¦ Installation

npm install n8n-nodes-message-aggregator

šŸ”§ Setup

1. Create Supabase Credentials

  1. Go to your Supabase project dashboard
  2. Navigate to Settings > API
  3. Copy your:
    • Project URL: https://your-project.supabase.co
    • Service Role Key: (for table creation)
    • Anon Key: (for regular operations)

2. Add Credentials in n8n

  1. Go to Credentials in n8n
  2. Click Add Credential
  3. Search for "Supabase Message Aggregator API"
  4. Fill in your Supabase details

3. Setup Database Tables

  1. Add Message Aggregator node to your workflow
  2. Set Operation to "Setup Database"
  3. Configure table prefix and schema (optional)
  4. Execute the node to create required tables

šŸš€ Usage

Basic Message Aggregation

  1. Add Message Aggregator node to your workflow
  2. Set Operation to "Aggregate Messages"
  3. Configure:
    • Wait Time: How long to wait for additional messages (default: 15 seconds)
    • Group By Field: Field to group messages by (e.g., "threadId")
    • Message Field: Field containing message content (e.g., "message")
    • Combine Method: How to join messages (newline, space, custom)

Example Workflow

Webhook → Message Aggregator → Send to AI/API

Input Messages:

[
  {"threadId": "123", "message": "Hello"},
  {"threadId": "123", "message": "How are you?"},
  {"threadId": "123", "message": "I need help"}
]

Output (after 15 seconds):

{
  "threadId": "123",
  "message": "Hello\nHow are you?\nI need help",
  "messageCount": 3,
  "aggregatedAt": "2024-01-15T10:30:00Z"
}

āš™ļø Configuration Options

Aggregate Messages

Parameter Type Default Description
Wait Time Number 15 Seconds to wait for additional messages
Group By Field String "threadId" Field to group messages by
Message Field String "message" Field containing message content
Combine Method Options "newline" How to join messages
Custom Separator String " | " Custom separator (if selected)
Max Messages Number 50 Maximum messages per batch

Setup Database

Parameter Type Default Description
Table Prefix String "msg_agg_" Prefix for created tables
Database Schema String "public" Database schema to use
Enable RLS Boolean true Enable Row Level Security

šŸ“Š Database Tables

The node automatically creates two tables:

msg_agg_buffer

Temporary storage for incoming messages

  • id: Primary key
  • group_key: Grouping field value
  • message_content: Message content
  • original_data: Full original message data
  • created_at: Message timestamp
  • expires_at: When message expires
  • workflow_id: n8n workflow ID
  • execution_id: n8n execution ID

msg_agg_stats

Statistics and monitoring

  • id: Primary key
  • workflow_id: n8n workflow ID
  • group_key: Grouping field value
  • message_count: Number of messages in batch
  • wait_time_seconds: Wait time used
  • combined_length: Length of combined message
  • processed_at: Processing timestamp

šŸ”’ Security

  • Row Level Security: Automatically enabled on created tables
  • Credential Management: Secure storage of Supabase credentials
  • Data Isolation: Each workflow execution is isolated

šŸ› ļø Development

Building

npm run build

Linting

npm run lint
npm run lintfix

Testing

npm test

šŸ“ Examples

Chat Message Batching

Perfect for chatbots that need to process multiple rapid messages:

{
  "waitTime": 10,
  "groupByField": "userId",
  "messageField": "text",
  "combineMethod": "newline"
}

Notification Aggregation

Batch notifications before sending emails:

{
  "waitTime": 60,
  "groupByField": "recipientEmail",
  "messageField": "notificationText",
  "combineMethod": "custom",
  "customSeparator": "\n• "
}

Event Processing

Group related events for batch processing:

{
  "waitTime": 30,
  "groupByField": "eventSource",
  "messageField": "eventData",
  "combineMethod": "space"
}

šŸ¤ Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

šŸ“„ License

MIT License - see LICENSE file for details.

šŸ†˜ Support


Made with ā¤ļø for the n8n community

Discussion