diff --git a/src/game/WorldSocket.cpp b/src/game/WorldSocket.cpp index e407a04ea..5bc447d25 100644 --- a/src/game/WorldSocket.cpp +++ b/src/game/WorldSocket.cpp @@ -111,6 +111,9 @@ m_OverSpeedPings (0), m_LastPingTime (ACE_Time_Value::zero) { reference_counting_policy ().value (ACE_Event_Handler::Reference_Counting_Policy::ENABLED); + + msg_queue()->high_water_mark(8*1024*1024); + msg_queue()->low_water_mark(8*1024*1024); } WorldSocket::~WorldSocket (void) @@ -124,10 +127,6 @@ WorldSocket::~WorldSocket (void) closing_ = true; peer ().close (); - - WorldPacket* pct; - while (m_PacketQueue.dequeue_head (pct) == 0) - delete pct; } bool WorldSocket::IsClosed (void) const @@ -187,18 +186,35 @@ int WorldSocket::SendPacket (const WorldPacket& pct) sWorldLog.Log ("\n\n"); } - if (iSendPacket (pct) == -1) + ServerPktHeader header(pct.size()+2, pct.GetOpcode()); + m_Crypt.EncryptSend ( header.header, header.getHeaderLength()); + + if (m_OutBuffer->space () >= pct.size () + header.getHeaderLength() && msg_queue()->is_empty()) { - WorldPacket* npct; + // Put the packet on the buffer. + if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1) + ACE_ASSERT (false); - ACE_NEW_RETURN (npct, WorldPacket (pct), -1); + if (!pct.empty ()) + if (m_OutBuffer->copy ((char*) pct.contents (), pct.size ()) == -1) + ACE_ASSERT (false); + } + else + { + // Enqueue the packet. + ACE_Message_Block* mb; - // NOTE maybe check of the size of the queue can be good ? - // to make it bounded instead of unbounded - if (m_PacketQueue.enqueue_tail (npct) == -1) + ACE_NEW_RETURN(mb, ACE_Message_Block(pct.size () + header.getHeaderLength()), -1); + + mb->copy((char*) header.header, header.getHeaderLength()); + + if (!pct.empty ()) + mb->copy((const char*)pct.contents(), pct.size ()); + + if(msg_queue()->enqueue_tail(mb,(ACE_Time_Value*)&ACE_Time_Value::zero) == -1) { - delete npct; - sLog.outError ("WorldSocket::SendPacket: m_PacketQueue.enqueue_tail failed"); + sLog.outError("WorldSocket::SendPacket enqueue_tail"); + mb->release(); return -1; } } @@ -323,7 +339,7 @@ int WorldSocket::handle_output (ACE_HANDLE) const size_t send_len = m_OutBuffer->length (); if (send_len == 0) - return cancel_wakeup_output (Guard); + return handle_output_queue (Guard); #ifdef MSG_NOSIGNAL ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL); @@ -353,15 +369,73 @@ int WorldSocket::handle_output (ACE_HANDLE) { m_OutBuffer->reset (); - if (!iFlushPacketQueue ()) - return cancel_wakeup_output (Guard); - else - return schedule_wakeup_output (Guard); + return handle_output_queue (Guard); } ACE_NOTREACHED (return 0); } +int WorldSocket::handle_output_queue (GuardType& g) +{ + if(msg_queue()->is_empty()) + return cancel_wakeup_output(g); + + ACE_Message_Block *mblk; + + if(msg_queue()->dequeue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1) + { + sLog.outError("WorldSocket::handle_output_queue dequeue_head"); + return -1; + } + + const size_t send_len = mblk->length (); + +#ifdef MSG_NOSIGNAL + ssize_t n = peer ().send (mblk->rd_ptr (), send_len, MSG_NOSIGNAL); +#else + ssize_t n = peer ().send (mblk->rd_ptr (), send_len); +#endif // MSG_NOSIGNAL + + if (n == 0) + { + mblk->release(); + + return -1; + } + else if (n == -1) + { + if (errno == EWOULDBLOCK || errno == EAGAIN) + { + msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero); + return schedule_wakeup_output (g); + } + + mblk->release(); + return -1; + } + else if (n < send_len) //now n > 0 + { + mblk->rd_ptr (static_cast (n)); + + if (msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero) == -1) + { + sLog.outError("WorldSocket::handle_output_queue enqueue_head"); + mblk->release(); + return -1; + } + + return schedule_wakeup_output (g); + } + else //now n == send_len + { + mblk->release(); + + return msg_queue()->is_empty() ? cancel_wakeup_output(g) : ACE_Event_Handler::WRITE_MASK; + } + + ACE_NOTREACHED(return -1); +} + int WorldSocket::handle_close (ACE_HANDLE h, ACE_Reactor_Mask) { // Critical section @@ -389,10 +463,15 @@ int WorldSocket::Update (void) if (closing_) return -1; - if (m_OutActive || m_OutBuffer->length () == 0) + if (m_OutActive || (m_OutBuffer->length () == 0 && msg_queue()->is_empty())) return 0; - return handle_output (get_handle ()); + int ret; + do + ret = handle_output (get_handle ()); + while( ret > 0 ); + + return ret; } int WorldSocket::handle_input_header (void) @@ -986,53 +1065,3 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket) packet << ping; return SendPacket (packet); } - -int WorldSocket::iSendPacket (const WorldPacket& pct) -{ - ServerPktHeader header(pct.size()+2, pct.GetOpcode()); - if (m_OutBuffer->space () < pct.size () + header.getHeaderLength()) - { - errno = ENOBUFS; - return -1; - } - - - m_Crypt.EncryptSend ( header.header, header.getHeaderLength()); - - if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1) - ACE_ASSERT (false); - - if (!pct.empty ()) - if (m_OutBuffer->copy ((char*) pct.contents (), pct.size ()) == -1) - ACE_ASSERT (false); - - return 0; -} - -bool WorldSocket::iFlushPacketQueue () -{ - WorldPacket *pct; - bool haveone = false; - - while (m_PacketQueue.dequeue_head (pct) == 0) - { - if (iSendPacket (*pct) == -1) - { - if (m_PacketQueue.enqueue_head (pct) == -1) - { - delete pct; - sLog.outError ("WorldSocket::iFlushPacketQueue m_PacketQueue->enqueue_head"); - return false; - } - - break; - } - else - { - haveone = true; - delete pct; - } - } - - return haveone; -} diff --git a/src/game/WorldSocket.h b/src/game/WorldSocket.h index 8fd4229e5..ee0a3c2b0 100644 --- a/src/game/WorldSocket.h +++ b/src/game/WorldSocket.h @@ -101,9 +101,6 @@ class WorldSocket : protected WorldHandler typedef ACE_Thread_Mutex LockType; typedef ACE_Guard GuardType; - /// Queue for storing packets for which there is no space. - typedef ACE_Unbounded_Queue< WorldPacket* > PacketQueueT; - /// Check if socket is closed. bool IsClosed (void) const; @@ -159,6 +156,9 @@ class WorldSocket : protected WorldHandler int cancel_wakeup_output (GuardType& g); int schedule_wakeup_output (GuardType& g); + /// Drain the queue if its not empty. + int handle_output_queue (GuardType& g); + /// process one incoming packet. /// @param new_pct received packet ,note that you need to delete it. int ProcessIncoming (WorldPacket* new_pct); @@ -169,16 +169,6 @@ class WorldSocket : protected WorldHandler /// Called by ProcessIncoming() on CMSG_PING. int HandlePing (WorldPacket& recvPacket); - /// Try to write WorldPacket to m_OutBuffer ,return -1 if no space - /// Need to be called with m_OutBufferLock lock held - int iSendPacket (const WorldPacket& pct); - - /// Flush m_PacketQueue if there are packets in it - /// Need to be called with m_OutBufferLock lock held - /// @return true if it wrote to the buffer ( AKA you need - /// to mark the socket for output ). - bool iFlushPacketQueue (); - private: /// Time in which the last ping was received ACE_Time_Value m_LastPingTime; @@ -218,10 +208,6 @@ class WorldSocket : protected WorldHandler /// Size of the m_OutBuffer. size_t m_OutBufferSize; - /// Here are stored packets for which there was no space on m_OutBuffer, - /// this allows not-to kick player if its buffer is overflowed. - PacketQueueT m_PacketQueue; - /// True if the socket is registered with the reactor for output bool m_OutActive; diff --git a/src/shared/revision_nr.h b/src/shared/revision_nr.h index 7a3a0deb0..1b0b36a74 100644 --- a/src/shared/revision_nr.h +++ b/src/shared/revision_nr.h @@ -1,4 +1,4 @@ #ifndef __REVISION_NR_H__ #define __REVISION_NR_H__ - #define REVISION_NR "7021" + #define REVISION_NR "7022" #endif // __REVISION_NR_H__