Package Information
Available Nodes
Documentation
n8n Nodes: Bozonx Redis Sugar
Community nodes for n8n to work with Redis, offering cache, streams, queues, and list management:
- Redis Cache: read/write JSON values with optional TTL
- Redis Pub: append entries to Redis Streams (XADD)
- Redis Sub Trigger: consume entries from Redis Streams (XREAD)
- Redis Queue Add: add jobs to a Redis-based queue for sequential processing
- Redis Queue Trigger: process jobs from a Redis-based queue
- Redis Queue Complete: mark queue job as complete and release lock
- Redis List Sugar: manage simple lists with YAML/JSON parsing
n8n is a fair-code licensed workflow automation platform: https://n8n.io/
Installation
Follow the official community nodes installation guide: https://docs.n8n.io/integrations/community-nodes/installation/
Package name: n8n-nodes-bozonx-redis-sugar
Credentials
All nodes use the same Redis credentials:
- Host (default:
localhost) - Port (default:
6379) - Username (optional)
- Password (optional)
- TLS (boolean)
- DB Index (default:
0)
You can use expressions and environment variables, e.g., {{$env.REDIS_HOST}}, {{$env.REDIS_PASSWORD}}.
Nodes
1) Redis Cache
Read and write JSON data to Redis with optional TTL (Time To Live).
Parameters:
- Mode: Write or Read
- Key: Redis key name
- Payload Type (Write only): YAML/JSON or Data Fields
- TTL: Expiration time (0 = no expiration). Units: seconds, minutes, hours, days
Output:
- Write:
{ ok: true, key, ttlSeconds, data } - Read (hit):
{ found: true, key, data } - Read (miss):
{ found: false, key }
Details:
- YAML/JSON mode: Input is parsed as YAML (superset of JSON), then falls back to JSON parsing.
- Data Fields mode: Define multiple typed fields (string, number, boolean, json, null). JSON values are validated.
- TTL: Set to 0 for no expiration.
2) Redis Pub (Streams)
Append entries to Redis Streams using XADD.
Parameters:
- Event Name: Redis stream key
- Payload Mode: YAML/JSON or Key-Value
- Stream Trimming:
MAXLEN ~ 1000000(optional) - Stream TTL:
EXPIRE 86400seconds (optional)
Output:
{
"payload": "<parsed_payload>",
"_stream": "mystream",
"_id": "1728666000000-0"
}
Details:
- YAML/JSON mode: Sends a single field
data. If input is empty, uses the incoming item (stringified). - Key-Value mode: Define typed fields. Invalid JSON values are rejected.
3) Redis Sub Trigger (Streams)
Consume entries from Redis Streams using XREAD.
Parameters:
- Stream Key: Redis stream to listen to
- Allowed Stale (Seconds): If > 0, emits recent entries on start; 0 = only new entries
- Blocking Read:
BLOCK 10000(10 seconds) - Batch Size:
COUNT 50
Output:
{
"payload": "<parsed_payload_or_fields>",
"_stream": "mystream",
"_id": "1728666000000-0"
}
Details:
- Persists the last stream ID in workflow static data to resume from the correct position.
4) Redis Queue Add
Add jobs to a Redis-based queue for strictly sequential processing. Jobs with the same Queue Identifier are processed one-by-one; different identifiers run in parallel.
Parameters (all required except Queue Identifier):
- Callback ID (required): Unique identifier linking this node to a specific Redis Queue Trigger. Must match exactly.
- Queue Identifier (optional): Creates separate queues (e.g., per user:
user-123). Jobs with the same identifier are sequential. Do not use "##" in this value. - Data (YAML/JSON): Job payload. Leave empty to use the incoming item.
- Lock TTL (seconds): Maximum job execution time. If exceeded, the lock expires and the next job can start.
- Response Timeout (seconds): How long to wait for the job result before throwing an error.
- Include Other Input Fields: Merge input data into output (default: false)
Output:
{
"result": {
"callbackId": "my-queue",
"queueIdentifier": "user-456",
"jobId": "user-456##1735308000000##0##abc12",
"batchIndex": 0,
"isFirstInBatch": true,
"isLastInBatch": true,
"queuePosition": 1,
"isFirstInQueue": false,
"data": { "result": "from worker" },
"isQueueEmpty": false
}
}
Output Fields:
callbackId: Callback identifierqueueIdentifier: Queue identifier (ornull)jobId: Unique job ID (format:queueIdentifier##executionId##itemIndex##random)batchIndex,isFirstInBatch,isLastInBatch: Batch position infoqueuePosition: Position in queue at enqueue time (snapshot, may change)isFirstInQueue: Whether this was the first job at enqueue time (snapshot)data: Result data from the workerisQueueEmpty: Whether the queue is now empty
Important:
queuePositionandisFirstInQueueare snapshots taken when the job is added. They do not update if jobs are deferred or re-queued.
5) Redis Queue Trigger
Process jobs from a Redis-based queue. Automatically discovers and consumes jobs from all active queue identifiers.
Parameters:
- Callback ID (required): Identifier linking this trigger to Redis Queue Add nodes. Must match exactly.
- Polling Interval (ms): Delay before retrying when a lock is busy (default: 1000ms)
Output:
{
"callbackId": "my-queue",
"data": { "your": "job data" },
"jobId": "user-456##1735308000000##0##abc12",
"queueIdentifier": "user-456",
"batchIndex": 0,
"isFirstInBatch": true,
"isLastInBatch": true,
"queuePosition": 1,
"isFirstInQueue": false
}
Details:
- Uses
BLPOPfor efficient, event-driven job consumption. - Supports multiple queue identifiers in parallel.
- Requires Redis 6.0.6+ for queue position tracking (
LPOScommand).
6) Redis Queue Complete
ESSENTIAL NODE: Releases the queue lock and allows the next job to start. Place this at the end of your queue processing workflow.
Parameters:
- Callback ID (required): Default:
={{ $json.callbackId }} - Job ID (required): Default:
={{ $json.jobId }} - Include Other Input Fields: Merge input data into output (default: false)
Output:
{
"result": {
"data": { "your": "processed data" },
"isQueueEmpty": true,
"callbackId": "my-queue",
"queueIdentifier": "user-456",
"jobId": "user-456##1735308000000##0##abc12",
"batchIndex": 0,
"isFirstInBatch": true,
"isLastInBatch": true,
"queuePosition": 0,
"isFirstInQueue": true
}
}
Output Fields:
isQueueEmpty:trueif this was the last job (queue is now empty)- Other fields: Passed through from the trigger for convenience
Critical Warning:
- Always call this node, even on errors! Use n8n's Error Trigger or Error Stub to ensure the lock is released.
- If not called, the queue will freeze until the Lock TTL expires.
Understanding the Queue System
Strictly Sequential Execution
- Within a single
Callback ID+Queue Identifier, only one job runs at a time. - New jobs wait in the queue until the current job calls Redis Queue Complete.
- Different
Queue Identifiers(e.g.,user-123,user-456) process in parallel.
The "Leaked Lock" Problem
If a workflow crashes before Redis Queue Complete is called, the lock remains in Redis and the queue freezes.
Solutions:
- Set a reasonable Lock TTL: Match your actual job duration (e.g., 60s for a 10s job). The queue auto-unfreezes after TTL expires.
- Error Handling: Use n8n's Error Trigger to call Redis Queue Complete even on failure.
- Manual Fix: Delete the lock key in Redis:
lock:{callbackId}:{queueIdentifier}.
7) Redis List Sugar
Manage sophisticated lists of objects in Redis with built-in JSON/YAML parsing and multi-field matching logic.
Key Features:
- Strict Object Mode: All elements are enforced to be valid JSON objects.
- Smart Parsing: Input data in YAML or JSON format.
- Multi-Field Matching: Delete, Update, and Filter (Get Range) operations match elements that contain ALL specified fields from your search object.
- Batch Add: Add a single object or an array of objects in one operation.
- Atomic Patch: Update specific fields in matching objects without overwriting the entire item.
Operations
- Add Element: Adds one or more objects to the list. Supports YAML/JSON strings or arrays of objects.
- Example (Single):
{ "id": 1, "name": "Item 1" } - Example (Batch):
[{ "id": 2 }, { "id": 3 }](Each will be a separate list item)
- Example (Single):
- Delete Elements: Removes ALL objects that match your criteria.
- Matching logic: If you search for
{ "status": "expired" }, all objects containing that field and value will be removed.
- Matching logic: If you search for
- Update Elements: Updates objects matching your criteria.
- Patch (Merge Fields): Merges new fields into the existing object.
- Replace (Overwrite): Replaces the entire object.
- Get Range / All Elements: Retrieves objects. Optionally filters them using the multi-field matching logic.
- Clear List: Deletes the entire Redis key.
Matching Example
If your Redis list contains:
{"id": 101, "category": "books", "price": 10}{"id": 102, "category": "toys", "price": 10}{"id": 103, "category": "books", "price": 20}
Searching for {"category": "books"} will match items 1 and 3.
Searching for {"category": "books", "price": 10} will match only item 1.
Quick Start
Cache Example
- Add Redis credentials in n8n
- Redis Cache (Write): Set key, payload, TTL
- Redis Cache (Read): Read the same key
Streams Example
- Redis Pub: Set stream key and payload
- Redis Sub Trigger: Listen to the same stream
Queue Example
Workflow 1 (Producer):
- Schedule Trigger → Redis Queue Add
- Callback ID:
emails - Queue Identifier:
={{ $json.userId }} - Data:
{ "email": "user@example.com" }
- Callback ID:
Workflow 2 (Consumer):
- Redis Queue Trigger (Callback ID:
emails)- → Send Email
- → Redis Queue Complete
- → IF node:
={{ $json.result.isQueueEmpty }}- True → Send summary email
- False → End
Usage Examples
Redis Cache (Write)
- Mode: Write
- Key:
cache:user:42 - Payload Type: YAML/JSON
- Data:
{ "id": 42, "name": "Ada" } - TTL: 24 hours
Redis Cache (Read)
- Mode: Read
- Key:
cache:user:42
Redis Pub
- Stream Key:
events:main - Payload Mode: YAML/JSON
- Payload: (leave empty to use incoming item)
Redis Sub Trigger
- Stream Key:
events:main - Allowed Stale:
0(only new entries)
Redis Queue Add
- Callback ID:
image-processor(required) - Queue Identifier:
={{ $json.userId }}(e.g.,user-123) - Data:
{ "url": "https://example.com/image.jpg", "format": "webp" } - Lock TTL:
120seconds - Response Timeout:
300seconds
Redis Queue Trigger
- Callback ID:
image-processor(must match producer) - Polling Interval:
1000ms
Redis Queue Complete
- Callback ID:
={{ $json.callbackId }} - Job ID:
={{ $json.jobId }}
Redis List Sugar
- Operation: Update Elements
- Update Mode: Patch (Merge Fields)
- List ID:
active_users - Search Value:
{"status": "expired"} - Value:
{"status": "archived", "updatedAt": "2024-01-01"} - TTL:
3600(refreshes whole list expiration to 1 hour)
Development
# Install dependencies
pnpm install
# Lint
pnpm lint
# Build
pnpm build
# Watch mode
pnpm dev
# or
pnpm run build:watch
Local Redis (for testing)
The package includes a compose.yaml for local development:
- Redis:
localhost:6379 - RedisInsight UI:
http://localhost:5540
# Start
docker compose -f compose.yaml up -d
# Stop
docker compose -f compose.yaml down
Connection params:
- Host:
localhost - Port:
6379 - Username: (empty)
- Password: (empty)
- TLS:
false - DB Index:
0
Quick test:
redis-cli -h localhost -p 6379 ping
Docker networking:
- If n8n runs in the same Compose network, use
redisas the host. - If separate networks, expose Redis and use the reachable host.
Publishing
- n8n Community Node flow:
pnpm release:n8n - Plain npm publish: See
dev_docs/publish.md
Before publishing, n8n-node prerelease runs automatically.
Requirements
- n8n: 1.60.0+
- node-redis: 5.x
- Redis Server: 6.0.6+ (required for queue nodes)
Compatibility
Built and tested with:
- n8n 1.60.0+
- node-redis 5.x
- Redis 6.0.6+
Note: Redis Queue nodes require Redis 6.0.6+ due to the LPOS command used for queue position tracking.
License
MIT