Notice
Recent Posts
Archives
Today
Total
«   2024/06   »
1
2 3 4 5 6 7 8
9 10 11 12 13 14 15
16 17 18 19 20 21 22
23 24 25 26 27 28 29
30
Recent Comments
관리 메뉴

우당탕탕 개발일지

서버에서 c# 프로그램 실행하기(3) : 데이터 송수신 본문

Server

서버에서 c# 프로그램 실행하기(3) : 데이터 송수신

devchop 2023. 9. 5. 22:17

 

 

서버와 클라이언트 연결은 성공했다 (Connection) 

이제 해야 할 것은 세션을 생성하여 데이터를 주고받는 것, 이 데이터를 타입별로 받아서 처리하는 작업이 필요하다.

abstract class 인 Session.cs 에서 데이터를 전송/수신하는 툴을 만든 다음, Session을 상속받은 자식 클래스에서는 딱 ArraySegment<byte> 형식의 데이터를 받아볼수 있도록 하였다 ( OnReceivePacket 함수)

 

Session

더보기

 

using System;
using System.Net;
using System.Net.Sockets;

namespace ServerCore
{
    public abstract class Session
    {
        Socket _socket;
        SocketAsyncEventArgs recvArgs = new SocketAsyncEventArgs();
        SocketAsyncEventArgs sendArgs = new SocketAsyncEventArgs();

        Queue<ArraySegment<byte>> _sendQueue = new Queue<ArraySegment<byte>>();
        List<ArraySegment<byte>> pendingList = new List<ArraySegment<byte>>();

        object pendingLock = new object();
        int _disconnected = 0;

        ReceiveBuffer _recvBuffer = new ReceiveBuffer(short.MaxValue);

        #region Handler
        public abstract void OnConnected(EndPoint? point);
        public abstract void OnDisConnected(EndPoint? point);
        public abstract void OnRecvPacket(ArraySegment<byte> buffer);
        public abstract void OnSend(int numOfBytes);
        #endregion

        public void Start(Socket socket)
        {
            _socket = socket;
            recvArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnRecvCompleted);
            sendArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendCompleted);

