EdgeX 规则引擎教程

概览

源 (Source)SQL (业务逻辑处理)目标 (Sink)
  • 源(Source):流式数据的数据源,例如来自于 MQTT 服务器的数据。在 EdgeX 的场景下,数据源就是 EdgeX 消息总线(EdgeX message bus),可以是来自于 ZeroMQ 或者 MQTT 服务器;
  • SQL:SQL 是你流式数据处理指定业务逻辑的地方,eKuiper 提供了 SQL 语句可以对数据进行抽取、过滤和转换;
  • 目标(Sink):目标用于将分析结果发送到特定的目标。例如,将分析结果发送到另外的 MQTT 服务器,或者一个 HTTP Rest 地址;

EdgeX Foundry 规则引擎教程 - 图1

使用 eKuiper,一般需要完成以下三个步骤。

  • 创建流,就是你定义数据源的地方
  • 写规则
    • 为数据分析写 SQL
    • 指定一个保存分析结果的目标
  • 部署,并且运行规则

该教程描述如何使用 eKuiper 处理来自于 EdgeX 消息总线的数据。

eKuiper EdgeX 集成

在不同的微服务之间,EdgeX 使用消息总线进行数据交换。它包含了一个抽象的消息总线接口,并分别实现了 ZeroMQ 与 MQTT,在不同的微服务之间信息交互的支持。eKuiper 和 EdgeX 的集成工作包含了以下三部分,

  1. CREATE STREAM demo (temperature bigint) WITH (FORMAT="JSON"...)

EdgeX Foundry 规则引擎教程 - 图2

迁移到 EdgeX V2

eKuiper v1.2.1 之后的版本将仅支持 EdgeX v2 ( Ireland 及之后的版本 ),并引入以下突破性变化。

Core contract Serviceedgex.yamlserviceServerDeviceDeviceName

运行 EdgeX Docker 实例

打开 EdgeX compose 项目,并且下载 Ireland 版本的 Docker compose file,然后启动所有的 EdgeX 容器。

  1. # docker-compose -f ./docker-compose-no-secty.yml up -d --build
docker ps
  1. $ docker ps
  2. CONTAINER ID IMAGE COMMAND CREATED
  3. STATUS PORTS NAMES
  4. c7cb2c07dc4f nexus3.edgexfoundry.org:10004/device-virtual:latest "/device-virtual --c…" 13 minutes ago Up 13 minutes 127.0.0.1:59900->59900/tcp edgex-device-virtual
  5. d7089087c301 nexus3.edgexfoundry.org:10004/device-rest:latest "/device-rest --cp=c…" 13 minutes ago Up 13 minutes 127.0.0.1:59986->59986/tcp edgex-device-rest
  6. 32cd339157e2 nexus3.edgexfoundry.org:10004/app-service-configurable:latest "/app-service-config…" 13 minutes ago Up 13 minutes 48095/tcp, 127.0.0.1:59701->59701/tcp edgex-app-rules-engine
  7. 62c2174d4b45 nexus3.edgexfoundry.org:10004/sys-mgmt-agent:latest "/sys-mgmt-agent -cp…" 13 minutes ago Up 13 minutes 127.0.0.1:58890->58890/tcp edgex-sys-mgmt-agent
  8. 5b9f9cfb4307 nexus3.edgexfoundry.org:10004/core-data:latest "/core-data -cp=cons…" 13 minutes ago Up 13 minutes 127.0.0.1:5563->5563/tcp, 127.0.0.1:59880->59880/tcp edgex-core-data
  9. b455b06e2e7c nexus3.edgexfoundry.org:10004/core-command:latest "/core-command -cp=c…" 13 minutes ago Up 13 minutes 127.0.0.1:59882->59882/tcp edgex-core-command
  10. 6de994ce09d6 nexus3.edgexfoundry.org:10004/core-metadata:latest "/core-metadata -cp=…" 13 minutes ago Up 13 minutes 127.0.0.1:59881->59881/tcp edgex-core-metadata
  11. 1b62bf57dd34 nexus3.edgexfoundry.org:10004/support-notifications:latest "/support-notificati…" 13 minutes ago Up 13 minutes 127.0.0.1:59860->59860/tcp edgex-support-notifications
  12. 38776815a286 nexus3.edgexfoundry.org:10004/support-scheduler:latest "/support-scheduler …" 13 minutes ago Up 13 minutes 127.0.0.1:59861->59861/tcp edgex-support-scheduler
  13. 5176ddff9f08 emqx/kuiper:1.2.1-alpine "/usr/bin/docker-ent…" 13 minutes ago Up 13 minutes 9081/tcp, 20498/tcp, 127.0.0.1:59720->59720/tcp edgex-kuiper
  14. c78419bc5096 consul:1.9.5 "docker-entrypoint.s…" 13 minutes ago Up 13 minutes 8300-8302/tcp, 8301-8302/udp, 8600/tcp, 8600/udp, 127.0.0.1:8500->8500/tcp edgex-core-consul
  15. d4b236a7b561 redis:6.2.4-alpine "docker-entrypoint.s…" 13 minutes ago Up 13 minutes 127.0.0.1:6379->6379/tcp edgex-redis

