Overview
This node acts as a trigger that consumes messages from NATS JetStream streams. It listens to a specified stream and consumer, pulling messages as they arrive and triggering workflows in n8n with the message data. This is useful for real-time event processing scenarios such as order processing, inventory updates, or any system where events are published to a JetStream stream and need to be handled asynchronously.
For example, you could use this node to listen to an "EVENTS" stream with a consumer named "order-processor" to automatically process new orders as they come in, updating databases or notifying other systems.
Properties
| Name | Meaning |
|---|---|
| Stream Name | The name of the existing JetStream stream to consume messages from (e.g., "EVENTS"). |
| Consumer Name | The name of the existing JetStream consumer to use for consuming messages (e.g., "order-processor"). |
| Options | Collection of options controlling message fetching behavior: |
| - Max Messages | Maximum number of messages to fetch per pull request (used if Max Bytes is not set). Default is 100. |
| - Max Bytes | Maximum bytes to fetch per pull request; takes priority over Max Messages. Set to 0 to disable. |
| - Expires | Timeout in seconds for each pull request. Default is 30 seconds. |
Output
The node outputs JSON objects representing individual JetStream messages. Each output item includes:
- The parsed message payload.
- Metadata fields such as:
seq: The sequence number of the message.stream: The stream name.consumer: The consumer name.redelivered: Boolean indicating if the message was redelivered.redeliveryCount: Number of times the message has been redelivered.
If the node receives binary data within messages, it would be included in the JSON output accordingly, but the current implementation focuses on JSON message content.
Dependencies
- Requires a connection to a NATS server with JetStream enabled.
- Needs credentials providing access to the NATS server (an API key or authentication token).
- The specified JetStream stream and consumer must already exist before using this node.
- Uses bundled NATS client libraries internally.
- No additional environment variables are explicitly required beyond the credential setup.
Troubleshooting
- Stream Not Found Error: If the specified stream does not exist, the node throws an error instructing to create the stream first.
- Consumer Not Found Error: If the specified consumer does not exist on the stream, the node throws an error instructing to create the consumer first.
- Connection Issues: Errors related to connection loss, disconnection, or async errors are logged. Ensure network connectivity and correct credentials.
- Message Processing Errors: If processing a message fails, the node logs the error and negatively acknowledges the message to allow redelivery.
- Timeouts: Adjust the "Expires" option if pull requests time out too quickly or take too long.