            RegisterRecv();

        }

        #region Send

        public void Send(ArraySegment<byte> contents)
        {
            if (contents.Count == 0) return;

            lock (pendingLock)
            {
                _sendQueue.Enqueue(contents);
                if (pendingList.Count == 0) RegisterSend();
            }
        }

        public void Send(List<ArraySegment<byte>> contents)
        {
            if (contents == null || contents.Count == 0) return;
            lock (pendingLock)
            {
                foreach (var item in contents)
                {
                    _sendQueue.Enqueue(item);
                }
                if (pendingList.Count == 0) RegisterSend();
            }
        }

        void RegisterSend()
        {
            if (_disconnected == 1) return;
            //쌓여잇던 queue를 비우고 sendArgs에 넣어서 보내기.
            while (_sendQueue.Count > 0)
            {
                ArraySegment<byte> buff = _sendQueue.Dequeue();
                pendingList.Add(buff);
            }

            try
            {
                sendArgs.BufferList = pendingList;
                bool pending = _socket.SendAsync(sendArgs);
                if (!pending) OnSendCompleted(null, sendArgs);
            }
            catch (Exception e)
            {
                Console.WriteLine("Register send exceiption occurred.  " + e.Message);
            }

        }

        void OnSendCompleted(object? sender, SocketAsyncEventArgs args)
        {
            lock (pendingLock)
            {
                int sendBytes = args.BytesTransferred;
                if (sendBytes > 0 && args.SocketError == SocketError.Success)
                {
                    sendArgs.BufferList = null;
                    pendingList.Clear();

                    OnSend(sendBytes);
                    if (_sendQueue.Count > 0) RegisterSend();

                }
            }
        }
        #endregion

        #region Recv
        void RegisterRecv()
        {
            if (_disconnected == 1) return;

            _recvBuffer.Clean();
            ArraySegment<byte> writeSegment = _recvBuffer.WriteSegment;
            recvArgs.SetBuffer(writeSegment.Array, writeSegment.Offset, writeSegment.Count);

            try
            {
                bool pending = _socket.ReceiveAsync(recvArgs);
                if (!pending) OnRecvCompleted(null, recvArgs);
            }
            catch (Exception e)
            {
                Console.WriteLine("Register Recv exception occurred." + e.Message);
            }
        }

        void OnRecvCompleted(object? sender, SocketAsyncEventArgs args)
        {
            int recvBytes = args.BytesTransferred;
            if (recvBytes <= 0 || args.SocketError != SocketError.Success) return;

            try
            {
                bool writeSuccess = _recvBuffer.OnWrite(recvBytes);
                if (!writeSuccess)
                {
                    //무언가 문제가 발생한것임 
                    Disconnected();
                    return;
                }

                while (true)
                {
                    var packet = _recvBuffer.GetPacketData();
                    if (packet.Count == 0) break;

                    OnRecvPacket(packet);
                }

                RegisterRecv();
            }
            catch (Exception e)
            {
                Console.WriteLine("on receive fail. " + e.Message);
                Disconnected();
            }
        }


        int OnRecv(ArraySegment<byte> buffer)
        {
            int HeaderSize = 2;
            int processLen = 0;
            while (true)
            {
                if (buffer.Count < HeaderSize) break;

                int dataSize = BitConverter.ToInt16(buffer.Array, buffer.Offset);
                if (buffer.Count < dataSize) break; //not recved all.

                var validData = new ArraySegment<byte>(buffer.Array, buffer.Offset, dataSize);
                processLen += dataSize;

                OnRecvPacket(validData);
                buffer = new ArraySegment<byte>(buffer.Array, buffer.Offset + dataSize, buffer.Count - dataSize);
            }

            return processLen;
        }
        #endregion

        #region Connection
        public void Disconnected()
        {
            //중복호출 방지 
            if (Interlocked.Exchange(ref _disconnected, 1) == 1) return;

            OnDisConnected(_socket.RemoteEndPoint);
            _socket.Shutdown(SocketShutdown.Both);
            _socket.Close();
            Clear();

        }
        void Clear()
        {
            lock (pendingList)
            {
                _sendQueue.Clear();
                pendingList.Clear();
            }
        }
        #endregion
    }
}

 

 

Session.cs 에서 핵심 사항은 다음과 같다.

1. 초기화

세션을 시작할 때 RecvBuffer 와 SendBuffer 를 초기화하고, 수신을 받을 수 있도록 ReigsterRecv() 함수를 호출한다.

2.Receive

패킷 통신 특성상 , 데이터를 수신할 때 한번에 모든 데이터가 들어오지 않을 수 있다. 맨 앞 2byte 는 지금 보내고 있는 데이터의 총 크기를 보낸다 예를들어 [0,5,a,b,c] 라고 왔을 경우 , 발송된 패킷의 총 크기는 5byte이고(헤더까지 포함) 데이터는 abc 이다. 이를 보낼때 꼭 [0,5,a,b,c]가 한덩이로 올거라는 보장이 없다 [0,5] 와 [a,b] 와 [c] 3덩어리로 쪼개져서 각각 보내질수도 있는것이다. 이를 위해서 OnRecvCompleted() 함수에서 RecvBuffer에게 현재 처리가능한 완전한 packet의 모음을 요청한다. 모든 조각이 모였을 경우에만 recvBuffer에서 꺼내서 자식클래스에서 처리해준다.

3.Send

전송요청이 들어올 경우 바로 전송하는게 아니라 , SendBuffer에 담아놓는다. 한 번에 처리되는 전송프로세스는 하나이다. 만약 전송요청이 들어왔는데 현재 진행중인 프로세스가 없다면 바로 전송한다. 하지만 전송요청이 들어왔을때 이미 진행중인 프로세스가 있다면 (pendingList가 비워져있지 않다면) queue 에 잠시 적재해놓고 끝낸다. 전송 프로세스가 종료되었을때 , 만약 sendQueue 에 데이터가 있다면 전송프로세스 도중 전송요청이 들어왔다는 의미이다. 이럴 경우 다시 sendQueue에 있는 모든 데이터를 한번에 보내는 전송 프로세스를 다시 시작한다.

 

