Kafka Snappy Trigger icon

Kafka Snappy Trigger

Consume messages from a Kafka topic

Overview

This node is a Kafka consumer trigger that listens to messages from a specified Kafka topic. It connects to a Kafka cluster, subscribes to a topic, and emits incoming messages as workflow triggers in n8n. The node supports advanced options such as using the Confluent Schema Registry for message decoding, JSON parsing of messages, parallel or sequential processing, and returning message headers.

Common scenarios where this node is beneficial include:

  • Real-time data processing pipelines where events are published to Kafka topics.
  • Integrating Kafka-based messaging systems with other services via n8n workflows.
  • Consuming Avro or other schema-encoded messages using a schema registry.
  • Processing streams of messages with control over commit intervals and session timeouts.

Practical example:
A user wants to trigger an n8n workflow whenever a new order event is published to a Kafka topic named "orders". They configure this node to consume from the "orders" topic, parse the JSON message payload, and pass the order details downstream for further processing like sending notifications or updating databases.

Properties

Name Meaning
Topic Name of the Kafka topic to consume messages from.
Group ID Identifier for the Kafka consumer group. Multiple consumers with the same group ID share the load of consuming messages.
Use Schema Registry Whether to decode messages using the Confluent Schema Registry.
Schema Registry URL URL of the Confluent Schema Registry service (required if "Use Schema Registry" is enabled).
Options Collection of additional configuration options:
- Allow Topic Creation Whether to allow sending messages to a topic that does not yet exist.
- Auto Commit Threshold Number of messages after which the consumer commits offsets automatically.
- Auto Commit Interval Time interval in milliseconds after which the consumer commits offsets automatically.
- Heartbeat Interval Interval in milliseconds for sending heartbeats to keep the consumer session active. Must be less than Session Timeout.
- Max Number of Requests Maximum number of unacknowledged requests allowed on a single connection.
- Read Messages From Beginning Whether to start reading messages from the beginning of the topic or only new messages.
- JSON Parse Message Whether to attempt parsing the message value as JSON.
- Parallel Processing Whether to process messages in parallel (true) or sequentially to maintain order (false). Only available for node version >1.
- Only Message When JSON parsing is enabled, whether to return only the parsed message content instead of the full object with metadata.
- Return Headers Whether to include Kafka message headers in the output.
- Session Timeout Time in milliseconds to wait for a response before considering the session timed out.

Output

The node outputs an array of JSON objects representing consumed Kafka messages. Each object contains:

  • message: The message payload, either as a string or parsed JSON object depending on settings.
  • topic: The Kafka topic name from which the message was received.
  • headers (optional): An object containing message headers as key-value pairs if "Return Headers" is enabled.

If "Only Message" is enabled along with JSON parsing, the output will be just the parsed message content without wrapping metadata.

The node does not output binary data.

Dependencies

  • Requires access to a Kafka cluster with brokers specified in credentials.
  • Optionally requires access to a Confluent Schema Registry service if schema decoding is enabled.
  • Needs appropriate authentication credentials for Kafka (e.g., username/password) if SASL authentication is used.
  • Uses the kafkajs library and kafkajs-snappy codec for Kafka communication and compression support.
  • Requires n8n credentials configured with Kafka broker addresses, client ID, SSL, and authentication details.

Troubleshooting

  • Authentication errors: If username or password is missing when authentication is enabled, the node throws an error. Ensure credentials are complete.
  • Schema decoding failures: If schema registry URL is incorrect or unavailable, decoding messages will fail. Verify the URL and network connectivity.
  • Message parsing errors: Enabling JSON parsing may cause errors if messages are not valid JSON. Consider disabling JSON parsing or handling malformed messages.
  • Session timeout issues: Heartbeat interval must be set lower than session timeout; otherwise, the consumer session may expire unexpectedly.
  • Topic subscription problems: If the topic does not exist and "Allow Topic Creation" is false, the consumer may fail to subscribe. Enable topic creation or ensure the topic exists.
  • Parallel processing ordering: Disabling parallel processing ensures message order but may reduce throughput. Choose based on use case.

Links and References

Discussion