运行环境:

  • JDK 8+

  • Maven 3.0+

  • Redis

技术栈:

  • SpringBoot 2.0+

  • Redis (Lettuce客户端,RedisTemplate模板方法)

  • Netty 4.1+

  • MQTT 3.1.1

IDE:

  • IDEA或者Eclipse

  • Lombok插件

简介

近年来,物联网高歌猛进,美国有“工业互联网”,德国有“工业4.0”,我国也有“中国制造2025”,这背后都是云计算、大数据。据波士顿咨询报告,单单中国制造业,云计算、大数据、人工智能等新技术就能为其带来高达6万亿的额外附加值。

国内外巨头纷纷驻足工业互联网,国外如亚马逊AWS、微软Azure,国内则是三大电信运营商、百度云、华为、金山云等,其中腾讯云、阿里云最甚,还拉来了传统制造大佬,国内巨头纷纷在物联网上布局。在2018云栖-深圳峰会上,阿里巴巴资深副总裁,阿里云总裁胡晓明宣布阿里巴巴将正式进军IoT。胡晓明表示,IoT是阿里巴巴集团继电商、金融、物流、云计算之后的一条新的主赛道。

IOT技术窥探

以上这些内容,作者作为一个开发人员,并不是一个投资人员和创业先锋。并不太关系这些具体细节。我所关心的是如何用技术去实现或者模拟一个支持百万链接的IOT服务器,并不严谨,仅做大家参考。

关于为什么选用下图的中间件或者对MQTT不太了解的话,可以阅读我之前的2篇文章:

技术轮廓图

快速入门

运行测试

  1. cd netty-iot
  2. 运行 NettyIotApplication
  3. 启动Eclipse Paho,并填写用户名和密码,即可连接。
  4. 另起一个Eclipse Paho,订阅随意主题,例如test。另一个Eclipse Paho发布主题test。即可收到消息。
  5. 取消主题订阅,再次发布消息。就收不到消息。

有了前面2篇文章的铺垫并学习了MQTT V3.1.1 协议,说了那么多,手痒痒的很。

You build it, You run it!

项目结构介绍

Redis

安装

体验 Redis 需要使用 Linux 或者 Mac 环境,如果是 Windows 可以考虑使用虚拟机。主要方式有四种:

  • 使用 Docker 安装。

  • 通过 Github 源码编译。

  • 直接安装 apt-get install(Ubuntu)、yum install(RedHat) 或者 brew install(Mac)。

  • 如果读者懒于安装操作,也可以使用网页版的 Web Redis 直接体验。

具体操作如下:

Docker 方式

Github 源码编译方式

直接安装方式

使用

除了支持常见的ORM框架外,更是对常用的中间件提供了非常好封装,随着的到来,支持的组件越来越丰富,也越来越成熟,其中对的支持不仅仅是丰富了它的API,更是替换掉底层的依赖,取而代之换成了

下列的是Redis所对应的操作方式

  • opsForValue: 对应 String(字符串)

  • opsForZSet: 对应 ZSet(有序集合)

  • opsForHash: 对应 Hash(哈希)

  • opsForList: 对应 List(列表)

  • opsForSet: 对应 Set(集合)

  • opsForGeo: 对应 GEO(地理位置)

我主要使用opsForValue,opsForHashopsForZSet,对于字符串。我推荐使用StringRedisTemplate

以下对于opsForValue和opsForHash的基础操作,我在这里简短的讲解一下。

Redis的Hash数据机构

Redis的散列可以让用户将多个键值对存储到一个Redis键里面。 public interface HashOperations<H,HK,HV> HashOperations提供一系列方法操作hash:

Redis的Set数据结构

Redis的Set是string类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。 Redis 中 集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。

 

MQTT

MQTT是一种轻量级的发布/订阅消息传递协议,最初由IBM和Arcom(后来成为Eurotech的一部分)于1998年左右创建。现在,MQTT 3.1.1规范已由OASIS联盟标准化。

客户端下载

