码迷,mamicode.com
首页 > Web开发 > 详细

FCL 系列 - 4. FCL.Net.dll

时间:2015-01-11 19:06:12      阅读:410      评论:0      收藏:0      [点我收藏+]

标签:

 

使用.NET 的 Socket 对象,Select 模型。

自定义封包: 包长度+消息体

客户端-服务端架构,有心跳包机制。

 

 

客户端源代码:

using System;
using System.Collections.Generic;
using System.Text;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Windows.Forms;

using Wagwei.FCL.Net.Implementation;

namespace Wagwei.FCL.Net.Socket
{
    /// <summary> Socket 客户端类<remarks>
    /// <para></para>
    /// </remarks>
    /// </summary>
    [DeveloperInfo("Wagwei")]
    public class ClientOp
    {
        /* ###### 客户端基础信息 ###### */
        public string m_sServerIP { get; set; }
        public int m_nPort { get; set; }
        public string m_sUserName { get; set; }
        public string m_sUserPwd { get; set; }


        /* ###### Socket客户端 ###### */
        public System.Net.Sockets.Socket m_socketClient;


        /* ###### 接受消息 ###### */
        Thread m_threadRecv;//线程
        ManualResetEvent m_mreReceive = new ManualResetEvent(false);
        ManualResetEvent m_mreReceiveDone1 = new ManualResetEvent(false);
        ManualResetEvent m_mreReceiveDone2 = new ManualResetEvent(false);
        public int m_nReceivePollingInterval = 100;//接受消息的轮询间隔


        /* ###### 自动连接 ###### */
        Thread m_threadAutoConnect;//线程
        ManualResetEvent m_mreAutoConnect = new ManualResetEvent(false);
        public int m_nAutoConnectPollingInterval = 5000;//断线重连的轮询间隔
        int m_nKeepAliveSeconds = 20;//30;//心跳允许时间
        DateTime m_datatimeKeepAlive = DateTime.Now; //心跳时间

        /* ###### 消息事件 ###### */
        public delegate void DelegateHaveText(Text text);
        public event DelegateHaveText m_eventHaveText;

        Queue<Text> _queueHaveText;
        object _objLockQueueHaveText;

        Thread m_threadHaveText;
        ManualResetEvent m_mreHaveText = new ManualResetEvent(false);

        /* ###### 提示信息事件 ###### */
        public delegate void DelegateInfo(string info);
        public event DelegateInfo m_eventInfo;


        static ClientOp()
        {
            //Wagwei.FCL.Core.Implementation.Internal.WagLicense.Check();
        }

        public ClientOp()
        {
            _queueHaveText = new Queue<Text>();
            _objLockQueueHaveText = new object();
        }

        public ClientOp(string serverIP, int port, string userName, string userPwd)
            : this()
        {
            m_sServerIP = serverIP;
            m_nPort = port;
            m_sUserName = userName;
            m_sUserPwd = userPwd;
        }

        /// <summary>
        /// 身份验证
        /// </summary>
        /// <param name="args">ServerIp + UserName + UserPwd + Port</param>
        /// <returns></returns>
        public void Logon(params string[] args)
        {
            if (args.Length > 3)
            {
                m_sServerIP = args[0];
                m_sUserName = args[1];
                m_sUserPwd = args[2];
                m_nPort = int.Parse(args[3]);
            }

            IPAddress iPAdrr = IPAddress.Parse(m_sServerIP);
            IPEndPoint iPEndP = new IPEndPoint(iPAdrr, m_nPort);
            try
            {
                m_socketClient = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                m_socketClient.Connect(iPEndP);

                //在Winsock2中定义了许多Socket IO控制类型, 其中有一项: KeepAliveValues,控制TCP keep-alive数据包的发送以及发送间隔
                //默认值为2个小时, 当间隔时间超过这个设定后, socket就会连续发送5次连接信号, 若客户端无回应, 则此 client socket会断开
                //我们可以如下调整这个间隔时间:
                //m_SocketClient.IOControl(IOControlCode.KeepAliveValues, BitConverter.GetBytes(7200), null);//24 * 60 * 60

                this.m_socketClient.ReceiveBufferSize = SocketCommon.m_ReceiveBufferSize;
                this.m_socketClient.SendBufferSize = SocketCommon.m_SendBufferSize;

                Text text = new Text();
                text.m_sFrom = m_sUserName;
                text.m_asTo = new string[] { "server" };
                text.m_enumCmdType = CMD_SERVER.LOGON;
                text.m_sUserPwd = m_sUserPwd;
                text.m_datetimeSend = DateTime.Now;

                m_socketClient.Send(BinarySerializer.Serialize(text, SocketCommon.m_PacketLenSize, SocketCommon.m_PacketLenSize));
            }
            catch (SocketException ex)
            {
                throw new Exception(ex.Message);
            }
        }

        /// <summary>
        /// 自动连接
        /// </summary>
        public void EnabledAutoConnect(bool flg)
        {
            if (flg)
            {
                if (this.m_threadAutoConnect == null)
                {
                    this.m_threadAutoConnect = new Thread(new ThreadStart(delegate
                        {
                            while (true)
                            {
                                this.m_mreAutoConnect.WaitOne();

                                //获取一个值, 该值指示Socket是在上次Send还是Receive操作时连接到远程主机
                                //if (this.m_socketClient.Connected)
                                //{
                                //    Thread.Sleep(this.m_nReconnectPollingInterval);
                                //}
                                double nDelay = DateTime.Now.Subtract(this.m_datatimeKeepAlive).TotalSeconds;
                                if ((nDelay < this.m_nKeepAliveSeconds)
                                    && (this.m_socketClient != null))
                                {
                                    if (this.m_socketClient.Connected)
                                    {
                                        //发送心跳
                                        Text text = new Text();
                                        text.m_sFrom = this.m_sUserName;
                                        text.m_asTo = new string[] { "server" };
                                        text.m_enumCmdType = CMD_SERVER.KEEP_ALIVE;
                                        text.m_datetimeSend = DateTime.Now;
                                        if (this.SendText(text) != 1)
                                        {
                                            this.m_socketClient = null;
                                        }
                                        //
                                    }
                                    else
                                    {
                                        this.m_socketClient = null;
                                    }

                                    //
                                    Thread.Sleep(this.m_nAutoConnectPollingInterval);
                                }
                                else
                                {
                                    //string sInfo = string.Format("{0} > {1}", nDelay, this.m_nKeepAliveSeconds);
                                    //MessageBox.Show("kkk " + sInfo);
                                    //System.Diagnostics.Debug.WriteLine(sInfo);

                                    //
                                    this.m_mreReceiveDone1.Set();
                                    this.m_mreReceiveDone2.Set();

                                    this.m_datatimeKeepAlive = DateTime.Now;
                                    try
                                    {
                                        this.Logon();
                                        this.SetEventInfo("成功连接到服务器端 0x10001000");
                                    }
                                    catch (Exception ex)
                                    {
                                        string sErr = "AutoConnect: " + ex.Message;
                                        this.SetEventInfo(sErr);
                                        SocketCommon.AppendLog("receive.log", sErr);
                                    }
                                }
                            }
                        }));
                    this.m_threadAutoConnect.Start();
                    this.m_mreAutoConnect.Set();
                }
            }
            else
            {
                this.m_mreAutoConnect.Reset();
                if (this.m_threadAutoConnect != null)
                {
                    this.m_threadAutoConnect.Abort();
                    this.m_threadAutoConnect = null;
                }
            }
        }

        /// <summary>
        /// 启动或停止接受消息
        /// </summary>
        public void EnabledReceive(bool flag)
        {
            if (flag)
            {
                if (this.m_threadRecv != null)
                {
                    if (!this.m_threadRecv.IsAlive)
                    {
                        goto END;
                    }
                    else
                    {
                        this.m_mreReceive.Set();
                        return;
                    }
                }
            END:
                this.m_mreReceive.Reset();
                if (this.m_threadRecv != null)
                {
                    m_threadRecv.Abort();
                }
                m_threadRecv = null;
                m_threadRecv = new Thread(ReceiveText);
                m_threadRecv.Start();
                this.m_mreReceive.Set();
                return;
            }
            else
            {
                this.m_mreReceive.Reset();
            }
        }

        /// <summary>
        /// 启动或停止接受消息通知
        /// </summary>
        /// <param name="flag"></param>
        public void EnabledHaveText(bool flag)
        {
            if (flag)
            {
                if (this.m_threadHaveText != null)
                {
                    if (!this.m_threadHaveText.IsAlive)
                    {
                        goto END;
                    }
                    else
                    {
                        this.m_mreHaveText.Set();
                        return;
                    }
                }
            END:
                this.m_mreHaveText.Reset();
                if (this.m_threadHaveText != null)
                {
                    this.m_threadHaveText.Abort();
                }
                m_threadHaveText = null;
                m_threadHaveText = new Thread(HaveText);
                m_threadHaveText.Start();
                this.m_mreHaveText.Set();
                return;
            }
            else
            {
                this.m_mreHaveText.Reset();
            }
        }

        void HaveText()
        {
            while (true)
            {
                this.m_mreHaveText.WaitOne();

                Text? text = null;
                lock (_objLockQueueHaveText)
                {
                    if (_queueHaveText.Count > 0)
                    {
                        text = _queueHaveText.Dequeue();
                    }
                }
                if (text == null)
                {
                    Thread.Sleep(5);
                }
                else
                {
                    if (this.m_eventHaveText != null)
                    {
                        this.m_eventHaveText((Text)text);
                    }
                }
            }
        }

        /// <summary>
        /// 异步发送消息, 发送速度不可太快
        /// </summary>
        public void SendTextAsync(Text text)
        {
            if (!this.m_socketClient.Connected)
            {
                throw new Exception("SendTextAsync: 与服务器已断开连接");
            }
            byte[] byteData = BinarySerializer.Serialize(text, SocketCommon.m_PacketLenSize, SocketCommon.m_PacketLenSize);
            try
            {
                this.m_socketClient.BeginSend(byteData,
                    0,
                    byteData.Length,
                    0,
                    new AsyncCallback(SendCallback),
                    this.m_socketClient);
            }
            catch (Exception ex)
            {
                string sErr = "SendTextAsync: " + ex.Message;
                this.SetEventInfo(sErr);
                SocketCommon.AppendLog("exceptions.log", sErr);
            }
        }

