mirror of
https://github.com/mangosfour/server.git
synced 2025-12-15 01:37:00 +00:00
[7022] Added support for packets > 64 kb
This commit is contained in:
parent
dfa29a883f
commit
a865eb6010
3 changed files with 102 additions and 87 deletions
|
|
@ -111,6 +111,9 @@ m_OverSpeedPings (0),
|
||||||
m_LastPingTime (ACE_Time_Value::zero)
|
m_LastPingTime (ACE_Time_Value::zero)
|
||||||
{
|
{
|
||||||
reference_counting_policy ().value (ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
|
reference_counting_policy ().value (ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
|
||||||
|
|
||||||
|
msg_queue()->high_water_mark(8*1024*1024);
|
||||||
|
msg_queue()->low_water_mark(8*1024*1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
WorldSocket::~WorldSocket (void)
|
WorldSocket::~WorldSocket (void)
|
||||||
|
|
@ -124,10 +127,6 @@ WorldSocket::~WorldSocket (void)
|
||||||
closing_ = true;
|
closing_ = true;
|
||||||
|
|
||||||
peer ().close ();
|
peer ().close ();
|
||||||
|
|
||||||
WorldPacket* pct;
|
|
||||||
while (m_PacketQueue.dequeue_head (pct) == 0)
|
|
||||||
delete pct;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool WorldSocket::IsClosed (void) const
|
bool WorldSocket::IsClosed (void) const
|
||||||
|
|
@ -187,18 +186,35 @@ int WorldSocket::SendPacket (const WorldPacket& pct)
|
||||||
sWorldLog.Log ("\n\n");
|
sWorldLog.Log ("\n\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (iSendPacket (pct) == -1)
|
ServerPktHeader header(pct.size()+2, pct.GetOpcode());
|
||||||
|
m_Crypt.EncryptSend ( header.header, header.getHeaderLength());
|
||||||
|
|
||||||
|
if (m_OutBuffer->space () >= pct.size () + header.getHeaderLength() && msg_queue()->is_empty())
|
||||||
{
|
{
|
||||||
WorldPacket* npct;
|
// Put the packet on the buffer.
|
||||||
|
if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1)
|
||||||
|
ACE_ASSERT (false);
|
||||||
|
|
||||||
ACE_NEW_RETURN (npct, WorldPacket (pct), -1);
|
if (!pct.empty ())
|
||||||
|
if (m_OutBuffer->copy ((char*) pct.contents (), pct.size ()) == -1)
|
||||||
|
ACE_ASSERT (false);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Enqueue the packet.
|
||||||
|
ACE_Message_Block* mb;
|
||||||
|
|
||||||
// NOTE maybe check of the size of the queue can be good ?
|
ACE_NEW_RETURN(mb, ACE_Message_Block(pct.size () + header.getHeaderLength()), -1);
|
||||||
// to make it bounded instead of unbounded
|
|
||||||
if (m_PacketQueue.enqueue_tail (npct) == -1)
|
mb->copy((char*) header.header, header.getHeaderLength());
|
||||||
|
|
||||||
|
if (!pct.empty ())
|
||||||
|
mb->copy((const char*)pct.contents(), pct.size ());
|
||||||
|
|
||||||
|
if(msg_queue()->enqueue_tail(mb,(ACE_Time_Value*)&ACE_Time_Value::zero) == -1)
|
||||||
{
|
{
|
||||||
delete npct;
|
sLog.outError("WorldSocket::SendPacket enqueue_tail");
|
||||||
sLog.outError ("WorldSocket::SendPacket: m_PacketQueue.enqueue_tail failed");
|
mb->release();
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -323,7 +339,7 @@ int WorldSocket::handle_output (ACE_HANDLE)
|
||||||
const size_t send_len = m_OutBuffer->length ();
|
const size_t send_len = m_OutBuffer->length ();
|
||||||
|
|
||||||
if (send_len == 0)
|
if (send_len == 0)
|
||||||
return cancel_wakeup_output (Guard);
|
return handle_output_queue (Guard);
|
||||||
|
|
||||||
#ifdef MSG_NOSIGNAL
|
#ifdef MSG_NOSIGNAL
|
||||||
ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL);
|
ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL);
|
||||||
|
|
@ -353,15 +369,73 @@ int WorldSocket::handle_output (ACE_HANDLE)
|
||||||
{
|
{
|
||||||
m_OutBuffer->reset ();
|
m_OutBuffer->reset ();
|
||||||
|
|
||||||
if (!iFlushPacketQueue ())
|
return handle_output_queue (Guard);
|
||||||
return cancel_wakeup_output (Guard);
|
|
||||||
else
|
|
||||||
return schedule_wakeup_output (Guard);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ACE_NOTREACHED (return 0);
|
ACE_NOTREACHED (return 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int WorldSocket::handle_output_queue (GuardType& g)
|
||||||
|
{
|
||||||
|
if(msg_queue()->is_empty())
|
||||||
|
return cancel_wakeup_output(g);
|
||||||
|
|
||||||
|
ACE_Message_Block *mblk;
|
||||||
|
|
||||||
|
if(msg_queue()->dequeue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1)
|
||||||
|
{
|
||||||
|
sLog.outError("WorldSocket::handle_output_queue dequeue_head");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
const size_t send_len = mblk->length ();
|
||||||
|
|
||||||
|
#ifdef MSG_NOSIGNAL
|
||||||
|
ssize_t n = peer ().send (mblk->rd_ptr (), send_len, MSG_NOSIGNAL);
|
||||||
|
#else
|
||||||
|
ssize_t n = peer ().send (mblk->rd_ptr (), send_len);
|
||||||
|
#endif // MSG_NOSIGNAL
|
||||||
|
|
||||||
|
if (n == 0)
|
||||||
|
{
|
||||||
|
mblk->release();
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
else if (n == -1)
|
||||||
|
{
|
||||||
|
if (errno == EWOULDBLOCK || errno == EAGAIN)
|
||||||
|
{
|
||||||
|
msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero);
|
||||||
|
return schedule_wakeup_output (g);
|
||||||
|
}
|
||||||
|
|
||||||
|
mblk->release();
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
else if (n < send_len) //now n > 0
|
||||||
|
{
|
||||||
|
mblk->rd_ptr (static_cast<size_t> (n));
|
||||||
|
|
||||||
|
if (msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero) == -1)
|
||||||
|
{
|
||||||
|
sLog.outError("WorldSocket::handle_output_queue enqueue_head");
|
||||||
|
mblk->release();
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return schedule_wakeup_output (g);
|
||||||
|
}
|
||||||
|
else //now n == send_len
|
||||||
|
{
|
||||||
|
mblk->release();
|
||||||
|
|
||||||
|
return msg_queue()->is_empty() ? cancel_wakeup_output(g) : ACE_Event_Handler::WRITE_MASK;
|
||||||
|
}
|
||||||
|
|
||||||
|
ACE_NOTREACHED(return -1);
|
||||||
|
}
|
||||||
|
|
||||||
int WorldSocket::handle_close (ACE_HANDLE h, ACE_Reactor_Mask)
|
int WorldSocket::handle_close (ACE_HANDLE h, ACE_Reactor_Mask)
|
||||||
{
|
{
|
||||||
// Critical section
|
// Critical section
|
||||||
|
|
@ -389,10 +463,15 @@ int WorldSocket::Update (void)
|
||||||
if (closing_)
|
if (closing_)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
if (m_OutActive || m_OutBuffer->length () == 0)
|
if (m_OutActive || (m_OutBuffer->length () == 0 && msg_queue()->is_empty()))
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
return handle_output (get_handle ());
|
int ret;
|
||||||
|
do
|
||||||
|
ret = handle_output (get_handle ());
|
||||||
|
while( ret > 0 );
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int WorldSocket::handle_input_header (void)
|
int WorldSocket::handle_input_header (void)
|
||||||
|
|
@ -986,53 +1065,3 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket)
|
||||||
packet << ping;
|
packet << ping;
|
||||||
return SendPacket (packet);
|
return SendPacket (packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
int WorldSocket::iSendPacket (const WorldPacket& pct)
|
|
||||||
{
|
|
||||||
ServerPktHeader header(pct.size()+2, pct.GetOpcode());
|
|
||||||
if (m_OutBuffer->space () < pct.size () + header.getHeaderLength())
|
|
||||||
{
|
|
||||||
errno = ENOBUFS;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
m_Crypt.EncryptSend ( header.header, header.getHeaderLength());
|
|
||||||
|
|
||||||
if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1)
|
|
||||||
ACE_ASSERT (false);
|
|
||||||
|
|
||||||
if (!pct.empty ())
|
|
||||||
if (m_OutBuffer->copy ((char*) pct.contents (), pct.size ()) == -1)
|
|
||||||
ACE_ASSERT (false);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool WorldSocket::iFlushPacketQueue ()
|
|
||||||
{
|
|
||||||
WorldPacket *pct;
|
|
||||||
bool haveone = false;
|
|
||||||
|
|
||||||
while (m_PacketQueue.dequeue_head (pct) == 0)
|
|
||||||
{
|
|
||||||
if (iSendPacket (*pct) == -1)
|
|
||||||
{
|
|
||||||
if (m_PacketQueue.enqueue_head (pct) == -1)
|
|
||||||
{
|
|
||||||
delete pct;
|
|
||||||
sLog.outError ("WorldSocket::iFlushPacketQueue m_PacketQueue->enqueue_head");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
haveone = true;
|
|
||||||
delete pct;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return haveone;
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -101,9 +101,6 @@ class WorldSocket : protected WorldHandler
|
||||||
typedef ACE_Thread_Mutex LockType;
|
typedef ACE_Thread_Mutex LockType;
|
||||||
typedef ACE_Guard<LockType> GuardType;
|
typedef ACE_Guard<LockType> GuardType;
|
||||||
|
|
||||||
/// Queue for storing packets for which there is no space.
|
|
||||||
typedef ACE_Unbounded_Queue< WorldPacket* > PacketQueueT;
|
|
||||||
|
|
||||||
/// Check if socket is closed.
|
/// Check if socket is closed.
|
||||||
bool IsClosed (void) const;
|
bool IsClosed (void) const;
|
||||||
|
|
||||||
|
|
@ -159,6 +156,9 @@ class WorldSocket : protected WorldHandler
|
||||||
int cancel_wakeup_output (GuardType& g);
|
int cancel_wakeup_output (GuardType& g);
|
||||||
int schedule_wakeup_output (GuardType& g);
|
int schedule_wakeup_output (GuardType& g);
|
||||||
|
|
||||||
|
/// Drain the queue if its not empty.
|
||||||
|
int handle_output_queue (GuardType& g);
|
||||||
|
|
||||||
/// process one incoming packet.
|
/// process one incoming packet.
|
||||||
/// @param new_pct received packet ,note that you need to delete it.
|
/// @param new_pct received packet ,note that you need to delete it.
|
||||||
int ProcessIncoming (WorldPacket* new_pct);
|
int ProcessIncoming (WorldPacket* new_pct);
|
||||||
|
|
@ -169,16 +169,6 @@ class WorldSocket : protected WorldHandler
|
||||||
/// Called by ProcessIncoming() on CMSG_PING.
|
/// Called by ProcessIncoming() on CMSG_PING.
|
||||||
int HandlePing (WorldPacket& recvPacket);
|
int HandlePing (WorldPacket& recvPacket);
|
||||||
|
|
||||||
/// Try to write WorldPacket to m_OutBuffer ,return -1 if no space
|
|
||||||
/// Need to be called with m_OutBufferLock lock held
|
|
||||||
int iSendPacket (const WorldPacket& pct);
|
|
||||||
|
|
||||||
/// Flush m_PacketQueue if there are packets in it
|
|
||||||
/// Need to be called with m_OutBufferLock lock held
|
|
||||||
/// @return true if it wrote to the buffer ( AKA you need
|
|
||||||
/// to mark the socket for output ).
|
|
||||||
bool iFlushPacketQueue ();
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// Time in which the last ping was received
|
/// Time in which the last ping was received
|
||||||
ACE_Time_Value m_LastPingTime;
|
ACE_Time_Value m_LastPingTime;
|
||||||
|
|
@ -218,10 +208,6 @@ class WorldSocket : protected WorldHandler
|
||||||
/// Size of the m_OutBuffer.
|
/// Size of the m_OutBuffer.
|
||||||
size_t m_OutBufferSize;
|
size_t m_OutBufferSize;
|
||||||
|
|
||||||
/// Here are stored packets for which there was no space on m_OutBuffer,
|
|
||||||
/// this allows not-to kick player if its buffer is overflowed.
|
|
||||||
PacketQueueT m_PacketQueue;
|
|
||||||
|
|
||||||
/// True if the socket is registered with the reactor for output
|
/// True if the socket is registered with the reactor for output
|
||||||
bool m_OutActive;
|
bool m_OutActive;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
#ifndef __REVISION_NR_H__
|
#ifndef __REVISION_NR_H__
|
||||||
#define __REVISION_NR_H__
|
#define __REVISION_NR_H__
|
||||||
#define REVISION_NR "7021"
|
#define REVISION_NR "7022"
|
||||||
#endif // __REVISION_NR_H__
|
#endif // __REVISION_NR_H__
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue