[8463] Fixed race conditions in LockedQueue.

Signed-off-by: ApoC <apoc@nymfe.net>
This commit is contained in:
XTZGZoReX 2009-08-30 23:23:46 +02:00 committed by ApoC
parent 7f444189c6
commit 66ffd80ed2
6 changed files with 42 additions and 86 deletions

View file

@ -112,8 +112,9 @@ World::~World()
m_weathers.clear(); m_weathers.clear();
while (!cliCmdQueue.empty()) CliCommandHolder* command;
delete cliCmdQueue.next(); while (cliCmdQueue.next(command))
delete command;
VMAP::VMapFactory::clear(); VMAP::VMapFactory::clear();
@ -1993,11 +1994,9 @@ void World::SendServerMessage(ServerMessageType type, const char *text, Player*
void World::UpdateSessions( uint32 diff ) void World::UpdateSessions( uint32 diff )
{ {
///- Add new sessions ///- Add new sessions
while(!addSessQueue.empty()) WorldSession* sess;
{ while(addSessQueue.next(sess))
WorldSession* sess = addSessQueue.next ();
AddSession_ (sess); AddSession_ (sess);
}
///- Then send an update signal to remaining ones ///- Then send an update signal to remaining ones
for (SessionMap::iterator itr = m_sessions.begin(), next; itr != m_sessions.end(); itr = next) for (SessionMap::iterator itr = m_sessions.begin(), next; itr != m_sessions.end(); itr = next)
@ -2021,25 +2020,20 @@ void World::UpdateSessions( uint32 diff )
// This handles the issued and queued CLI commands // This handles the issued and queued CLI commands
void World::ProcessCliCommands() void World::ProcessCliCommands()
{ {
if (cliCmdQueue.empty()) CliCommandHolder::Print* zprint = NULL;
return;
CliCommandHolder::Print* zprint; CliCommandHolder* command;
while (cliCmdQueue.next(command))
while (!cliCmdQueue.empty())
{ {
sLog.outDebug("CLI command under processing..."); sLog.outDebug("CLI command under processing...");
CliCommandHolder *command = cliCmdQueue.next();
zprint = command->m_print; zprint = command->m_print;
CliHandler(zprint).ParseCommands(command->m_command); CliHandler(zprint).ParseCommands(command->m_command);
delete command; delete command;
} }
// print the console message here so it looks right // print the console message here so it looks right
zprint("mangos>"); if (zprint)
zprint("mangos>");
} }
void World::InitResultQueue() void World::InitResultQueue()

View file

@ -69,11 +69,9 @@ WorldSession::~WorldSession()
} }
///- empty incoming packet queue ///- empty incoming packet queue
while(!_recvQueue.empty()) WorldPacket* packet;
{ while(_recvQueue.next(packet))
WorldPacket *packet = _recvQueue.next ();
delete packet; delete packet;
}
} }
void WorldSession::SizeError(WorldPacket const& packet, uint32 size) const void WorldSession::SizeError(WorldPacket const& packet, uint32 size) const
@ -163,10 +161,9 @@ bool WorldSession::Update(uint32 /*diff*/)
{ {
///- Retrieve packets from the receive queue and call the appropriate handlers ///- Retrieve packets from the receive queue and call the appropriate handlers
/// not proccess packets if socket already closed /// not proccess packets if socket already closed
while (!_recvQueue.empty() && m_Socket && !m_Socket->IsClosed ()) WorldPacket* packet;
while (_recvQueue.next(packet) && m_Socket && !m_Socket->IsClosed ())
{ {
WorldPacket *packet = _recvQueue.next();
/*#if 1 /*#if 1
sLog.outError( "MOEP: %s (0x%.4X)", sLog.outError( "MOEP: %s (0x%.4X)",
LookupOpcodeName(packet->GetOpcode()), LookupOpcodeName(packet->GetOpcode()),

View file

@ -26,7 +26,6 @@ SqlDelayThread::SqlDelayThread(Database* db) : m_dbEngine(db), m_running(true)
void SqlDelayThread::run() void SqlDelayThread::run()
{ {
SqlOperation* s;
#ifndef DO_POSTGRESQL #ifndef DO_POSTGRESQL
mysql_thread_init(); mysql_thread_init();
#endif #endif
@ -36,9 +35,9 @@ void SqlDelayThread::run()
// if the running state gets turned off while sleeping // if the running state gets turned off while sleeping
// empty the queue before exiting // empty the queue before exiting
ACE_Based::Thread::Sleep(10); ACE_Based::Thread::Sleep(10);
while (!m_sqlQueue.empty()) SqlOperation* s;
while (m_sqlQueue.next(s))
{ {
s = m_sqlQueue.next();
s->Execute(m_dbEngine); s->Execute(m_dbEngine);
delete s; delete s;
} }

View file

@ -71,9 +71,9 @@ void SqlQuery::Execute(Database *db)
void SqlResultQueue::Update() void SqlResultQueue::Update()
{ {
/// execute the callbacks waiting in the synchronization queue /// execute the callbacks waiting in the synchronization queue
while(!empty()) MaNGOS::IQueryCallback* callback;
while (next(callback))
{ {
MaNGOS::IQueryCallback * callback = next();
callback->Execute(); callback->Execute();
delete callback; delete callback;
} }

View file

@ -30,99 +30,65 @@ namespace ACE_Based
template <class T, class LockType, typename StorageType=std::deque<T> > template <class T, class LockType, typename StorageType=std::deque<T> >
class LockedQueue class LockedQueue
{ {
//! Serialize access to the Queue //! Lock access to the queue.
LockType _lock; LockType _lock;
//! Storage backing the queue //! Storage backing the queue.
StorageType _queue; StorageType _queue;
//! Cancellation flag //! Cancellation flag.
volatile bool _canceled; /*volatile*/ bool _canceled;
public: public:
//! Create a LockedQueue //! Create a LockedQueue.
LockedQueue() : _canceled(false) {} LockedQueue() : _canceled(false) {}
//! Destroy a LockedQueue //! Destroy a LockedQueue.
virtual ~LockedQueue() { } virtual ~LockedQueue() { }
/** //! Adds an item to the queue.
* @see Queue::add(const T& item)
*/
void add(const T& item) void add(const T& item)
{ {
ACE_Guard<LockType> g(this->_lock); ACE_Guard<LockType> g(this->_lock);
ASSERT(!this->_canceled); //ASSERT(!this->_canceled);
// throw Cancellation_Exception(); // throw Cancellation_Exception();
this->_queue.push_back(item); _queue.push_back(item);
} }
/** //! Gets the next result in the queue, if any.
* @see Queue::next() bool next(T& result)
*/
T next()
{ {
ACE_Guard<LockType> g(this->_lock); ACE_Guard<LockType> g(this->_lock);
ASSERT (!_queue.empty() || !this->_canceled); if (_queue.empty())
return false;
//ASSERT (!_queue.empty() || !this->_canceled);
// throw Cancellation_Exception(); // throw Cancellation_Exception();
T item = this->_queue.front(); result = _queue.front();
this->_queue.pop_front(); _queue.pop_front();
return item; return true;
} }
T front() //! Cancels the queue.
{
ACE_Guard<LockType> g(this->_lock);
ASSERT (!this->_queue.empty());
// throw NoSuchElement_Exception();
return this->_queue.front();
}
/**
* @see Queue::cancel()
*/
void cancel() void cancel()
{ {
ACE_Guard<LockType> g(this->_lock); ACE_Guard<LockType> g(this->_lock);
this->_canceled = true; _canceled = true;
} }
/** //! Checks if the queue is cancelled.
* @see Queue::isCanceled() bool cancelled()
*/
bool isCanceled()
{
// Faster check since the queue will not become un-canceled
if(this->_canceled)
return true;
ACE_Guard<LockType> g(this->_lock);
return this->_canceled;
}
/**
* @see Queue::size()
*/
size_t size()
{ {
ACE_Guard<LockType> g(this->_lock); ACE_Guard<LockType> g(this->_lock);
return this->_queue.size();
}
bool empty() return _canceled;
{
ACE_Guard<LockType> g(this->_lock);
return this->_queue.empty();
} }
}; };
} }

View file

@ -1,4 +1,4 @@
#ifndef __REVISION_NR_H__ #ifndef __REVISION_NR_H__
#define __REVISION_NR_H__ #define __REVISION_NR_H__
#define REVISION_NR "8462" #define REVISION_NR "8463"
#endif // __REVISION_NR_H__ #endif // __REVISION_NR_H__