using System; using System.IO; using System.Net.Sockets; namespace BF { internal partial class TCPChannel { private uint sendSeq = 0; private uint sendSeqTmp; private MemoryStream sendStream; private NetSafeQueue sendMessageQueue; private void InitializeSend() { // sendSeq = 0; sendStream = new MemoryStream(ownerConnection.configuration.SendBufferCapacity); sendMessageQueue = new NetSafeQueue(10); } private void ResetSendStream() { sendStream.Position = 0L; sendStream.SetLength(0L); } private bool IsCanSendData() { return actualStatus == NetConnectStatus.VerifyConnecting || actualStatus == NetConnectStatus.Connected || actualStatus == NetConnectStatus.Reconnecting || actualStatus == NetConnectStatus.Authing; } /// /// 只用于外部send,外部准备发送消息; /// group=0为ugate预留通信消息,不需要seq; /// /// /// private NetSendResult SendInternal(NetOutgoingMessage message) { //group == 0 不需要seq if (message.Group != 0) { sendSeq++; message.Seq = sendSeq; } else { message.Seq = 0; } #if BF_DEBUG NetStatistics.SetSendDataSeq(ownerConnection.configuration.UniqueIdentifier, sendSeq); #endif if (actualStatus != NetConnectStatus.Connected) { //是否应该缓存消息,等待连接成功后发送??? CacheWaitingSendMessages(message); return NetSendResult.Queued; } #if BF_DEBUG string connectId = ownerConnection.configuration.UniqueIdentifier; var result = NetStatistics.CheckEnableSendData(connectId); if (result) { sendMessageQueue.Enqueue(message); } else { NetStatistics.EnqueueCacheSendMessage(connectId, message); } #else //queue message, waiting send sendMessageQueue.Enqueue(message); #endif return NetSendResult.Queued; } /// /// 内部直接发送消息消息发送 /// /// private void EnqueueRawSendMessage(NetOutgoingMessage message) { #if BF_DEBUG string connectId = ownerConnection.configuration.UniqueIdentifier; if (NetStatistics.CheckEnableSendData(connectId)) { sendMessageQueue.Enqueue(message); } else { NetStatistics.EnqueueCacheSendMessage(connectId, message); } #else sendMessageQueue.Enqueue(message); #endif } private void SendUpdate() { if (sendStream.Length > 0) { //sending, return return; } if (sendMessageQueue.Count <= 0) { return; } while (sendMessageQueue.Count > 0) { bool result = sendMessageQueue.TryDequeue(out NetOutgoingMessage outgoingMessage); if (!result) { NetException.Assert(false, "SafeQueue dequeue error for sendMessageQueue."); break; } // outgoingMessage.Seq = sendSeq; // BFLog.Log($"SendUpdate : {outgoingMessage.Group}, {outgoingMessage.Cmd}, {outgoingMessage.Seq}"); result = messageService.Serialize(outgoingMessage, sendStream); if (!result) { //serialize failed NetException.Assert(true, "Serialized failure"); LogError(NetErrorCode.DataParseError, "Serialized failure."); break; } sendSeqTmp = outgoingMessage.Seq; // BFLog.Log("sendSeqTmp = " + sendSeqTmp.ToString()); // BFLog.Log("outgoingMessage.Data.Length = " + outgoingMessage.Data.Length.ToString()); CacheAlreadySendMessage(outgoingMessage); } sendStream.Position = 0L; StartSendAsync(); } private void StartSendAsync() { if (!IsCanSendData()) { return; } try { outArgs.SocketFlags = SocketFlags.None; byte[] buffer = sendStream.GetBuffer(); // BFLog.Log("StartSendAsync Length = " + sendStream.Length.ToString()); outArgs.SetBuffer(buffer, (int) sendStream.Position, (int) (sendStream.Length - sendStream.Position)); } catch (Exception exception) { throw new NetException(exception.ToString()); } try { if (socket.SendAsync(outArgs)) { return; } OnSendComplete(outArgs); } catch (SocketException exception) { //An error occurred when attempting to access the socket. See remarks section below. HandleSocketError(exception.SocketErrorCode, $"Begin send data socket error. {exception}"); } catch (Exception exception) { //The Socket has been closed. LogError(NetErrorCode.BeginSendError ,$"Begin send data failed. {exception}"); } } private void OnSendComplete(SocketAsyncEventArgs args) { if (!IsCanSendData()) { return; } if (socket == null) { return; } if (args.SocketError != SocketError.Success) { //LogError((int)args.SocketError , "Socket send complete, Error !"); HandleSocketError(args.SocketError, "Socket send complete, Error !"); return; } if (args.BytesTransferred <= 0) { CreateServerDisconnectMessage(); return; } sendStream.Position += args.BytesTransferred; if (sendStream.Position < sendStream.Length) { StartSendAsync(); return; } ResetSendStream(); // BFLog.Log("Send Update Round success!"); } /// /// 缓存内部重连时,外部发送的消息;等待重连成功后,继续发送;只缓存游戏消息; /// group=0为ugate预留通信消息,不缓存 /// /// private void CacheWaitingSendMessages(NetOutgoingMessage message) { if (message.Group == 0) { return; } waitingSendMessagesQueue.Enqueue(message); } /// /// 缓存已经写入到sendStream的消息; /// group=0为ugate预留通信消息,不缓存 /// /// already send message private void CacheAlreadySendMessage(NetOutgoingMessage message) { if (message.Group == 0 || message.Group == HeartBeat_Req_Group || message.Group == Reconnect_Req_Group || message.Group == Login_Req_Group) { ownerConnection.Recycle(message); return; } while(alreadySendMessagesQueue.Count >= ownerConnection.configuration.AlreadySendMessageCacheCount) { alreadySendMessagesQueue.TryDequeue(out var tempMsg); ownerConnection.Recycle(tempMsg); } alreadySendMessagesQueue.Enqueue(message); } private void CacheAlreadyInSendMessageQueueMessage(NetSafeQueue messageQueue) { if (messageQueue == null) { return; } while (messageQueue.Count > 0) { messageQueue.TryDequeue(out var message); CacheAlreadySendMessage(message); } } } }