        /// <summary>
        /// 同步发送消息, 发送速度不可太快
        /// <remarks>
        /// 返回值: -1 未连接, 0 暂时无法发送, 1 发送成功, 2 发送失败
        /// </remarks>
        /// </summary>
        public int SendText(Text text)
        {
            if (this.m_socketClient != null)
            {
                if (this.m_socketClient.Connected)
                {
                    if (this.m_socketClient.Poll(1000, SelectMode.SelectWrite))
                    {
                        try
                        {
                            m_socketClient.Send(BinarySerializer.Serialize(
                                                                            text,
                                                                            SocketCommon.m_PacketLenSize,
                                                                            SocketCommon.m_PacketLenSize));
                            //返回值, 已发送到 Socket 的字节数。
                        }
                        catch (Exception ex)
                        {
                            string sErr = "SendText: " + ex.Message;
                            this.SetEventInfo(sErr);
                            SocketCommon.AppendLog("exceptions.log", sErr);
                            return 2;
                        }
                        return 1;
                    }
                }
                return 0;
            }
            return -1;
        }

        void SendCallback(IAsyncResult ar)
        {
            try
            {
                System.Net.Sockets.Socket client = (System.Net.Sockets.Socket)ar.AsyncState;
                client.EndSend(ar);
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message);
            }
        }

        void ReceiveText()
        {
            while (true)
            {
                this.m_mreReceive.WaitOne();
                if (this.m_socketClient != null)
                {
                    try
                    {
                        if (m_socketClient.Connected)
                        {
                            if (m_socketClient.Poll(1000, SelectMode.SelectRead))
                            {
                                //注: Receive时返回的字节数,不一定等于要求读取的字节数。
                                //系统只是在数据包到达时,尽可能的读取要求的字节数

                                RecvState state = new RecvState();
                                state.m_socketWork = m_socketClient;

                                m_mreReceiveDone1.Reset();
                                m_mreReceiveDone2.Reset();

                                m_socketClient.BeginReceive(state.m_arraySize,
                                                            0,
                                                            SocketCommon.m_PacketLenSize,
                                                            SocketFlags.None,
                                                            new AsyncCallback(ReceiveCallBackSize), state);

                                m_mreReceiveDone1.WaitOne();

                                state.m_nLenAllData = BitConverter.ToInt32(state.m_arraySize, 0);
                                m_socketClient.BeginReceive(state.m_arrayData,
                                                            0,
                                                            state.m_nLenAllData,
                                                            SocketFlags.None,
                                                            new AsyncCallback(ReceiveCallbackData),
                                                            state);

                                m_mreReceiveDone2.WaitOne();

                                /* @@@@@@ 有新消息 @@@@@@ */
                                byte[] arrayRealData = new byte[state.m_nLenAllData];
                                for (int i = 0; i < state.m_nLenAllData; i++)
                                {
                                    arrayRealData[i] = state.m_arrayData[i];
                                }
                                Text text = (Text)BinarySerializer.Deserialize(arrayRealData);
                                if (text.m_enumCmdType.Equals(CMD_SERVER.TEXT))
                                {

                                    //--------------------------------------------------------
                                    /*被NEW替换*/
                                    /*
                                    //OLD
                                    if (this.m_eventHaveText != null)
                                    {
                                        this.m_eventHaveText(text);//事件的阻塞会导致下次心跳时间的迟来
                                    }
                                    */

                                    //NEW
                                    if (this.m_eventHaveText != null)
                                    {
                                        lock (_objLockQueueHaveText)
                                        {
                                            _queueHaveText.Enqueue(text);
                                        }
                                    }
                                    //--------------------------------------------------------
                                }
                                else if (text.m_enumCmdType.Equals(CMD_SERVER.KEEP_ALIVE))
                                {
                                    this.m_datatimeKeepAlive = DateTime.Now;
                                    //text.m_datetimeSend;如果服务器时间与本机时间不一致则有问题
                                }
                                /* @@@@@@@@@@@@@@@@@@@@@@ */
                            }
                        }
                        else
                        {
                            //不可设定异常
                            //bug
                            //throw new Exception("连接已断开");

                            //
                            System.Threading.Thread.Sleep(this.m_nReceivePollingInterval);
                            continue;
                        }
                    }
                    catch (Exception ex)
                    {
                        string sErr = "ReceiveText: " + ex.Message;
                        this.SetEventInfo(sErr);
                        SocketCommon.AppendLog("receive.log", sErr);

                        //break;//bug 不可退出循环
                        continue;
                    }
                }
                else
                {
                    System.Threading.Thread.Sleep(this.m_nReceivePollingInterval);
                    continue;
                }
            }
        }

