stream-respond-extended

Stream Respond node for n8n with Stream Model support

Package Information

Downloads: 0 weekly / 15 monthly
Latest Version: 0.2.4
Author: Matt Waung

Documentation

Attribution

This project is based on the work from
Original Author: fictionking
Original Repository: https://github.com/fictionking/n8n-nodes-stream-respond.git

This version adds:

  • attempt to add JSON to item

n8n-nodes-stream-respond

Stream Respond Node
这个 n8n 社区节点包提供了 Stream Respond 节点,允许你在 n8n 工作流中实现流式响应功能,将数据流式传输到 Stream Trigger。




功能特性

  • 多种 Chunk 类型:支持发送不同类型的流事件(完整流、单个项目、开始事件、结束事件、错误事件)
  • 完整流模式:一键发送完整的流序列(begin + 所有items + end)
  • 智能内容处理:自动检测Item Content是否为表达式,若是固定值则仅发送一次,避免重复发送
  • 灵活的内容配置:为每种流事件类型配置自定义内容
  • JSON 输出格式:支持发送结构化 JSON 数据(包括状态信息、元数据等)
  • 延迟控制:可设置发送每个流事件后的延迟时间
  • 版本兼容性检查:自动检查 n8n 版本是否支持流式功能

重要说明

n8n 的 sendChunk API 仅支持以下 chunk 类型:

  • begin - 流开始事件
  • item - 内容块(用于发送实际数据,包括状态信息)
  • end - 流结束事件
  • error - 错误事件

如需发送状态数据,请使用 item 类型配合 JSON 输出格式。

节点说明

Stream Respond 节点

功能:将数据流式传输到 Stream Trigger,支持多种流事件类型。

输入

  • Main 输入:包含要流式传输的数据项

参数

  • Output Format:选择输出格式
    • Text Only:发送纯文本字符串
    • JSON:发送结构化 JSON 数据(支持元数据)
  • Chunk Type:选择要发送的流事件类型
    • Complete (Begin + Items + End):发送完整流序列
    • Item:发送单个内容块(如文本 token、状态数据)
    • Begin:指示流的开始
    • End:指示流的结束
    • Error:向聊天发送错误事件
  • Begin Content:为开始事件定义字符串内容(仅在 Complete 或 Begin 模式下可见)
  • Item Content:为每个项目块定义字符串内容(仅在 Complete 或 Item 模式下可见)。如果内容是固定值(非表达式),将仅发送一次;如果是表达式(以=开头),则为每个输入数据项发送一次
  • End Content:为结束事件定义字符串内容(仅在 Complete 或 End 模式下可见)
  • Error Content:为错误事件定义字符串内容(仅在 Error 模式下可见)
  • JSON Metadata:附加的 JSON 元数据(仅在 JSON 输出格式下可见)。可使用 n8n 表达式如 ={{$json.field}}。对于 OpenWebUI,确保包含 output 或其他识别字段
  • Status Message Style:将消息格式化为状态更新样式(添加引用块和图标)。适用于工作流进度通知
  • Delay (ms):发送每个块后的等待时间

输出

  • Main 输出:原始输入数据(未修改)

核心工作原理

该节点通过 n8n 的 sendChunk API 实现流式数据传输,主要功能包括:

  1. 版本兼容性检查:确保当前 n8n 版本支持流式功能
  2. Chunk 构建:根据配置的事件类型和内容构建流数据
  3. 事件发送:使用 sendChunk API 发送流事件
  4. 延迟处理:支持在发送流事件之间添加延迟
  5. 错误处理:支持发送错误事件,并处理可能的执行错误

完整流模式(Complete)

在 Complete 模式下,节点会按照以下顺序发送流事件:

  1. 发送 begin 事件(带有配置的 Begin Content)
  2. 为每个输入数据项发送 item 事件(带有配置的 Item Content)
  3. 发送 end 事件(带有配置的 End Content)

安装

方法 1:直接安装(推荐)

在 n8n 中,转到 "Settings > Community Nodes",搜索并安装 n8n-nodes-stream-respond 包。

方法 2:手动安装

  1. 克隆此仓库
git clone https://github.com/yourusername/n8n-nodes-stream-respond.git
cd n8n-nodes-stream-respond
  1. 安装依赖
npm install
  1. 构建项目
npm run build
  1. 链接到 n8n
npm link
  1. 在 n8n 中启用开发模式
n8n start --dev

使用示例