连接重用

eKuiper 1.4.0EdgeX Jakarta
  1. environment:
  2. CONNECTION__EDGEX__REDISMSGBUS__PORT: 6379
  3. CONNECTION__EDGEX__REDISMSGBUS__PROTOCOL: redis
  4. CONNECTION__EDGEX__REDISMSGBUS__SERVER: edgex-redis
  5. CONNECTION__EDGEX__REDISMSGBUS__TYPE: redis
  6. EDGEX__DEFAULT__CONNECTIONSELECTOR: edgex.redisMsgBus
  1. environment:
  2. CONNECTION__EDGEX__MQTTMSGBUS__PORT: 1883
  3. CONNECTION__EDGEX__MQTTMSGBUS__PROTOCOL: tcp
  4. CONNECTION__EDGEX__MQTTMSGBUS__SERVER: edgex-mqtt
  5. CONNECTION__EDGEX__MQTTMSGBUS__TYPE: mqtt
  6. CONNECTION__EDGEX__MQTTMSGBUS__OPTIONAL__USERNAME: username
  7. CONNECTION__EDGEX__MQTTMSGBUS__OPTIONAL__PASSWORD: password
  8. EDGEX__DEFAULT__CONNECTIONSELECTOR: edgex.mqttMsgBus

做完这些修改后,请参考这篇文档了解如何使用连接重用功能

使用 Redis 作为 KV 存储

1.4.0docker-composerulesengineenvironment1.4.0
  1. environment:
  2. KUIPER__STORE__TYPE: redis
  3. KUIPER__STORE__REDIS__HOST: edgex-redis
  4. KUIPER__STORE__REDIS__PORT: 6379
  5. KUIPER__STORE__REDIS__PASSWORD: ""

注意: 这个功能仅适用于 redis 工作在非安全模式时

原生 (native) 方式运行

zeromqzeromqzeromqEdgeXmake pkg_with_edgex

创建流

该步骤是创建一个可以从 EdgeX 消息总线进行数据消费的流。有两种方法来支持管理流,你可以选择喜欢的方式。

方式1: 使用 Rest API

597209081
$eKuiper_server
  1. curl -X POST \
  2. http://$eKuiper_server:59720/streams \
  3. -H 'Content-Type: application/json' \
  4. -d '{
  5. "sql": "create stream demo() WITH (FORMAT=\"JSON\", TYPE=\"edgex\")"
  6. }'

关于其它 API,请参考该文档.

方式2: 使用 eKuiper 命令行

使用以下命令,进入运行中的 eKuiper docker 实例。

  1. docker exec -it kuiper /bin/sh
demo
  1. bin/kuiper create stream demo'() WITH (FORMAT="JSON", TYPE="edgex")'

其它命令行,请参考该文档。


CREATE STREAMetc/sources/edgex.yamlcat etc/sources/edgex.yaml
  1. #Global Edgex configurations
  2. default:
  3. protocol: tcp
  4. server: localhost
  5. port: 5566
  6. topic: events
  7. .....

更多关于配置文件的信息,请参考该文档.

创建规则

让我们创建一条规则,将分析结果发送至 MQTT 服务器,关于 MQTT 目标的相关配置,请参考这个链接。与创建流的过程类似,你可以选择使用 REST 或者命令行来管理规则。

events
broker.emqx.ioresult

选项1: 使用 Rest API

  1. curl -X POST \
  2. http://$eKuiper_server:9081/rules \
  3. -H 'Content-Type: application/json' \
  4. -d '{
  5. "id": "rule1",
  6. "sql": "SELECT * FROM demo",
  7. "actions": [
  8. {
  9. "mqtt": {
  10. "server": "tcp://broker.emqx.io:1883",
  11. "topic": "result",
  12. "clientId": "demo_001"
  13. }
  14. },
  15. {
  16. "log":{}
  17. }
  18. ]
  19. }

选项2: 使用 eKuiper 命令行

rule.txt
  1. {
  2. "sql": "SELECT * from demo",
  3. "actions": [
  4. {
  5. "mqtt": {
  6. "server": "tcp://broker.emqx.io:1883",
  7. "topic": "result",
  8. "clientId": "demo_001"
  9. }
  10. },
  11. {
  12. "log":{}
  13. }
  14. ]
  15. }

在运行的容器中,执行以下命令。

  1. # bin/kuiper create rule rule1 -f rule.txt
  2. Connecting to 127.0.0.1:20498...
  3. Creating a new rule from file rule.txt.
  4. Rule rule1 was created successfully, please use 'cli getstatus rule rule1' command to get rule status.