        private void ReceiveCallBackSize(IAsyncResult ar)
        {
            RecvState state = (RecvState)ar.AsyncState;
            System.Net.Sockets.Socket client = state.m_socketWork;
            int bytesRead = client.EndReceive(ar);
            if (bytesRead > 0)
            {
                state.m_nLenSize += bytesRead;
                if (state.m_nLenSize == SocketCommon.m_PacketLenSize)
                {
                    m_mreReceiveDone1.Set();
                    state.m_nLenAllData = BitConverter.ToInt32(state.m_arraySize, 0);
                }
                else
                {
                    m_socketClient.BeginReceive(state.m_arraySize,
                                                state.m_nLenSize,
                                                SocketCommon.m_PacketLenSize - state.m_nLenSize,
                                                SocketFlags.None,
                                                new AsyncCallback(ReceiveCallbackData),
                                                state);
                }
            }
            else if (bytesRead == 0)
            {
                //m_mreReceiveDone1.Set();
                string sErr = "ReceiveCallBackSize: the peer has performed an orderly shutdown";
                //throw new Exception(sErr);
                this.SetEventInfo(sErr);
                SocketCommon.AppendLog("exceptions.log", sErr);
            }
            else
            {
                //m_mreReceiveDone1.Set();
                string sErr = "ReceiveCallBackSize: an error occurred";
                //throw new Exception(sErr);
                this.SetEventInfo(sErr);
                SocketCommon.AppendLog("exceptions.log", sErr);
            }
        }

        private void ReceiveCallbackData(IAsyncResult ar)
        {
            try
            {
                RecvState state = (RecvState)ar.AsyncState;
                System.Net.Sockets.Socket client = state.m_socketWork;
                int bytesRead = client.EndReceive(ar);
                if (bytesRead > 0)
                {
                    state.m_nlenData += bytesRead;
                    if (state.m_nlenData != state.m_nLenAllData)
                    {
                        client.BeginReceive(state.m_arrayData,
                                            state.m_nlenData,
                                            state.m_nLenAllData - state.m_nlenData,
                                            SocketFlags.None,
                                            new AsyncCallback(ReceiveCallbackData),
                                            state);
                    }
                    else
                    {
                        m_mreReceiveDone2.Set();
                    }
                }
                else if (bytesRead == 0)
                {
                    //m_mreReceiveDone2.Set();
                    string sErr = "ReceiveCallbackData: the peer has performed an orderly shutdown";
                    //throw new Exception(sErr);
                    this.SetEventInfo(sErr);
                    SocketCommon.AppendLog("exceptions.log", sErr);
                }
                else
                {
                    //m_mreReceiveDone2.Set();
                    string sErr = "ReceiveCallbackData: an error occurred";
                    //throw new Exception(sErr);
                    this.SetEventInfo(sErr);
                    SocketCommon.AppendLog("exceptions.log", sErr);
                }
            }
            catch (Exception e)
            {
                //m_MRE_ReceiveDone2.Set();
                string sErr = "ReceiveCallbackData: " + e.Message;
                //throw new Exception(sErr);
                this.SetEventInfo(sErr);
                SocketCommon.AppendLog("exceptions.log", sErr);
            }
        }

        /// <summary>
        /// 错误通知
        /// </summary>
        private void SetEventInfo(string info)
        {
            if (this.m_eventInfo != null)
            {
                this.m_eventInfo(info);
            }
        }

        /// <summary>
        /// 关闭
        /// </summary>
        public void Close()
        {
            this.EnabledAutoConnect(false);

            this.m_mreReceive.Reset();
            if (this.m_threadRecv != null)
            {
                this.m_threadRecv.Abort();
            }

            this.m_mreHaveText.Reset();
            if (this.m_threadHaveText != null)
            {
                this.m_threadHaveText.Abort();
            }

            if (this.m_socketClient != null)
            {
                this.m_socketClient.Close();
            }
        }
    }


}

 

服务端源代码:

using System;
using System.Collections.Generic;
using System.Text;
using System.Net.Sockets;
using System.Net;
using System.Threading;
using System.Windows.Forms;

using Wagwei.FCL.Net.Implementation;

namespace Wagwei.FCL.Net.Socket
{
    //特别说明:
    //(1) 当客户端连接过多, 消息转发量过大时候, 会导致传输的数据包严重延时, 这也导致了心跳异常, 此时禁用心跳则不会持续报错
    //(2) 


    //高并发指标: 1. 并发连接数 2. 每秒可创建多少连接 3. 其他

    /// <summary>Socket服务端类</summary>
    [DeveloperInfo("Wagwei")]
    public class ServerOp
    {
        public int m_nListeningInterval = 1200;//监听时间间隔, 建议大于500ms
        public Dictionary<string, System.Net.Sockets.Socket> m_dicSocket;

        Thread m_threadListener; //监听线程
        Thread m_threadReceiver; //接受(直接转发)线程
        Thread m_threadSend;     //间接转发线程

        ManualResetEvent m_mreListener = new ManualResetEvent(false);
        ManualResetEvent m_mreReceiver = new ManualResetEvent(false);
        ManualResetEvent m_mreSend = new ManualResetEvent(false);

        System.Net.Sockets.Socket m_socketListener; //监听Socket

        List<Text> m_listText;//未处理的信息

        //List<User> m_listUser;//在线所有客户端
        public delegate void DelegateClientAccount (string clientName,string type);
        public event DelegateClientAccount m_eventClientAccount;

        // ****** 信息(to server) ******
        public delegate void DelegateServerHaveText(Text text);
        public event DelegateServerHaveText m_eventServerHaveText;