ReceiveBuffer

더보기
using System;
namespace ServerCore
{
    public class ReceiveBuffer
    {
        ArraySegment<byte> _buffer;

        int _readIndex;
        int _writeIndex;

        public ReceiveBuffer(int bufferSize)
        {
            _buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
        }

        public int DataSize { get { return _writeIndex - _readIndex; } }
        public int FreeSize { get { return _buffer.Count - _writeIndex; } }

        public ArraySegment<byte> ReadSegment
        {
            get { return new ArraySegment<byte>(_buffer.Array, _buffer.Offset + _readIndex, DataSize); }
        }
        public ArraySegment<byte> WriteSegment
        {

            get { return new ArraySegment<byte>(_buffer.Array, _buffer.Offset + _writeIndex, FreeSize); }
        }

        public void Clean()
        {
            int dataSize = DataSize;
            if (_buffer.Array == null) return;

            if (DataSize == 0)
            {
                _readIndex = 0;
            }
            else
            {
                Array.Copy(_buffer.Array, _buffer.Offset + _readIndex, _buffer.Array, _buffer.Offset, DataSize);
            }
        }

        //데이터 조립이 완료되어서 읽기처리를 끝냈을때 호출
        public bool OnRead(int numOfBytes)
        {
            if (numOfBytes > DataSize) return false;
            _readIndex += numOfBytes;
            return true;
        }

        //통신이 와서 버퍼에 write처리를 완료하였을때 호출
        public bool OnWrite(int numOfBytes)
        {
            if (numOfBytes > FreeSize) return false;
            _writeIndex += numOfBytes;
            return true;
        }



        public ArraySegment<byte> GetPacketData()
        {
            int HeaderSize = 2;

            if (_buffer.Array == null || DataSize < HeaderSize) return new ArraySegment<byte>();
            if (DataSize < HeaderSize) return new ArraySegment<byte>();

            int packetSize = BitConverter.ToInt16(_buffer.Array, _buffer.Offset + _readIndex);
            if (DataSize < packetSize) return new ArraySegment<byte>(); //not received yet

            var validData = new ArraySegment<byte>(_buffer.Array, _buffer.Offset + _readIndex, packetSize);

            OnRead(packetSize);
            return validData;
        }
    }
}

 

Session에서 사용하는 ReceiveBuffer 의 역할은 다음과 같다.

1. Buffer와 ReadIndex, WriteIndex

데이터를 수신하면 우선 buffer에 차례대로 넣는다. 효율성을 위해 ReadIndex 와 writeIndex를 놓는다. 데이터를 읽을 경우 writeIndex를 증가시킨다. 패킷의 조각모음이 끝나서 세션에서 읽기처리가 끝날경우는 readIndex를 추가한다.

[x,x,0,5,a,b,-,-,-,-,-,-,-] 현재 buffer가 이렇게 들어와있을 경우 , readIndex = 2이고 writeindex = 6이다. x,x 는 이전 패킷이 처리된것이고, 5byte값이 전부 들어오지 않았기때문에 패킷처리를 할 수 없는 상태이다. 마지막으로 패킷처리한 index가 x,x 이므로, index 2 에서 대기한다. writeIndex는 마지막으로 데이터가 들어온 버퍼위치를 저장한다. 여기서 나머지데이터 (c) 가 들어와서 [x,x,0,5,a,b,c,-,-,-,-] 가 될 경우, 패킷을 처리하고 readIndex = 7, writeIndex = 7 이 된다.

 

2.GetPacketData() 함수

현재 버퍼의 readIndex와 write인덱스를 참고하여 온전한 패킷 덩어리가 있을경우 패킷을 반환하고 readIndex를 패킷사이즈만큼 증가시키는 함수이다.