Overview
This node acts as a trigger that consumes messages from a NATS JetStream stream using a specified consumer. It listens for new messages on the configured stream and consumer, then triggers workflows with the received message data. This is useful in event-driven architectures where workflows need to react to real-time events published on JetStream streams.
Common scenarios include:
- Processing order events in e-commerce systems.
- Reacting to sensor or IoT device data streams.
- Handling asynchronous notifications or logs from distributed systems.
For example, you could configure this node to listen to an "EVENTS" stream with a consumer named "order-processor" to automatically start workflows whenever new order events arrive.
Properties
| Name | Meaning |
|---|---|
| Stream Name | Name of the JetStream stream to consume from. The stream must already exist before consuming. |
| Consumer Name | Name of the JetStream consumer to use for pulling messages. The consumer must already exist. |
| Options | Collection of options controlling message fetching: |
| - Max Messages | Maximum number of messages to fetch per pull request (used if Max Bytes is not set). Higher values improve throughput but use more memory. |
| - Max Bytes | Maximum bytes to fetch per pull request. Takes priority over Max Messages if set. Set to 0 to disable. |
| - Expires (Seconds) | Timeout in seconds for each pull request before it expires. Default is 30 seconds. |
Output
The node outputs JSON objects representing individual JetStream messages. Each output item includes:
- The parsed message payload.
- Metadata fields such as:
seq: The sequence number of the message.stream: The name of the stream.consumer: The name of the consumer.redelivered: Boolean indicating if the message was redelivered.redeliveryCount: Number of times the message has been redelivered.
If the node receives binary data within messages, it would be included in the JSON output accordingly, but the current implementation focuses on JSON message content.
Dependencies
- Requires a connection to a NATS JetStream server.
- Needs credentials providing access to the NATS server (an API key or authentication token).
- Uses the official NATS client libraries bundled with the node.
- The target JetStream stream and consumer must be pre-created and properly configured on the NATS server.
Troubleshooting
- Stream Not Found: If the specified stream does not exist, the node throws an error prompting to create the stream first.
- Consumer Not Found: If the specified consumer does not exist on the stream, an error indicates to create the consumer first.
- Connection Issues: Errors related to connection loss, disconnection, or async errors are logged. Ensure network connectivity and correct credentials.
- Message Processing Errors: If processing a message fails, the node logs the error and negatively acknowledges the message to allow redelivery.
- Timeouts: Adjust the "Expires (Seconds)" option if pull requests time out too quickly or take too long.