        Queue<Text> m_queueHaveText;
        object m_objLockQueueHaveText;

        Thread m_threadHaveText;
        ManualResetEvent m_mreHaveText = new ManualResetEvent(false);

        // ****** 提示 ******
        public delegate void DelegateInfo(string info);
        public event DelegateInfo m_eventInfo;

        // ****** 验证 ******
        public delegate bool DelegateVerify();
        public event DelegateVerify m_eventVerify;

        static ServerOp()
        {
            //Wagwei.FCL.Core.Implementation.Internal.WagLicense.Check();
        }

        public ServerOp()
        {
            m_queueHaveText = new Queue<Text>();
            m_objLockQueueHaveText = new object();
        }

        /// <summary>初始化</summary>
        /// <param name="ip">IP地址</param>
        /// <param name="port">端口号</param>
        public void Init(string ip, int port)
        {
            IPEndPoint iPEndP = new IPEndPoint(IPAddress.Parse(ip), port);//port为0即为随机端口
            this.m_socketListener = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            this.m_socketListener.Bind(iPEndP);
            this.m_socketListener.Listen(120);//挂起连接数

            this.m_socketListener.ReceiveBufferSize = SocketCommon.m_ReceiveBufferSize;
            this.m_socketListener.SendBufferSize = SocketCommon.m_SendBufferSize;

            this.m_dicSocket = new Dictionary<string, System.Net.Sockets.Socket>(0);
            this.m_listText = new List<Text>();
            //this.m_listUser = new List<User>();

            //监听
            this.m_threadListener = new Thread(Listener);
            this.m_threadListener.Name = "thread_fcl_net_socket_listener";
            this.m_threadListener.Start();

            //接受消息
            this.m_threadReceiver = new Thread(ReceiveText);
            this.m_threadReceiver.Name = "thread_fcl_net_socket_receiver";
            this.m_threadReceiver.Start();

            //转发消息
            this.m_threadSend = new Thread(SwitchSendText);
            this.m_threadSend.Name = "thread_fcl_net_socket_send";
            this.m_threadSend.Start();
        }

        public void EnabledListener(bool enable)
        {
            if (enable)
            {
                this.m_mreListener.Set();
            }
            else
            {
                this.m_mreListener.Reset();
            }
        }

        public void EnabledReceiver(bool enabled)
        {
            if (enabled)
            {
                this.m_mreReceiver.Set();
            }
            else
            {
                this.m_mreReceiver.Reset();
            }
        }

        public void EnabledSend(bool enabled)
        {
            if (enabled)
            {
                this.m_mreSend.Set();
            }
            else
            {
                this.m_mreSend.Reset();
            }
        }

        #region to server

        /// <summary>启动或停止接受消息通知</summary>
        /// <param name="flag"></param>
        public void EnabledHaveText(bool flag)
        {
            if (flag)
            {
                if (this.m_threadHaveText != null)
                {
                    if (!this.m_threadHaveText.IsAlive)
                    {
                        goto END;
                    }
                    else
                    {
                        this.m_mreHaveText.Set();
                        return;
                    }
                }
            END:
                this.m_mreHaveText.Reset();
                if (this.m_threadHaveText != null)
                {
                    this.m_threadHaveText.Abort();
                }
                m_threadHaveText = null;
                m_threadHaveText = new Thread(HaveText);
                m_threadHaveText.Start();
                this.m_mreHaveText.Set();
                return;
            }
            else
            {
                this.m_mreHaveText.Reset();
            }
        }

        void HaveText()
        {
            while (true)
            {
                this.m_mreHaveText.WaitOne();

                Text? text = null;
                lock (m_objLockQueueHaveText)
                {
                    if (m_queueHaveText.Count > 0)
                    {
                        text = m_queueHaveText.Dequeue();
                    }
                }
                if (text == null)
                {
                    Thread.Sleep(5);
                }
                else
                {
                    if (this.m_eventServerHaveText != null)
                    {
                        this.m_eventServerHaveText((Text)text);
                    }
                }
            }
        }

        #endregion

