From af66b470a863ad3f57a6c0fe9e5546400ad8789f Mon Sep 17 00:00:00 2001 From: Ambal Date: Sun, 12 Dec 2010 11:34:26 +0200 Subject: [PATCH] [10865] Make DB code thread-safe. Original patch by Machiavelli and Kero99. Signed-off-by: Ambal --- src/shared/Database/DatabaseMysql.cpp | 17 ++++++++++++---- src/shared/Database/DatabaseMysql.h | 3 ++- src/shared/Database/SqlDelayThread.cpp | 11 +++++++++- src/shared/Database/SqlDelayThread.h | 2 +- src/shared/Database/SqlOperations.cpp | 5 +++++ src/shared/Database/SqlOperations.h | 11 +++++++++- src/shared/LockedQueue.h | 28 ++++++++++---------------- src/shared/revision_nr.h | 2 +- 8 files changed, 53 insertions(+), 26 deletions(-) diff --git a/src/shared/Database/DatabaseMysql.cpp b/src/shared/Database/DatabaseMysql.cpp index 02d1193ab..8648b9766 100644 --- a/src/shared/Database/DatabaseMysql.cpp +++ b/src/shared/Database/DatabaseMysql.cpp @@ -264,6 +264,8 @@ bool DatabaseMysql::Execute(const char *sql) // don't use queued execution if it has not been initialized if (!m_threadBody) return DirectExecute(sql); + ACE_Guard _lock(nMutex); + tranThread = ACE_Based::Thread::current(); // owner of this transaction TransactionQueues::iterator i = m_tranQueues.find(tranThread); if (i != m_tranQueues.end() && i->second != NULL) @@ -341,6 +343,8 @@ bool DatabaseMysql::BeginTransaction() return true; // transaction started } + ACE_Guard _lock(nMutex); + tranThread = ACE_Based::Thread::current(); // owner of this transaction TransactionQueues::iterator i = m_tranQueues.find(tranThread); if (i != m_tranQueues.end() && i->second != NULL) @@ -369,16 +373,18 @@ bool DatabaseMysql::CommitTransaction() return _res; } + ACE_Guard _lock(nMutex); + tranThread = ACE_Based::Thread::current(); TransactionQueues::iterator i = m_tranQueues.find(tranThread); if (i != m_tranQueues.end() && i->second != NULL) { m_threadBody->Delay(i->second); - i->second = NULL; + m_tranQueues.erase(i); return true; } - else - return false; + + return false; } bool DatabaseMysql::RollbackTransaction() @@ -397,13 +403,16 @@ bool DatabaseMysql::RollbackTransaction() return _res; } + ACE_Guard _lock(nMutex); + tranThread = ACE_Based::Thread::current(); TransactionQueues::iterator i = m_tranQueues.find(tranThread); if (i != m_tranQueues.end() && i->second != NULL) { delete i->second; - i->second = NULL; + m_tranQueues.erase(i); } + return true; } diff --git a/src/shared/Database/DatabaseMysql.h b/src/shared/Database/DatabaseMysql.h index 43baddfcb..6d27f5a5b 100644 --- a/src/shared/Database/DatabaseMysql.h +++ b/src/shared/Database/DatabaseMysql.h @@ -65,7 +65,8 @@ class MANGOS_DLL_SPEC DatabaseMysql : public Database // must be call before finish thread run void ThreadEnd(); private: - ACE_Thread_Mutex mMutex; + ACE_Thread_Mutex mMutex; // For thread safe operations between core and mySQL server + ACE_Thread_Mutex nMutex; // For thread safe operations on m_transQueues ACE_Based::Thread * tranThread; diff --git a/src/shared/Database/SqlDelayThread.cpp b/src/shared/Database/SqlDelayThread.cpp index e1311007e..13dd30149 100644 --- a/src/shared/Database/SqlDelayThread.cpp +++ b/src/shared/Database/SqlDelayThread.cpp @@ -24,6 +24,14 @@ SqlDelayThread::SqlDelayThread(Database* db) : m_dbEngine(db), m_running(true) { } +SqlDelayThread::~SqlDelayThread() +{ + //empty SQL queue before exiting + SqlOperation* s = NULL; + while (m_sqlQueue.next(s)) + delete s; +} + void SqlDelayThread::run() { #ifndef DO_POSTGRESQL @@ -39,14 +47,15 @@ void SqlDelayThread::run() { // if the running state gets turned off while sleeping // empty the queue before exiting - ACE_Based::Thread::Sleep(loopSleepms); + SqlOperation* s = NULL; while (m_sqlQueue.next(s)) { s->Execute(m_dbEngine); delete s; } + if((loopCounter++) >= pingEveryLoop) { loopCounter = 0; diff --git a/src/shared/Database/SqlDelayThread.h b/src/shared/Database/SqlDelayThread.h index 0b3e633ef..f34ae2640 100644 --- a/src/shared/Database/SqlDelayThread.h +++ b/src/shared/Database/SqlDelayThread.h @@ -36,9 +36,9 @@ class SqlDelayThread : public ACE_Based::Runnable Database* m_dbEngine; ///< Pointer to used Database engine volatile bool m_running; - SqlDelayThread(); public: SqlDelayThread(Database* db); + ~SqlDelayThread(); ///< Put sql statement to delay queue bool Delay(SqlOperation* sql) { m_sqlQueue.add(sql); return true; } diff --git a/src/shared/Database/SqlOperations.cpp b/src/shared/Database/SqlOperations.cpp index 25933a32a..9003a01e3 100644 --- a/src/shared/Database/SqlOperations.cpp +++ b/src/shared/Database/SqlOperations.cpp @@ -31,8 +31,10 @@ void SqlStatement::Execute(Database *db) void SqlTransaction::Execute(Database *db) { + ACE_Guard _lock(m_Mutex); if(m_queue.empty()) return; + db->DirectExecute("START TRANSACTION"); while(!m_queue.empty()) { @@ -42,17 +44,20 @@ void SqlTransaction::Execute(Database *db) if(!db->DirectExecute(sql)) { delete [] sql; + db->DirectExecute("ROLLBACK"); while(!m_queue.empty()) { delete [] (const_cast(m_queue.front())); m_queue.pop(); } + return; } delete [] sql; } + db->DirectExecute("COMMIT"); } diff --git a/src/shared/Database/SqlOperations.h b/src/shared/Database/SqlOperations.h index fa9437cc0..bfef89331 100644 --- a/src/shared/Database/SqlOperations.h +++ b/src/shared/Database/SqlOperations.h @@ -55,9 +55,18 @@ class SqlTransaction : public SqlOperation { private: std::queue m_queue; + ACE_Thread_Mutex m_Mutex; public: SqlTransaction() {} - void DelayExecute(const char *sql) { m_queue.push(mangos_strdup(sql)); } + void DelayExecute(const char *sql) + { + char* _sql = mangos_strdup(sql); + if (_sql) + { + ACE_Guard _lock(m_Mutex); + m_queue.push(_sql); + } + } void Execute(Database *db); }; diff --git a/src/shared/LockedQueue.h b/src/shared/LockedQueue.h index 966abd70a..1d2087bfd 100644 --- a/src/shared/LockedQueue.h +++ b/src/shared/LockedQueue.h @@ -55,27 +55,18 @@ namespace ACE_Based //! Adds an item to the queue. void add(const T& item) { - lock(); - - //ASSERT(!this->_canceled); - // throw Cancellation_Exception(); - + ACE_Guard g(this->_lock); _queue.push_back(item); - - unlock(); } //! Gets the next result in the queue, if any. bool next(T& result) { - ACE_Guard g(this->_lock); + ACE_GUARD_RETURN (LockType, g, this->_lock, false); if (_queue.empty()) return false; - //ASSERT (!_queue.empty() || !this->_canceled); - // throw Cancellation_Exception(); - result = _queue.front(); _queue.pop_front(); @@ -85,7 +76,7 @@ namespace ACE_Based template bool next(T& result, Checker& check) { - ACE_Guard g(this->_lock); + ACE_GUARD_RETURN (LockType, g, this->_lock, false); if (_queue.empty()) return false; @@ -111,18 +102,14 @@ namespace ACE_Based //! Cancels the queue. void cancel() { - lock(); - + ACE_Guard g(this->_lock); _canceled = true; - - unlock(); } //! Checks if the queue is cancelled. bool cancelled() { ACE_Guard g(this->_lock); - return _canceled; } @@ -137,6 +124,13 @@ namespace ACE_Based { this->_lock.release(); } + + ///! Checks if we're empty or not with locks held + bool empty() + { + ACE_Guard g(this->_lock); + return _queue.empty(); + } }; } #endif diff --git a/src/shared/revision_nr.h b/src/shared/revision_nr.h index 34abfd6ca..d7435ce76 100644 --- a/src/shared/revision_nr.h +++ b/src/shared/revision_nr.h @@ -1,4 +1,4 @@ #ifndef __REVISION_NR_H__ #define __REVISION_NR_H__ - #define REVISION_NR "10864" + #define REVISION_NR "10865" #endif // __REVISION_NR_H__