运行环境:
JDK 8+
Maven 3.0+
Redis
技术栈:
SpringBoot 2.0+
Redis (Lettuce客户端,RedisTemplate模板方法)
Netty 4.1+
MQTT 3.1.1
IDE:
IDEA或者Eclipse
Lombok插件
近年来,物联网高歌猛进,美国有“工业互联网”,德国有“工业4.0”,我国也有“中国制造2025”,这背后都是云计算、大数据。据波士顿咨询报告,单单中国制造业,云计算、大数据、人工智能等新技术就能为其带来高达6万亿的额外附加值。
国内外巨头纷纷驻足工业互联网,国外如亚马逊AWS、微软Azure,国内则是三大电信运营商、百度云、华为、金山云等,其中腾讯云、阿里云最甚,还拉来了传统制造大佬,国内巨头纷纷在物联网上布局。在2018云栖-深圳峰会上,阿里巴巴资深副总裁,阿里云总裁胡晓明宣布阿里巴巴将正式进军IoT。胡晓明表示,IoT是阿里巴巴集团继电商、金融、物流、云计算之后的一条新的主赛道。
IOT技术窥探以上这些内容,作者作为一个开发人员,并不是一个投资人员和创业先锋。并不太关系这些具体细节。我所关心的是如何用技术去实现或者模拟一个支持百万链接的IOT服务器,并不严谨,仅做大家参考。
关于为什么选用下图的中间件或者对MQTT不太了解的话,可以阅读我之前的2篇文章:
技术轮廓图
快速入门运行测试
- cd netty-iot
- 运行 NettyIotApplication
- 启动Eclipse Paho,并填写用户名和密码,即可连接。
- 另起一个Eclipse Paho,订阅随意主题,例如test。另一个Eclipse Paho发布主题test。即可收到消息。
- 取消主题订阅,再次发布消息。就收不到消息。
有了前面2篇文章的铺垫并学习了MQTT V3.1.1 协议,说了那么多,手痒痒的很。
You build it, You run it!
项目结构介绍
Redis
安装
体验 Redis 需要使用 Linux 或者 Mac 环境,如果是 Windows 可以考虑使用虚拟机。主要方式有四种:
使用 Docker 安装。
通过 Github 源码编译。
直接安装 apt-get install(Ubuntu)、yum install(RedHat) 或者 brew install(Mac)。
如果读者懒于安装操作,也可以使用网页版的 Web Redis 直接体验。
具体操作如下:
Docker 方式
Github 源码编译方式
直接安装方式
使用
除了支持常见的ORM框架外,更是对常用的中间件提供了非常好封装,随着的到来,支持的组件越来越丰富,也越来越成熟,其中对的支持不仅仅是丰富了它的API,更是替换掉底层的依赖,取而代之换成了
下列的是Redis所对应的操作方式
opsForValue: 对应 String(字符串)
opsForZSet: 对应 ZSet(有序集合)
opsForHash: 对应 Hash(哈希)
opsForList: 对应 List(列表)
opsForSet: 对应 Set(集合)
opsForGeo: 对应 GEO(地理位置)
我主要使用opsForValue,opsForHash和opsForZSet,对于字符串。我推荐使用StringRedisTemplate。
以下对于opsForValue和opsForHash的基础操作,我在这里简短的讲解一下。
Redis的Hash数据机构
Redis的散列可以让用户将多个键值对存储到一个Redis键里面。 public interface HashOperations<H,HK,HV> HashOperations提供一系列方法操作hash:
Redis的Set数据结构
Redis的Set是string类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。 Redis 中 集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。
MQTT
MQTT是一种轻量级的发布/订阅消息传递协议,最初由IBM和Arcom(后来成为Eurotech的一部分)于1998年左右创建。现在,MQTT 3.1.1规范已由OASIS联盟标准化。
客户端下载
对于MQTT客户端,我选用Eclipse Paho,Eclipse Paho项目提供针对物联网(IoT)的新的,现有的和新兴的应用程序的MQTT和MQTT-SN消息传递协议的开源客户端实现。具体下载地址,大家根据自己的操作系统自行下载。
MQTT控制报文
Connect
让我们对照着MQTT 3.1.1协议来实现客户端Connect协议。
// 消息解码器出现异常
if (msg.decoderResult().isFailure()) {
Throwable cause = msg.decoderResult().cause();
if (cause instanceof MqttUnacceptableProtocolVersionException) {
// 不支持的协议版本
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
} else if (cause instanceof MqttIdentifierRejectedException) {
// 不合格的clientId
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
channel.close();
return;
}
if (StrUtil.isBlank(msg.payload().clientIdentifier())) {
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
String username = msg.payload().userName();
String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
if (!grozaAuthService.checkValid(username,password)) {
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
channel.writeAndFlush(connAckMessage);
channel.close();
return;
}
if (grozaSessionStoreService.containsKey(msg.payload().clientIdentifier())){
SessionStore sessionStore = grozaSessionStoreService.get(msg.payload().clientIdentifier());
Channel previous = sessionStore.getChannel();
Boolean cleanSession = sessionStore.isCleanSession();
if (cleanSession){
grozaSessionStoreService.remove(msg.payload().clientIdentifier());
grozaSubscribeStoreService.removeForClient(msg.payload().clientIdentifier());
grozaDupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier());
grozaDupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier());
}
previous.close();
}
SessionStore sessionStore = new SessionStore(msg.payload().clientIdentifier(), channel, msg.variableHeader().isCleanSession(), null);
if (msg.variableHeader().isWillFlag()){
MqttPublishMessage willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.valueOf(msg.variableHeader().willQos()),msg.variableHeader().isWillRetain(),0),
new MqttPublishVariableHeader(msg.payload().willTopic(),0),
Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes())
);
sessionStore.setWillMessage(willMessage);
}
if (msg.variableHeader().keepAliveTimeSeconds() > 0){
if (channel.pipeline().names().contains("idle")){
channel.pipeline().remove("idle");
}
channel.pipeline().addFirst("idle",new IdleStateHandler(0, 0, Math.round(msg.variableHeader().keepAliveTimeSeconds() * 1.5f)));
}
至此存储会话消息及返回接受客户端连接 将clientId存储到channel的map中
grozaSessionStoreService.put(msg.payload().clientIdentifier(),sessionStore);
channel.attr(AttributeKey.valueOf("clientId")).set(msg.payload().clientIdentifier());
Boolean sessionPresent = grozaSessionStoreService.containsKey(msg.payload().clientIdentifier()) && !msg.variableHeader().isCleanSession();
MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK,false,MqttQoS.AT_MOST_ONCE,false,0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED,sessionPresent),
null
);
channel.writeAndFlush(okResp);
if (!msg.variableHeader().isCleanSession()){
List<DupPublishMessageStore> dupPublishMessageStoreList = grozaDupPublishMessageStoreService.get(msg.payload().clientIdentifier());
List<DupPubRelMessageStore> dupPubRelMessageStoreList = grozaDupPubRelMessageStoreService.get(msg.payload().clientIdentifier());
dupPublishMessageStoreList.forEach(dupPublishMessageStore -> {
MqttPublishMessage publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH,true,MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()),false,0),
new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(),dupPublishMessageStore.getMessageId()),
Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes())
);
channel.writeAndFlush(publishMessage);
});
dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> {
MqttMessage pubRelMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL,true,MqttQoS.AT_MOST_ONCE,false,0),
MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()),
null
);
channel.writeAndFlush(pubRelMessage);
});
}
用户名密码认证
其他关于Netty实现高性能IOT服务器(Groza)之精尽代码篇中详解到这里就结束了。
原创不易,如果感觉不错,希望给个推荐!您的支持是我写作的最大动力!
下文会带大家推进Netty实现MQTT协议的IOT服务器。
版权声明: