规则引擎概述
规则引擎是对于处理复杂的事件具有灵活配置和高度定制化的特点。
您可以使用规则引擎的Filter、Enrichment和Transform节点通过设备和相关资产发出输入消息。
您可以使用规则引擎的Action、Externala节点触发各种操作与通信。
概念
规则引擎消息
规则引擎消息可以被被序列化并有着规定的数据结构同时可以表示系统中的各种消息。
例如:
设备遥测、属性更新或RPC调用;
实体生命周期事件: created、updated、deleted、assigned、unassigned、属性更新;
设备状态事件: connected, disconnected, active, inactive, etc;
其他事件。
规则引擎消息包含以下信息:
消息ID:基于时间的通用唯一标识符;
消息发起者:Device,Asset或其他Entity标识符;
消息类型:遥测或不活动的事件等;
消息负载:消息payload的JSON字符串;
元数据:键值对的列表以及与消息有关的其他数据.
规则节点
规则节点是规则引擎的基本组件每次处理单个输入消息并生成一个或多个输出消息。
规则节点是规则引擎的主要逻辑单元。
规则节点可以是Filter、Enrichment、Transform输入消息或者是执行Action与External节点对外部系统进行通信。
规则节点关系
规则节点之间存在关联性每个节点都有对应关系类型,用于标识关系的逻辑标签。
当规则节点生成输出消息时,它总是将消息路由到下一个指定的节点并通过关系类型进行关联。
表示成功与否的规则节点关系是Success和Failure。
表示逻辑运算的规则节点可以是True或False。
一些特定的规则节点可能使用完全不同的关系类型例如:“Post Telemetry”、“Attributes Updated”、“Entity Created”等。
规则链
规则链是规则节点及其关系的逻辑组;例如:下面的规则链将:
将所有遥测消息保存到数据库中;
如果消息中的温度字段高于50度,则发出“高温警报”;
如果消息中的温度字段低于-40度,则发出“低温警报”;
如果在脚本中发生逻辑或语法错误时,则无法执行温度脚本检查控制台记录。
租户管理员可以定义一个Root Rule Chain还可以定义多个其他规则链。根规则链处理所有输入的消息,并将其转发到其他规则链以进行其他处理。
例如:
如果消息中的温度字段高于50度,则发出“高温警报”;
如果消息中的温度字段小于50度,则清除“高温警报”
将有关“已创建”和“已清除”警报的事件转发到外部规则链,该规则链处理向相应用户的通知。
消息处理结果
有三种消息处理结果:成功、失败和超时。
当消息被规则引擎中所有节点处理成功,那么该消息将被标注为”Success”。
当消息被规则引擎中任一节点处理失败,那么此消息将被标记”Failure”。
当处理超过配置的阈值时将消息标记为“Timeout”。
可能存在的情况参见下图:
如果”Transformation”脚本失败则该消息不会标记为”Failed”,因为存在与”Failure”关系连接到”Save to DB”节点。
如果”Transformation”脚本成功则将通过REST API调用将其推送到”External System”。
如果外部系统阻塞则REST API调用可能会“等待”一段时间。
假设消息处理超时为20秒忽略Transformation脚本的执行的小于1毫秒时间。
如果”External System”在20秒内回复则消息将被成功处理。
如果”Save to DB”调用成功则消息将被成功处理。
如果外部系统在20秒内未答复将消息标记为”timed-out”。
如果”Save to DB”调用失败将该消息将标记为失败。
规则引擎队列
规则引擎在启动时订阅队列并轮询消息,总是有”Main”主题作为消息的入口。
您可以在Thingsboard.yml或环境变量配置多个队列,配置完成后您可以用”Checkpoint”节点将消息放置到另一个主题,并自动确认当前主题中的对应消息。
队列的定义由以下参数组成:
name - 用于统计和记录;
topic - 队列实现用于生成和使用消息;
poll-interval - 如果没有新消息到达则两次轮询之间的持续时间(以毫秒为单位);
partitions - 队列关联的分区数用于扩展并行处理的消息数;
pack-processing-timeout - 处理消费者返回的特定消息包时间间隔(以毫秒为单位);
submit-strategy - 定义向规则引擎提交消息的逻辑和顺序,请参阅下面的段落。
processing-strategy - 定义消息确认的逻辑,请参阅下面的段落。
提交策略
规则引擎服务不断轮询主题一旦有返回消息它就会创建TbMsgPackProcessingContext对象。
有5种策略控制如何提交TbMsgPackProcessingContex消息到规则链:
BURST - 所有消息按到达的先后顺序提交到规则链。
BATCH - 使用”queue.rule-engine.queues[queue index].batch-size” 配置参数将消息分组切片在确认之前的切片之前不会提交新切片。
SEQUENTIAL_BY_ORIGINATOR - 消息在特定实体(消息发起者)内按顺序提交消息;例如:在确认设备A的上一个消息之前不会提交设备A的新消息。
SEQUENTIAL_BY_TENANT - 消息在租户(消息发起者的所有者)内按顺序提交消息;例如:在确认租户A的上一个消息之前不会提交租户A的新消息。
SEQUENTIAL - 这是一个处理相当慢的消息处理,消息按顺序提交在确认上一个消息之前不会提交新消息。
处理策略
有5种策略控制消息失败或超时的重新处理方式:
SKIP_ALL_FAILURES - 如果忽略所有故障和超时会导致消息丢失;例如:如果DB关闭消息将不会保存可以标记为”acknowledged(已确认)”并从队列中删除,此策略是为了兼容以前的版本和开发及演示。
RETRY_ALL - 重试处理所有消息;如果100条消息中有1条将失败策略将重新处理(重新提交到规则引擎)100条消息。
RETRY_FAILED - 重试处理所有失败的消息;如果每100条消息中有1条失败策略将仅重新处理(重新提交到规则引擎)1条消息同时超时的消息将不会重新处理。
RETRY_TIMED_OUT - 重试处理所有超时消息;如果每100条消息中有1条超时策略将仅重新处理(重新提交到规则引擎)1条消息同时失败的消息将不会被重新处理。
RETRY_FAILED_AND_TIMED_OUT - 重试处理所有失败和超时的消息。
所有”RETRY*“策略都支持配置参数:
retries - 重试次数,0表示无限。
failure-percentage - 如果失败或超时少于消息的X百分比则跳过重试;
pause-between-retries - 在重试之前在线程中等待时间(以秒为单位);
默认队列
基于提交和处理策略的不同队列配置了三个默认:Main、HighPriority和SequentialByOriginator。
规则引擎处理主题中的消息并可以选择使用”Checkpoint”规则节点将其放入其他主题;默认情况下主题只忽略失败的消息这样做的目的是为了让以前的版本向后兼容,但是您需要自行承担重新配置带来的风险。
请注意:如果由于规则节点脚本中的某些故障而有未处理的消息则可能会阻止处理下一条消息。
我们已经设计了特定的仪表板监视规则引擎的处理和故障。
HighPriority主题用于传递警报或处理步骤在发生故障的情况下HighPriority主题中的消息会不断进行重新处理直到消息处理成功为止。
如果SMTP服务器或外部系统中断规则引擎将重试消息发送直到处理完成为止。
如果您想确保正确的消息处理顺序SequentialByOriginator主题很重要对来自同一实体的消息将按照到达队列的先后顺序进行处理,在确认相同实体ID的上一条消息之前规则引擎不会向规则链提交新消息。
预定义消息类型
预定义消息类型列表:
类型
显示名称
描述
元数据
payload
POST_ATTRIBUTES_REQUEST
属性发布
发布设备客户端属性 (参见属性API)
deviceName - 设备名称, deviceType - 设备类型
键/值
{
"currentState": "IDLE"
}
POST_TELEMETRY_REQUEST
遥测发布
发布设备遥测数据(参见遥测api)
deviceName - 设备名称, deviceType - 设备类型, ts - 时间戳 (毫秒)
键/值
{
"temperature": 22.7
}
TO_SERVER_RPC_REQUEST
RPC Request from Device
设备RPC请求(参见客户端rpc api)
deviceName - 设备名称, deviceType - 设备类型, requestId - RPC请求Id
包含方法和参数的json:
{
"method": "getTime",
"params": { "param1": "val1" }
}
RPC_CALL_FROM_SERVER_TO_DEVICE
服务端RPC响应
响应RPC请求(参见服务端rpc api)
requestUUID - sustem表示内部应答的请求id, expirationTime - 请求过期时间, oneway - 指定请求类型: true - 无响应, false - 有响应
包含方法和参数的json:
{
"method": "getGpioStatus",
"params": { "param1": "val1" }
}
ACTIVITY_EVENT
活动事件
表明设备处于活动状态的事件
deviceName - 设备名称, deviceType - 设备类型
包含设备活动信息的json:
{
"active": true,
"lastConnectTime": 1526979083267,
"lastActivityTime": 1526979083270,
"lastDisconnectTime": 1526978493963,
"lastInactivityAlarmTime": 1526978512339,
"inactivityTimeout": 10000
}
INACTIVITY_EVENT
不活动事件
表示设备处理非活动状态的事件
deviceName - 设备名称, deviceType - 设备类型
设备活动信息的json活动事件 payload
CONNECT_EVENT
连接事件
设备连接时的事件
deviceName - 设备名称, deviceType - 设备类型
设备活动信息的json活动事件 payload
DISCONNECT_EVENT
断开事件
设备断开连接产生的事件
deviceName - 设备名称, deviceType - 设备类型
设备活动信息的json活动事件 payload
ENTITY_CREATED
实体创建
实体创建产生的事件
userName - 实体创建的用户名, userId - 用户Id
实体详细信息的json:
{
"id": {
"entityType": "DEVICE",
"id": "efc4b9e0-5d0f-11e8-8559-37a7f8cdca74"
},
"createdTime": 1526918366334,
...
"name": "my-device",
"type": "temp-sensor"
}
ENTITY_UPDATED
实体更新
更新实体产生的事件
userName - 更新实体的用户名, userId - 用户Id
实体详细信息的json:参见实体创建 payload
ENTITY_DELETED
实体删除
删除实体产生的事件
userName - 删除实体的用户名, userId - 用户Id
实体详细信息的json:参见实体创建 payload
ENTITY_ASSIGNED
实体分配
实体分配给客户时生的事件
userName - 分配实体的用户名, userId - 用户Id, assignedCustomerName -分配的客户名, assignedCustomerId - 客户Id
实体详细信息的json:参见实体创建 payload
ENTITY_UNASSIGNED
取消实体分配
取消实体对客户分配时产生的事件
userName - 取消分配操作的用户名, userId - 用户Id, unassignedCustomerName - 取消配客户名称, unassignedCustomerId - 取消配客户Id
实体详细信息的json:参见实体创建 payload
ADDED_TO_ENTITY_GROUP
添加分组
将实体添加到实体分组时产生的事件。 仅用于ThingsBoard PE。
userName - 操作的用户名, userId - 用户Id, addedToEntityGroupName - 分组名称, addedToEntityGroupId - 分组Id
payload为空
REMOVED_FROM_ENTITY_GROUP
移除分组
移除分组。仅用于ThingsBoard PE。
userName - 操作的用户名, userId - 用户Id, removedFromEntityGroupName - 分组名称, removedFromEntityGroupId - 分组Id
payload为空
ATTRIBUTES_UPDATED
属性更新
实体属性更新时产生的事件
userName - 操作的用户名, userId - 用户Id, scope - 属性更新作用 ( SERVER_SCOPE或SHARED_SCOPE)
键/值json:
{
"softwareVersion": "1.2.3"
}
ATTRIBUTES_DELETED
属性删除
实体属性删除时产生的事件
userName - 操作的用户名, userId - 用户Id, scope - 属性删除作用 (SERVER_SCOPE或SHARED_SCOPE)
已删除的属性的keys列表:
{
"attributes": ["modelNumber", "serial"]
}
ALARM
警报事件
创建、更新或删除警报时产生的事件
消息发起者元数据中的所有字段 isNewAlarm - 创建了一个新的Alram,则为true isExistingAlarm - 已存在警报,则为true isClearedAlarm - 清除了警报,则为true
创建警报的json详细信息:
{
"tenantId": {
...
},
"type": "High Temperature Alarm",
"originator": {
...
},
"severity": "CRITICAL",
"status": "CLEARED_UNACK",
"startTs": 1526985698000,
"endTs": 1526985698000,
"ackTs": 0,
"clearTs": 1526985712000,
"details": {
"temperature": 70,
"ts": 1526985696000
},
"propagate": true,
"id": "33cd8999-5dac-11e8-bbab-ad47060c9431",
"createdTime": 1526985698000,
"name": "High Temperature Alarm"
}
REST_API_REQUEST
REST API请求到规则引擎
执行REST API调用时产生的事件
requestUUID - 请求id, expirationTime - 请求过期时间
json请求的playload
规则节点类型
根据其性质将所有可用规则节点分组:
Filter Nodes用于消息过滤和路由;
Enrichment Nodes用于更新传入消息的元数据;
Transformation Nodes用于更改传入的消息字段,例如Originator, Type, Payload, Metadata;
Action Nodes根据传入的消息执行各种动作;
External Nodes用于与外部系统进行交互.
配置
每一个规则节点具有特定的参数配置,例如:Filter节点可以通过自定义JS函数。External节点可以通过参数配置实现外部邮件服务器连接设置
可以通过在“规则链”编辑器中双击节点来打开“规则节点”配置窗口:
Javascript函数
一些规则节点具有特定的UI功能,允许用户测试JS函数。单击Test Filter Function后,您将看到JS编辑器,可使用该编辑器替换输入参数并验证函数的输出。
你可以定义:
Message Type 左上角.
Message payload 左侧中间.
Metadata 右上角.
JS script 实际脚本.
点击Test按钮将在右侧Output返回值
规则引擎统计
ThingsBoard已经为“规则引擎”统计信息准备了“默认”仪表板。
将为每个租户自动加载此仪表板。统计信息收集默认情况下处于启用状态,并通过配置属性进行控制。
您可能会在下面的仪表板上注意到有关处理错误及其原因的见解:
调试
启用调试后,只要相应的关系类型,用户就可以查看传入和传出消息的信息。请参阅下图,获取示例调试消息视图:
导入导出
您可以将规则链导出为JSON格式,并将其导入到相同或其他ThingsBoard实例。 为了导出规则链,您应该导航到Rule Chains页面,然后单击位于特定规则链卡上的导出按钮。
类似地,要导入规则链,您应该导航到Rules Chains页面,然后单击屏幕右下角的大“ +”按钮,然后单击导入按钮。
最后更新于
这有帮助吗?