277 lines
8.5 KiB
C#
277 lines
8.5 KiB
C#
using System;
|
||
using System.IO;
|
||
using System.Net.Sockets;
|
||
|
||
namespace BF
|
||
{
|
||
internal partial class TCPChannel
|
||
{
|
||
private uint sendSeq = 0;
|
||
private uint sendSeqTmp;
|
||
private MemoryStream sendStream;
|
||
private NetSafeQueue<NetOutgoingMessage> sendMessageQueue;
|
||
|
||
private void InitializeSend()
|
||
{
|
||
// sendSeq = 0;
|
||
sendStream = new MemoryStream(ownerConnection.configuration.SendBufferCapacity);
|
||
sendMessageQueue = new NetSafeQueue<NetOutgoingMessage>(10);
|
||
}
|
||
|
||
private void ResetSendStream()
|
||
{
|
||
sendStream.Position = 0L;
|
||
sendStream.SetLength(0L);
|
||
}
|
||
|
||
private bool IsCanSendData()
|
||
{
|
||
return actualStatus == NetConnectStatus.VerifyConnecting || actualStatus == NetConnectStatus.Connected || actualStatus == NetConnectStatus.Reconnecting || actualStatus == NetConnectStatus.Authing;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 只用于外部send,外部准备发送消息;
|
||
/// group=0为ugate预留通信消息,不需要seq;
|
||
/// </summary>
|
||
/// <param name="message"></param>
|
||
/// <returns></returns>
|
||
private NetSendResult SendInternal(NetOutgoingMessage message)
|
||
{
|
||
//group == 0 不需要seq
|
||
if (message.Group != 0)
|
||
{
|
||
sendSeq++;
|
||
message.Seq = sendSeq;
|
||
}
|
||
else
|
||
{
|
||
message.Seq = 0;
|
||
}
|
||
|
||
#if BF_DEBUG
|
||
NetStatistics.SetSendDataSeq(ownerConnection.configuration.UniqueIdentifier, sendSeq);
|
||
#endif
|
||
|
||
if (actualStatus != NetConnectStatus.Connected)
|
||
{
|
||
//是否应该缓存消息,等待连接成功后发送???
|
||
CacheWaitingSendMessages(message);
|
||
return NetSendResult.Queued;
|
||
}
|
||
|
||
#if BF_DEBUG
|
||
string connectId = ownerConnection.configuration.UniqueIdentifier;
|
||
var result = NetStatistics.CheckEnableSendData(connectId);
|
||
if (result)
|
||
{
|
||
sendMessageQueue.Enqueue(message);
|
||
}
|
||
else
|
||
{
|
||
NetStatistics.EnqueueCacheSendMessage(connectId, message);
|
||
}
|
||
#else
|
||
//queue message, waiting send
|
||
sendMessageQueue.Enqueue(message);
|
||
#endif
|
||
return NetSendResult.Queued;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 内部直接发送消息消息发送
|
||
/// </summary>
|
||
/// <param name="message"></param>
|
||
private void EnqueueRawSendMessage(NetOutgoingMessage message)
|
||
{
|
||
#if BF_DEBUG
|
||
string connectId = ownerConnection.configuration.UniqueIdentifier;
|
||
if (NetStatistics.CheckEnableSendData(connectId))
|
||
{
|
||
sendMessageQueue.Enqueue(message);
|
||
}
|
||
else
|
||
{
|
||
NetStatistics.EnqueueCacheSendMessage(connectId, message);
|
||
}
|
||
#else
|
||
sendMessageQueue.Enqueue(message);
|
||
#endif
|
||
}
|
||
|
||
private void SendUpdate()
|
||
{
|
||
if (sendStream.Length > 0)
|
||
{
|
||
//sending, return
|
||
return;
|
||
}
|
||
|
||
if (sendMessageQueue.Count <= 0)
|
||
{
|
||
return;
|
||
}
|
||
|
||
while (sendMessageQueue.Count > 0)
|
||
{
|
||
bool result = sendMessageQueue.TryDequeue(out NetOutgoingMessage outgoingMessage);
|
||
if (!result)
|
||
{
|
||
NetException.Assert(false, "SafeQueue dequeue error for sendMessageQueue.");
|
||
break;
|
||
}
|
||
|
||
// outgoingMessage.Seq = sendSeq;
|
||
// BFLog.Log($"SendUpdate : {outgoingMessage.Group}, {outgoingMessage.Cmd}, {outgoingMessage.Seq}");
|
||
|
||
result = messageService.Serialize(outgoingMessage, sendStream);
|
||
if (!result)
|
||
{
|
||
//serialize failed
|
||
NetException.Assert(true, "Serialized failure");
|
||
LogError(NetErrorCode.DataParseError, "Serialized failure.");
|
||
break;
|
||
}
|
||
sendSeqTmp = outgoingMessage.Seq;
|
||
// BFLog.Log("sendSeqTmp = " + sendSeqTmp.ToString());
|
||
// BFLog.Log("outgoingMessage.Data.Length = " + outgoingMessage.Data.Length.ToString());
|
||
|
||
CacheAlreadySendMessage(outgoingMessage);
|
||
}
|
||
|
||
sendStream.Position = 0L;
|
||
|
||
StartSendAsync();
|
||
}
|
||
|
||
private void StartSendAsync()
|
||
{
|
||
if (!IsCanSendData())
|
||
{
|
||
return;
|
||
}
|
||
|
||
try
|
||
{
|
||
outArgs.SocketFlags = SocketFlags.None;
|
||
byte[] buffer = sendStream.GetBuffer();
|
||
// BFLog.Log("StartSendAsync Length = " + sendStream.Length.ToString());
|
||
outArgs.SetBuffer(buffer, (int) sendStream.Position,
|
||
(int) (sendStream.Length - sendStream.Position));
|
||
}
|
||
catch (Exception exception)
|
||
{
|
||
throw new NetException(exception.ToString());
|
||
}
|
||
|
||
try
|
||
{
|
||
if (socket.SendAsync(outArgs))
|
||
{
|
||
return;
|
||
}
|
||
|
||
OnSendComplete(outArgs);
|
||
}
|
||
catch (SocketException exception)
|
||
{
|
||
//An error occurred when attempting to access the socket. See remarks section below.
|
||
HandleSocketError(exception.SocketErrorCode, $"Begin send data socket error. {exception}");
|
||
}
|
||
catch (Exception exception)
|
||
{
|
||
//The Socket has been closed.
|
||
LogError(NetErrorCode.BeginSendError ,$"Begin send data failed. {exception}");
|
||
}
|
||
}
|
||
|
||
private void OnSendComplete(SocketAsyncEventArgs args)
|
||
{
|
||
if (!IsCanSendData())
|
||
{
|
||
return;
|
||
}
|
||
|
||
if (socket == null)
|
||
{
|
||
return;
|
||
}
|
||
|
||
if (args.SocketError != SocketError.Success)
|
||
{
|
||
//LogError((int)args.SocketError , "Socket send complete, Error !");
|
||
HandleSocketError(args.SocketError, "Socket send complete, Error !");
|
||
return;
|
||
}
|
||
|
||
if (args.BytesTransferred <= 0)
|
||
{
|
||
CreateServerDisconnectMessage();
|
||
return;
|
||
}
|
||
|
||
sendStream.Position += args.BytesTransferred;
|
||
if (sendStream.Position < sendStream.Length)
|
||
{
|
||
StartSendAsync();
|
||
return;
|
||
}
|
||
|
||
ResetSendStream();
|
||
|
||
// BFLog.Log("Send Update Round success!");
|
||
}
|
||
|
||
/// <summary>
|
||
/// 缓存内部重连时,外部发送的消息;等待重连成功后,继续发送;只缓存游戏消息;
|
||
/// group=0为ugate预留通信消息,不缓存
|
||
/// </summary>
|
||
/// <param name="message"></param>
|
||
private void CacheWaitingSendMessages(NetOutgoingMessage message)
|
||
{
|
||
if (message.Group == 0)
|
||
{
|
||
return;
|
||
}
|
||
|
||
waitingSendMessagesQueue.Enqueue(message);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 缓存已经写入到sendStream的消息;
|
||
/// group=0为ugate预留通信消息,不缓存
|
||
/// </summary>
|
||
/// <param name="message">already send message</param>
|
||
private void CacheAlreadySendMessage(NetOutgoingMessage message)
|
||
{
|
||
if (message.Group == 0 || message.Group == HeartBeat_Req_Group || message.Group == Reconnect_Req_Group || message.Group == Login_Req_Group)
|
||
{
|
||
ownerConnection.Recycle(message);
|
||
return;
|
||
}
|
||
|
||
while(alreadySendMessagesQueue.Count >= ownerConnection.configuration.AlreadySendMessageCacheCount)
|
||
{
|
||
alreadySendMessagesQueue.TryDequeue(out var tempMsg);
|
||
|
||
ownerConnection.Recycle(tempMsg);
|
||
}
|
||
|
||
alreadySendMessagesQueue.Enqueue(message);
|
||
}
|
||
|
||
private void CacheAlreadyInSendMessageQueueMessage(NetSafeQueue<NetOutgoingMessage> messageQueue)
|
||
{
|
||
if (messageQueue == null)
|
||
{
|
||
return;
|
||
}
|
||
|
||
while (messageQueue.Count > 0)
|
||
{
|
||
messageQueue.TryDequeue(out var message);
|
||
CacheAlreadySendMessage(message);
|
||
}
|
||
}
|
||
|
||
}
|
||
} |