Overview
This node listens to AMQP 1.0 messages from a specified queue or topic. It is designed as a trigger node that continuously receives messages from an AMQP broker and emits them into the workflow for further processing.
Common scenarios where this node is beneficial include:
- Integrating with message brokers that support AMQP 1.0 protocol (e.g., Apache Qpid, Azure Service Bus).
- Reacting to real-time events or data streams published on queues or topics.
- Building event-driven workflows that process incoming messages asynchronously.
Practical examples:
- Listening to order messages on a queue named
order-queueto trigger order processing workflows. - Subscribing to a durable topic subscription for persistent event handling in distributed systems.
- Consuming telemetry data from IoT devices sent via AMQP topics.
Properties
| Name | Meaning |
|---|---|
| Queue / Topic | The name of the queue or topic to listen to for incoming AMQP messages. Example format: topic://sourcename.something. |
| Clientname | Client identifier used for durable/persistent topic subscriptions or queues. Leave empty for non-durable subscriptions. |
| Subscription | Name of the subscription for durable/persistent topic subscriptions or queues. Leave empty for non-durable subscriptions. |
| Options | Collection of additional options: |
| - Container ID | Identifier passed to the underlying AMQP client backend as container_id. |
| - Convert Body To String | Whether to convert JSON body content from a byte array to a string. Useful for services like Azure Service Bus. |
| - JSON Parse Body | Whether to parse the message body as JSON object after receiving it. |
| - Messages per Cicle | Number of messages to pull from the bus in each cycle (batch size). |
| - Only Body | Whether to return only the body property of the message instead of the full message object. |
| - Parallel Processing | Whether to process multiple messages in parallel or sequentially. |
| - Reconnect | Whether to automatically reconnect if the connection to the AMQP broker is lost. |
| - Reconnect Limit | Maximum number of automatic reconnect attempts before giving up. |
| - Sleep Time | Milliseconds to wait/sleep after each message pulling cycle. |
Output
The node outputs an array of JSON objects representing the received AMQP messages.
- By default, each output item contains the full message object including metadata and body.
- If the "Only Body" option is enabled, the output contains only the
bodyproperty of the message. - If "JSON Parse Body" is enabled, the body is parsed into a JavaScript object.
- If "Convert Body To String" is enabled, the body content (if a byte array) is converted to a string before parsing or output.
The node does not output binary data explicitly; all message bodies are handled as JSON or strings depending on configuration.
Dependencies
- Requires an AMQP 1.0 compatible message broker.
- Needs credentials for connecting to the AMQP broker, including hostname, port, username, password, and optionally TLS settings.
- Uses the
rhealibrary internally for AMQP protocol handling. - Supports TLS connections with certificate and key options if configured.
Troubleshooting
Queue or Topic required!
This error occurs if the "Queue / Topic" property is left empty. Ensure you specify a valid queue or topic name.Connection issues or no messages received:
Check network connectivity to the AMQP broker and verify credentials. Enable reconnect options to handle transient disconnections.Timeout when manually triggering:
When running the node manually, it waits up to 15 seconds for a message before aborting. In active workflows, it listens indefinitely.Message body parsing errors:
If "JSON Parse Body" is enabled but the body is not valid JSON, parsing will fail. Disable this option or ensure messages contain valid JSON.Durable subscription misconfiguration:
For durable topic subscriptions, both "Clientname" and "Subscription" must be set. Leaving either empty results in non-durable subscriptions.
Links and References
- AMQP 1.0 Protocol
- Rhea AMQP Client Library
- Azure Service Bus AMQP Documentation (relevant for body conversion options)