[10865] Make DB code thread-safe. Original patch by Machiavelli and Kero99.

Signed-off-by: Ambal <pogrebniak@gala.net>
This commit is contained in:
Ambal 2010-12-12 11:34:26 +02:00
parent 9b3535f803
commit af66b470a8
8 changed files with 53 additions and 26 deletions

View file

@ -264,6 +264,8 @@ bool DatabaseMysql::Execute(const char *sql)
// don't use queued execution if it has not been initialized // don't use queued execution if it has not been initialized
if (!m_threadBody) return DirectExecute(sql); if (!m_threadBody) return DirectExecute(sql);
ACE_Guard<ACE_Thread_Mutex> _lock(nMutex);
tranThread = ACE_Based::Thread::current(); // owner of this transaction tranThread = ACE_Based::Thread::current(); // owner of this transaction
TransactionQueues::iterator i = m_tranQueues.find(tranThread); TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL) if (i != m_tranQueues.end() && i->second != NULL)
@ -341,6 +343,8 @@ bool DatabaseMysql::BeginTransaction()
return true; // transaction started return true; // transaction started
} }
ACE_Guard<ACE_Thread_Mutex> _lock(nMutex);
tranThread = ACE_Based::Thread::current(); // owner of this transaction tranThread = ACE_Based::Thread::current(); // owner of this transaction
TransactionQueues::iterator i = m_tranQueues.find(tranThread); TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL) if (i != m_tranQueues.end() && i->second != NULL)
@ -369,15 +373,17 @@ bool DatabaseMysql::CommitTransaction()
return _res; return _res;
} }
ACE_Guard<ACE_Thread_Mutex> _lock(nMutex);
tranThread = ACE_Based::Thread::current(); tranThread = ACE_Based::Thread::current();
TransactionQueues::iterator i = m_tranQueues.find(tranThread); TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL) if (i != m_tranQueues.end() && i->second != NULL)
{ {
m_threadBody->Delay(i->second); m_threadBody->Delay(i->second);
i->second = NULL; m_tranQueues.erase(i);
return true; return true;
} }
else
return false; return false;
} }
@ -397,13 +403,16 @@ bool DatabaseMysql::RollbackTransaction()
return _res; return _res;
} }
ACE_Guard<ACE_Thread_Mutex> _lock(nMutex);
tranThread = ACE_Based::Thread::current(); tranThread = ACE_Based::Thread::current();
TransactionQueues::iterator i = m_tranQueues.find(tranThread); TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL) if (i != m_tranQueues.end() && i->second != NULL)
{ {
delete i->second; delete i->second;
i->second = NULL; m_tranQueues.erase(i);
} }
return true; return true;
} }

View file

@ -65,7 +65,8 @@ class MANGOS_DLL_SPEC DatabaseMysql : public Database
// must be call before finish thread run // must be call before finish thread run
void ThreadEnd(); void ThreadEnd();
private: private:
ACE_Thread_Mutex mMutex; ACE_Thread_Mutex mMutex; // For thread safe operations between core and mySQL server
ACE_Thread_Mutex nMutex; // For thread safe operations on m_transQueues
ACE_Based::Thread * tranThread; ACE_Based::Thread * tranThread;

View file