log/stream.log
  1. time="2021-07-08 01:03:08" level=info msg="Serving kuiper (version - 1.2.1) on port 20498, and restful api on http://0.0.0.0:59720. \n" file="server/server.go:144"
  2. Serving kuiper (version - 1.2.1) on port 20498, and restful api on http://0.0.0.0:59720.
  3. time="2021-07-08 01:08:14" level=info msg="Successfully subscribed to edgex messagebus topic rules-events." file="extensions/edgex_source.go:111" rule=rule1
  4. time="2021-07-08 01:08:14" level=info msg="The connection to server tcp://broker.emqx.io:1883 was established successfully" file="sinks/mqtt_sink.go:182" rule=rule1
  5. time="2021-07-08 01:08:20" level=info msg="sink result for rule rule1: [{\"Float32\":-2.4369560555943686e+38}]" file="sinks/log_sink.go:16" rule=rule1
  6. time="2021-07-08 01:08:20" level=info msg="sink result for rule rule1: [{\"Float64\":-1.488582e+308}]" file="sinks/log_sink.go:16" rule=rule1
  7. time="2021-07-08 01:08:20" level=info msg="sink result for rule rule1: [{\"Uint64\":9544048735510870974}]" file="sinks/log_sink.go:16" rule=rule1
  8. time="2021-07-08 01:08:20" level=info msg="sink result for rule rule1: [{\"Uint16\":33714}]" file="sinks/log_sink.go:16" rule=rule1
  9. time="2021-07-08 01:08:20" level=info msg="sink result for rule rule1: [{\"Uint8\":57}]" file="sinks/log_sink.go:16" rule=rule1
  10. time="2021-07-08 01:08:20" level=info msg="sink result for rule rule1: [{\"Uint32\":3860684797}]" file="sinks/log_sink.go:16" rule=rule1
  11. ...

监控分析结果

tcp://broker.emqx.io:1883mosquitto_sub
  1. # mosquitto_sub -h broker.emqx.io -t result
  2. [{"Bool":false}]
  3. [{"Int64":228212448717749920}]
  4. [{"Int8":-70}]
  5. [{"Int16":16748}]
  6. [{"Int32":728167766}]
  7. [{"Uint16":32311}]
  8. [{"Uint8":133}]
  9. [{"Uint64":16707883778643919729}]
  10. [{"Uint32":1453300043}]
  11. [{"Bool":false}]
  12. [{"Float32":1.3364580409833176e+37}]
  13. [{"Float64":8.638344e+306}]
  14. [{"Int64":-2517790659681968229}]
  15. [{"Int16":-31683}]
  16. [{"Int8":96}]
  17. [{"Int32":-1245869667}]
  18. ...

你也可以敲入以下的命令来查看规则执行的状态。相关的查看规则状态的 REST API 也有提供,请检查相关文档.

  1. # bin/kuiper getstatus rule rule1
  2. Connecting to 127.0.0.1:20498...
  3. {
  4. "source_demo_0_records_in_total": 29,
  5. "source_demo_0_records_out_total": 29,
  6. "source_demo_0_exceptions_total": 0,
  7. "source_demo_0_process_latency_ms": 0,
  8. "source_demo_0_buffer_length": 0,
  9. "source_demo_0_last_invocation": "2020-04-17T10:30:09.294337",
  10. "op_filter_0_records_in_total": 29,
  11. "op_filter_0_records_out_total": 21,
  12. "op_filter_0_exceptions_total": 0,
  13. "op_filter_0_process_latency_ms": 0,
  14. "op_filter_0_buffer_length": 0,
  15. "op_filter_0_last_invocation": "2020-04-17T10:30:09.294362",
  16. "op_project_0_records_in_total": 21,
  17. "op_project_0_records_out_total": 21,
  18. "op_project_0_exceptions_total": 0,
  19. "op_project_0_process_latency_ms": 0,
  20. "op_project_0_buffer_length": 0,
  21. "op_project_0_last_invocation": "2020-04-17T10:30:09.294382",
  22. "sink_mqtt_0_0_records_in_total": 21,
  23. "sink_mqtt_0_0_records_out_total": 21,
  24. "sink_mqtt_0_0_exceptions_total": 0,
  25. "sink_mqtt_0_0_process_latency_ms": 0,
  26. "sink_mqtt_0_0_buffer_length": 1,
  27. "sink_mqtt_0_0_last_invocation": "2020-04-17T10:30:09.294423"

总结

在本教程中,我们介绍了使用 EdgeX eKuiper 规则引擎的非常简单的例子,如果使用过程中发现任何问题,请到 EdgeX,或者 eKuiper Github 中报问题。

更多练习

目前的规则没有过滤发送给 eKuiper 的任何数据,那么如何过滤数据呢?请使用删除规则,然后试着更改一下 SQL 语句,完成更改后,重新部署规则。这时候如果监听 MQTT 服务的结果主题,检查一下相关的规则是否起作用?

扩展阅读