Documentation

n8n Nodes: Bozonx Redis Sugar

Community nodes for n8n to work with Redis, offering cache, streams, queues, and list management:

  • Redis Cache: read/write JSON values with optional TTL
  • Redis Pub: append entries to Redis Streams (XADD)
  • Redis Sub Trigger: consume entries from Redis Streams (XREAD)
  • Redis Queue Add: add jobs to a Redis-based queue for sequential processing
  • Redis Queue Trigger: process jobs from a Redis-based queue
  • Redis Queue Complete: mark queue job as complete and release lock
  • Redis List Sugar: manage simple lists with YAML/JSON parsing

n8n is a fair-code licensed workflow automation platform: https://n8n.io/

Installation

Follow the official community nodes installation guide: https://docs.n8n.io/integrations/community-nodes/installation/

Package name: n8n-nodes-bozonx-redis-sugar

Credentials

All nodes use the same Redis credentials:

  • Host (default: localhost)
  • Port (default: 6379)
  • Username (optional)
  • Password (optional)
  • TLS (boolean)
  • DB Index (default: 0)

You can use expressions and environment variables, e.g., {{$env.REDIS_HOST}}, {{$env.REDIS_PASSWORD}}.


Nodes

1) Redis Cache

Read and write JSON data to Redis with optional TTL (Time To Live).

Parameters:

  • Mode: Write or Read
  • Key: Redis key name
  • Payload Type (Write only): YAML/JSON or Data Fields
  • TTL: Expiration time (0 = no expiration). Units: seconds, minutes, hours, days

Output:

  • Write: { ok: true, key, ttlSeconds, data }
  • Read (hit): { found: true, key, data }
  • Read (miss): { found: false, key }

Details:

  • YAML/JSON mode: Input is parsed as YAML (superset of JSON), then falls back to JSON parsing.
  • Data Fields mode: Define multiple typed fields (string, number, boolean, json, null). JSON values are validated.
  • TTL: Set to 0 for no expiration.

2) Redis Pub (Streams)

Append entries to Redis Streams using XADD.

Parameters:

  • Event Name: Redis stream key
  • Payload Mode: YAML/JSON or Key-Value
  • Stream Trimming: MAXLEN ~ 1000000 (optional)
  • Stream TTL: EXPIRE 86400 seconds (optional)

Output:

{
  "payload": "<parsed_payload>",
  "_stream": "mystream",
  "_id": "1728666000000-0"
}

Details:

  • YAML/JSON mode: Sends a single field data. If input is empty, uses the incoming item (stringified).
  • Key-Value mode: Define typed fields. Invalid JSON values are rejected.

3) Redis Sub Trigger (Streams)

Consume entries from Redis Streams using XREAD.

Parameters:

  • Stream Key: Redis stream to listen to
  • Allowed Stale (Seconds): If > 0, emits recent entries on start; 0 = only new entries
  • Blocking Read: BLOCK 10000 (10 seconds)
  • Batch Size: COUNT 50

Output:

{
  "payload": "<parsed_payload_or_fields>",
  "_stream": "mystream",
  "_id": "1728666000000-0"
}

Details:

  • Persists the last stream ID in workflow static data to resume from the correct position.

4) Redis Queue Add

Add jobs to a Redis-based queue for strictly sequential processing. Jobs with the same Queue Identifier are processed one-by-one; different identifiers run in parallel.

Parameters (all required except Queue Identifier):

  • Callback ID (required): Unique identifier linking this node to a specific Redis Queue Trigger. Must match exactly.
  • Queue Identifier (optional): Creates separate queues (e.g., per user: user-123). Jobs with the same identifier are sequential. Do not use "##" in this value.
  • Data (YAML/JSON): Job payload. Leave empty to use the incoming item.
  • Lock TTL (seconds): Maximum job execution time. If exceeded, the lock expires and the next job can start.
  • Response Timeout (seconds): How long to wait for the job result before throwing an error.
  • Include Other Input Fields: Merge input data into output (default: false)

Output:

{
  "result": {
    "callbackId": "my-queue",
    "queueIdentifier": "user-456",
    "jobId": "user-456##1735308000000##0##abc12",
    "batchIndex": 0,
    "isFirstInBatch": true,
    "isLastInBatch": true,
    "queuePosition": 1,
    "isFirstInQueue": false,
    "data": { "result": "from worker" },
    "isQueueEmpty": false
  }
}

