Overview
This node sends messages to a Kafka topic, supporting various configurations such as sending raw input data or custom messages, using message keys, headers, and optionally integrating with Confluent Schema Registry for schema validation and encoding. It supports compression options including none, GZIP, and Snappy.
Common scenarios include:
- Publishing event data from workflows to Kafka topics for downstream processing.
- Sending structured messages validated against schemas in a Schema Registry.
- Adding custom headers or keys to messages for routing or partitioning.
- Compressing messages to optimize network usage.
Practical examples:
- Sending JSON payloads received from previous nodes directly to a Kafka topic.
- Encoding messages with Avro schemas stored in a Schema Registry before publishing.
- Using message keys to ensure ordering or partition affinity in Kafka consumers.
- Adding custom headers for metadata like correlation IDs or timestamps.
Properties
| Name | Meaning |
|---|---|
| Topic | Name of the Kafka topic (queue) to publish the message to. |
| Send Input Data | Whether to send the incoming node data as JSON to Kafka (true), or use a custom message string (false). |
| Message | The custom message string to send when not sending input data. |
| JSON Parameters | Whether header parameters are provided as JSON (true) or via UI fields (false). |
| Use Schema Registry | Whether to encode messages using Confluent Schema Registry. |
| Schema Registry URL | URL of the Confluent Schema Registry service (required if using schema registry). |
| Use Key | Whether to include a message key. |
| Key | The message key string (required if using key). |
| Event Name | Namespace and name of the schema in the Schema Registry, e.g., "namespace.name" (required if using schema registry). |
| Headers | Custom message headers as key-value pairs (UI input, used if JSON Parameters is false). |
| Headers (JSON) | Custom message headers as a flat JSON object (used if JSON Parameters is true). |
| Options | Additional options: |
| - Acks | Whether the producer waits for acknowledgement from all replicas (boolean). |
| - Compression | Compression format for sending data: None, GZIP, or Snappy. |
| - Timeout | Time in milliseconds to await a response from Kafka. |
Output
The node outputs an array of objects representing the result of the batch send operation to Kafka. Each object typically contains metadata about the sent messages, such as topic, partition, offset, and success status.
If no messages were sent, it returns a single object indicating success.
The output is available in the json field of each item.
No binary data is produced by this node.
Dependencies
- Requires a Kafka cluster accessible via configured brokers.
- Requires valid credentials for Kafka connection, including optional SASL authentication.
- Optionally requires access to a Confluent Schema Registry service if schema validation/encoding is enabled.
- Uses the
kafkajslibrary for Kafka communication. - Uses
@kafkajs/confluent-schema-registryfor schema registry integration. - Uses
kafkajs-snappycodec if Snappy compression is selected.
Troubleshooting
- Authentication errors: If username or password is missing when authentication is enabled, the node throws an error. Ensure credentials are correctly set.
- Schema Registry errors: If schema registry URL or event name is incorrect or unreachable, encoding will fail with an error prompting to verify configuration.
- Invalid headers JSON: When using JSON parameters for headers, invalid JSON syntax causes an error. Validate JSON format before running.
- Timeouts: If Kafka does not respond within the specified timeout, the operation may fail. Adjust the timeout option as needed.
- Empty topic or message: Ensure the topic name is provided and message content is not empty when not sending input data.
- Compression issues: Selecting unsupported compression formats or misconfiguration can cause failures; use supported options only.