@ -24,6 +24,14 @@ SqlDelayThread::SqlDelayThread(Database* db) : m_dbEngine(db), m_running(true)
{ {
} }
SqlDelayThread::~SqlDelayThread()
{
//empty SQL queue before exiting
SqlOperation* s = NULL;
while (m_sqlQueue.next(s))
delete s;
}
void SqlDelayThread::run() void SqlDelayThread::run()
{ {
#ifndef DO_POSTGRESQL #ifndef DO_POSTGRESQL
@ -39,14 +47,15 @@ void SqlDelayThread::run()
{ {
// if the running state gets turned off while sleeping // if the running state gets turned off while sleeping
// empty the queue before exiting // empty the queue before exiting
ACE_Based::Thread::Sleep(loopSleepms); ACE_Based::Thread::Sleep(loopSleepms);
SqlOperation* s = NULL; SqlOperation* s = NULL;
while (m_sqlQueue.next(s)) while (m_sqlQueue.next(s))
{ {
s->Execute(m_dbEngine); s->Execute(m_dbEngine);
delete s; delete s;
} }
if((loopCounter++) >= pingEveryLoop) if((loopCounter++) >= pingEveryLoop)
{ {
loopCounter = 0; loopCounter = 0;

View file

@ -36,9 +36,9 @@ class SqlDelayThread : public ACE_Based::Runnable
Database* m_dbEngine; ///< Pointer to used Database engine Database* m_dbEngine; ///< Pointer to used Database engine
volatile bool m_running; volatile bool m_running;
SqlDelayThread();
public: public:
SqlDelayThread(Database* db); SqlDelayThread(Database* db);
~SqlDelayThread();
///< Put sql statement to delay queue ///< Put sql statement to delay queue
bool Delay(SqlOperation* sql) { m_sqlQueue.add(sql); return true; } bool Delay(SqlOperation* sql) { m_sqlQueue.add(sql); return true; }

View file

@ -31,8 +31,10 @@ void SqlStatement::Execute(Database *db)
void SqlTransaction::Execute(Database *db) void SqlTransaction::Execute(Database *db)
{ {
ACE_Guard<ACE_Thread_Mutex> _lock(m_Mutex);
if(m_queue.empty()) if(m_queue.empty())
return; return;
db->DirectExecute("START TRANSACTION"); db->DirectExecute("START TRANSACTION");
while(!m_queue.empty()) while(!m_queue.empty())
{ {
@ -42,17 +44,20 @@ void SqlTransaction::Execute(Database *db)
if(!db->DirectExecute(sql)) if(!db->DirectExecute(sql))
{ {
delete [] sql; delete [] sql;
db->DirectExecute("ROLLBACK"); db->DirectExecute("ROLLBACK");
while(!m_queue.empty()) while(!m_queue.empty())
{ {
delete [] (const_cast<char*>(m_queue.front())); delete [] (const_cast<char*>(m_queue.front()));
m_queue.pop(); m_queue.pop();
} }
return; return;
} }
delete [] sql; delete [] sql;
} }
db->DirectExecute("COMMIT"); db->DirectExecute("COMMIT");
} }

View file

@ -55,9 +55,18 @@ class SqlTransaction : public SqlOperation
{ {
private: private:
std::queue<const char *> m_queue; std::queue<const char *> m_queue;
ACE_Thread_Mutex m_Mutex;
public: public:
SqlTransaction() {} SqlTransaction() {}
void DelayExecute(const char *sql) { m_queue.push(mangos_strdup(sql)); } void DelayExecute(const char *sql)
{
char* _sql = mangos_strdup(sql);
if (_sql)
{
ACE_Guard<ACE_Thread_Mutex> _lock(m_Mutex);
m_queue.push(_sql);
}
}
void Execute(Database *db); void Execute(Database *db);
}; };

View file

@ -55,27 +55,18 @@ namespace ACE_Based
//! Adds an item to the queue. //! Adds an item to the queue.
void add(const T& item) void add(const T& item)
{ {
lock(); ACE_Guard<LockType> g(this->_lock);
//ASSERT(!this->_canceled);
// throw Cancellation_Exception();
_queue.push_back(item); _queue.push_back(item);
unlock();
} }
//! Gets the next result in the queue, if any. //! Gets the next result in the queue, if any.
bool next(T& result) bool next(T& result)
{ {
ACE_Guard<LockType> g(this->_lock); ACE_GUARD_RETURN (LockType, g, this->_lock, false);
if (_queue.empty()) if (_queue.empty())
return false; return false;
//ASSERT (!_queue.empty() || !this->_canceled);
// throw Cancellation_Exception();
result = _queue.front(); result = _queue.front();
_queue.pop_front(); _queue.pop_front();
@ -85,7 +76,7 @@ namespace ACE_Based
template<class Checker> template<class Checker>
bool next(T& result, Checker& check) bool next(T& result, Checker& check)
{ {
ACE_Guard<LockType> g(this->_lock); ACE_GUARD_RETURN (LockType, g, this->_lock, false);
if (_queue.empty()) if (_queue.empty())
return false; return false;
@ -111,18 +102,14 @@ namespace ACE_Based
//! Cancels the queue. //! Cancels the queue.
void cancel() void cancel()
{ {
lock(); ACE_Guard<LockType> g(this->_lock);
_canceled = true; _canceled = true;
unlock();
} }
//! Checks if the queue is cancelled. //! Checks if the queue is cancelled.
bool cancelled() bool cancelled()
{ {
ACE_Guard<LockType> g(this->_lock); ACE_Guard<LockType> g(this->_lock);
return _canceled; return _canceled;
} }
@ -137,6 +124,13 @@ namespace ACE_Based
{ {
this->_lock.release(); this->_lock.release();
} }
///! Checks if we're empty or not with locks held
bool empty()
{
ACE_Guard<LockType> g(this->_lock);
return _queue.empty();
}
}; };
} }
#endif #endif

View file

@ -1,4 +1,4 @@
#ifndef __REVISION_NR_H__ #ifndef __REVISION_NR_H__
#define __REVISION_NR_H__ #define __REVISION_NR_H__
#define REVISION_NR "10864" #define REVISION_NR "10865"
#endif // __REVISION_NR_H__ #endif // __REVISION_NR_H__