Overview
This node is a Kafka event type trigger designed to consume messages from a specified Kafka topic. It listens for messages on a Kafka queue and filters them by a specific event type, allowing workflows to react only to relevant events. This is particularly useful in event-driven architectures where different services communicate via Kafka topics.
Common scenarios include:
- Triggering workflows when a particular event (e.g., "CustomerOnboardingApprovedEvent") occurs.
- Processing messages from Kafka topics in real-time with options for message ordering and offset committing.
- Filtering Kafka messages by event type to reduce unnecessary processing.
Practical example:
A workflow could use this node to listen to a "user-events" topic and trigger only when the event type is "UserRegistered", enabling automated onboarding processes or notifications.
Properties
| Name | Meaning |
|---|---|
| Topic | Name of the Kafka topic (queue) to consume messages from. |
| Group ID | Unique identifier for the consumer group; must be unique per workflow to manage Kafka consumer state correctly. |
| Event Type | The specific event type string to filter incoming messages by (case-insensitive). Only messages matching this event type will trigger the workflow. |
| Options | Collection of optional settings: |
| - Allow Topic Creation | Whether to allow sending messages to a topic that does not yet exist. |
| - Auto Commit Threshold | Number of messages after which the consumer commits offsets automatically. |
| - Auto Commit Interval | Time interval in milliseconds after which the consumer commits offsets automatically. |
| - Heartbeat Interval | Interval in milliseconds for heartbeats to keep the consumer session active; must be less than Session Timeout. |
| - Max Number of Requests | Maximum number of unacknowledged requests allowed on a single connection; set to null if zero. |
| - Read Messages From Beginning | Whether to read messages from the beginning of the topic or only new messages. |
| - Parallel Processing | Whether to process messages in parallel (true) or sequentially to maintain order (false). |
| - Only Message | Whether to return only the message content or include additional metadata. |
| - Return Headers | Whether to include Kafka message headers in the output. |
| - Session Timeout | Time in milliseconds to wait for a response before timing out the consumer session. |
Output
The node outputs JSON data representing the consumed Kafka messages filtered by the specified event type.
- If Only Message is true (default), the output contains only the parsed message content under the
jsonfield. - If Only Message is false, the output includes an object with:
message: the parsed message content,- optionally
headers: an object containing Kafka message headers as UTF-8 strings if Return Headers is enabled, topic: the Kafka topic name.
The output is emitted as an array with one element per message received.
The node does not output binary data.
Dependencies
- Requires a Kafka cluster accessible via brokers specified in credentials.
- Needs an API key credential or username/password for authentication if SASL is enabled.
- Uses the
kafkajslibrary internally to connect and consume messages. - Requires proper configuration of Kafka credentials in n8n, including brokers, client ID, SSL, and authentication details.
Troubleshooting
- Authentication errors: Ensure username and password are provided if authentication is enabled; missing credentials will cause errors.
- Connection issues: Verify broker addresses and network connectivity to Kafka cluster.
- Message filtering: If no messages appear, check that the event type matches exactly (case-insensitive) the
typeproperty inside the Kafka message JSON payload. - Offset committing: Misconfiguration of auto commit threshold or interval may lead to duplicate or missed messages.
- Session timeout: If heartbeats are not sent frequently enough (heartbeat interval >= session timeout), the consumer session may expire causing disconnections.
- Parallel processing: Disabling parallel processing ensures message order but may slow down throughput; enable it for higher performance if order is not critical.