Kafka Consumer icon

Kafka Consumer

Consume messages from a Kafka topic within a workflow

Overview

This node consumes messages from a specified Kafka topic within an n8n workflow. It connects to a Kafka cluster as a consumer, subscribes to a topic, and collects messages for a configured wait time before outputting them. The node supports options such as reading messages from the beginning of the topic, parsing JSON messages, returning message headers, and committing offsets automatically.

Common scenarios where this node is beneficial include:

  • Integrating real-time data streams from Kafka into automation workflows.
  • Processing event-driven data pipelines where Kafka acts as the message broker.
  • Consuming messages for analytics, monitoring, or triggering downstream processes.

Practical example:

  • A user wants to consume all new messages from a Kafka topic named "orders" to trigger order processing workflows in n8n. They configure the node with the topic name, consumer group ID, and wait time to collect messages before continuing the workflow.

Properties

Name Meaning
Topic Name of the Kafka topic to consume messages from.
Group ID Identifier for the Kafka consumer group.
Wait Time Duration in milliseconds to wait for messages before finishing consumption and returning results.
Use Schema Registry Whether to use Confluent Schema Registry for message schema management (requires URL if enabled).
Schema Registry URL URL of the Confluent Schema Registry (required if "Use Schema Registry" is enabled).
Options Collection of additional configuration options:
- Allow Topic Creation Allow sending messages to topics that do not yet exist.
- Auto Commit Threshold Number of messages after which the consumer commits offsets automatically.
- Auto Commit Interval Time interval in milliseconds after which the consumer commits offsets automatically.
- Heartbeat Interval Interval in milliseconds for heartbeats to keep the consumer session active (must be less than session timeout).
- Max Number of Requests Maximum number of unacknowledged requests sent on a single connection.
- Read Messages From Beginning Whether to read messages starting from the beginning of the topic.
- JSON Parse Message Whether to parse the consumed message value as JSON.
- Only Message If JSON parsing is enabled, whether to return only the parsed message content instead of full metadata.
- Return Headers Whether to include Kafka message headers in the output.
- Session Timeout Time in milliseconds to await a response before timing out.

Output

The node outputs an array of JSON objects representing the consumed Kafka messages collected during the wait time. Each object has the following structure depending on options:

  • If JSON parsing is disabled:

    {
      "message": "<string representation of the message value>",
      "topic": "<topic name>",
      // optionally
      "headers": { "<headerKey>": "<headerValue>", ... }
    }
    
  • If JSON parsing is enabled and "Only Message" is false:

    {
      "message": <parsed JSON object>,
      "topic": "<topic name>",
      // optionally
      "headers": { "<headerKey>": "<headerValue>", ... }
    }
    
  • If JSON parsing is enabled and "Only Message" is true:

    <parsed JSON object>
    

If no messages are received during the wait time, the node returns the input data unchanged.

The node does not output binary data.

Dependencies

  • Requires access to a Kafka cluster with brokers specified via credentials.
  • Supports optional integration with Confluent Schema Registry if enabled.
  • Requires valid Kafka credentials including brokers, client ID, and optionally username/password for SASL authentication.
  • The node uses the kafkajs library internally to connect and consume messages.

Troubleshooting

  • Authentication errors: If SASL authentication is enabled but username or password is missing, the node will throw an error. Ensure credentials include both username and password.
  • Timeouts: If the wait time is too short, the node may not receive any messages. Increase the "Wait Time" property to allow more time for message consumption.
  • JSON parsing failures: If "JSON Parse Message" is enabled but messages are not valid JSON, parsing will fail silently and the raw string will be returned.
  • Offset commit issues: Misconfiguration of auto commit threshold or interval can cause offsets not to be committed properly, leading to duplicate message consumption.
  • Schema Registry usage: If "Use Schema Registry" is enabled but the URL is incorrect or unreachable, the node may fail to decode messages properly.
  • Topic subscription errors: If the topic does not exist and "Allow Topic Creation" is false, the node may fail to subscribe.

Links and References

Discussion