2023-04-03 11:04:31 +08:00

277 lines
8.5 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.IO;
using System.Net.Sockets;
namespace BF
{
internal partial class TCPChannel
{
private uint sendSeq = 0;
private uint sendSeqTmp;
private MemoryStream sendStream;
private NetSafeQueue<NetOutgoingMessage> sendMessageQueue;
private void InitializeSend()
{
// sendSeq = 0;
sendStream = new MemoryStream(ownerConnection.configuration.SendBufferCapacity);
sendMessageQueue = new NetSafeQueue<NetOutgoingMessage>(10);
}
private void ResetSendStream()
{
sendStream.Position = 0L;
sendStream.SetLength(0L);
}
private bool IsCanSendData()
{
return actualStatus == NetConnectStatus.VerifyConnecting || actualStatus == NetConnectStatus.Connected || actualStatus == NetConnectStatus.Reconnecting || actualStatus == NetConnectStatus.Authing;
}
/// <summary>
/// 只用于外部send外部准备发送消息
/// group=0为ugate预留通信消息不需要seq
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
private NetSendResult SendInternal(NetOutgoingMessage message)
{
//group == 0 不需要seq
if (message.Group != 0)
{
sendSeq++;
message.Seq = sendSeq;
}
else
{
message.Seq = 0;
}
#if BF_DEBUG
NetStatistics.SetSendDataSeq(ownerConnection.configuration.UniqueIdentifier, sendSeq);
#endif
if (actualStatus != NetConnectStatus.Connected)
{
//是否应该缓存消息,等待连接成功后发送???
CacheWaitingSendMessages(message);
return NetSendResult.Queued;
}
#if BF_DEBUG
string connectId = ownerConnection.configuration.UniqueIdentifier;
var result = NetStatistics.CheckEnableSendData(connectId);
if (result)
{
sendMessageQueue.Enqueue(message);
}
else
{
NetStatistics.EnqueueCacheSendMessage(connectId, message);
}
#else
//queue message, waiting send
sendMessageQueue.Enqueue(message);
#endif
return NetSendResult.Queued;
}
/// <summary>
/// 内部直接发送消息消息发送
/// </summary>
/// <param name="message"></param>
private void EnqueueRawSendMessage(NetOutgoingMessage message)
{
#if BF_DEBUG
string connectId = ownerConnection.configuration.UniqueIdentifier;
if (NetStatistics.CheckEnableSendData(connectId))
{
sendMessageQueue.Enqueue(message);
}
else
{
NetStatistics.EnqueueCacheSendMessage(connectId, message);
}
#else
sendMessageQueue.Enqueue(message);
#endif
}
private void SendUpdate()
{
if (sendStream.Length > 0)
{
//sending, return
return;
}
if (sendMessageQueue.Count <= 0)
{
return;
}
while (sendMessageQueue.Count > 0)
{
bool result = sendMessageQueue.TryDequeue(out NetOutgoingMessage outgoingMessage);
if (!result)
{
NetException.Assert(false, "SafeQueue dequeue error for sendMessageQueue.");
break;
}
// outgoingMessage.Seq = sendSeq;
// BFLog.Log($"SendUpdate : {outgoingMessage.Group}, {outgoingMessage.Cmd}, {outgoingMessage.Seq}");
result = messageService.Serialize(outgoingMessage, sendStream);
if (!result)
{
//serialize failed
NetException.Assert(true, "Serialized failure");
LogError(NetErrorCode.DataParseError, "Serialized failure.");
break;
}
sendSeqTmp = outgoingMessage.Seq;
// BFLog.Log("sendSeqTmp = " + sendSeqTmp.ToString());
// BFLog.Log("outgoingMessage.Data.Length = " + outgoingMessage.Data.Length.ToString());
CacheAlreadySendMessage(outgoingMessage);
}
sendStream.Position = 0L;
StartSendAsync();
}
private void StartSendAsync()
{
if (!IsCanSendData())
{
return;
}
try
{
outArgs.SocketFlags = SocketFlags.None;
byte[] buffer = sendStream.GetBuffer();
// BFLog.Log("StartSendAsync Length = " + sendStream.Length.ToString());
outArgs.SetBuffer(buffer, (int) sendStream.Position,
(int) (sendStream.Length - sendStream.Position));
}
catch (Exception exception)
{
throw new NetException(exception.ToString());
}
try
{
if (socket.SendAsync(outArgs))
{
return;
}
OnSendComplete(outArgs);
}
catch (SocketException exception)
{
//An error occurred when attempting to access the socket. See remarks section below.
HandleSocketError(exception.SocketErrorCode, $"Begin send data socket error. {exception}");
}
catch (Exception exception)
{
//The Socket has been closed.
LogError(NetErrorCode.BeginSendError ,$"Begin send data failed. {exception}");
}
}
private void OnSendComplete(SocketAsyncEventArgs args)
{
if (!IsCanSendData())
{
return;
}
if (socket == null)
{
return;
}
if (args.SocketError != SocketError.Success)
{
//LogError((int)args.SocketError , "Socket send complete, Error !");
HandleSocketError(args.SocketError, "Socket send complete, Error !");
return;
}
if (args.BytesTransferred <= 0)
{
CreateServerDisconnectMessage();
return;
}
sendStream.Position += args.BytesTransferred;
if (sendStream.Position < sendStream.Length)
{
StartSendAsync();
return;
}
ResetSendStream();
// BFLog.Log("Send Update Round success!");
}
/// <summary>
/// 缓存内部重连时,外部发送的消息;等待重连成功后,继续发送;只缓存游戏消息;
/// group=0为ugate预留通信消息不缓存
/// </summary>
/// <param name="message"></param>
private void CacheWaitingSendMessages(NetOutgoingMessage message)
{
if (message.Group == 0)
{
return;
}
waitingSendMessagesQueue.Enqueue(message);
}
/// <summary>
/// 缓存已经写入到sendStream的消息
/// group=0为ugate预留通信消息不缓存
/// </summary>
/// <param name="message">already send message</param>
private void CacheAlreadySendMessage(NetOutgoingMessage message)
{
if (message.Group == 0 || message.Group == HeartBeat_Req_Group || message.Group == Reconnect_Req_Group || message.Group == Login_Req_Group)
{
ownerConnection.Recycle(message);
return;
}
while(alreadySendMessagesQueue.Count >= ownerConnection.configuration.AlreadySendMessageCacheCount)
{
alreadySendMessagesQueue.TryDequeue(out var tempMsg);
ownerConnection.Recycle(tempMsg);
}
alreadySendMessagesQueue.Enqueue(message);
}
private void CacheAlreadyInSendMessageQueueMessage(NetSafeQueue<NetOutgoingMessage> messageQueue)
{
if (messageQueue == null)
{
return;
}
while (messageQueue.Count > 0)
{
messageQueue.TryDequeue(out var message);
CacheAlreadySendMessage(message);
}
}
}
}