示例 1:实现 AI 流式响应

  1. 添加一个 Chat Trigger 节点,启用 "Response Mode: Streaming"
  2. 添加一个 AI 节点(如 OpenAI Completions),配置生成文本
  3. 添加 Stream Respond 节点,设置:
    • Chunk Type:Complete (Begin + Items + End)
    • Begin Content:"Response started..."
    • Item Content:{{$json.text}}(假设 AI 节点输出包含 text 字段)
    • End Content:"Response completed!"
    • Delay:10(毫秒)
  4. 连接 AI 节点到 Stream Respond 节点的 Main 输入
  5. 执行工作流,观察流式响应效果

示例 2:发送状态数据(OpenWebUI 兼容)

  1. 添加一个 Webhook Trigger 节点,配置为 Streaming 模式

  2. 添加 Stream Respond 节点,设置:

    • Output Format:JSON
    • Chunk Type:Item
    • Item Content:留空
    • JSON Metadata:
    {
      "output": "Processing your request...",
      "type": "status",
      "status": "in_progress",
      "done": false
    }
    

    注意:OpenWebUI 会从以下字段提取内容:contenttextoutputmessagedeltadataresponseresult。确保至少包含其中一个字段。

  3. 执行工作流,OpenWebUI 将显示 "Processing your request..." 并可访问其他元数据

示例 3:发送状态更新到 OpenWebUI 状态栏

要求:使用修改后的 OpenWebUI pipeline(openwebui/n8n-stream-pipe.py v2.3.0+)

详细设置说明请参见:openwebui/SETUP_GUIDE.md

在工作流的不同阶段发送状态更新:

  1. 添加一个 Webhook Trigger 节点,配置为 Streaming 模式

  2. 在工作流开始时添加 Stream Respond 节点:

    • Output Format:JSON
    • Chunk Type:Item
    • Item Content:Starting workflow...
    • Status Message Style:✓ Enabled
    • JSON Metadata:{}
  3. 在工具调用后添加另一个 Stream Respond 节点:

    • Output Format:JSON
    • Chunk Type:Item
    • Item Content:=Called tools: {{ $json.toolNames.join(', ') }}
    • Status Message Style:✓ Enabled
    • JSON Metadata:{}
  4. 在工作流结束前添加最后一个 Stream Respond 节点:

    • Output Format:JSON
    • Chunk Type:Item
    • Item Content:Processing complete!
    • Status Message Style:✓ Enabled
    • JSON Metadata:{}

效果(修改 pipeline 后)

  • 状态消息出现在 OpenWebUI 顶部状态栏
  • 与 "Calling N8N Agent..." 和 "Thinking..." 样式相同
  • 不会出现在聊天内容中

效果(不修改 pipeline)

  • 状态消息会以引用块格式出现在聊天中
  • 带有 🔄 图标,与普通消息区分开来

示例 4:发送单个流事件

  1. 添加一个 Webhook Trigger 节点,配置为 Streaming 模式
  2. 添加一个 Code 节点,生成单个数据项
  3. 添加 Stream Respond 节点,设置:
    • Output Format:Text Only
    • Chunk Type:Item
    • Item Content:"This is a single stream item"
  4. 连接 Code 节点到 Stream Respond 节点
  5. 执行工作流,观察单个流事件的发送

配置字段说明

Chunk Type

选择要发送的流事件类型,决定了节点的行为:

  • Complete:发送完整的流序列(begin + items + end)
  • Item:为每个输入数据项发送一个 item 事件
  • Begin:发送一个 begin 事件,指示流的开始
  • End:发送一个 end 事件,指示流的结束
  • Error:发送一个 error 事件,用于报告错误

Content 字段

根据选择的 Chunk Type,节点会显示对应的 Content 字段:

  • Begin Content:begin 事件的内容
  • Item Content:item 事件的内容,可使用 n8n 表达式(如 {{$json.field}})。智能处理:如果是固定值将仅发送一次,表达式则为每个项目发送一次
  • End Content:end 事件的内容
  • Error Content:error 事件的内容

Delay

设置发送每个流事件后的延迟时间(毫秒),用于控制流的速度。

开发

如果你想为此项目做贡献或修改代码:

  1. 启动开发模式
npm run dev
  1. 代码检查
npm run lint
  1. 自动修复代码问题
npm run lint:fix
  1. 构建项目
npm run build

技术栈

  • TypeScript
  • n8n-workflow

兼容性

该节点需要 n8n 版本支持 sendChunk API。如果你的 n8n 版本不支持此功能,节点会显示错误提示。

License

MIT

Discussion