Output Fields:

  • callbackId: Callback identifier
  • queueIdentifier: Queue identifier (or null)
  • jobId: Unique job ID (format: queueIdentifier##executionId##itemIndex##random)
  • batchIndex, isFirstInBatch, isLastInBatch: Batch position info
  • queuePosition: Position in queue at enqueue time (snapshot, may change)
  • isFirstInQueue: Whether this was the first job at enqueue time (snapshot)
  • data: Result data from the worker
  • isQueueEmpty: Whether the queue is now empty

Important:

  • queuePosition and isFirstInQueue are snapshots taken when the job is added. They do not update if jobs are deferred or re-queued.

5) Redis Queue Trigger

Process jobs from a Redis-based queue. Automatically discovers and consumes jobs from all active queue identifiers.

Parameters:

  • Callback ID (required): Identifier linking this trigger to Redis Queue Add nodes. Must match exactly.
  • Polling Interval (ms): Delay before retrying when a lock is busy (default: 1000ms)

Output:

{
  "callbackId": "my-queue",
  "data": { "your": "job data" },
  "jobId": "user-456##1735308000000##0##abc12",
  "queueIdentifier": "user-456",
  "batchIndex": 0,
  "isFirstInBatch": true,
  "isLastInBatch": true,
  "queuePosition": 1,
  "isFirstInQueue": false
}

Details:

  • Uses BLPOP for efficient, event-driven job consumption.
  • Supports multiple queue identifiers in parallel.
  • Requires Redis 6.0.6+ for queue position tracking (LPOS command).

6) Redis Queue Complete

ESSENTIAL NODE: Releases the queue lock and allows the next job to start. Place this at the end of your queue processing workflow.

Parameters:

  • Callback ID (required): Default: ={{ $json.callbackId }}
  • Job ID (required): Default: ={{ $json.jobId }}
  • Include Other Input Fields: Merge input data into output (default: false)

Output:

{
  "result": {
    "data": { "your": "processed data" },
    "isQueueEmpty": true,
    "callbackId": "my-queue",
    "queueIdentifier": "user-456",
    "jobId": "user-456##1735308000000##0##abc12",
    "batchIndex": 0,
    "isFirstInBatch": true,
    "isLastInBatch": true,
    "queuePosition": 0,
    "isFirstInQueue": true
  }
}

Output Fields:

  • isQueueEmpty: true if this was the last job (queue is now empty)
  • Other fields: Passed through from the trigger for convenience

Critical Warning:

  • Always call this node, even on errors! Use n8n's Error Trigger or Error Stub to ensure the lock is released.
  • If not called, the queue will freeze until the Lock TTL expires.

Understanding the Queue System

Strictly Sequential Execution

  • Within a single Callback ID + Queue Identifier, only one job runs at a time.
  • New jobs wait in the queue until the current job calls Redis Queue Complete.
  • Different Queue Identifiers (e.g., user-123, user-456) process in parallel.

The "Leaked Lock" Problem

If a workflow crashes before Redis Queue Complete is called, the lock remains in Redis and the queue freezes.

Solutions:

  1. Set a reasonable Lock TTL: Match your actual job duration (e.g., 60s for a 10s job). The queue auto-unfreezes after TTL expires.
  2. Error Handling: Use n8n's Error Trigger to call Redis Queue Complete even on failure.
  3. Manual Fix: Delete the lock key in Redis: lock:{callbackId}:{queueIdentifier}.

7) Redis List Sugar

Manage sophisticated lists of objects in Redis with built-in JSON/YAML parsing and multi-field matching logic.

Key Features:

  • Strict Object Mode: All elements are enforced to be valid JSON objects.
  • Smart Parsing: Input data in YAML or JSON format.
  • Multi-Field Matching: Delete, Update, and Filter (Get Range) operations match elements that contain ALL specified fields from your search object.
  • Batch Add: Add a single object or an array of objects in one operation.
  • Atomic Patch: Update specific fields in matching objects without overwriting the entire item.