        /// <summary>监听连接</summary>
        void Listener()
        {
            //System.Net.Sockets.Socket currentSocket = null;
            while (true)
            {
                this.m_mreListener.WaitOne();

                try
                {
                    if (this.m_socketListener.Poll(1000, SelectMode.SelectRead))
                    {
                        System.Net.Sockets.Socket client = this.m_socketListener.Accept();//接受一个请求

                        //currentSocket = client;

                        client.SendBufferSize = SocketCommon.m_SendBufferSize;
                        client.ReceiveBufferSize = SocketCommon.m_ReceiveBufferSize;

                        if (!client.Poll(1000, SelectMode.SelectRead))
                        {
                            client.Disconnect(false);
                            client.Close();
                            goto END;
                        }

                        byte[] byteData = new byte[SocketCommon.m_PacketSize];
                        byte[] byteSize = new byte[SocketCommon.m_PacketLenSize];
                        int lenSize = client.Receive(byteSize, SocketCommon.m_PacketLenSize, SocketFlags.None);
                        int lenData = BitConverter.ToInt32(byteSize, 0);
                        client.Receive(byteData, lenData, SocketFlags.None);
                        byte[] arrayRealData = new byte[lenData];

                        for (int i = 0; i < lenData; i++)
                        {
                            arrayRealData[i] = byteData[i];
                        }

                        Text text = (Text)BinarySerializer.Deserialize(arrayRealData);

                        //身份验证
                        if (text.m_enumCmdType.Equals(CMD_SERVER.LOGON)
                            && text.m_sFrom != "server"
                            && (m_eventVerify == null || m_eventVerify()))
                        {
                        }
                        else
                        {
                            client.Disconnect(false);
                            client.Close();
                            goto END;
                        }

                        lock (this.m_dicSocket)
                        {
                            //重复登录
                            if (this.m_dicSocket.ContainsKey(text.m_sFrom))
                            {
                                if (this.m_dicSocket[text.m_sFrom].Poll(1000, SelectMode.SelectWrite))
                                {
                                    Text textToClient = new Text();
                                    textToClient.m_sFrom = "server";
                                    textToClient.m_asTo = new string[] { text.m_sFrom };
                                    textToClient.m_enumCmdType = CMD_LOGON_RESULT.FORCED_OFFLINE;
                                    textToClient.m_objContent = "被迫下线";
                                    this.m_dicSocket[text.m_sFrom].Send(BinarySerializer.Serialize(textToClient,
                                        SocketCommon.m_PacketLenSize,
                                        SocketCommon.m_PacketLenSize));
                                }
                                this.m_dicSocket[text.m_sFrom].Disconnect(false);
                                this.m_dicSocket[text.m_sFrom].Close();
                                this.m_dicSocket.Remove(text.m_sFrom);

                                //User userFound = this.m_listUser.Find(delegate(User user) { return user.m_sUserName == text.m_sFrom; });
                                //if (string.IsNullOrEmpty(userFound.m_sUserName))//struct为值类型, 不能与null比较
                                //{
                                //    this.m_listUser.Remove(userFound);
                                //}
                            }

                            //非重复登录
                            this.m_dicSocket.Add(text.m_sFrom, client);

                            if (m_eventClientAccount != null)
                            {
                                m_eventClientAccount(text.m_sFrom, "logon");
                            }

                            //立即发送心跳给客户端
                            //... 或者客户端主动发送心跳
                        }
                        //用户登记
                        //this.m_listUser.Add(new User(text.m_sFrom, DateTime.Now));

                        //服务器发送一条成功连接信息给客户端
                        //if (client.Poll(1000, SelectMode.SelectWrite))
                        //{
                        //    Text textToClient = new Text();
                        //    textToClient._from = "server";
                        //    textToClient._to = new string[] { text._from };
                        //    textToClient._cmd_type = CMD_LOGON_RESULT.SUCCESS;
                        //    textToClient._content = "成功连接到服务器";
                        //    client.Send(BinarySerializer.Serialize(textToClient, Common.m_PacketLenSize, Common.m_PacketLenSize));
                        //}

                        continue;
                    }
                    else
                    {
                        System.Threading.Thread.Sleep(m_nListeningInterval);
                        continue;
                    }
                }
                catch (Exception ex)
                {
                    string s = ex.ToString();
                    this.SetEventOnRunError(ex.Message);
                    SocketCommon.AppendLog("listener.log", ex.ToString());
                    goto END;
                }
            END:
                System.Threading.Thread.Sleep(m_nListeningInterval);
            }
        }

