跳到主要内容
跳到主要内容

将 EMQX 与 ClickHouse 集成

连接 EMQX

EMQX 是一个开源的 MQTT 代理,具有高性能的实时消息处理引擎,支持大规模物联网设备的事件流转。作为最具可扩展性的 MQTT 代理,EMQX 可以帮助您以任何规模连接任何设备。随时随地移动和处理您的物联网数据。

EMQX Cloud 是一个托管于 EMQ 的物联网领域的 MQTT 消息中间件产品。作为全球首个完全托管的 MQTT 5.0 云消息服务,EMQX Cloud 提供了一站式运维共址和独特的隔离环境以支持 MQTT 消息服务。在万物互联的时代,EMQX Cloud 可以帮助您快速构建物联网领域的行业应用,轻松收集、传输、计算和持久化 IoT 数据。

凭借云服务提供商提供的基础设施,EMQX Cloud 服务于世界各地数十个国家和地区,为 5G 和互联网应用提供低成本、安全且可靠的云服务。

假设

  • 您熟悉 MQTT 协议,该协议被设计为一个极其轻量级的发布/订阅消息传输协议。
  • 您正在使用 EMQX 或 EMQX Cloud 作为实时消息处理引擎,支持大规模物联网设备的事件流转。
  • 您已经准备好了一个 Clickhouse Cloud 实例来持久化设备数据。
  • 我们使用 MQTT X 作为 MQTT 客户端测试工具来连接 EMQX Cloud 的部署并发布 MQTT 数据。或者其他连接到 MQTT 代理的方法也可以完成这一工作。

获取您的 ClickHouse Cloud 服务

在此设置过程中,我们在 AWS 的 N. Virginia (us-east-1) 部署了 ClickHouse 实例,同时也在同一地区部署了 EMQX Cloud 实例。

在设置过程中,您还需要注意连接设置。在本教程中,我们选择了“任何地方”,但是如果您申请特定位置,您需要将从 EMQX Cloud 部署获取的 NAT gateway IP 地址添加到白名单中。

然后,您需要保存您的用户名和密码以备将来使用。

之后,您将获得一个正在运行的 Clickhouse 实例。点击“连接”以获取 Clickhouse Cloud 的实例连接地址。

点击“连接到 SQL 控制台”以创建与 EMQX Cloud 集成的数据库和表。

您可以参考以下 SQL 语句,或根据实际情况修改 SQL。

在 EMQX Cloud 上创建 MQTT 服务

在 EMQX Cloud 上创建一个专用的 MQTT 代理只需几个点击。

获取账户

EMQX Cloud 为每个账户提供 14 天的免费试用,包括标准部署和专业部署。

如果您是 EMQX Cloud 新用户,请访问 EMQX Cloud 注册 页面,并点击开始免费注册一个账户。

创建 MQTT 集群

登录后,点击账户菜单下的“云控制台”,您将能看到创建新部署的绿色按钮。

在本教程中,我们将使用专业部署,因为只有 Pro 版本提供数据集成功能,可以直接将 MQTT 数据发送到 ClickHouse 而无需编写一行代码。

选择 Pro 版本,选择 N.Virginia 区域,然后点击 立即创建。几分钟后,您将获得一个完全托管的 MQTT 代理:

现在点击面板以进入集群视图。在该仪表板上,您将看到 MQTT 代理的概览。

添加客户端凭据

EMQX Cloud 默认不允许匿名连接,因此您需要添加客户端凭据,以便使用 MQTT 客户端工具向此代理发送数据。

点击左侧菜单中的“身份验证与 ACL”,然后在子菜单中点击“身份验证”。点击右侧的“添加”按钮并为稍后 MQTT 连接提供用户名和密码。我们将使用 emqxxxxxxx 作为用户名和密码。

点击“确认”,现在我们有一个完全托管的 MQTT 代理准备就绪。

启用 NAT 网关

在我们开始设置 ClickHouse 集成之前,我们需要先启用 NAT 网关。默认情况下,MQTT 代理部署在私有 VPC 中,无法通过公共网络发送数据到第三方系统。

返回概览页面,向下滚动到页面底部,您将看到 NAT 网关小部件。点击订阅按钮并按照说明进行操作。请注意,NAT 网关是一个增值服务,但它也提供 14 天的免费试用。

创建完成后,您将在小部件中找到公共 IP 地址。请注意,如果您在 ClickHouse Cloud 设置过程中选择了“从特定位置连接”,您需要将此 IP 地址添加到白名单中。

将 EMQX Cloud 与 ClickHouse Cloud 集成

