Overview
The Kafka Wait Event node listens to a specified Kafka topic and waits for messages that match optional filtering criteria, with a configurable timeout. It is designed to pause workflow execution until a relevant Kafka event arrives or the timeout expires.
This node is useful in scenarios where workflows depend on asynchronous events published to Kafka topics, such as:
- Waiting for user activity or system events before proceeding.
- Synchronizing processes based on external Kafka messages.
- Filtering specific event types or keys from Kafka streams.
For example, you can configure it to wait for a message with a particular key or a JSON field matching an expected value, enabling event-driven automation triggered by Kafka messages.
Properties
| Name | Meaning |
|---|---|
| Topic Name | The Kafka topic to listen to. Supports expressions for dynamic topic names. |
| Group ID | Consumer group ID for this instance. Supports expressions for dynamic grouping. |
| Timeout (Seconds) | Maximum time to wait for a matching message before timing out. Supports expressions. |
| Read From Beginning | Whether to read messages from the beginning of the topic (true/false). |
| Filter Options | Collection of filters to narrow down messages: |
| - Message Key Filter | Exact match filter on the message key. Supports expressions. |
| - Message Value Filter | JSONPath expression to filter message content. Supports expressions. |
| - Expected Value | Expected value for the JSONPath filter to match. Supports expressions. |
| Advanced Options | Additional consumer configuration options: |
| - Heartbeat Interval (Ms) | Frequency of heartbeat signals sent to Kafka (default 3000 ms). |
| - Session Timeout (Ms) | Consumer session timeout duration (default 30000 ms). |
| - Max Wait Time (Ms) | Maximum wait time for fetch requests (default 5000 ms). |
Output
The node outputs a single item containing the Kafka message details when a matching message is received within the timeout period. The output JSON structure includes:
topic: The Kafka topic name.partition: Partition number of the message.offset: Offset of the message in the partition.key: The message key as a string (if present).value: The parsed message content; either JSON object if parseable or raw string.timestamp: Timestamp of the message.headers: Any headers included with the Kafka message.
If no matching message arrives within the timeout, the node throws an error unless configured to continue on failure.
The node does not output binary data.
Dependencies
- Requires a Kafka broker connection configured via credentials including brokers list, client ID, and optionally authentication details.
- Uses the
kafkajslibrary for Kafka consumer functionality. - Uses
jsonpathlibrary to evaluate JSONPath expressions for message filtering. - Requires proper Kafka topic permissions and network access to the Kafka cluster.
Troubleshooting
- Timeout waiting for Kafka message: Occurs if no message matching the filters arrives within the specified timeout. Increase timeout or verify filters and topic activity.
- Topic name or Group ID empty after evaluation: Ensure expressions resolve to non-empty strings.
- Failed to connect to Kafka after 3 attempts: Check Kafka broker addresses, network connectivity, and credentials.
- Error processing Kafka message: Could be due to invalid JSON in message value or issues evaluating JSONPath filters. Verify message format and filter correctness.
- If the node fails but "Continue On Fail" is enabled, errors are returned in the output JSON under
error.