Operations

  • Add Element: Adds one or more objects to the list. Supports YAML/JSON strings or arrays of objects.
    • Example (Single): { "id": 1, "name": "Item 1" }
    • Example (Batch): [{ "id": 2 }, { "id": 3 }] (Each will be a separate list item)
  • Delete Elements: Removes ALL objects that match your criteria.
    • Matching logic: If you search for { "status": "expired" }, all objects containing that field and value will be removed.
  • Update Elements: Updates objects matching your criteria.
    • Patch (Merge Fields): Merges new fields into the existing object.
    • Replace (Overwrite): Replaces the entire object.
  • Get Range / All Elements: Retrieves objects. Optionally filters them using the multi-field matching logic.
  • Clear List: Deletes the entire Redis key.

Matching Example

If your Redis list contains:

  1. {"id": 101, "category": "books", "price": 10}
  2. {"id": 102, "category": "toys", "price": 10}
  3. {"id": 103, "category": "books", "price": 20}

Searching for {"category": "books"} will match items 1 and 3.
Searching for {"category": "books", "price": 10} will match only item 1.


Quick Start

Cache Example

  1. Add Redis credentials in n8n
  2. Redis Cache (Write): Set key, payload, TTL
  3. Redis Cache (Read): Read the same key

Streams Example

  1. Redis Pub: Set stream key and payload
  2. Redis Sub Trigger: Listen to the same stream

Queue Example

Workflow 1 (Producer):

  • Schedule TriggerRedis Queue Add
    • Callback ID: emails
    • Queue Identifier: ={{ $json.userId }}
    • Data: { "email": "user@example.com" }

Workflow 2 (Consumer):

  • Redis Queue Trigger (Callback ID: emails)
    • Send Email
    • Redis Queue Complete
    • IF node: ={{ $json.result.isQueueEmpty }}
      • True → Send summary email
      • False → End

Usage Examples

Redis Cache (Write)

  • Mode: Write
  • Key: cache:user:42
  • Payload Type: YAML/JSON
  • Data: { "id": 42, "name": "Ada" }
  • TTL: 24 hours

Redis Cache (Read)

  • Mode: Read
  • Key: cache:user:42

Redis Pub

  • Stream Key: events:main
  • Payload Mode: YAML/JSON
  • Payload: (leave empty to use incoming item)

Redis Sub Trigger

  • Stream Key: events:main
  • Allowed Stale: 0 (only new entries)

Redis Queue Add

  • Callback ID: image-processor (required)
  • Queue Identifier: ={{ $json.userId }} (e.g., user-123)
  • Data: { "url": "https://example.com/image.jpg", "format": "webp" }
  • Lock TTL: 120 seconds
  • Response Timeout: 300 seconds

Redis Queue Trigger

  • Callback ID: image-processor (must match producer)
  • Polling Interval: 1000 ms

Redis Queue Complete

  • Callback ID: ={{ $json.callbackId }}
  • Job ID: ={{ $json.jobId }}

Redis List Sugar

  • Operation: Update Elements
  • Update Mode: Patch (Merge Fields)
  • List ID: active_users
  • Search Value: {"status": "expired"}
  • Value: {"status": "archived", "updatedAt": "2024-01-01"}
  • TTL: 3600 (refreshes whole list expiration to 1 hour)

Development

# Install dependencies
pnpm install

# Lint
pnpm lint

# Build
pnpm build

# Watch mode
pnpm dev
# or
pnpm run build:watch

Local Redis (for testing)

The package includes a compose.yaml for local development:

  • Redis: localhost:6379
  • RedisInsight UI: http://localhost:5540
# Start
docker compose -f compose.yaml up -d

# Stop
docker compose -f compose.yaml down

Connection params:

  • Host: localhost
  • Port: 6379
  • Username: (empty)
  • Password: (empty)
  • TLS: false
  • DB Index: 0

Quick test:

redis-cli -h localhost -p 6379 ping

Docker networking:

  • If n8n runs in the same Compose network, use redis as the host.
  • If separate networks, expose Redis and use the reachable host.

Publishing

  • n8n Community Node flow: pnpm release:n8n
  • Plain npm publish: See dev_docs/publish.md

Before publishing, n8n-node prerelease runs automatically.


Requirements

  • n8n: 1.60.0+
  • node-redis: 5.x
  • Redis Server: 6.0.6+ (required for queue nodes)

Compatibility

Built and tested with:

  • n8n 1.60.0+
  • node-redis 5.x
  • Redis 6.0.6+

Note: Redis Queue nodes require Redis 6.0.6+ due to the LPOS command used for queue position tracking.


License

MIT

Discussion