        /// <summary>接受消息</summary>
        void ReceiveText()
        {
            while (true)
            {
                this.m_mreReceiver.WaitOne();

                System.Net.Sockets.Socket socketCurrent = null;

                try
                {
                    List<System.Net.Sockets.Socket> tmp = new List<System.Net.Sockets.Socket>();

                    //IEnumerator enumerator = this.dicSocket.Values.GetEnumerator();
                    //while (enumerator.MoveNext()){tmp.Add((Socket)enumerator.Current);}

                    lock (this.m_dicSocket)
                    {
                        foreach (System.Net.Sockets.Socket s in this.m_dicSocket.Values)
                        {
                            tmp.Add(s);
                        }
                    }

                    if (tmp.Count > 0)
                    {
                        System.Net.Sockets.Socket.Select(tmp, null, null, 1000);//select

                        foreach (System.Net.Sockets.Socket socketOne in tmp)
                        {
                            socketCurrent = socketOne;

                            byte[] bytesData = new byte[SocketCommon.m_PacketSize];
                            byte[] bytesSize = new byte[SocketCommon.m_PacketLenSize];
                            int DataSize;
                            int recvSizeRtn = 0;
                            recvSizeRtn += socketOne.Receive(bytesSize, SocketCommon.m_PacketLenSize, SocketFlags.None);

                            if (recvSizeRtn > 0)
                            {
                                while (recvSizeRtn != SocketCommon.m_PacketLenSize)
                                {
                                    recvSizeRtn += socketOne.Receive(bytesSize,
                                        recvSizeRtn,
                                        SocketCommon.m_PacketLenSize - recvSizeRtn,
                                        SocketFlags.None);
                                }
                                DataSize = BitConverter.ToInt32(bytesSize, 0);
                            }
                            else if (recvSizeRtn == 0)
                            {
                                throw new Exception("the peer has performed an orderly shutdown");
                            }
                            else
                            {
                                throw new Exception("an error occurred");
                            }

                            int recvDataRtn = 0;
                            recvDataRtn += socketOne.Receive(bytesData, DataSize, SocketFlags.None);

                            if (recvDataRtn > 0)
                            {
                                while (recvDataRtn != DataSize)
                                {
                                    recvDataRtn += socketOne.Receive(bytesData,
                                        recvDataRtn,
                                        DataSize - recvDataRtn,
                                        SocketFlags.None);
                                }

                                byte[] aRealData = new byte[DataSize];

                                for (int i = 0; i < DataSize; i++)
                                {
                                    aRealData[i] = bytesData[i];
                                }

                                Text text = (Text)BinarySerializer.Deserialize(aRealData);

                                //当即转发, 否则保存到listText中
                                List<System.Net.Sockets.Socket> listSocket = new List<System.Net.Sockets.Socket>();
                                int count = text.m_asTo.Length;

                                lock (this.m_dicSocket)
                                {
                                    for (int i = 0; i < count; i++)
                                    {
                                        if (this.m_dicSocket.ContainsKey(text.m_asTo[i]))
                                        {
                                            listSocket.Add(this.m_dicSocket[text.m_asTo[i]]);
                                        }
                                        else
                                        {
                                            //如果接收方为服务器本身则另做处理
                                            if (text.m_asTo[i] == "server")
                                            {
                                                if (this.m_eventServerHaveText != null)
                                                {
                                                    //this.m_eventServerHaveText(text);

                                                    lock (m_objLockQueueHaveText)
                                                    {
                                                        m_queueHaveText.Enqueue(text);
                                                    }
                                                }//end if

                                                if (text.m_enumCmdType.Equals(CMD_SERVER.TEXT))
                                                {

                                                }//end if
                                                else if (text.m_enumCmdType.Equals(CMD_SERVER.KEEP_ALIVE))
                                                {
                                                    //if (text.m_sContentRemark == "wag")
                                                    //{
                                                    //    MessageBox.Show("wag");
                                                    //}

                                                    Text textKeepAlive = new Text();
                                                    textKeepAlive.m_sFrom = "server";
                                                    textKeepAlive.m_asTo = new string[] { text.m_sFrom };
                                                    textKeepAlive.m_enumCmdType = CMD_SERVER.KEEP_ALIVE;
                                                    textKeepAlive.m_datetimeSend = DateTime.Now;

                                                    socketOne.Send(BinarySerializer.Serialize(textKeepAlive,
                                                        SocketCommon.m_PacketLenSize,
                                                        SocketCommon.m_PacketLenSize));
                                                }//end else if
                                            }//end if
                                            else
                                            {
                                                //flg 2014-02-23 19:31
                                                //lock (this.m_listText)
                                                //{
                                                //    Text _text = new Text();
                                                //    _text.m_sFrom = text.m_sFrom;
                                                //    _text.m_sFromIp = text.m_sFromIp;
                                                //    _text.m_asTo = new string[] { text.m_asTo[i] };
                                                //    _text.m_datetimeSend = text.m_datetimeSend;
                                                //    _text.m_typeContent = text.m_typeContent;
                                                //    _text.m_objContent = text.m_objContent;
                                                //    _text.m_sUserPwd = text.m_sUserPwd;
                                                //    _text.m_enumCmdType = text.m_enumCmdType;
                                                //    this.m_listText.Add(_text);
                                                //}
                                            }//end else

                                        }//end else
                                    }//end for
                                }//end lock

                                for (int i = 0; i < listSocket.Count; i++)
                                {
                                    Text _text = new Text();

                                    if (listSocket[i].Poll(1000, SelectMode.SelectWrite))
                                    {
                                        _text.m_sFrom = text.m_sFrom;
                                        _text.m_sFromIp = text.m_sFromIp;
                                        _text.m_asTo = new string[] { text.m_asTo[i] };
                                        _text.m_datetimeSend = text.m_datetimeSend;
                                        _text.m_typeContent = text.m_typeContent;
                                        _text.m_objContent = text.m_objContent;
                                        _text.m_sContentRemark = text.m_sContentRemark;
                                        _text.m_sUserPwd = text.m_sUserPwd;
                                        _text.m_enumCmdType = text.m_enumCmdType;

                                        listSocket[i].Send(BinarySerializer.Serialize(_text,
                                            SocketCommon.m_PacketLenSize,
                                            SocketCommon.m_PacketLenSize));
                                        continue;
                                    }//end if
                                }//end for

                            }
                            else if (recvDataRtn == 0)
                            {
                                throw new Exception("the peer has performed an orderly shutdown");
                            }
                            else
                            {
                                throw new Exception("an error occurred");
                            }

                        }//end foreach
                    }
                    else
                    {
                        System.Threading.Thread.Sleep(5);
                        continue;
                    }
                }
                catch (Exception ex)
                {
                    string s = ex.ToString();
                    this.SetEventOnRunError("ReceiveText: " + ex.ToString());
                    this.RemoveAbnormal(socketCurrent);
                    SocketCommon.AppendLog("receive.log", ex.ToString());

                    continue;
                }
            }
        }

