[10848] New system for parallelizing client packet processing. Handle WorldSession updates in Map::Update() where we are safe to proceed. Thank you for all your feedback!

Signed-off-by: Ambal <pogrebniak@gala.net>
This commit is contained in:
Ambal 2010-12-09 20:20:58 +02:00
parent 4e72ead2fb
commit 5f539117a4
8 changed files with 1447 additions and 1322 deletions

View file

@ -461,6 +461,20 @@ bool Map::loaded(const GridPair &p) const
void Map::Update(const uint32 &t_diff) void Map::Update(const uint32 &t_diff)
{ {
/// update worldsessions for existing players
for(m_mapRefIter = m_mapRefManager.begin(); m_mapRefIter != m_mapRefManager.end(); ++m_mapRefIter)
{
Player* plr = m_mapRefIter->getSource();
if(plr && plr->IsInWorld())
{
//plr->Update(t_diff);
WorldSession * pSession = plr->GetSession();
MapSessionFilter updater(pSession);
pSession->Update(t_diff, updater);
}
}
/// update players at tick /// update players at tick
for(m_mapRefIter = m_mapRefManager.begin(); m_mapRefIter != m_mapRefManager.end(); ++m_mapRefIter) for(m_mapRefIter = m_mapRefManager.begin(); m_mapRefIter != m_mapRefManager.end(); ++m_mapRefIter)
{ {

File diff suppressed because it is too large Load diff

View file

@ -1359,12 +1359,20 @@ enum SessionStatus
STATUS_UNHANDLED ///< We don' handle this opcode yet STATUS_UNHANDLED ///< We don' handle this opcode yet
}; };
enum PacketProcessing
{
PROCESS_INPLACE = 0, //process packet whenever we receive it - mostly for non-handled or non-implemented packets
PROCESS_THREADUNSAFE, //packet is not thread-safe - process it in World::UpdateSessions()
PROCESS_THREADSAFE //packet is thread-safe - process it in Map::Update()
};
class WorldPacket; class WorldPacket;
struct OpcodeHandler struct OpcodeHandler
{ {
char const* name; char const* name;
SessionStatus status; SessionStatus status;
PacketProcessing packetProcessing;
void (WorldSession::*handler)(WorldPacket& recvPacket); void (WorldSession::*handler)(WorldPacket& recvPacket);
}; };

View file

@ -1882,11 +1882,14 @@ void World::UpdateSessions( uint32 diff )
next = itr; next = itr;
++next; ++next;
///- and remove not active sessions from the list ///- and remove not active sessions from the list
if(!itr->second->Update(diff)) // As interval = 0 WorldSession * pSession = itr->second;
WorldSessionFilter updater(pSession);
if(!pSession->Update(diff, updater)) // As interval = 0
{ {
RemoveQueuedSession (itr->second); RemoveQueuedSession(pSession);
delete itr->second;
m_sessions.erase(itr); m_sessions.erase(itr);
delete pSession;
} }
} }
} }

View file