EMQX Cloud 数据集成 用于配置处理和响应 EMQX 消息流和设备事件的规则。数据集成不仅提供了清晰且灵活的“可配置”架构解决方案,还简化了开发过程,提高了用户可用性,并降低了业务系统与 EMQX Cloud 之间的耦合度。它还为自定义 EMQX Cloud 专有功能提供了出色的基础设施。

EMQX Cloud 提供 30 多个与流行数据系统的原生集成。ClickHouse 是其中之一。

创建 ClickHouse 资源

点击左侧菜单中的“数据集成”,然后点击“查看所有资源”。您将在数据持久性部分找到 ClickHouse,或者可以搜索 ClickHouse。

点击 ClickHouse 卡片以创建新资源。

  • 注意:为该资源添加注释。
  • 服务器地址:这是您的 ClickHouse Cloud 服务的地址,请务必不要忘记端口。
  • 数据库名称:emqx,我们在之前的步骤中创建的。
  • 用户:连接到您的 ClickHouse Cloud 服务的用户名。
  • 密钥:连接的密码。

创建新规则

在创建资源时,您会看到一个弹出窗口,点击“新建”将引导您进入规则创建页面。

EMQX 提供了强大的 规则引擎,可以在将原始 MQTT 消息发送到第三方系统之前进行转换和丰富。

以下是本教程中使用的规则:

它将从 temp_hum/emqx 主题中读取消息,并通过添加 client_id、主题和时间戳信息来丰富 JSON 对象。

因此,您发送到主题的原始 JSON:

您可以使用 SQL 测试来测试并查看结果。

现在点击“下一步”按钮。这一步是告诉 EMQX Cloud 如何将经过处理的数据插入到您的 ClickHouse 数据库中。

添加响应行动

如果您只有一个资源,您不需要修改“资源”和“行动类型”。 您只需设置 SQL 模板。以下是本教程使用的示例:

这是一个插入数据到 Clickhouse 的模板,您可以看到这里使用了变量。

查看规则详情

点击“确认”和“查看详情”。现在,一切应该都设置好了。您可以在规则详情页面看到数据集成的工作情况。

所有发送到 temp_hum/emqx 主题的 MQTT 消息将持久化到您的 ClickHouse Cloud 数据库中。

将数据保存到 ClickHouse

我们将模拟温度和湿度数据,并通过 MQTT X 将这些数据报告到 EMQX Cloud,然后使用 EMQX Cloud 数据集成将数据保存到 ClickHouse Cloud。

发布 MQTT 消息到 EMQX Cloud

您可以使用任何 MQTT 客户端或 SDK 发布消息。在本教程中,我们将使用 MQTT X,这是 EMQ 提供的用户友好的 MQTT 客户端应用程序。

在 MQTTX 上点击“新连接”并填写连接表单:

  • 名称:连接名称。可以使用您想要的任何名称。
  • 主机:MQTT 代理的连接地址。您可以在 EMQX Cloud 概览页面获取它。
  • 端口:MQTT 代理连接端口。您可以在 EMQX Cloud 概览页面获取它。
  • 用户名/密码:使用上面创建的凭据,本教程中应为 emqxxxxxxx

点击右上角的“连接”按钮,连接应该建立。

现在您可以使用该工具向 MQTT 代理发送消息。 输入:

  1. 将有效负载格式设置为“JSON”。
  2. 设置主题:temp_hum/emqx(我们刚在规则中设置的主题)
  3. JSON 主体:

点击右侧的发送按钮。您可以更改温度值并向 MQTT 代理发送更多数据。

发送到 EMQX Cloud 的数据应由规则引擎处理并自动插入到 ClickHouse Cloud 中。

查看规则监控

检查规则监控并在成功数量上加一。

检查已持久化的数据

现在是查看 ClickHouse Cloud 中数据的时候了。理想情况下,您使用 MQTTX 发送的数据将通过 EMQX Cloud 传输,并凭借原生数据集成功能持久化到 ClickHouse Cloud 的数据库中。

您可以连接到 ClickHouse Cloud 面板上的 SQL 控制台,或使用任何客户端工具从 ClickHouse 中获取数据。在本教程中,我们使用了 SQL 控制台。 通过执行 SQL:

总结

您没有编写任何代码,现在 MQTT 数据从 EMQX Cloud 移动到 ClickHouse Cloud。有了 EMQX Cloud 和 ClickHouse Cloud,您无需管理基础设施,专注于编写物联网应用程序,数据安全地存储在 ClickHouse Cloud 中。