        public void AppendOneText(Text text)
        {
            lock (this.m_listText)
            {
                this.m_listText.Add(text);
            }
        }

        /// <summary>转发消息</summary>
        void SwitchSendText()
        {
            while (true)
            {
                this.m_mreSend.WaitOne();

                System.Net.Sockets.Socket currentSocket = null;

                try
                {
                    if (this.m_listText.Count > 0)
                    {
                        lock (this.m_listText)
                        {
                            List<Text> listRemove = new List<Text>();
                            foreach (Text text in this.m_listText)
                            {
                                int count = text.m_asTo.Length;
                                System.Net.Sockets.Socket socket = null;
                                for (int i = 0; i < count; i++)
                                {
                                    this.m_dicSocket.TryGetValue(text.m_asTo[i], out socket);
                                    if (socket != null)
                                    {
                                        currentSocket = socket;

                                        if (socket.Poll(2000, SelectMode.SelectWrite))
                                        {
                                            Text _text = new Text();
                                            _text.m_sFrom = text.m_sFrom;
                                            _text.m_sFromIp = text.m_sFromIp;
                                            _text.m_asTo = new string[] { text.m_asTo[i] };
                                            _text.m_datetimeSend = text.m_datetimeSend;
                                            _text.m_typeContent = text.m_typeContent;
                                            _text.m_objContent = text.m_objContent;
                                            _text.m_sContentRemark = text.m_sContentRemark;
                                            _text.m_sUserPwd = text.m_sUserPwd;
                                            _text.m_enumCmdType = text.m_enumCmdType;

                                            socket.Send(BinarySerializer.Serialize(_text,
                                                SocketCommon.m_PacketLenSize,
                                                SocketCommon.m_PacketLenSize));
                                        }
                                    }
                                }
                                listRemove.Add(text);
                            }
                            foreach (Text t in listRemove)
                            {
                                m_listText.Remove(t);
                            }
                            listRemove.Clear();
                        }

                    }
                    else
                    {
                        System.Threading.Thread.Sleep(5);
                        continue;
                    }
                }
                catch (Exception ex)
                {
                    string s = ex.ToString();
                    this.SetEventOnRunError(ex.Message);
                    this.RemoveAbnormal(currentSocket);
                    SocketCommon.AppendLog("send.log", s);
                    continue;
                }
            }
        }



        /// <summary>
        /// 移除不正常的socket对象, 并且将对应的用户移除
        /// </summary>
        private void RemoveAbnormal(System.Net.Sockets.Socket currentSocket)
        {
            //lock (this.m_dicSocket)
            //{
            //    foreach (KeyValuePair<string, System.Net.Sockets.Socket> item in this.m_dicSocket)
            //    {
            //        if (item.Value == currentSocket)
            //        {
            //            this.m_dicSocket.Remove(item.Key);
            //            User userFind = this.m_listUser.Find(delegate(User user) { return user.m_sUserName == item.Key; });
            //            if (userFind != null)
            //            {
            //                this.m_listUser.Remove(userFind);
            //            }
            //            break;
            //        }
            //    }
            //}

            lock (this.m_dicSocket)
            {
                List<string> listKey = new List<string>();
                foreach (KeyValuePair<string, System.Net.Sockets.Socket> item in this.m_dicSocket)
                {
                    if (item.Value == currentSocket)
                    {
                        listKey.Add(item.Key);
                    }
                }
                foreach (string key in listKey)
                {
                    this.m_dicSocket.Remove(key);
                    //User userFind = this.m_listUser.Find(delegate(User user) { return user.m_sUserName == key; });
                    //if (userFind != null)
                    //{
                    //    this.m_listUser.Remove(userFind);
                    //}

                    if (m_eventClientAccount != null)
                    {
                        m_eventClientAccount(key,"logoff");
                    }
                }
            }
        }

        /// <summary>
        /// 错误通知
        /// </summary>
        private void SetEventOnRunError(string error)
        {
            if (this.m_eventInfo != null)
            {
                this.m_eventInfo(error);
            }
        }

        /// <summary>
        /// 关闭
        /// </summary>
        public void Close()
        {
            this.m_mreListener.Reset();
            if (this.m_threadListener != null)
            {
                this.m_threadListener.Abort();
            }

            this.m_mreReceiver.Reset();
            if (this.m_threadReceiver != null)
            {
                this.m_threadReceiver.Abort();
            }

            this.m_mreSend.Reset();
            if (this.m_threadSend != null)
            {
                this.m_threadSend.Abort();
            }

            this.m_mreHaveText.Reset();
            if (this.m_threadHaveText != null)
            {
                this.m_threadHaveText.Abort();
            }

            if (this.m_socketListener != null)
            {
                this.m_socketListener.Close();
            }

            this.m_listText.Clear();
            //this.m_listUser.Clear();
            this.m_dicSocket.Clear();
        }



    }
}

 

 

 

 

 

 

技术分享

 

FCL 系列 - 4. FCL.Net.dll

标签:

原文地址:http://www.cnblogs.com/wagwei/p/4216846.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!