前言
WebSocketTCPWebSocketHttpTCP/IP四层模型WebSocketHTTPwswssWebSocket
实现
WebSocket集群ASP.NET CoregolangnginxredisServerpub/sub
Server
WebSocket
redis
nginx配置
nginx.conf
//上游服务器地址也就是websocket服务的真实地址
upstream wsbackend {
server 127.0.0.1:5001;
server 127.0.0.1:5678;
}
server {
listen 5000;
server_name localhost;
location ~/chat/{
//upstream地址
proxy_pass http://wsbackend;
proxy_connect_timeout 60s;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
//记得转发避免踩坑
proxy_set_header Host $host;
proxy_http_version 1.1;
//http升级成websocket协议的头标识
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
}
}
ip_hash
一对一发送
首先介绍的就是一对一发送的情况,也就是我把消息发给你,聊天的时候私聊的情况。这里呢涉及到两种情况
pub/sub
WebSocketuser:用户唯一标识user:用户唯一标识freeredis
var builder = WebApplication.CreateBuilder(args);
//注册freeredis
builder.Services.AddSingleton(provider => {
var logger = provider.GetService<ILogger<WebSocketChannelHandler>>();
RedisClient cli = new RedisClient("127.0.0.1:6379");
cli.Notice += (s, e) => logger?.LogInformation(e.Log);
return cli;
});
//注册WebSocket具体操作的类
builder.Services.AddSingleton<WebSocketHandler>();
builder.Services.AddControllers();
var app = builder.Build();
var webSocketOptions = new WebSocketOptions
{
KeepAliveInterval = TimeSpan.FromMinutes(2)
};
//注册WebSocket中间件
app.UseWebSockets(webSocketOptions);
app.MapGet("/", () => "Hello World!");
app.MapControllers();
app.Run();
接下来我们定义一个Controller用来处理WebSocket请求
public class WebSocketController : ControllerBase
{
private readonly ILogger<WebSocketController> _logger;
private readonly WebSocketHandler _socketHandler;
public WebSocketController(ILogger<WebSocketController> logger, WebSocketHandler socketHandler, WebSocketChannelHandler webSocketChannelHandler)
{
_logger = logger;
_socketHandler = socketHandler;
}
//这里的id代表当前连接的客户端唯一标识比如用户唯一标识
[HttpGet("/chat/user/{id}")]
public async Task ChatUser(string id)
{
//判断是否是WebSocket请求
if (HttpContext.WebSockets.IsWebSocketRequest)
{
_logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");
var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
//处理请求相关
await _socketHandler.Handle(id, webSocket);
}
else
{
HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
}
}
}
这里的WebSocketHandler是用来处理具体逻辑用的,咱们看一下相关代码
public class WebSocketHandler:IDisposable
{
//存储当前服务用户的集合
private readonly UserConnection UserConnection = new();
//redis频道前缀
private readonly string userPrefix = "user:";
//用户对应的redis频道
private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();
private readonly ILogger<WebSocketHandler> _logger;
//redis客户端
private readonly RedisClient _redisClient;
public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
{
_logger = logger;
_redisClient = redisClient;
}
public async Task Handle(string id, WebSocket webSocket)
{
//把当前用户连接存储起来
_ = UserConnection.GetOrAdd(id, webSocket);
//订阅一个当前用户的频道
await SubMsg($"{userPrefix}{id}");
var buffer = new byte[1024 * 4];
//接收发送过来的消息,这个方法是阻塞的,如果没收到消息则一直阻塞
var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
//循环接收消息
while (webSocket.State == WebSocketState.Open)
{
try
{
//因为缓冲区长度是固定的所以要获取实际长度
string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');
//接收的到消息转换成实体
MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(msg);
//发送到其他客户端的数据
byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");
_logger.LogInformation($"user {id} send:{msgBody.Msg}");
//判断目标客户端是否在当前当前服务,如果在当前服务直接扎到目标连接直接发送
if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket))
{
if (targetSocket.State == WebSocketState.Open)
{
await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);
}
}
else
{
//如果要发送的目标端不在当前服务,则发送给目标redis端的频道
ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };
//目标的redis频道
_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));
}
//继续阻塞循环接收消息
receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, ex.Message);
break;
}
}
//循环结束意味着当前端已经退出
//从当前用户的集合移除当前用户
_ = UserConnection.TryRemove(id, out _);
//关闭当前WebSocket连接
await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);
//在当前订阅集合移除当前用户
_disposables.TryRemove($"{userPrefix}{id}", out var disposable);
//关闭当前用户的通道
disposable.Dispose();
}
private async Task SubMsg(string channel)
{
//订阅当前用户频道
var sub = _redisClient.Subscribe(channel, async (channel, data) => {
//接收过来当前频道数据,说明发送端不在当前服务
ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");
//在当前服务找到目标的WebSocket连接并发送消息
if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket))
{
if (targetSocket.State == WebSocketState.Open)
{
await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
}
}
});
//把redis订阅频道添加到集合中
_disposables.TryAdd(channel, sub);
}
//程序退出的时候取消当前服务订阅的redis频道
public void Dispose()
{
foreach (var disposable in _disposables)
{
disposable.Value.Dispose();
}
_disposables.Clear();
}
}
UserConnectionMsgBodyChannelMsgBody
//注册到当前服务的连接
public class UserConnection : IEnumerable<KeyValuePair<string, WebSocket>>
{
//存储用户唯一标识和WebSocket的对应关系
private ConcurrentDictionary<string, WebSocket> _users = new ConcurrentDictionary<string, WebSocket>();
//当前服务的用户数量
public int Count => _users.Count;
public WebSocket GetOrAdd(string userId, WebSocket webSocket)
{
return _users.GetOrAdd(userId, webSocket);
}
public bool TryGetValue(string userId, out WebSocket webSocket)
{
return _users.TryGetValue(userId, out webSocket);
}
public bool TryRemove(string userId, out WebSocket webSocket)
{
return _users.TryRemove(userId, out webSocket);
}
public void Clear()
{
_users.Clear();
}
public IEnumerator<KeyValuePair<string, WebSocket>> GetEnumerator()
{
return _users.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
}
//客户端消息
public class MsgBody
{
//目标用户标识
public string Id { get; set; }
//要发送的消息
public string Msg { get; set; }
}
//频道订阅消息
public class ChannelMsgBody
{
//用户标识
public string FromId { get; set; }
//目标用户标识,也就是要发送给谁
public string ToId { get; set; }
//要发送的消息
public string Msg { get; set; }
}
Postman1、2、3
群组发送
group:群组唯一标识
- 发送端可以不用考虑当前服务中的客户端连接,一股脑的交给redis把消息发布出去
- 如果有WebSocket服务中的用户订阅了当前分组则可以接受消息,获取组内的用户循环发送消息
展示一下代码实现的方式,首先是定义一个action用于表示群组的相关场景
//包含两个标识一个是组别标识一个是注册到组别的用户
[HttpGet("/chat/group/{groupId}/{userId}")]
public async Task ChatGroup(string groupId, string userId)
{
if (HttpContext.WebSockets.IsWebSocketRequest)
{
_logger.LogInformation($"group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");
var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
//调用HandleGroup处理群组相关的消息
await _socketHandler.HandleGroup(groupId, userId, webSocket);
}
else
{
HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
}
}
接下来看一下HandleGroup的相关逻辑,还是在WebSocketHandler类中,看一下代码实现
public class WebSocketHandler:IDisposable
{
private readonly UserConnection UserConnection = new();
private readonly GroupUser GroupUser = new();
private readonly SemaphoreSlim _lock = new(1, 1);
private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();
private readonly string groupPrefix = "group:";
private readonly ILogger<WebSocketHandler> _logger;
private readonly RedisClient _redisClient;
public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
{
_logger = logger;
_redisClient = redisClient;
}
public async Task HandleGroup(string groupId, string userId, WebSocket webSocket)
{
//因为群组的集合可能会存在很多用户一起访问所以限制访问数量
await _lock.WaitAsync();
//初始化群组容器 群唯一标识为key 群员容器为value
var currentGroup = GroupUser.Groups.GetOrAdd(groupId, new UserConnection { });
//当前用户加入当前群组
_ = currentGroup.GetOrAdd(userId, webSocket);
//只有有当前WebSocket服务的第一个加入当前组的时候才去订阅群组频道
//如果不限制的话则会出现如果当前WebSocket服务有多个用户在一个组内则会重复收到redis消息
if (currentGroup.Count == 1)
{
//订阅redis频道
await SubGroupMsg($"{groupPrefix}{groupId}");
}
_lock.Release();
var buffer = new byte[1024 * 4];
//阻塞接收WebSocket消息
var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
//服务不退出的话则一直等待接收
while (webSocket.State == WebSocketState.Open)
{
try
{
string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');
_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msg}");
//组装redis频道发布的消息,目标为群组标识
ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msg };
//通过redis发布消息
_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));
receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, ex.Message);
break;
}
}
//如果客户端退出则在当前群组集合删除当前用户
_ = currentGroup.TryRemove(userId, out _);
await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);
}
private async Task SubGroupMsg(string channel)
{
var sub = _redisClient.Subscribe(channel, async (channel, data) => {
ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");
//在当前WebSocket服务器找到当前群组里的用户
GroupUser.Groups.TryGetValue(msgBody.ToId, out var currentGroup);
//循环当前WebSocket服务器里的用户发送消息
foreach (var user in currentGroup)
{
//不用给自己发送了
if (user.Key == msgBody.FromId)
{
continue;
}
if (user.Value.State == WebSocketState.Open)
{
await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
}
}
});
//把当前频道加入订阅集合
_disposables.TryAdd(channel, sub);
}
}
GroupUser
public class GroupUser
{
//key为群组的唯一标识
public ConcurrentDictionary<string, UserConnection> Groups = new ConcurrentDictionary<string, UserConnection>();
}
演示一下把两个用户添加到一个群组内,然后发送接收消息的场景,用户u1发送
用户u2接收
发送所有人
发送给所有用户的逻辑比较简单,不用考虑到用户限制,只要用户连接到了WebSocket集群则都可以接收到这个消息,大致工作方式如下图所示这个比较简单,咱们直接看实现代码,首先是定义一个地址,用于发布消息
//把用户注册进去
[HttpGet("/chat/all/{id}")]
public async Task ChatAll(string id)
{
if (HttpContext.WebSockets.IsWebSocketRequest)
{
_logger.LogInformation($"all user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");
var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
await _socketHandler.HandleAll(id, webSocket);
}
else
{
HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
}
}
具体的实现逻辑还是在HandleGroup类里,是HandleAll方法,看一下具体实现
public class WebSocketHandler:IDisposable
{
private readonly UserConnection AllConnection = new();
private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();
private readonly string all = "all";
private readonly ILogger<WebSocketHandler> _logger;
private readonly RedisClient _redisClient;
public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
{
_logger = logger;
_redisClient = redisClient;
}
public async Task HandleAll(string id, WebSocket webSocket)
{
await _lock.WaitAsync();
//把用户加入用户集合
_ = AllConnection.GetOrAdd(id, webSocket);
//WebSocket集群中的每个服务只定义一次
if (AllConnection.Count == 1)
{
await SubAllMsg(all);
}
_lock.Release();
var buffer = new byte[1024 * 4];
//阻塞接收信息
var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
while (webSocket.State == WebSocketState.Open)
{
try
{
string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');
_logger.LogInformation($"user {id} send:{msg}");
//获取接收信息
ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msg };
//把消息通过redis发布到集群中的其他服务
_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));
receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, ex.Message);
break;
}
}
//用户退出则删除集合中的当前用户信息
_ = AllConnection.TryRemove(id, out _);
await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);
}
private async Task SubAllMsg(string channel)
{
var sub = _redisClient.Subscribe(channel, async (channel, data) => {
ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");
//接收到消息后遍历用户集合把消息发送给所有用户
foreach (var user in AllConnection)
{
//如果包含当前用户跳过
if (user.Key == msgBody.FromId)
{
continue;
}
if (user.Value.State == WebSocketState.Open)
{
await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
}
}
});
_disposables.TryAdd(channel, sub);
}
}
效果在这里就不展示了,和群组的效果是类似的,只是一个是部分用户,一个是全部的用户。
整合到一起
上面我们分别展示了一对一、群组、所有人的场景,但是实际使用的时候,每个用户只需要注册到WebSocket集群一次也就是保持一个连接即可,而不是一对一一个连接、注册群组一个连接、所有消息的时候一个连接。所以我们需要把上面的演示整合一下,一个用户只需要连接到WebSocket集群一次即可,至于发送给谁,加入什么群组,接收全部消息等都是连接后通过一些标识区分的,而不必每个类型的操作都注册一次,就和微信和QQ一样我只要登录了即可,至于其他操作都是靠数据标识区分的。接下来咱们就整合一下代码达到这个效果,大致的思路是
- 用户连接到WebSocket集群,把用户和连接保存到当前WebSocket服务器的用户集合中去。
- 一对一发送的时候,只需要在具体的服务器中找到具体的客户端发送消息
- 群组的时候,先把当前用户标识加入群组集合即可,接收消息的时候根据群组集合里的用户标识去用户集合里去拿具体的WebSocket连接发送消息
- 全员消息的时候,直接遍历集群中的每个WebSocket服务里的用户集合里的WebSocket连接训话发送消息
这样的话就保证了每个客户端用户在集群中只会绑定一个连接,首先还是单独定义一个action,用于让客户端用户连接上来,具体实现代码如下所示
public class WebSocketChannelController : ControllerBase
{
private readonly ILogger<WebSocketController> _logger;
private readonly WebSocketChannelHandler _webSocketChannelHandler;
public WebSocketChannelController(ILogger<WebSocketController> logger, WebSocketChannelHandler webSocketChannelHandler)
{
_logger = logger;
_webSocketChannelHandler = webSocketChannelHandler;
}
//只需要把当前用户连接到服务即可
[HttpGet("/chat/channel/{id}")]
public async Task Channel(string id)
{
if (HttpContext.WebSockets.IsWebSocketRequest)
{
_logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");
var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
await _webSocketChannelHandler.HandleChannel(id, webSocket);
}
else
{
HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
}
}
}
接下来看一下WebSocketChannelHandler类的HandleChannel方法实现,用于处理不同的消息,比如一对一、群组、全员消息等不同类型的消息
public class WebSocketChannelHandler : IDisposable
{
//用于存储当前WebSocket服务器链接上来的所有用户对应关系
private readonly UserConnection UserConnection = new();
//用于存储群组和用户关系,用户集合采用HashSet保证每个用户只加入一个群组一次
private readonly ConcurrentDictionary<string, HashSet<string>> GroupUser = new ConcurrentDictionary<string, HashSet<string>>();
private readonly SemaphoreSlim _lock = new(1, 1);
//存放redis订阅实例
private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();
//一对一redis频道前缀
private readonly string userPrefix = "user:";
//群组redis频道前缀
private readonly string groupPrefix = "group:";
//全员redis频道
private readonly string all = "all";
private readonly ILogger<WebSocketHandler> _logger;
private readonly RedisClient _redisClient;
public WebSocketChannelHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
{
_logger = logger;
_redisClient = redisClient;
}
public async Task HandleChannel(string id, WebSocket webSocket)
{
await _lock.WaitAsync();
//每次连接进来就添加到用户集合
_ = UserConnection.GetOrAdd(id, webSocket);
//每个WebSocket服务实例只需要订阅一次全员消息频道
await SubMsg($"{userPrefix}{id}");
if (UserConnection.Count == 1)
{
await SubAllMsg(all);
}
_lock.Release();
var buffer = new byte[1024 * 4];
//接收客户端消息
var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
while (webSocket.State == WebSocketState.Open)
{
try
{
string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');
//读取客户端消息
ChannelData channelData = JsonConvert.DeserializeObject<ChannelData>(msg);
//判断消息类型
switch (channelData.Method)
{
//一对一
case "One":
await HandleOne(id, channelData.MsgBody, receiveResult);
break;
//把用户加入群组
case "UserGroup":
await AddUserGroup(id, channelData.Group, webSocket);
break;
//处理群组消息
case "Group":
await HandleGroup(channelData.Group, id, webSocket, channelData.MsgBody);
break;
//处理全员消息
default:
await HandleAll(id, channelData.MsgBody);
break;
}
receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, ex.Message);
break;
}
}
await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);
//在群组中移除当前用户
foreach (var users in GroupUser.Values)
{
lock (users)
{
users.Remove(id);
}
}
//当前客户端用户退出则移除连接
_ = UserConnection.TryRemove(id, out _);
//取消用户频道订阅
_disposables.Remove($"{userPrefix}{id}", out var sub);
sub?.Dispose();
}
public void Dispose()
{
foreach (var disposable in _disposables)
{
disposable.Value.Dispose();
}
_disposables.Clear();
}
}
ChannelData
public class ChannelData
{
//消息类型 比如一对一 群组 全员
public string Method { get; set; }
//群组标识
public string Group { get; set; }
//消息体
public object MsgBody { get; set; }
}
类中并不会包含当前用户信息,因为连接到当前服务的时候已经提供了客户端唯一标识。结合上面的处理代码我们可以看出,客户端用户连接到WebSocket实例之后,先注册当前用户的redis订阅频道并且当前实例仅注册一次全员消息的redis频道,用于处理非当前实例注册客户端的一对一消息处理和全员消息处理,然后等待接收客户端消息,根据客户端消息的消息类型来判断是进行一对一、群组、或者全员的消息类型处理,它的工作流程入下图所示由代码和上面的流程图可知,它根据不同的标识去处理不同类型的消息,接下来我们可以看下每种消息类型的处理方式。
一对一处理
首先是一对一的消息处理情况,看一下具体的处理逻辑,首先是一对一发布消息
private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult)
{
MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(JsonConvert.SerializeObject(msg));
byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");
_logger.LogInformation($"user {id} send:{msgBody.Msg}");
//判断目标用户是否在当前WebSocket服务器
if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket))
{
if (targetSocket.State == WebSocketState.Open)
{
await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);
}
}
else
{
//如果不在当前服务器,则直接把消息发布到具体的用户频道去,由具体用户去订阅
ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };
_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));
}
}
接下来是用于处理订阅其他用户发送过来消息的逻辑,这个和整合之前的逻辑是一致的,在当前服务器中找到用户对应的连接,发送消息
private async Task SubMsg(string channel)
{
var sub = _redisClient.Subscribe(channel, async (channel, data) =>
{
ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");
if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket))
{
if (targetSocket.State == WebSocketState.Open)
{
await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
}
else
{
_ = UserConnection.TryRemove(msgBody.FromId, out _);
}
}
});
//把订阅实例加入集合
_disposables.TryAdd(channel, sub);
}
如果给某个用户发送消息则可以使用如下的消息格式
{"Method":"One", "MsgBody":{"Id":"2","Msg":"Hello"}}
Method为One代表着是私聊一对一的情况,消息体内Id为要发送给的具体用户标识和消息体。
群组处理
接下来看群组处理方式,这个和之前的逻辑是有出入的,首先是用户要先加入到某个群组然后才能接收群组消息或者在群组中发送消息,之前是一个用户对应多个连接,整合了之后集群中每个用户只关联唯一的一个WebSocket连接,首先看用户加入群组的逻辑
private async Task AddUserGroup(string user, string group, WebSocket webSocket)
{
//获取群组信息
var currentGroup = GroupUser.GetOrAdd(group, new HashSet<string>());
lock (currentGroup)
{
//把用户标识加入当前组
_ = currentGroup.Add(user);
}
//每个组的redis频道,在每台WebSocket服务器实例只注册一次订阅
if (currentGroup.Count == 1)
{
//订阅当前组消息
await SubGroupMsg($"{groupPrefix}{group}");
}
string addMsg = $"user 【{user}】 add to group 【{group}】";
byte[] sendByte = Encoding.UTF8.GetBytes(addMsg);
await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
//如果有用户加入群组,则通知其他群成员
ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = user, ToId = group, Msg = addMsg };
_redisClient.Publish($"{groupPrefix}{group}", JsonConvert.SerializeObject(channelMsgBody));
}
用户想要在群组内发消息,则必须先加入到一个具体的群组内,具体的加入群组的格式如下
{"Method":"UserGroup", "Group":"g1"}
Method为UserGroup代表着用户加入群组的业务类型,Group代表着你要加入的群组唯一标识。接下来就看下,用户发送群组消息的逻辑了
private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody)
{
//判断群组是否存在
var hasValue = GroupUser.TryGetValue(groupId, out var users);
if (!hasValue)
{
byte[] sendByte = Encoding.UTF8.GetBytes($"group【{groupId}】 not exists");
await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
return;
}
//只有加入到当前群组,才能在群组内发送消息
if (!users.Contains(userId))
{
byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{userId}】 not in 【{groupId}】");
await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
return;
}
_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msgBody}");
//发送群组消息
ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msgBody.ToString() };
_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));
}
加入群组之后则可以发送和接收群组内的消息了,给群组发送消息的格式如下
{"Method":"Group", "Group":"g1", "MsgBody":"Hi All"}
Method为Group代表着用户加入群组的业务类型,Group则代表你要发送到具体的群组的唯一标识,MsgBody则是发送到群组内的消息。最后再来看下订阅群组内消息的情况,也就是处理群组消息的逻辑
private async Task SubGroupMsg(string channel)
{
var sub = _redisClient.Subscribe(channel, async (channel, data) =>
{
//接收群组订阅消息
ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");
//获取当前服务器实例中当前群组的所有用户连接
GroupUser.TryGetValue(msgBody.ToId, out var currentGroup);
foreach (var user in currentGroup)
{
if (user == msgBody.FromId)
{
continue;
}
//通过群组内的用户标识去用户集合获取用户集合里的用户唯一连接发送消息
if (UserConnection.TryGetValue(user, out var targetSocket) && targetSocket.State == WebSocketState.Open)
{
await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
}
else
{
currentGroup.Remove(user);
}
}
});
_disposables.TryAdd(channel, sub);
}
全员消息处理
全员消息处理相对来说思路比较简单,因为当服务启动的时候就会监听redis的全员消息频道,这样的话具体的实现也就只包含发送和接收全员消息了,首先看一下全员消息发送的逻辑
private async Task HandleAll(string id, object msgBody)
{
_logger.LogInformation($"user {id} send:{msgBody}");
//直接给redis的全员频道发送消息
ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msgBody.ToString() };
_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));
}
全员消息的发送数据格式如下所示
{"Method":"All", "MsgBody":"Hello All"}
Method为All代表着全员消息类型,MsgBody则代表着具体消息。接收消息出里同样很简单,订阅redis全员消息频道,然后遍历当前WebSocket服务器实例内的所有用户获取连接发送消息,具体逻辑如下
private async Task SubAllMsg(string channel)
{
var sub = _redisClient.Subscribe(channel, async (channel, data) =>
{
ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");
//获取当前服务器实例内所有用户的连接
foreach (var user in UserConnection)
{
//不给自己发送消息,因为发送的时候可以通过具体的业务代码处理
if (user.Key == msgBody.FromId)
{
continue;
}
//给每个用户发送消息
if (user.Value.State == WebSocketState.Open)
{
await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
}
else
{
_ = UserConnection.TryRemove(user.Key, out _);
}
}
});
_disposables.TryAdd(channel, sub);
}
示例源码
github
仓库里还涉及到本人闲暇之余开源的其他仓库,由于本人能力有限难登大雅之堂,就不做广告了,有兴趣的同学可以自行浏览一下。
总结
ASP.NET CoreWebSocketgolang
- 首先是,利用可以构建WebSocket服务的框架,在当前服务实例中保存当前客户端用户和WebSocket的连接关系
- 如果消息的目标客户端不在当前服务器,可以利用redis频道、消息队列相关、甚至是数据库类的共享回话发送的消息,由目标服务器获取目标是否属于自己的ws会话
- 本文设计的思路使用的是无状态的方式,即WebSocket服务实例之间不存在直接的消息通信和相互的服务地址存储,当然也可以利用redis等存储在线用户信息等,这个可以参考具体业务自行设计
读万卷书,行万里路。在这个时刻都在变化点的环境里,唯有不断的进化自己,多接触多尝试不用的事物,多扩展自己的认知思维,方能构建自己的底层逻辑。毕竟越底层越抽象,越通用越抽象。面对未知的挑战,自身作为自己坚强的后盾,可能才会让自己更踏实。