From 631ce36680d30d68c265368a515c369602ca850d Mon Sep 17 00:00:00 2001 From: Ambal Date: Wed, 19 Jan 2011 22:04:54 +0200 Subject: [PATCH] [11045] Rewrite internals of DB layer. Simplify code and use less locking. Spawn and use separate connections for sync and async DB requests. Implement database connection pool for SELECT queries. Up to maximum 16 connections supported. Disable 'autocommit' mode for MySQL. UPDATE YOUR CONFIGS! Defaults: LoginDatabaseConnections = 1 WorldDatabaseConnections = 1 CharacterDatabaseConnections = 1 If you are not using patch do not change the default settings - this is useless. You can try following option in your MySQL config to squeeze even more performance from your DB: [mysqld] transaction-isolation = READ-COMMITTED Great thanks to Undergarun, kero99 and selector for making tests and providing very useful feedback and DB statistics! Have fun :) Signed-off-by: Ambal --- src/game/CharacterHandler.cpp | 2 + src/game/ObjectMgr.cpp | 8 + src/game/World.cpp | 13 +- src/game/World.h | 1 - src/mangosd/Master.cpp | 15 +- src/mangosd/mangosd.conf.dist.in | 11 ++ src/realmd/Main.cpp | 2 +- src/shared/Database/Database.cpp | 241 ++++++++++++++++++++++- src/shared/Database/Database.h | 176 +++++++++++++---- src/shared/Database/DatabaseImpl.h | 59 ++---- src/shared/Database/DatabaseMysql.cpp | 245 ++++++------------------ src/shared/Database/DatabaseMysql.h | 55 +++--- src/shared/Database/DatabasePostgre.cpp | 224 +++++----------------- src/shared/Database/DatabasePostgre.h | 54 +++--- src/shared/Database/SqlDelayThread.cpp | 8 +- src/shared/Database/SqlDelayThread.h | 4 +- src/shared/Database/SqlOperations.cpp | 58 +++--- src/shared/Database/SqlOperations.h | 24 +-- src/shared/revision_nr.h | 2 +- 19 files changed, 655 insertions(+), 547 deletions(-) diff --git a/src/game/CharacterHandler.cpp b/src/game/CharacterHandler.cpp index c5c244deb..64405b43b 100644 --- a/src/game/CharacterHandler.cpp +++ b/src/game/CharacterHandler.cpp @@ -949,8 +949,10 @@ void WorldSession::HandleChangePlayerNameOpcodeCallBack(QueryResult *result, uin delete result; + CharacterDatabase.BeginTransaction(); CharacterDatabase.PExecute("UPDATE characters set name = '%s', at_login = at_login & ~ %u WHERE guid ='%u'", newname.c_str(), uint32(AT_LOGIN_RENAME), guidLow); CharacterDatabase.PExecute("DELETE FROM character_declinedname WHERE guid ='%u'", guidLow); + CharacterDatabase.CommitTransaction(); sLog.outChar("Account: %d (IP: %s) Character:[%s] (guid:%u) Changed name to: %s", session->GetAccountId(), session->GetRemoteAddress().c_str(), oldname.c_str(), guidLow, newname.c_str()); diff --git a/src/game/ObjectMgr.cpp b/src/game/ObjectMgr.cpp index 779bae587..309bb2f1d 100644 --- a/src/game/ObjectMgr.cpp +++ b/src/game/ObjectMgr.cpp @@ -6797,9 +6797,12 @@ void ObjectMgr::LoadWeatherZoneChances() void ObjectMgr::SaveCreatureRespawnTime(uint32 loguid, uint32 instance, time_t t) { mCreatureRespawnTimes[MAKE_PAIR64(loguid,instance)] = t; + + CharacterDatabase.BeginTransaction(); CharacterDatabase.PExecute("DELETE FROM creature_respawn WHERE guid = '%u' AND instance = '%u'", loguid, instance); if(t) CharacterDatabase.PExecute("INSERT INTO creature_respawn VALUES ( '%u', '" UI64FMTD "', '%u' )", loguid, uint64(t), instance); + CharacterDatabase.CommitTransaction(); } void ObjectMgr::DeleteCreatureData(uint32 guid) @@ -6815,9 +6818,12 @@ void ObjectMgr::DeleteCreatureData(uint32 guid) void ObjectMgr::SaveGORespawnTime(uint32 loguid, uint32 instance, time_t t) { mGORespawnTimes[MAKE_PAIR64(loguid,instance)] = t; + + CharacterDatabase.BeginTransaction(); CharacterDatabase.PExecute("DELETE FROM gameobject_respawn WHERE guid = '%u' AND instance = '%u'", loguid, instance); if(t) CharacterDatabase.PExecute("INSERT INTO gameobject_respawn VALUES ( '%u', '" UI64FMTD "', '%u' )", loguid, uint64(t), instance); + CharacterDatabase.CommitTransaction(); } void ObjectMgr::DeleteRespawnTimeForInstance(uint32 instance) @@ -6842,8 +6848,10 @@ void ObjectMgr::DeleteRespawnTimeForInstance(uint32 instance) mCreatureRespawnTimes.erase(itr); } + CharacterDatabase.BeginTransaction(); CharacterDatabase.PExecute("DELETE FROM creature_respawn WHERE instance = '%u'", instance); CharacterDatabase.PExecute("DELETE FROM gameobject_respawn WHERE instance = '%u'", instance); + CharacterDatabase.CommitTransaction(); } void ObjectMgr::DeleteGOData(uint32 guid) diff --git a/src/game/World.cpp b/src/game/World.cpp index 32e9f564b..fe8554559 100644 --- a/src/game/World.cpp +++ b/src/game/World.cpp @@ -89,7 +89,6 @@ World::World() m_startTime=m_gameTime; m_maxActiveSessionCount = 0; m_maxQueuedSessionCount = 0; - m_resultQueue = NULL; m_NextDailyQuestReset = 0; m_NextWeeklyQuestReset = 0; m_scheduledScripts = 0; @@ -133,8 +132,6 @@ World::~World() VMAP::VMapFactory::clear(); - if(m_resultQueue) delete m_resultQueue; - //TODO free addSessQueue } @@ -1938,13 +1935,14 @@ void World::ProcessCliCommands() void World::InitResultQueue() { - m_resultQueue = new SqlResultQueue; - CharacterDatabase.SetResultQueue(m_resultQueue); } void World::UpdateResultQueue() { - m_resultQueue->Update(); + //process async result queues + CharacterDatabase.ProcessResultQueue(); + WorldDatabase.ProcessResultQueue(); + LoginDatabase.ProcessResultQueue(); } void World::UpdateRealmCharCount(uint32 accountId) @@ -1960,8 +1958,11 @@ void World::_UpdateRealmCharCount(QueryResult *resultCharCount, uint32 accountId Field *fields = resultCharCount->Fetch(); uint32 charCount = fields[0].GetUInt32(); delete resultCharCount; + + LoginDatabase.BeginTransaction(); LoginDatabase.PExecute("DELETE FROM realmcharacters WHERE acctid= '%u' AND realmid = '%u'", accountId, realmID); LoginDatabase.PExecute("INSERT INTO realmcharacters (numchars, acctid, realmid) VALUES (%u, %u, %u)", charCount, accountId, realmID); + LoginDatabase.CommitTransaction(); } } diff --git a/src/game/World.h b/src/game/World.h index a57e5bb39..fc35fc474 100644 --- a/src/game/World.h +++ b/src/game/World.h @@ -657,7 +657,6 @@ class World // CLI command holder to be thread safe ACE_Based::LockedQueue cliCmdQueue; - SqlResultQueue *m_resultQueue; // next daily quests reset time time_t m_NextDailyQuestReset; diff --git a/src/mangosd/Master.cpp b/src/mangosd/Master.cpp index 5bbc16882..d3255b4b4 100644 --- a/src/mangosd/Master.cpp +++ b/src/mangosd/Master.cpp @@ -416,15 +416,16 @@ bool Master::_StartDB() { ///- Get world database info from configuration file std::string dbstring = sConfig.GetStringDefault("WorldDatabaseInfo", ""); + int nConnections = sConfig.GetIntDefault("WorldDatabaseConnections", 1); if(dbstring.empty()) { sLog.outError("Database not specified in configuration file"); return false; } - sLog.outString("World Database: %s", dbstring.c_str()); + sLog.outString("World Database: %s, total connections: %i", dbstring.c_str(), nConnections + 1); ///- Initialise the world database - if(!WorldDatabase.Initialize(dbstring.c_str())) + if(!WorldDatabase.Initialize(dbstring.c_str(), nConnections)) { sLog.outError("Cannot connect to world database %s",dbstring.c_str()); return false; @@ -438,6 +439,7 @@ bool Master::_StartDB() } dbstring = sConfig.GetStringDefault("CharacterDatabaseInfo", ""); + nConnections = sConfig.GetIntDefault("CharacterDatabaseConnections", 1); if(dbstring.empty()) { sLog.outError("Character Database not specified in configuration file"); @@ -446,10 +448,10 @@ bool Master::_StartDB() WorldDatabase.HaltDelayThread(); return false; } - sLog.outString("Character Database: %s", dbstring.c_str()); + sLog.outString("Character Database: %s, total connections: %i", dbstring.c_str(), nConnections + 1); ///- Initialise the Character database - if(!CharacterDatabase.Initialize(dbstring.c_str())) + if(!CharacterDatabase.Initialize(dbstring.c_str(), nConnections)) { sLog.outError("Cannot connect to Character database %s",dbstring.c_str()); @@ -468,6 +470,7 @@ bool Master::_StartDB() ///- Get login database info from configuration file dbstring = sConfig.GetStringDefault("LoginDatabaseInfo", ""); + nConnections = sConfig.GetIntDefault("LoginDatabaseConnections", 1); if(dbstring.empty()) { sLog.outError("Login database not specified in configuration file"); @@ -479,8 +482,8 @@ bool Master::_StartDB() } ///- Initialise the login database - sLog.outString("Login Database: %s", dbstring.c_str() ); - if(!LoginDatabase.Initialize(dbstring.c_str())) + sLog.outString("Login Database: %s, total connections: %i", dbstring.c_str(), nConnections + 1); + if(!LoginDatabase.Initialize(dbstring.c_str(), nConnections)) { sLog.outError("Cannot connect to login database %s",dbstring.c_str()); diff --git a/src/mangosd/mangosd.conf.dist.in b/src/mangosd/mangosd.conf.dist.in index dd0bb971c..0536296c5 100644 --- a/src/mangosd/mangosd.conf.dist.in +++ b/src/mangosd/mangosd.conf.dist.in @@ -37,6 +37,14 @@ ConfVersion=2010100901 # hostname;port;username;password;database # .;/path/to/unix_socket/DIRECTORY or . for default path;username;password;database - use Unix sockets at Unix/Linux # +# LoginDatabaseConnections +# WorldDatabaseConnections +# CharacterDatabaseConnections +# Amount of connections to database which will be used for SELECT queries. Maximum 16 connections per database. +# Please, note, for data consistency only one connection for each database is used for transactions and async SELECTs. +# So formula to find out how many connections will be established: X = ¹_connections + 1 +# Default: 1 connection for SELECT statements +# # MaxPingTime # Settings for maximum database-ping interval (minutes between pings) # @@ -57,6 +65,9 @@ LogsDir = "" LoginDatabaseInfo = "127.0.0.1;3306;mangos;mangos;realmd" WorldDatabaseInfo = "127.0.0.1;3306;mangos;mangos;mangos" CharacterDatabaseInfo = "127.0.0.1;3306;mangos;mangos;characters" +LoginDatabaseConnections = 1 +WorldDatabaseConnections = 1 +CharacterDatabaseConnections = 1 MaxPingTime = 30 WorldServerPort = 8085 BindIP = "0.0.0.0" diff --git a/src/realmd/Main.cpp b/src/realmd/Main.cpp index 9b1398dd2..6e501542b 100644 --- a/src/realmd/Main.cpp +++ b/src/realmd/Main.cpp @@ -302,7 +302,7 @@ extern int main(int argc, char **argv) { loopCounter = 0; DETAIL_LOG("Ping MySQL to keep connection alive"); - delete LoginDatabase.Query("SELECT 1 FROM realmlist LIMIT 1"); + LoginDatabase.Ping(); } #ifdef WIN32 if (m_ServiceStatus == 0) stopEvent = true; diff --git a/src/shared/Database/Database.cpp b/src/shared/Database/Database.cpp index 9a05b32fc..d114c2c95 100644 --- a/src/shared/Database/Database.cpp +++ b/src/shared/Database/Database.cpp @@ -18,19 +18,29 @@ #include "DatabaseEnv.h" #include "Config/Config.h" +#include "Database/SqlOperations.h" #include #include #include +#define MIN_CONNECTION_POOL_SIZE 1 +#define MAX_CONNECTION_POOL_SIZE 16 + Database::~Database() { + HaltDelayThread(); /*Delete objects*/ + delete m_pResultQueue; + delete m_pAsyncConn; + + for (int i = 0; i < m_pQueryConnections.size(); ++i) + delete m_pQueryConnections[i]; } -bool Database::Initialize(const char *) +bool Database::Initialize(const char * infoString, int nConns /*= 1*/) { - // Enable logging of SQL commands (usally only GM commands) + // Enable logging of SQL commands (usually only GM commands) // (See method: PExecuteLog) m_logSQL = sConfig.GetBoolDefault("LogSQL", false); m_logsDir = sConfig.GetStringDefault("LogsDir",""); @@ -41,9 +51,74 @@ bool Database::Initialize(const char *) } m_pingIntervallms = sConfig.GetIntDefault ("MaxPingTime", 30) * (MINUTE * 1000); + + //create DB connections + + //setup connection pool size + if(nConns < MIN_CONNECTION_POOL_SIZE) + m_nQueryConnPoolSize = MIN_CONNECTION_POOL_SIZE; + else if(nConns > MAX_CONNECTION_POOL_SIZE) + m_nQueryConnPoolSize = MAX_CONNECTION_POOL_SIZE; + else + m_nQueryConnPoolSize = nConns; + + //create connection pool for sync requests + for (int i = 0; i < m_nQueryConnPoolSize; ++i) + { + SqlConnection * pConn = CreateConnection(); + if(!pConn->Initialize(infoString)) + { + delete pConn; + return false; + } + + m_pQueryConnections.push_back(pConn); + } + + //create and initialize connection for async requests + m_pAsyncConn = CreateConnection(); + if(!m_pAsyncConn->Initialize(infoString)) + return false; + + InitDelayThread(); return true; } +SqlDelayThread * Database::CreateDelayThread() +{ + assert(m_pAsyncConn); + return new SqlDelayThread(this, m_pAsyncConn); +} + +void Database::InitDelayThread() +{ + assert(!m_delayThread); + + m_pResultQueue = new SqlResultQueue; + //New delay thread for delay execute + m_threadBody = CreateDelayThread(); // will deleted at m_delayThread delete + m_delayThread = new ACE_Based::Thread(m_threadBody); +} + +void Database::HaltDelayThread() +{ + if (!m_threadBody || !m_delayThread) return; + + m_threadBody->Stop(); //Stop event + m_delayThread->wait(); //Wait for flush to DB + delete m_delayThread; //This also deletes m_threadBody + m_delayThread = NULL; + m_threadBody = NULL; + + //stop async result queue + if(m_pResultQueue) + { + m_pResultQueue->Update(); + delete m_pResultQueue; + m_pResultQueue = NULL; + } +} + void Database::ThreadStart() { } @@ -52,17 +127,52 @@ void Database::ThreadEnd() { } +void Database::ProcessResultQueue() +{ + if(m_pResultQueue) + m_pResultQueue->Update(); +} + void Database::escape_string(std::string& str) { if(str.empty()) return; char* buf = new char[str.size()*2+1]; - escape_string(buf,str.c_str(),str.size()); + //we don't care what connection to use - escape string will be the same + m_pQueryConnections[0]->escape_string(buf,str.c_str(),str.size()); str = buf; delete[] buf; } +SqlConnection * Database::getQueryConnection() +{ + int nCount = 0; + + if(m_nQueryCounter == long(1 << 31)) + m_nQueryCounter = 0; + else + nCount = ++m_nQueryCounter; + + return m_pQueryConnections[nCount % m_nQueryConnPoolSize]; +} + +void Database::Ping() +{ + const char * sql = "SELECT 1"; + + { + SqlConnection::Lock guard(m_pAsyncConn); + delete guard->Query(sql); + } + + for (int i = 0; i < m_nQueryConnPoolSize; ++i) + { + SqlConnection::Lock guard(m_pQueryConnections[i]); + delete guard->Query(sql); + } +} + bool Database::PExecuteLog(const char * format,...) { if (!format) @@ -107,12 +217,6 @@ bool Database::PExecuteLog(const char * format,...) return Execute(szQuery); } -void Database::SetResultQueue(SqlResultQueue * queue) -{ - m_queryQueues[ACE_Based::Thread::current()] = queue; - -} - QueryResult* Database::PQuery(const char *format,...) { if(!format) return NULL; @@ -151,6 +255,29 @@ QueryNamedResult* Database::PQueryNamed(const char *format,...) return QueryNamed(szQuery); } +bool Database::Execute(const char *sql) +{ + if (!m_pAsyncConn) + return false; + + SqlTransaction * pTrans = m_TransStorage->get(); + if(pTrans) + { + //add SQL request to trans queue + pTrans->DelayExecute(sql); + } + else + { + // Simple sql statement + pTrans = new SqlTransaction; + pTrans->DelayExecute(sql); + + m_threadBody->Delay(pTrans); + } + + return true; +} + bool Database::PExecute(const char * format,...) { if (!format) @@ -171,6 +298,18 @@ bool Database::PExecute(const char * format,...) return Execute(szQuery); } +bool Database::DirectExecute(const char* sql) +{ + if(!m_pAsyncConn) + return false; + + SqlTransaction trans; + trans.DelayExecute(sql); + + trans.Execute(m_pAsyncConn); + return true; +} + bool Database::DirectPExecute(const char * format,...) { if (!format) @@ -191,6 +330,62 @@ bool Database::DirectPExecute(const char * format,...) return DirectExecute(szQuery); } +bool Database::BeginTransaction() +{ + if (!m_pAsyncConn) + return false; + + //initiate transaction on current thread + //currently we do not support queued transactions + m_TransStorage->init(); + return true; +} + +bool Database::CommitTransaction() +{ + if (!m_pAsyncConn) + return false; + + //check if we have pending transaction + if(!m_TransStorage->get()) + return false; + + //add SqlTransaction to the async queue + m_threadBody->Delay(m_TransStorage->detach()); + return true; +} + +bool Database::CommitTransactionDirect() +{ + if (!m_pAsyncConn) + return false; + + //check if we have pending transaction + if(!m_TransStorage->get()) + return false; + + //directly execute SqlTransaction + SqlTransaction * pTrans = m_TransStorage->detach(); + pTrans->Execute(m_pAsyncConn); + delete pTrans; + + return true; +} + +bool Database::RollbackTransaction() +{ + if (!m_pAsyncConn) + return false; + + if(!m_TransStorage->get()) + return false; + + //remove scheduled transaction + m_TransStorage->reset(); + + return true; +} + bool Database::CheckRequiredField( char const* table_name, char const* required_name ) { // check required field @@ -278,3 +473,31 @@ bool Database::CheckRequiredField( char const* table_name, char const* required_ return false; } + +Database::TransHelper::~TransHelper() +{ + reset(); +} + +SqlTransaction * Database::TransHelper::init() +{ + MANGOS_ASSERT(!m_pTrans); //if we will get a nested transaction request - we MUST fix code!!! + m_pTrans = new SqlTransaction; + return m_pTrans; +} + +SqlTransaction * Database::TransHelper::detach() +{ + SqlTransaction * pRes = m_pTrans; + m_pTrans = NULL; + return pRes; +} + +void Database::TransHelper::reset() +{ + if(m_pTrans) + { + delete m_pTrans; + m_pTrans = NULL; + } +} diff --git a/src/shared/Database/Database.h b/src/shared/Database/Database.h index 4731ed188..5b3ff7f49 100644 --- a/src/shared/Database/Database.h +++ b/src/shared/Database/Database.h @@ -22,39 +22,87 @@ #include "Threading.h" #include "Utilities/UnorderedMapSet.h" #include "Database/SqlDelayThread.h" +#include +#include "Policies/ThreadingModel.h" +#include +#include class SqlTransaction; class SqlResultQueue; class SqlQueryHolder; -typedef UNORDERED_MAP TransactionQueues; -typedef UNORDERED_MAP QueryQueues; - #define MAX_QUERY_LEN 32*1024 +// +class MANGOS_DLL_SPEC SqlConnection +{ + public: + virtual ~SqlConnection() {} + + //method for initializing DB connection + virtual bool Initialize(const char *infoString) = 0; + //public methods for making queries + virtual QueryResult* Query(const char *sql) = 0; + virtual QueryNamedResult* QueryNamed(const char *sql) = 0; + + //public methods for making requests + virtual bool Execute(const char *sql) = 0; + + //escape string generation + virtual unsigned long escape_string(char *to, const char *from, unsigned long length) { strncpy(to,from,length); return length; } + + // nothing do if DB not support transactions + virtual bool BeginTransaction() { return true; } + virtual bool CommitTransaction() { return true; } + // can't rollback without transaction support + virtual bool RollbackTransaction() { return true; } + + //SqlConnection object lock + class Lock + { + public: + Lock(SqlConnection * conn) : m_pConn(conn) { m_pConn->m_mutex.acquire(); } + ~Lock() { m_pConn->m_mutex.release(); } + + SqlConnection * operator->() const { return m_pConn; } + + private: + SqlConnection * const m_pConn; + }; + + private: + typedef ACE_Recursive_Thread_Mutex LOCK_TYPE; + LOCK_TYPE m_mutex; +}; + class MANGOS_DLL_SPEC Database { - protected: - Database() : m_threadBody(NULL), m_delayThread(NULL) {}; - - TransactionQueues m_tranQueues; ///< Transaction queues from diff. threads - QueryQueues m_queryQueues; ///< Query queues from diff threads - SqlDelayThread* m_threadBody; ///< Pointer to delay sql executer (owned by m_delayThread) - ACE_Based::Thread* m_delayThread; ///< Pointer to executer thread - public: - virtual ~Database(); - virtual bool Initialize(const char *infoString); - virtual void InitDelayThread() = 0; - virtual void HaltDelayThread() = 0; + virtual bool Initialize(const char *infoString, int nConns = 1); + virtual void InitDelayThread(); + virtual void HaltDelayThread(); + + /// Synchronous DB queries + inline QueryResult* Query(const char *sql) + { + SqlConnection::Lock guard(getQueryConnection()); + return guard->Query(sql); + } + + inline QueryNamedResult* QueryNamed(const char *sql) + { + SqlConnection::Lock guard(getQueryConnection()); + return guard->QueryNamed(sql); + } - virtual QueryResult* Query(const char *sql) = 0; QueryResult* PQuery(const char *format,...) ATTR_PRINTF(2,3); - virtual QueryNamedResult* QueryNamed(const char *sql) = 0; QueryNamedResult* PQueryNamed(const char *format,...) ATTR_PRINTF(2,3); + bool DirectExecute(const char* sql); + bool DirectPExecute(const char *format,...) ATTR_PRINTF(2,3); + /// Async queries and query holders, implemented in DatabaseImpl.h // Query / member @@ -95,30 +143,21 @@ class MANGOS_DLL_SPEC Database template bool DelayQueryHolder(Class *object, void (Class::*method)(QueryResult*, SqlQueryHolder*, ParamType1), SqlQueryHolder *holder, ParamType1 param1); - virtual bool Execute(const char *sql) = 0; + bool Execute(const char *sql); bool PExecute(const char *format,...) ATTR_PRINTF(2,3); - virtual bool DirectExecute(const char* sql) = 0; - bool DirectPExecute(const char *format,...) ATTR_PRINTF(2,3); // Writes SQL commands to a LOG file (see mangosd.conf "LogSQL") bool PExecuteLog(const char *format,...) ATTR_PRINTF(2,3); - virtual bool BeginTransaction() // nothing do if DB not support transactions - { - return true; - } - virtual bool CommitTransaction() // nothing do if DB not support transactions - { - return true; - } - virtual bool RollbackTransaction() // can't rollback without transaction support - { - return false; - } + bool BeginTransaction(); + bool CommitTransaction(); + bool RollbackTransaction(); + //for sync transaction execution + bool CommitTransactionDirect(); - virtual operator bool () const = 0; + operator bool () const { return m_pQueryConnections.size() && m_pAsyncConn != 0; } - virtual unsigned long escape_string(char *to, const char *from, unsigned long length) { strncpy(to,from,length); return length; } + //escape string generation void escape_string(std::string& str); // must be called before first query in thread (one time for thread using one from existing Database objects) @@ -126,13 +165,76 @@ class MANGOS_DLL_SPEC Database // must be called before finish thread run (one time for thread using one from existing Database objects) virtual void ThreadEnd(); - // sets the result queue of the current thread, be careful what thread you call this from - void SetResultQueue(SqlResultQueue * queue); + // set database-wide result queue. also we should use object-bases and not thread-based result queues + void ProcessResultQueue(); bool CheckRequiredField(char const* table_name, char const* required_name); - uint32 GetPingIntervall() { return m_pingIntervallms;} + uint32 GetPingIntervall() { return m_pingIntervallms; } + + //function to ping database connections + void Ping(); + + protected: + Database() : m_pAsyncConn(NULL), m_pResultQueue(NULL), m_threadBody(NULL), m_delayThread(NULL), + m_logSQL(false), m_pingIntervallms(0), m_nQueryConnPoolSize(1) + { + m_nQueryCounter = -1; + } + + //factory method to create SqlConnection objects + virtual SqlConnection * CreateConnection() = 0; + //factory method to create SqlDelayThread objects + virtual SqlDelayThread * CreateDelayThread(); + + class TransHelper + { + public: + TransHelper() : m_pTrans(NULL) {} + ~TransHelper(); + + //initializes new SqlTransaction object + SqlTransaction * init(); + //gets pointer on current transaction object. Returns NULL if transaction was not initiated + SqlTransaction * get() const { return m_pTrans; } + //detaches SqlTransaction object allocated by init() function + //next call to get() function will return NULL! + //do not forget to destroy obtained SqlTransaction object! + SqlTransaction * detach(); + //destroyes SqlTransaction allocated by init() function + void reset(); + + private: + SqlTransaction * m_pTrans; + }; + + //per-thread based storage for SqlTransaction object initialization - no locking is required + typedef ACE_TSS DBTransHelperTSS; + Database::DBTransHelperTSS m_TransStorage; + + ///< DB connections + + //round-robin connection selection + SqlConnection * getQueryConnection(); + //for now return one single connection for async requests + SqlConnection * getAsyncConnection() const { return m_pAsyncConn; } + + //connection helper counters + int m_nQueryConnPoolSize; //current size of query connection pool + ACE_Atomic_Op m_nQueryCounter; //counter for connection selection + + //lets use pool of connections for sync queries + typedef std::vector< SqlConnection * > SqlConnectionContainer; + SqlConnectionContainer m_pQueryConnections; + + //only one single DB connection for transactions + SqlConnection * m_pAsyncConn; + + SqlResultQueue * m_pResultQueue; ///< Transaction queues from diff. threads + SqlDelayThread * m_threadBody; ///< Pointer to delay sql executer (owned by m_delayThread) + ACE_Based::Thread * m_delayThread; ///< Pointer to executer thread private: + bool m_logSQL; std::string m_logsDir; uint32 m_pingIntervallms; diff --git a/src/shared/Database/DatabaseImpl.h b/src/shared/Database/DatabaseImpl.h index 44072eca3..963b15150 100644 --- a/src/shared/Database/DatabaseImpl.h +++ b/src/shared/Database/DatabaseImpl.h @@ -21,16 +21,8 @@ /// Function body definitions for the template function members of the Database class -#define ASYNC_QUERY_BODY(sql, queue_itr) \ - if (!sql) return false; \ - \ - QueryQueues::iterator queue_itr; \ - \ - { \ - ACE_Based::Thread * queryThread = ACE_Based::Thread::current(); \ - queue_itr = m_queryQueues.find(queryThread); \ - if (queue_itr == m_queryQueues.end()) return false; \ - } +#define ASYNC_QUERY_BODY(sql) if (!sql || !m_pResultQueue) return false; +#define ASYNC_DELAYHOLDER_BODY(holder) if (!holder || !m_pResultQueue) return false; #define ASYNC_PQUERY_BODY(format, szQuery) \ if(!format) return false; \ @@ -51,49 +43,38 @@ } \ } -#define ASYNC_DELAYHOLDER_BODY(holder, queue_itr) \ - if (!holder) return false; \ - \ - QueryQueues::iterator queue_itr; \ - \ - { \ - ACE_Based::Thread * queryThread = ACE_Based::Thread::current(); \ - queue_itr = m_queryQueues.find(queryThread); \ - if (queue_itr == m_queryQueues.end()) return false; \ - } - // -- Query / member -- template bool Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*), const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method), m_pResultQueue)); } template bool Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*, ParamType1), ParamType1 param1, const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, param1), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, param1), m_pResultQueue)); } template bool Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*, ParamType1, ParamType2), ParamType1 param1, ParamType2 param2, const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, param1, param2), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, param1, param2), m_pResultQueue)); } template bool Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*, ParamType1, ParamType2, ParamType3), ParamType1 param1, ParamType2 param2, ParamType3 param3, const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, param1, param2, param3), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, param1, param2, param3), m_pResultQueue)); } // -- Query / static -- @@ -102,24 +83,24 @@ template bool Database::AsyncQuery(void (*method)(QueryResult*, ParamType1), ParamType1 param1, const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback(method, (QueryResult*)NULL, param1), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback(method, (QueryResult*)NULL, param1), m_pResultQueue)); } template bool Database::AsyncQuery(void (*method)(QueryResult*, ParamType1, ParamType2), ParamType1 param1, ParamType2 param2, const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback(method, (QueryResult*)NULL, param1, param2), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback(method, (QueryResult*)NULL, param1, param2), m_pResultQueue)); } template bool Database::AsyncQuery(void (*method)(QueryResult*, ParamType1, ParamType2, ParamType3), ParamType1 param1, ParamType2 param2, ParamType3 param3, const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback(method, (QueryResult*)NULL, param1, param2, param3), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback(method, (QueryResult*)NULL, param1, param2, param3), m_pResultQueue)); } // -- PQuery / member -- @@ -188,16 +169,16 @@ template bool Database::DelayQueryHolder(Class *object, void (Class::*method)(QueryResult*, SqlQueryHolder*), SqlQueryHolder *holder) { - ASYNC_DELAYHOLDER_BODY(holder, itr) - return holder->Execute(new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, holder), m_threadBody, itr->second); + ASYNC_DELAYHOLDER_BODY(holder) + return holder->Execute(new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, holder), m_threadBody, m_pResultQueue); } template bool Database::DelayQueryHolder(Class *object, void (Class::*method)(QueryResult*, SqlQueryHolder*, ParamType1), SqlQueryHolder *holder, ParamType1 param1) { - ASYNC_DELAYHOLDER_BODY(holder, itr) - return holder->Execute(new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, holder, param1), m_threadBody, itr->second); + ASYNC_DELAYHOLDER_BODY(holder) + return holder->Execute(new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, holder, param1), m_threadBody, m_pResultQueue); } #undef ASYNC_QUERY_BODY diff --git a/src/shared/Database/DatabaseMysql.cpp b/src/shared/Database/DatabaseMysql.cpp index c0d390283..9e596bc8d 100644 --- a/src/shared/Database/DatabaseMysql.cpp +++ b/src/shared/Database/DatabaseMysql.cpp @@ -23,10 +23,10 @@ #include "Platform/Define.h" #include "Threading.h" #include "DatabaseEnv.h" -#include "Database/MySQLDelayThread.h" -#include "Database/SqlOperations.h" #include "Timer.h" +size_t DatabaseMysql::db_count = 0; + void DatabaseMysql::ThreadStart() { mysql_thread_init(); @@ -37,9 +37,7 @@ void DatabaseMysql::ThreadEnd() mysql_thread_end(); } -size_t DatabaseMysql::db_count = 0; - -DatabaseMysql::DatabaseMysql() : Database(), mMysql(0) +DatabaseMysql::DatabaseMysql() { // before first connection if( db_count++ == 0 ) @@ -61,31 +59,40 @@ DatabaseMysql::~DatabaseMysql() if (m_delayThread) HaltDelayThread(); - if (mMysql) - mysql_close(mMysql); + //destroy SqlConnection objects + if(m_pQueryConnections.size()) + { + for (int i = 0; i < m_pQueryConnections.size(); ++i) + delete m_pQueryConnections[i]; + + m_pQueryConnections.clear(); + } + + if(m_pAsyncConn) + { + delete m_pAsyncConn; + m_pAsyncConn = NULL; + } //Free Mysql library pointers for last ~DB if(--db_count == 0) mysql_library_end(); } -bool DatabaseMysql::Initialize(const char *infoString) +SqlConnection * DatabaseMysql::CreateConnection() +{ + return new MySQLConnection(); +} + +bool MySQLConnection::Initialize(const char *infoString) { - - if(!Database::Initialize(infoString)) - return false; - - tranThread = NULL; - MYSQL *mysqlInit; - mysqlInit = mysql_init(NULL); + MYSQL * mysqlInit = mysql_init(NULL); if (!mysqlInit) { sLog.outError( "Could not initialize Mysql connection" ); return false; } - InitDelayThread(); - Tokens tokens = StrSplit(infoString, ";"); Tokens::iterator iter; @@ -108,7 +115,7 @@ bool DatabaseMysql::Initialize(const char *infoString) database = *iter++; mysql_options(mysqlInit,MYSQL_SET_CHARSET_NAME,"utf8"); - #ifdef WIN32 +#ifdef WIN32 if(host==".") // named pipe use option (Windows) { unsigned int opt = MYSQL_PROTOCOL_PIPE; @@ -121,7 +128,7 @@ bool DatabaseMysql::Initialize(const char *infoString) port = atoi(port_or_socket.c_str()); unix_socket = 0; } - #else +#else if(host==".") // socket use option (Unix/Linux) { unsigned int opt = MYSQL_PROTOCOL_SOCKET; @@ -135,7 +142,7 @@ bool DatabaseMysql::Initialize(const char *infoString) port = atoi(port_or_socket.c_str()); unix_socket = 0; } - #endif +#endif mMysql = mysql_real_connect(mysqlInit, host.c_str(), user.c_str(), password.c_str(), database.c_str(), port, unix_socket, 0); @@ -155,16 +162,19 @@ bool DatabaseMysql::Initialize(const char *infoString) // This is wrong since mangos use transactions, // autocommit is turned of during it. // Setting it to on makes atomic updates work - if (!mysql_autocommit(mMysql, 1)) - DETAIL_LOG("AUTOCOMMIT SUCCESSFULLY SET TO 1"); + // --- + // if you want atomic updates to work - USE TRANSACTIONS!!! + // no need to mess up with autocommit mode which might degrade server performance! + if (!mysql_autocommit(mMysql, 0)) + DETAIL_LOG("AUTOCOMMIT SUCCESSFULLY SET TO 0"); else - DETAIL_LOG("AUTOCOMMIT NOT SET TO 1"); + DETAIL_LOG("AUTOCOMMIT NOT SET TO 0"); /*-------------------------------------*/ // set connection properties to UTF8 to properly handle locales for different // server configs - core sends data in UTF8, so MySQL must expect UTF8 too - PExecute("SET NAMES `utf8`"); - PExecute("SET CHARACTER SET `utf8`"); + Execute("SET NAMES `utf8`"); + Execute("SET CHARACTER SET `utf8`"); return true; } @@ -177,33 +187,27 @@ bool DatabaseMysql::Initialize(const char *infoString) } } -bool DatabaseMysql::_Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **pFields, uint64* pRowCount, uint32* pFieldCount) +bool MySQLConnection::_Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **pFields, uint64* pRowCount, uint32* pFieldCount) { if (!mMysql) return 0; + uint32 _s = WorldTimer::getMSTime(); + + if(mysql_query(mMysql, sql)) { - // guarded block for thread-safe mySQL request - ACE_Guard query_connection_guard(mMutex); - - uint32 _s = WorldTimer::getMSTime(); - - if(mysql_query(mMysql, sql)) - { - sLog.outErrorDb( "SQL: %s", sql ); - sLog.outErrorDb("query ERROR: %s", mysql_error(mMysql)); - return false; - } - else - { - DEBUG_FILTER_LOG(LOG_FILTER_SQL_TEXT, "[%u ms] SQL: %s", WorldTimer::getMSTimeDiff(_s,WorldTimer::getMSTime()), sql ); - } - - *pResult = mysql_store_result(mMysql); - *pRowCount = mysql_affected_rows(mMysql); - *pFieldCount = mysql_field_count(mMysql); - // end guarded block + sLog.outErrorDb( "SQL: %s", sql ); + sLog.outErrorDb("query ERROR: %s", mysql_error(mMysql)); + return false; } + else + { + DEBUG_FILTER_LOG(LOG_FILTER_SQL_TEXT, "[%u ms] SQL: %s", WorldTimer::getMSTimeDiff(_s,WorldTimer::getMSTime()), sql ); + } + + *pResult = mysql_store_result(mMysql); + *pRowCount = mysql_affected_rows(mMysql); + *pFieldCount = mysql_field_count(mMysql); if (!*pResult ) return false; @@ -218,7 +222,7 @@ bool DatabaseMysql::_Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **p return true; } -QueryResult* DatabaseMysql::Query(const char *sql) +QueryResult* MySQLConnection::Query(const char *sql) { MYSQL_RES *result = NULL; MYSQL_FIELD *fields = NULL; @@ -231,11 +235,10 @@ QueryResult* DatabaseMysql::Query(const char *sql) QueryResultMysql *queryResult = new QueryResultMysql(result, fields, rowCount, fieldCount); queryResult->NextRow(); - return queryResult; } -QueryNamedResult* DatabaseMysql::QueryNamed(const char *sql) +QueryNamedResult* MySQLConnection::QueryNamed(const char *sql) { MYSQL_RES *result = NULL; MYSQL_FIELD *fields = NULL; @@ -252,44 +255,15 @@ QueryNamedResult* DatabaseMysql::QueryNamed(const char *sql) QueryResultMysql *queryResult = new QueryResultMysql(result, fields, rowCount, fieldCount); queryResult->NextRow(); - return new QueryNamedResult(queryResult,names); } -bool DatabaseMysql::Execute(const char *sql) -{ - if (!mMysql) - return false; - - // don't use queued execution if it has not been initialized - if (!m_threadBody) return DirectExecute(sql); - - ACE_Guard _lock(nMutex); - - tranThread = ACE_Based::Thread::current(); // owner of this transaction - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - { // Statement for transaction - i->second->DelayExecute(sql); - } - else - { - // Simple sql statement - m_threadBody->Delay(new SqlStatement(sql)); - } - - return true; -} - -bool DatabaseMysql::DirectExecute(const char* sql) +bool MySQLConnection::Execute(const char* sql) { if (!mMysql) return false; { - // guarded block for thread-safe mySQL request - ACE_Guard query_connection_guard(mMutex); - uint32 _s = WorldTimer::getMSTime(); if(mysql_query(mMysql, sql)) @@ -308,7 +282,7 @@ bool DatabaseMysql::DirectExecute(const char* sql) return true; } -bool DatabaseMysql::_TransactionCmd(const char *sql) +bool MySQLConnection::_TransactionCmd(const char *sql) { if (mysql_query(mMysql, sql)) { @@ -323,100 +297,22 @@ bool DatabaseMysql::_TransactionCmd(const char *sql) return true; } -bool DatabaseMysql::BeginTransaction() +bool MySQLConnection::BeginTransaction() { - if (!mMysql) - return false; - - // don't use queued execution if it has not been initialized - if (!m_threadBody) - { - if (tranThread == ACE_Based::Thread::current()) - return false; // huh? this thread already started transaction - - mMutex.acquire(); - if (!_TransactionCmd("START TRANSACTION")) - { - mMutex.release(); // can't start transaction - return false; - } - return true; // transaction started - } - - ACE_Guard _lock(nMutex); - - tranThread = ACE_Based::Thread::current(); // owner of this transaction - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - // If for thread exists queue and also contains transaction - // delete that transaction (not allow trans in trans) - delete i->second; - - m_tranQueues[tranThread] = new SqlTransaction(); - - return true; + return _TransactionCmd("START TRANSACTION"); } -bool DatabaseMysql::CommitTransaction() +bool MySQLConnection::CommitTransaction() { - if (!mMysql) - return false; - - // don't use queued execution if it has not been initialized - if (!m_threadBody) - { - if (tranThread != ACE_Based::Thread::current()) - return false; - bool _res = _TransactionCmd("COMMIT"); - tranThread = NULL; - mMutex.release(); - return _res; - } - - ACE_Guard _lock(nMutex); - - tranThread = ACE_Based::Thread::current(); - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - { - m_threadBody->Delay(i->second); - m_tranQueues.erase(i); - return true; - } - - return false; + return _TransactionCmd("COMMIT"); } -bool DatabaseMysql::RollbackTransaction() +bool MySQLConnection::RollbackTransaction() { - if (!mMysql) - return false; - - // don't use queued execution if it has not been initialized - if (!m_threadBody) - { - if (tranThread != ACE_Based::Thread::current()) - return false; - bool _res = _TransactionCmd("ROLLBACK"); - tranThread = NULL; - mMutex.release(); - return _res; - } - - ACE_Guard _lock(nMutex); - - tranThread = ACE_Based::Thread::current(); - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - { - delete i->second; - m_tranQueues.erase(i); - } - - return true; + return _TransactionCmd("ROLLBACK"); } -unsigned long DatabaseMysql::escape_string(char *to, const char *from, unsigned long length) +unsigned long MySQLConnection::escape_string(char *to, const char *from, unsigned long length) { if (!mMysql || !to || !from || !length) return 0; @@ -424,23 +320,4 @@ unsigned long DatabaseMysql::escape_string(char *to, const char *from, unsigned return(mysql_real_escape_string(mMysql, to, from, length)); } -void DatabaseMysql::InitDelayThread() -{ - assert(!m_delayThread); - - //New delay thread for delay execute - m_threadBody = new MySQLDelayThread(this); // will deleted at m_delayThread delete - m_delayThread = new ACE_Based::Thread(m_threadBody); -} - -void DatabaseMysql::HaltDelayThread() -{ - if (!m_threadBody || !m_delayThread) return; - - m_threadBody->Stop(); //Stop event - m_delayThread->wait(); //Wait for flush to DB - delete m_delayThread; //This also deletes m_threadBody - m_delayThread = NULL; - m_threadBody = NULL; -} #endif diff --git a/src/shared/Database/DatabaseMysql.h b/src/shared/Database/DatabaseMysql.h index 51ebd4f68..a9d920e73 100644 --- a/src/shared/Database/DatabaseMysql.h +++ b/src/shared/Database/DatabaseMysql.h @@ -34,6 +34,31 @@ #include #endif +class MANGOS_DLL_SPEC MySQLConnection : public SqlConnection +{ + public: + MySQLConnection() : mMysql(NULL) {} + ~MySQLConnection() { mysql_close(mMysql); } + + bool Initialize(const char *infoString); + + QueryResult* Query(const char *sql); + QueryNamedResult* QueryNamed(const char *sql); + bool Execute(const char *sql); + + unsigned long escape_string(char *to, const char *from, unsigned long length); + + bool BeginTransaction(); + bool CommitTransaction(); + bool RollbackTransaction(); + + private: + bool _TransactionCmd(const char *sql); + bool _Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **pFields, uint64* pRowCount, uint32* pFieldCount); + + MYSQL *mMysql; +}; + class MANGOS_DLL_SPEC DatabaseMysql : public Database { friend class MaNGOS::OperatorNew; @@ -44,38 +69,18 @@ class MANGOS_DLL_SPEC DatabaseMysql : public Database //! Initializes Mysql and connects to a server. /*! infoString should be formated like hostname;username;password;database. */ - bool Initialize(const char *infoString); - void InitDelayThread(); - void HaltDelayThread(); - QueryResult* Query(const char *sql); - QueryNamedResult* QueryNamed(const char *sql); - bool Execute(const char *sql); - bool DirectExecute(const char* sql); - bool BeginTransaction(); - bool CommitTransaction(); - bool RollbackTransaction(); - - operator bool () const { return mMysql != NULL; } - - unsigned long escape_string(char *to, const char *from, unsigned long length); - using Database::escape_string; // must be call before first query in thread void ThreadStart(); // must be call before finish thread run void ThreadEnd(); + + protected: + virtual SqlConnection * CreateConnection(); + private: - ACE_Thread_Mutex mMutex; // For thread safe operations between core and mySQL server - ACE_Thread_Mutex nMutex; // For thread safe operations on m_transQueues - - ACE_Based::Thread * tranThread; - - MYSQL *mMysql; - static size_t db_count; - - bool _TransactionCmd(const char *sql); - bool _Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **pFields, uint64* pRowCount, uint32* pFieldCount); }; + #endif #endif diff --git a/src/shared/Database/DatabasePostgre.cpp b/src/shared/Database/DatabasePostgre.cpp index d25a099ee..0edfe55d4 100644 --- a/src/shared/Database/DatabasePostgre.cpp +++ b/src/shared/Database/DatabasePostgre.cpp @@ -23,18 +23,16 @@ #include "Platform/Define.h" #include "Threading.h" #include "DatabaseEnv.h" -#include "Database/PGSQLDelayThread.h" #include "Database/SqlOperations.h" #include "Timer.h" size_t DatabasePostgre::db_count = 0; -DatabasePostgre::DatabasePostgre() : Database(), mPGconn(NULL) +DatabasePostgre::DatabasePostgre() { // before first connection if( db_count++ == 0 ) { - if (!PQisthreadsafe()) { sLog.outError("FATAL ERROR: PostgreSQL libpq isn't thread-safe."); @@ -45,26 +43,32 @@ DatabasePostgre::DatabasePostgre() : Database(), mPGconn(NULL) DatabasePostgre::~DatabasePostgre() { - if (m_delayThread) HaltDelayThread(); - if( mPGconn ) + //destroy SqlConnection objects + if(m_pQueryConnections.size()) { - PQfinish(mPGconn); - mPGconn = NULL; + for (int i = 0; i < m_pQueryConnections.size(); ++i) + delete m_pQueryConnections[i]; + + m_pQueryConnections.clear(); + } + + if(m_pAsyncConn) + { + delete m_pAsyncConn; + m_pAsyncConn = NULL; } } -bool DatabasePostgre::Initialize(const char *infoString) +SqlConnection * DatabasePostgre::CreateConnection() { - if(!Database::Initialize(infoString)) - return false; - - tranThread = NULL; - - InitDelayThread(); + return new PostgreSQLConnection(); +} +bool PostgreSQLConnection::Initialize(const char *infoString) +{ Tokens tokens = StrSplit(infoString, ";"); Tokens::iterator iter; @@ -98,26 +102,18 @@ bool DatabasePostgre::Initialize(const char *infoString) mPGconn = NULL; return false; } - else - { - sLog.outDetail( "Connected to Postgre database at %s", - host.c_str()); - sLog.outString( "PostgreSQL server ver: %d",PQserverVersion(mPGconn)); - return true; - } + sLog.outDetail( "Connected to Postgre database at %s", host.c_str()); + sLog.outString( "PostgreSQL server ver: %d", PQserverVersion(mPGconn)); + return true; } -bool DatabasePostgre::_Query(const char *sql, PGresult** pResult, uint64* pRowCount, uint32* pFieldCount) +bool PostgreSQLConnection::_Query(const char *sql, PGresult** pResult, uint64* pRowCount, uint32* pFieldCount) { if (!mPGconn) - return 0; + return false; - // guarded block for thread-safe request - ACE_Guard query_connection_guard(mMutex); - #ifdef MANGOS_DEBUG - uint32 _s = getMSTime(); - #endif + uint32 _s = WorldTimer::getMSTime(); // Send the query *pResult = PQexec(mPGconn, sql); if(!*pResult ) @@ -132,9 +128,7 @@ bool DatabasePostgre::_Query(const char *sql, PGresult** pResult, uint64* pRowCo } else { - #ifdef MANGOS_DEBUG - sLog.outDebug("[%u ms] SQL: %s", getMSTime() - _s, sql ); - #endif + DEBUG_FILTER_LOG(LOG_FILTER_SQL_TEXT, "[%u ms] SQL: %s", WorldTimer::getMSTimeDiff(_s,WorldTimer::getMSTime()), sql ); } *pRowCount = PQntuples(*pResult); @@ -146,13 +140,14 @@ bool DatabasePostgre::_Query(const char *sql, PGresult** pResult, uint64* pRowCo PQclear(*pResult); return false; } + return true; } -QueryResult* DatabasePostgre::Query(const char *sql) +QueryResult* PostgreSQLConnection::Query(const char *sql) { if (!mPGconn) - return 0; + return NULL; PGresult* result = NULL; uint64 rowCount = 0; @@ -162,15 +157,15 @@ QueryResult* DatabasePostgre::Query(const char *sql) return NULL; QueryResultPostgre * queryResult = new QueryResultPostgre(result, rowCount, fieldCount); - queryResult->NextRow(); + queryResult->NextRow(); return queryResult; } -QueryNamedResult* DatabasePostgre::QueryNamed(const char *sql) +QueryNamedResult* PostgreSQLConnection::QueryNamed(const char *sql) { if (!mPGconn) - return 0; + return NULL; PGresult* result = NULL; uint64 rowCount = 0; @@ -184,67 +179,35 @@ QueryNamedResult* DatabasePostgre::QueryNamed(const char *sql) names[i] = PQfname(result, i); QueryResultPostgre * queryResult = new QueryResultPostgre(result, rowCount, fieldCount); - queryResult->NextRow(); + queryResult->NextRow(); return new QueryNamedResult(queryResult,names); } -bool DatabasePostgre::Execute(const char *sql) +bool PostgreSQLConnection::Execute(const char *sql) { - if (!mPGconn) return false; - // don't use queued execution if it has not been initialized - if (!m_threadBody) - return DirectExecute(sql); + uint32 _s = WorldTimer::getMSTime(); - tranThread = ACE_Based::Thread::current(); // owner of this transaction - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - { // Statement for transaction - i->second->DelayExecute(sql); + PGresult *res = PQexec(mPGconn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + sLog.outErrorDb( "SQL: %s", sql ); + sLog.outErrorDb( "SQL %s", PQerrorMessage(mPGconn) ); + return false; } else { - // Simple sql statement - m_threadBody->Delay(new SqlStatement(sql)); + DEBUG_FILTER_LOG(LOG_FILTER_SQL_TEXT, "[%u ms] SQL: %s", WorldTimer::getMSTimeDiff(_s,WorldTimer::getMSTime()), sql ); } + PQclear(res); return true; } -bool DatabasePostgre::DirectExecute(const char* sql) -{ - if (!mPGconn) - return false; - { - // guarded block for thread-safe request - ACE_Guard query_connection_guard(mMutex); - #ifdef MANGOS_DEBUG - uint32 _s = getMSTime(); - #endif - PGresult *res = PQexec(mPGconn, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - sLog.outErrorDb( "SQL: %s", sql ); - sLog.outErrorDb( "SQL %s", PQerrorMessage(mPGconn) ); - return false; - } - else - { - #ifdef MANGOS_DEBUG - sLog.outDebug("[%u ms] SQL: %s", getMSTime() - _s, sql ); - #endif - } - PQclear(res); - - // end guarded block - } - return true; -} - -bool DatabasePostgre::_TransactionCmd(const char *sql) +bool PostgreSQLConnection::_TransactionCmd(const char *sql) { if (!mPGconn) return false; @@ -263,88 +226,22 @@ bool DatabasePostgre::_TransactionCmd(const char *sql) return true; } -bool DatabasePostgre::BeginTransaction() +bool PostgreSQLConnection::BeginTransaction() { - if (!mPGconn) - return false; - // don't use queued execution if it has not been initialized - if (!m_threadBody) - { - if (tranThread == ACE_Based::Thread::current()) - return false; // huh? this thread already started transaction - mMutex.acquire(); - if (!_TransactionCmd("START TRANSACTION")) - { - mMutex.release(); // can't start transaction - return false; - } - return true; - } - // transaction started - tranThread = ACE_Based::Thread::current(); // owner of this transaction - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - // If for thread exists queue and also contains transaction - // delete that transaction (not allow trans in trans) - delete i->second; - - m_tranQueues[tranThread] = new SqlTransaction(); - - return true; + return _TransactionCmd("START TRANSACTION"); } -bool DatabasePostgre::CommitTransaction() +bool PostgreSQLConnection::CommitTransaction() { - if (!mPGconn) - return false; - - // don't use queued execution if it has not been initialized - if (!m_threadBody) - { - if (tranThread != ACE_Based::Thread::current()) - return false; - bool _res = _TransactionCmd("COMMIT"); - tranThread = NULL; - mMutex.release(); - return _res; - } - tranThread = ACE_Based::Thread::current(); - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - { - m_threadBody->Delay(i->second); - i->second = NULL; - return true; - } - else - return false; + return _TransactionCmd("COMMIT"); } -bool DatabasePostgre::RollbackTransaction() -{ - if (!mPGconn) - return false; - // don't use queued execution if it has not been initialized - if (!m_threadBody) - { - if (tranThread != ACE_Based::Thread::current()) - return false; - bool _res = _TransactionCmd("ROLLBACK"); - tranThread = NULL; - mMutex.release(); - return _res; - } - tranThread = ACE_Based::Thread::current(); - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - { - delete i->second; - i->second = NULL; - } - return true; +bool PostgreSQLConnection::RollbackTransaction() +{ + return _TransactionCmd("ROLLBACK"); } -unsigned long DatabasePostgre::escape_string(char *to, const char *from, unsigned long length) +unsigned long PostgreSQLConnection::escape_string(char *to, const char *from, unsigned long length) { if (!mPGconn || !to || !from || !length) return 0; @@ -352,23 +249,4 @@ unsigned long DatabasePostgre::escape_string(char *to, const char *from, unsigne return PQescapeString(to, from, length); } -void DatabasePostgre::InitDelayThread() -{ - assert(!m_delayThread); - - //New delay thread for delay execute - m_threadBody = new PGSQLDelayThread(this); // Will be deleted on m_delayThread delete - m_delayThread = new ACE_Based::Thread(m_threadBody); -} - -void DatabasePostgre::HaltDelayThread() -{ - if (!m_threadBody || !m_delayThread) return; - - m_threadBody->Stop(); //Stop event - m_delayThread->wait(); //Wait for flush to DB - delete m_delayThread; //This also deletes m_threadBody - m_delayThread = NULL; - m_threadBody = NULL; -} #endif diff --git a/src/shared/Database/DatabasePostgre.h b/src/shared/Database/DatabasePostgre.h index cb5d9c65c..19f0c4681 100644 --- a/src/shared/Database/DatabasePostgre.h +++ b/src/shared/Database/DatabasePostgre.h @@ -19,7 +19,11 @@ #ifndef _DatabasePostgre_H #define _DatabasePostgre_H +#include "Common.h" +#include "Database.h" #include "Policies/Singleton.h" +#include "ace/Thread_Mutex.h" +#include "ace/Guard_T.h" #include #ifdef WIN32 @@ -30,7 +34,32 @@ #include #endif -class DatabasePostgre : public Database +class MANGOS_DLL_SPEC PostgreSQLConnection : public SqlConnection +{ + public: + PostgreSQLConnection() : mPGconn(NULL) {} + ~PostgreSQLConnection() { PQfinish(mPGconn); } + + bool Initialize(const char *infoString); + + QueryResult* Query(const char *sql); + QueryNamedResult* QueryNamed(const char *sql); + bool Execute(const char *sql); + + unsigned long escape_string(char *to, const char *from, unsigned long length); + + bool BeginTransaction(); + bool CommitTransaction(); + bool RollbackTransaction(); + + private: + bool _TransactionCmd(const char *sql); + bool _Query(const char *sql, PGresult **pResult, uint64* pRowCount, uint32* pFieldCount); + + PGconn *mPGconn; +}; + +class MANGOS_DLL_SPEC DatabasePostgre : public Database { friend class MaNGOS::OperatorNew; @@ -40,30 +69,11 @@ class DatabasePostgre : public Database //! Initializes Postgres and connects to a server. /*! infoString should be formated like hostname;username;password;database. */ - bool Initialize(const char *infoString); - void InitDelayThread(); - void HaltDelayThread(); - QueryResult* Query(const char *sql); - QueryNamedResult* QueryNamed(const char *sql); - bool Execute(const char *sql); - bool DirectExecute(const char* sql); - bool BeginTransaction(); - bool CommitTransaction(); - bool RollbackTransaction(); - operator bool () const { return mPGconn != NULL; } + protected: + virtual SqlConnection * CreateConnection(); - unsigned long escape_string(char *to, const char *from, unsigned long length); - using Database::escape_string; private: - ACE_Thread_Mutex mMutex; - ACE_Based::Thread * tranThread; - - PGconn *mPGconn; - static size_t db_count; - - bool _TransactionCmd(const char *sql); - bool _Query(const char *sql, PGresult **pResult, uint64* pRowCount, uint32* pFieldCount); }; #endif diff --git a/src/shared/Database/SqlDelayThread.cpp b/src/shared/Database/SqlDelayThread.cpp index 2504772be..b15b405db 100644 --- a/src/shared/Database/SqlDelayThread.cpp +++ b/src/shared/Database/SqlDelayThread.cpp @@ -20,7 +20,7 @@ #include "Database/SqlOperations.h" #include "DatabaseEnv.h" -SqlDelayThread::SqlDelayThread(Database* db) : m_dbEngine(db), m_running(true) +SqlDelayThread::SqlDelayThread(Database* db, SqlConnection* conn) : m_dbEngine(db), m_dbConnection(conn), m_running(true) { } @@ -40,7 +40,7 @@ void SqlDelayThread::run() const uint32 loopSleepms = 10; - const uint32 pingEveryLoop = m_dbEngine->GetPingIntervall()/loopSleepms; + const uint32 pingEveryLoop = m_dbEngine->GetPingIntervall() / loopSleepms; uint32 loopCounter = 0; while (m_running) @@ -52,14 +52,14 @@ void SqlDelayThread::run() SqlOperation* s = NULL; while (m_sqlQueue.next(s)) { - s->Execute(m_dbEngine); + s->Execute(m_dbConnection); delete s; } if((loopCounter++) >= pingEveryLoop) { loopCounter = 0; - delete m_dbEngine->Query("SELECT 1"); + m_dbEngine->Ping(); } } diff --git a/src/shared/Database/SqlDelayThread.h b/src/shared/Database/SqlDelayThread.h index 03e4d65d9..167f5763c 100644 --- a/src/shared/Database/SqlDelayThread.h +++ b/src/shared/Database/SqlDelayThread.h @@ -26,6 +26,7 @@ class Database; class SqlOperation; +class SqlConnection; class SqlDelayThread : public ACE_Based::Runnable { @@ -34,10 +35,11 @@ class SqlDelayThread : public ACE_Based::Runnable private: SqlQueue m_sqlQueue; ///< Queue of SQL statements Database* m_dbEngine; ///< Pointer to used Database engine + SqlConnection * m_dbConnection; ///< Pointer to DB connection volatile bool m_running; public: - SqlDelayThread(Database* db); + SqlDelayThread(Database* db, SqlConnection* conn); ~SqlDelayThread(); ///< Put sql statement to delay queue diff --git a/src/shared/Database/SqlOperations.cpp b/src/shared/Database/SqlOperations.cpp index 09d9bc64d..013cd6f48 100644 --- a/src/shared/Database/SqlOperations.cpp +++ b/src/shared/Database/SqlOperations.cpp @@ -21,54 +21,60 @@ #include "DatabaseEnv.h" #include "DatabaseImpl.h" +#define LOCK_DB_CONN(conn) SqlConnection::Lock guard(conn) + /// ---- ASYNC STATEMENTS / TRANSACTIONS ---- -void SqlStatement::Execute(Database *db) +void SqlStatement::Execute(SqlConnection *conn) { /// just do it - db->DirectExecute(m_sql); + LOCK_DB_CONN(conn); + conn->Execute(m_sql); } -void SqlTransaction::Execute(Database *db) +SqlTransaction::~SqlTransaction() +{ + while(!m_queue.empty()) + { + delete [] (const_cast(m_queue.back())); + m_queue.pop_back(); + } +} + +void SqlTransaction::Execute(SqlConnection *conn) { - ACE_Guard _lock(m_Mutex); if(m_queue.empty()) return; - db->DirectExecute("START TRANSACTION"); - while(!m_queue.empty()) + LOCK_DB_CONN(conn); + + conn->BeginTransaction(); + + const int nItems = m_queue.size(); + for (int i = 0; i < nItems; ++i) { - char *sql = const_cast(m_queue.front()); - m_queue.pop(); + const char *sql = m_queue[i]; - if(!db->DirectExecute(sql)) + if(!conn->Execute(sql)) { - delete [] sql; - - db->DirectExecute("ROLLBACK"); - while(!m_queue.empty()) - { - delete [] (const_cast(m_queue.front())); - m_queue.pop(); - } - + conn->RollbackTransaction(); return; } - - delete [] sql; } - db->DirectExecute("COMMIT"); + conn->CommitTransaction(); } /// ---- ASYNC QUERIES ---- -void SqlQuery::Execute(Database *db) +void SqlQuery::Execute(SqlConnection *conn) { if(!m_callback || !m_queue) return; + + LOCK_DB_CONN(conn); /// execute the query and store the result in the callback - m_callback->SetResult(db->Query(m_sql)); + m_callback->SetResult(conn->Query(m_sql)); /// add the callback to the sql result queue of the thread it originated from m_queue->add(m_callback); } @@ -184,19 +190,19 @@ void SqlQueryHolder::SetSize(size_t size) m_queries.resize(size); } -void SqlQueryHolderEx::Execute(Database *db) +void SqlQueryHolderEx::Execute(SqlConnection *conn) { if(!m_holder || !m_callback || !m_queue) return; + LOCK_DB_CONN(conn); /// we can do this, we are friends std::vector &queries = m_holder->m_queries; - for(size_t i = 0; i < queries.size(); i++) { /// execute all queries in the holder and pass the results char const *sql = queries[i].first; - if(sql) m_holder->SetResult(i, db->Query(sql)); + if(sql) m_holder->SetResult(i, conn->Query(sql)); } /// sync with the caller thread diff --git a/src/shared/Database/SqlOperations.h b/src/shared/Database/SqlOperations.h index e3ae0213e..ac53e5df5 100644 --- a/src/shared/Database/SqlOperations.h +++ b/src/shared/Database/SqlOperations.h @@ -29,13 +29,14 @@ /// ---- BASE --- class Database; +class SqlConnection; class SqlDelayThread; class SqlOperation { public: virtual void OnRemove() { delete this; } - virtual void Execute(Database *db) = 0; + virtual void Execute(SqlConnection *conn) = 0; virtual ~SqlOperation() {} }; @@ -48,26 +49,25 @@ class SqlStatement : public SqlOperation public: SqlStatement(const char *sql) : m_sql(mangos_strdup(sql)){} ~SqlStatement() { char* tofree = const_cast(m_sql); delete [] tofree; } - void Execute(Database *db); + void Execute(SqlConnection *conn); }; class SqlTransaction : public SqlOperation { private: - std::queue m_queue; - ACE_Thread_Mutex m_Mutex; + std::vector m_queue; + public: SqlTransaction() {} + ~SqlTransaction(); + void DelayExecute(const char *sql) { char* _sql = mangos_strdup(sql); - if (_sql) - { - ACE_Guard _lock(m_Mutex); - m_queue.push(_sql); - } + m_queue.push_back(_sql); } - void Execute(Database *db); + + void Execute(SqlConnection *conn); }; /// ---- ASYNC QUERIES ---- @@ -95,7 +95,7 @@ class SqlQuery : public SqlOperation SqlQuery(const char *sql, MaNGOS::IQueryCallback * callback, SqlResultQueue * queue) : m_sql(mangos_strdup(sql)), m_callback(callback), m_queue(queue) {} ~SqlQuery() { char* tofree = const_cast(m_sql); delete [] tofree; } - void Execute(Database *db); + void Execute(SqlConnection *conn); }; class SqlQueryHolder @@ -124,6 +124,6 @@ class SqlQueryHolderEx : public SqlOperation public: SqlQueryHolderEx(SqlQueryHolder *holder, MaNGOS::IQueryCallback * callback, SqlResultQueue * queue) : m_holder(holder), m_callback(callback), m_queue(queue) {} - void Execute(Database *db); + void Execute(SqlConnection *conn); }; #endif //__SQLOPERATIONS_H diff --git a/src/shared/revision_nr.h b/src/shared/revision_nr.h index 01c70eaca..39a3de96c 100644 --- a/src/shared/revision_nr.h +++ b/src/shared/revision_nr.h @@ -1,4 +1,4 @@ #ifndef __REVISION_NR_H__ #define __REVISION_NR_H__ - #define REVISION_NR "11044" + #define REVISION_NR "11045" #endif // __REVISION_NR_H__