@ -39,6 +39,47 @@
#include "Auth/HMACSHA1.h" #include "Auth/HMACSHA1.h"
#include "zlib/zlib.h" #include "zlib/zlib.h"
bool MapSessionFilter::Process(WorldPacket * packet)
{
OpcodeHandler const& opHandle = opcodeTable[packet->GetOpcode()];
//let's check if our opcode can be really processed in Map::Update()
if(opHandle.packetProcessing == PROCESS_INPLACE)
return true;
//we do not process thread-unsafe packets
if(opHandle.packetProcessing == PROCESS_THREADUNSAFE)
return false;
Player * plr = m_pSession->GetPlayer();
if(!plr)
return false;
//in Map::Update() we do not process packets where player is not in world!
return plr->IsInWorld();
}
//we should process ALL packets when player is not in world/logged in
//OR packet handler is not thread-safe!
bool WorldSessionFilter::Process(WorldPacket* packet)
{
OpcodeHandler const& opHandle = opcodeTable[packet->GetOpcode()];
//check if packet handler is supposed to be safe
if(opHandle.packetProcessing == PROCESS_INPLACE)
return true;
//thread-unsafe packets should be processed in World::UpdateSessions()
if(opHandle.packetProcessing == PROCESS_THREADUNSAFE)
return true;
//no player attached? -> our client! ^^
Player * plr = m_pSession->GetPlayer();
if(!plr)
return true;
//lets process all packets for non-in-the-world player
return (plr->IsInWorld() == false);
}
/// WorldSession constructor /// WorldSession constructor
WorldSession::WorldSession(uint32 id, WorldSocket *sock, AccountTypes sec, uint8 expansion, time_t mute_time, LocaleConstant locale) : WorldSession::WorldSession(uint32 id, WorldSocket *sock, AccountTypes sec, uint8 expansion, time_t mute_time, LocaleConstant locale) :
LookingForGroup_auto_join(false), LookingForGroup_auto_add(false), m_muteTime(mute_time), LookingForGroup_auto_join(false), LookingForGroup_auto_add(false), m_muteTime(mute_time),
@ -158,12 +199,12 @@ void WorldSession::LogUnprocessedTail(WorldPacket *packet)
} }
/// Update the WorldSession (triggered by World update) /// Update the WorldSession (triggered by World update)
bool WorldSession::Update(uint32 /*diff*/) bool WorldSession::Update(uint32 diff, PacketFilter& updater)
{ {
///- Retrieve packets from the receive queue and call the appropriate handlers ///- Retrieve packets from the receive queue and call the appropriate handlers
/// not proccess packets if socket already closed /// not process packets if socket already closed
WorldPacket* packet; WorldPacket* packet;
while (m_Socket && !m_Socket->IsClosed() && _recvQueue.next(packet)) while (m_Socket && !m_Socket->IsClosed() && _recvQueue.next(packet, updater))
{ {
/*#if 1 /*#if 1
sLog.outError( "MOEP: %s (0x%.4X)", sLog.outError( "MOEP: %s (0x%.4X)",
@ -266,6 +307,10 @@ bool WorldSession::Update(uint32 /*diff*/)
m_Socket = NULL; m_Socket = NULL;
} }
//check if we are safe to proceed with logout
//logout procedure should happen only in World::UpdateSessions() method!!!
if(updater.ProcessLogout())
{
///- If necessary, log the player out ///- If necessary, log the player out
time_t currTime = time(NULL); time_t currTime = time(NULL);
if (!m_Socket || (ShouldLogOut(currTime) && !m_playerLoading)) if (!m_Socket || (ShouldLogOut(currTime) && !m_playerLoading))
@ -273,6 +318,7 @@ bool WorldSession::Update(uint32 /*diff*/)
if (!m_Socket) if (!m_Socket)
return false; //Will remove this session from the world session map return false; //Will remove this session from the world session map
}
return true; return true;
} }

View file

@ -143,6 +143,43 @@ enum TutorialDataState
TUTORIALDATA_NEW = 2 TUTORIALDATA_NEW = 2
}; };
//class to deal with packet processing
//allows to determine if next packet is safe to be processed
class PacketFilter
{
public:
explicit PacketFilter(WorldSession * pSession) : m_pSession(pSession) {}
virtual ~PacketFilter() {}
virtual bool Process(WorldPacket * packet) { return true; }
virtual bool ProcessLogout() const { return true; }
protected:
WorldSession * const m_pSession;
};
//process only thread-safe packets in Map::Update()
class MapSessionFilter : public PacketFilter
{
public:
explicit MapSessionFilter(WorldSession * pSession) : PacketFilter(pSession) {}
~MapSessionFilter() {}
virtual bool Process(WorldPacket * packet);
//in Map::Update() we do not process player logout!
virtual bool ProcessLogout() const { return false; }
};
//class used to filer only thread-unsafe packets from queue
//in order to update only be used in World::UpdateSessions()
class WorldSessionFilter : public PacketFilter
{
public:
explicit WorldSessionFilter(WorldSession * pSession) : PacketFilter(pSession) {}
~WorldSessionFilter() {}
virtual bool Process(WorldPacket* packet);
};
/// Player session in the World /// Player session in the World
class MANGOS_DLL_SPEC WorldSession class MANGOS_DLL_SPEC WorldSession
{ {
@ -203,7 +240,8 @@ class MANGOS_DLL_SPEC WorldSession
void KickPlayer(); void KickPlayer();
void QueuePacket(WorldPacket* new_packet); void QueuePacket(WorldPacket* new_packet);
bool Update(uint32 diff);
bool Update(uint32 diff, PacketFilter& updater);
/// Handle the authentication waiting queue (to be completed) /// Handle the authentication waiting queue (to be completed)
void SendAuthWaitQue(uint32 position); void SendAuthWaitQue(uint32 position);

View file

@ -82,6 +82,22 @@ namespace ACE_Based
return true; return true;
} }
template<class Checker>
bool next(T& result, Checker& check)
{
ACE_Guard<LockType> g(this->_lock);
if (_queue.empty())
return false;
result = _queue.front();
if(!check.Process(result))
return false;
_queue.pop_front();
return true;
}
//! Peeks at the top of the queue. Remember to unlock after use. //! Peeks at the top of the queue. Remember to unlock after use.
T& peek() T& peek()
{ {

View file

@ -1,4 +1,4 @@
#ifndef __REVISION_NR_H__ #ifndef __REVISION_NR_H__
#define __REVISION_NR_H__ #define __REVISION_NR_H__
#define REVISION_NR "10847" #define REVISION_NR "10848"
#endif // __REVISION_NR_H__ #endif // __REVISION_NR_H__