对于MQTT客户端,我选用Eclipse Paho,Eclipse Paho项目提供针对物联网(IoT)的新的,现有的和新兴的应用程序的MQTT和MQTT-SN消息传递协议的开源客户端实现。具体下载地址,大家根据自己的操作系统自行下载。

MQTT控制报文

Connect

让我们对照着MQTT 3.1.1协议来实现客户端Connect协议。

// 消息解码器出现异常
if (msg.decoderResult().isFailure()) {
Throwable cause = msg.decoderResult().cause();
if (cause instanceof MqttUnacceptableProtocolVersionException) {
// 不支持的协议版本
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
} else if (cause instanceof MqttIdentifierRejectedException) {
// 不合格的clientId
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
channel.close();
return;
}
      
  if (StrUtil.isBlank(msg.payload().clientIdentifier())) {
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
          
   String username = msg.payload().userName();
String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
if (!grozaAuthService.checkValid(username,password)) {
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
  if (grozaSessionStoreService.containsKey(msg.payload().clientIdentifier())){
SessionStore sessionStore = grozaSessionStoreService.get(msg.payload().clientIdentifier());
Channel previous = sessionStore.getChannel();
Boolean cleanSession = sessionStore.isCleanSession();
if (cleanSession){
grozaSessionStoreService.remove(msg.payload().clientIdentifier());
grozaSubscribeStoreService.removeForClient(msg.payload().clientIdentifier());
grozaDupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier());
grozaDupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier());
}
previous.close();
}
 SessionStore sessionStore = new SessionStore(msg.payload().clientIdentifier(), channel, msg.variableHeader().isCleanSession(), null);
if (msg.variableHeader().isWillFlag()){
MqttPublishMessage willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.valueOf(msg.variableHeader().willQos()),msg.variableHeader().isWillRetain(),0),
new MqttPublishVariableHeader(msg.payload().willTopic(),0),
Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes())
);
sessionStore.setWillMessage(willMessage);
}
 if (msg.variableHeader().keepAliveTimeSeconds() > 0){
if (channel.pipeline().names().contains("idle")){
channel.pipeline().remove("idle");
}
channel.pipeline().addFirst("idle",new IdleStateHandler(0, 0, Math.round(msg.variableHeader().keepAliveTimeSeconds() * 1.5f)));
}
至此存储会话消息及返回接受客户端连接 将clientId存储到channel的map中
grozaSessionStoreService.put(msg.payload().clientIdentifier(),sessionStore);
channel.attr(AttributeKey.valueOf("clientId")).set(msg.payload().clientIdentifier());
Boolean sessionPresent = grozaSessionStoreService.containsKey(msg.payload().clientIdentifier()) && !msg.variableHeader().isCleanSession();
MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK,false,MqttQoS.AT_MOST_ONCE,false,0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED,sessionPresent),
null
);
channel.writeAndFlush(okResp);
   if (!msg.variableHeader().isCleanSession()){
List<DupPublishMessageStore> dupPublishMessageStoreList = grozaDupPublishMessageStoreService.get(msg.payload().clientIdentifier());
List<DupPubRelMessageStore> dupPubRelMessageStoreList = grozaDupPubRelMessageStoreService.get(msg.payload().clientIdentifier());
dupPublishMessageStoreList.forEach(dupPublishMessageStore -> {
MqttPublishMessage publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH,true,MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()),false,0),
new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(),dupPublishMessageStore.getMessageId()),
Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes())
);
channel.writeAndFlush(publishMessage);
});
dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> {
MqttMessage pubRelMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL,true,MqttQoS.AT_MOST_ONCE,false,0),
MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()),
null
);
channel.writeAndFlush(pubRelMessage);
});
}

用户名密码认证

其他

关于Netty实现高性能IOT服务器(Groza)之精尽代码篇中详解到这里就结束了。

原创不易,如果感觉不错,希望给个推荐!您的支持是我写作的最大动力!

下文会带大家推进Netty实现MQTT协议的IOT服务器。

版权声明: