消息推送服务
服务器推送目前流行就是私信、发布/订阅等模式,基本上都是基于会话映射,消息对列等技术实现的;高性能、分布式可以如下解决:会话映射可采用redis cluster等技术实现,消息对列可使用kafka等分布式消息队列方案实现。
APM.Server基于简单
1 static ConcurrentDictionary<string, Session> _sessionDic = new ConcurrentDictionary<string, Session>();
和
1 private static ConcurrentQueue<Message> _messageQueue = new ConcurrentQueue<Message>();
实现。
部分代码如下:
1 /// <summary>
2 /// 消息转发
3 /// </summary>
4 private void ForwardMsg()
5 {
6 try
7 {
8 var msg = MessageQueue.Dequeue();
9 if (msg != null)
10 {
11 switch (msg.Type)
12 {
13 case (byte)MessageType.Sub:
14 if (!msg.IsMuti)
15 {
16 if (!SessionDic.Exists(msg.SessionID, msg.SessionID))
17 SessionDic.Set(this._server, msg.SessionID, msg.SessionID);
18 }
19 if (!SessionDic.Exists(msg.SessionID, msg.Sender))
20 SessionDic.Set(this._server, msg.Sender, msg.SessionID);
21 break;
22 case (byte)MessageType.Unsub:
23 if (!msg.IsMuti)
24 {
25 if (SessionDic.Exists(msg.SessionID, msg.SessionID))
26 SessionDic.Del(msg.SessionID, msg.SessionID);
27 }
28 if (SessionDic.Exists(msg.SessionID, msg.Sender))
29 SessionDic.Del(msg.Sender, msg.SessionID);
30 break;
31 default:
32 var session = SessionDic.Get(msg.SessionID);
33 if (session != null)
34 {
35 var remotes = session.UserTokenDic.List.Where(b => b.ID != msg.Sender).ToList();
36 if (remotes != null && remotes.Count > 0)
37 {
38 Parallel.For(0, remotes.Count, i =>
39 {
40 this._server.SendMsg(remotes[i], Message.Serialize(msg));
41 });
42 }
43 }
44 this.OnMessage?.Invoke(msg);
45 break;
46 }
47
48 }
49 }
50 catch { }
51 }
1 /// <summary>
2 /// 消息转发
3 /// </summary>
4 private void ForwardMsg()
5 {
6 try
7 {
8 var msg = MessageQueue.Dequeue();
9 if (msg != null)
10 {
11 switch (msg.Type)
12 {
13 case (byte)MessageType.Sub:
14 if (!msg.IsMuti)
15 {
16 if (!SessionDic.Exists(msg.SessionID, msg.SessionID))
17 SessionDic.Set(this._server, msg.SessionID, msg.SessionID);
18 }
19 if (!SessionDic.Exists(msg.SessionID, msg.Sender))
20 SessionDic.Set(this._server, msg.Sender, msg.SessionID);
21 break;
22 case (byte)MessageType.Unsub:
23 if (!msg.IsMuti)
24 {
25 if (SessionDic.Exists(msg.SessionID, msg.SessionID))
26 SessionDic.Del(msg.SessionID, msg.SessionID);
27 }
28 if (SessionDic.Exists(msg.SessionID, msg.Sender))
29 SessionDic.Del(msg.Sender, msg.SessionID);
30 break;
31 default:
32 var session = SessionDic.Get(msg.SessionID);
33 if (session != null)
34 {
35 var remotes = session.UserTokenDic.List.Where(b => b.ID != msg.Sender).ToList();
36 if (remotes != null && remotes.Count > 0)
37 {
38 Parallel.For(0, remotes.Count, i =>
39 {
40 this._server.SendMsg(remotes[i], Message.Serialize(msg));
41 });
42 }
43 }
44 this.OnMessage?.Invoke(msg);
45 break;
46 }
47
48 }
49 }
50 catch { }
51 }
异步tcp通信——APM.Core 解包
异步tcp通信——APM.Server 消息推送服务的实现
异步tcp通信——APM.ConsoleDemo
转载请标明本文来源:http://www.cnblogs.com/yswenli/
更多内容欢迎star作者的github:https://github.com/yswenli/APM
