112 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			C#
		
	
	
			
		
		
	
	
			112 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			C#
		
	
	
using System;
 | 
						|
using DotNetty.Buffers;
 | 
						|
using DotNetty.Common;
 | 
						|
using dotNetty_kcp.thread;
 | 
						|
 | 
						|
namespace dotNetty_kcp
 | 
						|
{
 | 
						|
    public class ReadTask : ITask
 | 
						|
    {
 | 
						|
        private Ukcp kcp;
 | 
						|
 | 
						|
        private static readonly ThreadLocalPool<ReadTask> RECYCLER =
 | 
						|
            new ThreadLocalPool<ReadTask>(handle => new ReadTask(handle));
 | 
						|
 | 
						|
        private readonly ThreadLocalPool.Handle recyclerHandle;
 | 
						|
 | 
						|
        private ReadTask(ThreadLocalPool.Handle recyclerHandle)
 | 
						|
        {
 | 
						|
            this.recyclerHandle = recyclerHandle;
 | 
						|
        }
 | 
						|
 | 
						|
        public static ReadTask New(Ukcp kcp)
 | 
						|
        {
 | 
						|
            ReadTask readTask = RECYCLER.Take();
 | 
						|
            readTask.kcp = kcp;
 | 
						|
            return readTask;
 | 
						|
        }
 | 
						|
 | 
						|
        public void execute()
 | 
						|
        {
 | 
						|
            CodecOutputList<IByteBuffer> bufList = null;
 | 
						|
            try {
 | 
						|
                //Thread.sleep(1000);
 | 
						|
                //查看连接状态
 | 
						|
                if (!kcp.isActive()) {
 | 
						|
                    return;
 | 
						|
                }
 | 
						|
                bool hasKcpMessage = false;
 | 
						|
                long current = kcp.currentMs();
 | 
						|
                var readQueue = kcp.ReadQueue;
 | 
						|
                IByteBuffer byteBuf = null;
 | 
						|
                for (;;)
 | 
						|
                {
 | 
						|
                    if (!readQueue.TryDequeue(out byteBuf))
 | 
						|
                    {
 | 
						|
                        break;
 | 
						|
                    }
 | 
						|
                    hasKcpMessage = true;
 | 
						|
                    kcp.input(byteBuf, current);
 | 
						|
                    byteBuf.Release();
 | 
						|
                }
 | 
						|
                if (!hasKcpMessage) {
 | 
						|
                    return;
 | 
						|
                }
 | 
						|
                if (kcp.isStream()) {
 | 
						|
                    while (kcp.canRecv()) {
 | 
						|
                        if (bufList == null) {
 | 
						|
                            bufList = CodecOutputList<IByteBuffer>.NewInstance();
 | 
						|
                        }
 | 
						|
                        kcp.receive(bufList);
 | 
						|
                    }
 | 
						|
                    int size = bufList.Count;
 | 
						|
                    for (int i = 0; i < size; i++)
 | 
						|
                    {
 | 
						|
                        byteBuf = bufList[i];
 | 
						|
                        readBytebuf(byteBuf,current);
 | 
						|
                    }
 | 
						|
                } else {
 | 
						|
                    while (kcp.canRecv()) {
 | 
						|
                        IByteBuffer recvBuf = kcp.mergeReceive();
 | 
						|
                        readBytebuf(recvBuf,current);
 | 
						|
                    }
 | 
						|
                }
 | 
						|
                //判断写事件
 | 
						|
                if (!kcp.WriteQueue.IsEmpty&&kcp.canSend(false)) {
 | 
						|
                    kcp.notifyWriteEvent();
 | 
						|
                }
 | 
						|
            } catch (Exception e) {
 | 
						|
                kcp.KcpListener.handleException(e,kcp);
 | 
						|
                Console.WriteLine(e);
 | 
						|
            } finally {
 | 
						|
                release();
 | 
						|
                bufList?.Return();
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
 | 
						|
        private void readBytebuf(IByteBuffer buf,long current)
 | 
						|
        {
 | 
						|
            kcp.LastRecieveTime = current;
 | 
						|
            try
 | 
						|
            {
 | 
						|
                kcp.getKcpListener().handleReceive(buf, kcp);
 | 
						|
            }
 | 
						|
            catch (Exception throwable)
 | 
						|
            {
 | 
						|
                kcp.getKcpListener().handleException(throwable, kcp);
 | 
						|
            }
 | 
						|
            finally
 | 
						|
            {
 | 
						|
                buf.Release();
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        private void release()
 | 
						|
        {
 | 
						|
            kcp.ReadProcessing.Set(false);
 | 
						|
            kcp = null;
 | 
						|
            recyclerHandle.Release(this);
 | 
						|
        }
 | 
						|
    }
 | 
						|
} |