diff --git a/src/game/CharacterHandler.cpp b/src/game/CharacterHandler.cpp index c5c244deb..64405b43b 100644 --- a/src/game/CharacterHandler.cpp +++ b/src/game/CharacterHandler.cpp @@ -949,8 +949,10 @@ void WorldSession::HandleChangePlayerNameOpcodeCallBack(QueryResult *result, uin delete result; + CharacterDatabase.BeginTransaction(); CharacterDatabase.PExecute("UPDATE characters set name = '%s', at_login = at_login & ~ %u WHERE guid ='%u'", newname.c_str(), uint32(AT_LOGIN_RENAME), guidLow); CharacterDatabase.PExecute("DELETE FROM character_declinedname WHERE guid ='%u'", guidLow); + CharacterDatabase.CommitTransaction(); sLog.outChar("Account: %d (IP: %s) Character:[%s] (guid:%u) Changed name to: %s", session->GetAccountId(), session->GetRemoteAddress().c_str(), oldname.c_str(), guidLow, newname.c_str()); diff --git a/src/game/ObjectMgr.cpp b/src/game/ObjectMgr.cpp index 779bae587..309bb2f1d 100644 --- a/src/game/ObjectMgr.cpp +++ b/src/game/ObjectMgr.cpp @@ -6797,9 +6797,12 @@ void ObjectMgr::LoadWeatherZoneChances() void ObjectMgr::SaveCreatureRespawnTime(uint32 loguid, uint32 instance, time_t t) { mCreatureRespawnTimes[MAKE_PAIR64(loguid,instance)] = t; + + CharacterDatabase.BeginTransaction(); CharacterDatabase.PExecute("DELETE FROM creature_respawn WHERE guid = '%u' AND instance = '%u'", loguid, instance); if(t) CharacterDatabase.PExecute("INSERT INTO creature_respawn VALUES ( '%u', '" UI64FMTD "', '%u' )", loguid, uint64(t), instance); + CharacterDatabase.CommitTransaction(); } void ObjectMgr::DeleteCreatureData(uint32 guid) @@ -6815,9 +6818,12 @@ void ObjectMgr::DeleteCreatureData(uint32 guid) void ObjectMgr::SaveGORespawnTime(uint32 loguid, uint32 instance, time_t t) { mGORespawnTimes[MAKE_PAIR64(loguid,instance)] = t; + + CharacterDatabase.BeginTransaction(); CharacterDatabase.PExecute("DELETE FROM gameobject_respawn WHERE guid = '%u' AND instance = '%u'", loguid, instance); if(t) CharacterDatabase.PExecute("INSERT INTO gameobject_respawn VALUES ( '%u', '" UI64FMTD "', '%u' )", loguid, uint64(t), instance); + CharacterDatabase.CommitTransaction(); } void ObjectMgr::DeleteRespawnTimeForInstance(uint32 instance) @@ -6842,8 +6848,10 @@ void ObjectMgr::DeleteRespawnTimeForInstance(uint32 instance) mCreatureRespawnTimes.erase(itr); } + CharacterDatabase.BeginTransaction(); CharacterDatabase.PExecute("DELETE FROM creature_respawn WHERE instance = '%u'", instance); CharacterDatabase.PExecute("DELETE FROM gameobject_respawn WHERE instance = '%u'", instance); + CharacterDatabase.CommitTransaction(); } void ObjectMgr::DeleteGOData(uint32 guid) diff --git a/src/game/World.cpp b/src/game/World.cpp index 32e9f564b..fe8554559 100644 --- a/src/game/World.cpp +++ b/src/game/World.cpp @@ -89,7 +89,6 @@ World::World() m_startTime=m_gameTime; m_maxActiveSessionCount = 0; m_maxQueuedSessionCount = 0; - m_resultQueue = NULL; m_NextDailyQuestReset = 0; m_NextWeeklyQuestReset = 0; m_scheduledScripts = 0; @@ -133,8 +132,6 @@ World::~World() VMAP::VMapFactory::clear(); - if(m_resultQueue) delete m_resultQueue; - //TODO free addSessQueue } @@ -1938,13 +1935,14 @@ void World::ProcessCliCommands() void World::InitResultQueue() { - m_resultQueue = new SqlResultQueue; - CharacterDatabase.SetResultQueue(m_resultQueue); } void World::UpdateResultQueue() { - m_resultQueue->Update(); + //process async result queues + CharacterDatabase.ProcessResultQueue(); + WorldDatabase.ProcessResultQueue(); + LoginDatabase.ProcessResultQueue(); } void World::UpdateRealmCharCount(uint32 accountId) @@ -1960,8 +1958,11 @@ void World::_UpdateRealmCharCount(QueryResult *resultCharCount, uint32 accountId Field *fields = resultCharCount->Fetch(); uint32 charCount = fields[0].GetUInt32(); delete resultCharCount; + + LoginDatabase.BeginTransaction(); LoginDatabase.PExecute("DELETE FROM realmcharacters WHERE acctid= '%u' AND realmid = '%u'", accountId, realmID); LoginDatabase.PExecute("INSERT INTO realmcharacters (numchars, acctid, realmid) VALUES (%u, %u, %u)", charCount, accountId, realmID); + LoginDatabase.CommitTransaction(); } } diff --git a/src/game/World.h b/src/game/World.h index a57e5bb39..fc35fc474 100644 --- a/src/game/World.h +++ b/src/game/World.h @@ -657,7 +657,6 @@ class World // CLI command holder to be thread safe ACE_Based::LockedQueue cliCmdQueue; - SqlResultQueue *m_resultQueue; // next daily quests reset time time_t m_NextDailyQuestReset; diff --git a/src/mangosd/Master.cpp b/src/mangosd/Master.cpp index 5bbc16882..d3255b4b4 100644 --- a/src/mangosd/Master.cpp +++ b/src/mangosd/Master.cpp @@ -416,15 +416,16 @@ bool Master::_StartDB() { ///- Get world database info from configuration file std::string dbstring = sConfig.GetStringDefault("WorldDatabaseInfo", ""); + int nConnections = sConfig.GetIntDefault("WorldDatabaseConnections", 1); if(dbstring.empty()) { sLog.outError("Database not specified in configuration file"); return false; } - sLog.outString("World Database: %s", dbstring.c_str()); + sLog.outString("World Database: %s, total connections: %i", dbstring.c_str(), nConnections + 1); ///- Initialise the world database - if(!WorldDatabase.Initialize(dbstring.c_str())) + if(!WorldDatabase.Initialize(dbstring.c_str(), nConnections)) { sLog.outError("Cannot connect to world database %s",dbstring.c_str()); return false; @@ -438,6 +439,7 @@ bool Master::_StartDB() } dbstring = sConfig.GetStringDefault("CharacterDatabaseInfo", ""); + nConnections = sConfig.GetIntDefault("CharacterDatabaseConnections", 1); if(dbstring.empty()) { sLog.outError("Character Database not specified in configuration file"); @@ -446,10 +448,10 @@ bool Master::_StartDB() WorldDatabase.HaltDelayThread(); return false; } - sLog.outString("Character Database: %s", dbstring.c_str()); + sLog.outString("Character Database: %s, total connections: %i", dbstring.c_str(), nConnections + 1); ///- Initialise the Character database - if(!CharacterDatabase.Initialize(dbstring.c_str())) + if(!CharacterDatabase.Initialize(dbstring.c_str(), nConnections)) { sLog.outError("Cannot connect to Character database %s",dbstring.c_str()); @@ -468,6 +470,7 @@ bool Master::_StartDB() ///- Get login database info from configuration file dbstring = sConfig.GetStringDefault("LoginDatabaseInfo", ""); + nConnections = sConfig.GetIntDefault("LoginDatabaseConnections", 1); if(dbstring.empty()) { sLog.outError("Login database not specified in configuration file"); @@ -479,8 +482,8 @@ bool Master::_StartDB() } ///- Initialise the login database - sLog.outString("Login Database: %s", dbstring.c_str() ); - if(!LoginDatabase.Initialize(dbstring.c_str())) + sLog.outString("Login Database: %s, total connections: %i", dbstring.c_str(), nConnections + 1); + if(!LoginDatabase.Initialize(dbstring.c_str(), nConnections)) { sLog.outError("Cannot connect to login database %s",dbstring.c_str()); diff --git a/src/mangosd/mangosd.conf.dist.in b/src/mangosd/mangosd.conf.dist.in index dd0bb971c..0536296c5 100644 --- a/src/mangosd/mangosd.conf.dist.in +++ b/src/mangosd/mangosd.conf.dist.in @@ -37,6 +37,14 @@ ConfVersion=2010100901 # hostname;port;username;password;database # .;/path/to/unix_socket/DIRECTORY or . for default path;username;password;database - use Unix sockets at Unix/Linux # +# LoginDatabaseConnections +# WorldDatabaseConnections +# CharacterDatabaseConnections +# Amount of connections to database which will be used for SELECT queries. Maximum 16 connections per database. +# Please, note, for data consistency only one connection for each database is used for transactions and async SELECTs. +# So formula to find out how many connections will be established: X = ¹_connections + 1 +# Default: 1 connection for SELECT statements +# # MaxPingTime # Settings for maximum database-ping interval (minutes between pings) # @@ -57,6 +65,9 @@ LogsDir = "" LoginDatabaseInfo = "127.0.0.1;3306;mangos;mangos;realmd" WorldDatabaseInfo = "127.0.0.1;3306;mangos;mangos;mangos" CharacterDatabaseInfo = "127.0.0.1;3306;mangos;mangos;characters" +LoginDatabaseConnections = 1 +WorldDatabaseConnections = 1 +CharacterDatabaseConnections = 1 MaxPingTime = 30 WorldServerPort = 8085 BindIP = "0.0.0.0" diff --git a/src/realmd/Main.cpp b/src/realmd/Main.cpp index 9b1398dd2..6e501542b 100644 --- a/src/realmd/Main.cpp +++ b/src/realmd/Main.cpp @@ -302,7 +302,7 @@ extern int main(int argc, char **argv) { loopCounter = 0; DETAIL_LOG("Ping MySQL to keep connection alive"); - delete LoginDatabase.Query("SELECT 1 FROM realmlist LIMIT 1"); + LoginDatabase.Ping(); } #ifdef WIN32 if (m_ServiceStatus == 0) stopEvent = true; diff --git a/src/shared/Database/Database.cpp b/src/shared/Database/Database.cpp index 9a05b32fc..d114c2c95 100644 --- a/src/shared/Database/Database.cpp +++ b/src/shared/Database/Database.cpp @@ -18,19 +18,29 @@ #include "DatabaseEnv.h" #include "Config/Config.h" +#include "Database/SqlOperations.h" #include #include #include +#define MIN_CONNECTION_POOL_SIZE 1 +#define MAX_CONNECTION_POOL_SIZE 16 + Database::~Database() { + HaltDelayThread(); /*Delete objects*/ + delete m_pResultQueue; + delete m_pAsyncConn; + + for (int i = 0; i < m_pQueryConnections.size(); ++i) + delete m_pQueryConnections[i]; } -bool Database::Initialize(const char *) +bool Database::Initialize(const char * infoString, int nConns /*= 1*/) { - // Enable logging of SQL commands (usally only GM commands) + // Enable logging of SQL commands (usually only GM commands) // (See method: PExecuteLog) m_logSQL = sConfig.GetBoolDefault("LogSQL", false); m_logsDir = sConfig.GetStringDefault("LogsDir",""); @@ -41,9 +51,74 @@ bool Database::Initialize(const char *) } m_pingIntervallms = sConfig.GetIntDefault ("MaxPingTime", 30) * (MINUTE * 1000); + + //create DB connections + + //setup connection pool size + if(nConns < MIN_CONNECTION_POOL_SIZE) + m_nQueryConnPoolSize = MIN_CONNECTION_POOL_SIZE; + else if(nConns > MAX_CONNECTION_POOL_SIZE) + m_nQueryConnPoolSize = MAX_CONNECTION_POOL_SIZE; + else + m_nQueryConnPoolSize = nConns; + + //create connection pool for sync requests + for (int i = 0; i < m_nQueryConnPoolSize; ++i) + { + SqlConnection * pConn = CreateConnection(); + if(!pConn->Initialize(infoString)) + { + delete pConn; + return false; + } + + m_pQueryConnections.push_back(pConn); + } + + //create and initialize connection for async requests + m_pAsyncConn = CreateConnection(); + if(!m_pAsyncConn->Initialize(infoString)) + return false; + + InitDelayThread(); return true; } +SqlDelayThread * Database::CreateDelayThread() +{ + assert(m_pAsyncConn); + return new SqlDelayThread(this, m_pAsyncConn); +} + +void Database::InitDelayThread() +{ + assert(!m_delayThread); + + m_pResultQueue = new SqlResultQueue; + //New delay thread for delay execute + m_threadBody = CreateDelayThread(); // will deleted at m_delayThread delete + m_delayThread = new ACE_Based::Thread(m_threadBody); +} + +void Database::HaltDelayThread() +{ + if (!m_threadBody || !m_delayThread) return; + + m_threadBody->Stop(); //Stop event + m_delayThread->wait(); //Wait for flush to DB + delete m_delayThread; //This also deletes m_threadBody + m_delayThread = NULL; + m_threadBody = NULL; + + //stop async result queue + if(m_pResultQueue) + { + m_pResultQueue->Update(); + delete m_pResultQueue; + m_pResultQueue = NULL; + } +} + void Database::ThreadStart() { } @@ -52,17 +127,52 @@ void Database::ThreadEnd() { } +void Database::ProcessResultQueue() +{ + if(m_pResultQueue) + m_pResultQueue->Update(); +} + void Database::escape_string(std::string& str) { if(str.empty()) return; char* buf = new char[str.size()*2+1]; - escape_string(buf,str.c_str(),str.size()); + //we don't care what connection to use - escape string will be the same + m_pQueryConnections[0]->escape_string(buf,str.c_str(),str.size()); str = buf; delete[] buf; } +SqlConnection * Database::getQueryConnection() +{ + int nCount = 0; + + if(m_nQueryCounter == long(1 << 31)) + m_nQueryCounter = 0; + else + nCount = ++m_nQueryCounter; + + return m_pQueryConnections[nCount % m_nQueryConnPoolSize]; +} + +void Database::Ping() +{ + const char * sql = "SELECT 1"; + + { + SqlConnection::Lock guard(m_pAsyncConn); + delete guard->Query(sql); + } + + for (int i = 0; i < m_nQueryConnPoolSize; ++i) + { + SqlConnection::Lock guard(m_pQueryConnections[i]); + delete guard->Query(sql); + } +} + bool Database::PExecuteLog(const char * format,...) { if (!format) @@ -107,12 +217,6 @@ bool Database::PExecuteLog(const char * format,...) return Execute(szQuery); } -void Database::SetResultQueue(SqlResultQueue * queue) -{ - m_queryQueues[ACE_Based::Thread::current()] = queue; - -} - QueryResult* Database::PQuery(const char *format,...) { if(!format) return NULL; @@ -151,6 +255,29 @@ QueryNamedResult* Database::PQueryNamed(const char *format,...) return QueryNamed(szQuery); } +bool Database::Execute(const char *sql) +{ + if (!m_pAsyncConn) + return false; + + SqlTransaction * pTrans = m_TransStorage->get(); + if(pTrans) + { + //add SQL request to trans queue + pTrans->DelayExecute(sql); + } + else + { + // Simple sql statement + pTrans = new SqlTransaction; + pTrans->DelayExecute(sql); + + m_threadBody->Delay(pTrans); + } + + return true; +} + bool Database::PExecute(const char * format,...) { if (!format) @@ -171,6 +298,18 @@ bool Database::PExecute(const char * format,...) return Execute(szQuery); } +bool Database::DirectExecute(const char* sql) +{ + if(!m_pAsyncConn) + return false; + + SqlTransaction trans; + trans.DelayExecute(sql); + + trans.Execute(m_pAsyncConn); + return true; +} + bool Database::DirectPExecute(const char * format,...) { if (!format) @@ -191,6 +330,62 @@ bool Database::DirectPExecute(const char * format,...) return DirectExecute(szQuery); } +bool Database::BeginTransaction() +{ + if (!m_pAsyncConn) + return false; + + //initiate transaction on current thread + //currently we do not support queued transactions + m_TransStorage->init(); + return true; +} + +bool Database::CommitTransaction() +{ + if (!m_pAsyncConn) + return false; + + //check if we have pending transaction + if(!m_TransStorage->get()) + return false; + + //add SqlTransaction to the async queue + m_threadBody->Delay(m_TransStorage->detach()); + return true; +} + +bool Database::CommitTransactionDirect() +{ + if (!m_pAsyncConn) + return false; + + //check if we have pending transaction + if(!m_TransStorage->get()) + return false; + + //directly execute SqlTransaction + SqlTransaction * pTrans = m_TransStorage->detach(); + pTrans->Execute(m_pAsyncConn); + delete pTrans; + + return true; +} + +bool Database::RollbackTransaction() +{ + if (!m_pAsyncConn) + return false; + + if(!m_TransStorage->get()) + return false; + + //remove scheduled transaction + m_TransStorage->reset(); + + return true; +} + bool Database::CheckRequiredField( char const* table_name, char const* required_name ) { // check required field @@ -278,3 +473,31 @@ bool Database::CheckRequiredField( char const* table_name, char const* required_ return false; } + +Database::TransHelper::~TransHelper() +{ + reset(); +} + +SqlTransaction * Database::TransHelper::init() +{ + MANGOS_ASSERT(!m_pTrans); //if we will get a nested transaction request - we MUST fix code!!! + m_pTrans = new SqlTransaction; + return m_pTrans; +} + +SqlTransaction * Database::TransHelper::detach() +{ + SqlTransaction * pRes = m_pTrans; + m_pTrans = NULL; + return pRes; +} + +void Database::TransHelper::reset() +{ + if(m_pTrans) + { + delete m_pTrans; + m_pTrans = NULL; + } +} diff --git a/src/shared/Database/Database.h b/src/shared/Database/Database.h index 4731ed188..5b3ff7f49 100644 --- a/src/shared/Database/Database.h +++ b/src/shared/Database/Database.h @@ -22,39 +22,87 @@ #include "Threading.h" #include "Utilities/UnorderedMapSet.h" #include "Database/SqlDelayThread.h" +#include +#include "Policies/ThreadingModel.h" +#include +#include class SqlTransaction; class SqlResultQueue; class SqlQueryHolder; -typedef UNORDERED_MAP TransactionQueues; -typedef UNORDERED_MAP QueryQueues; - #define MAX_QUERY_LEN 32*1024 +// +class MANGOS_DLL_SPEC SqlConnection +{ + public: + virtual ~SqlConnection() {} + + //method for initializing DB connection + virtual bool Initialize(const char *infoString) = 0; + //public methods for making queries + virtual QueryResult* Query(const char *sql) = 0; + virtual QueryNamedResult* QueryNamed(const char *sql) = 0; + + //public methods for making requests + virtual bool Execute(const char *sql) = 0; + + //escape string generation + virtual unsigned long escape_string(char *to, const char *from, unsigned long length) { strncpy(to,from,length); return length; } + + // nothing do if DB not support transactions + virtual bool BeginTransaction() { return true; } + virtual bool CommitTransaction() { return true; } + // can't rollback without transaction support + virtual bool RollbackTransaction() { return true; } + + //SqlConnection object lock + class Lock + { + public: + Lock(SqlConnection * conn) : m_pConn(conn) { m_pConn->m_mutex.acquire(); } + ~Lock() { m_pConn->m_mutex.release(); } + + SqlConnection * operator->() const { return m_pConn; } + + private: + SqlConnection * const m_pConn; + }; + + private: + typedef ACE_Recursive_Thread_Mutex LOCK_TYPE; + LOCK_TYPE m_mutex; +}; + class MANGOS_DLL_SPEC Database { - protected: - Database() : m_threadBody(NULL), m_delayThread(NULL) {}; - - TransactionQueues m_tranQueues; ///< Transaction queues from diff. threads - QueryQueues m_queryQueues; ///< Query queues from diff threads - SqlDelayThread* m_threadBody; ///< Pointer to delay sql executer (owned by m_delayThread) - ACE_Based::Thread* m_delayThread; ///< Pointer to executer thread - public: - virtual ~Database(); - virtual bool Initialize(const char *infoString); - virtual void InitDelayThread() = 0; - virtual void HaltDelayThread() = 0; + virtual bool Initialize(const char *infoString, int nConns = 1); + virtual void InitDelayThread(); + virtual void HaltDelayThread(); + + /// Synchronous DB queries + inline QueryResult* Query(const char *sql) + { + SqlConnection::Lock guard(getQueryConnection()); + return guard->Query(sql); + } + + inline QueryNamedResult* QueryNamed(const char *sql) + { + SqlConnection::Lock guard(getQueryConnection()); + return guard->QueryNamed(sql); + } - virtual QueryResult* Query(const char *sql) = 0; QueryResult* PQuery(const char *format,...) ATTR_PRINTF(2,3); - virtual QueryNamedResult* QueryNamed(const char *sql) = 0; QueryNamedResult* PQueryNamed(const char *format,...) ATTR_PRINTF(2,3); + bool DirectExecute(const char* sql); + bool DirectPExecute(const char *format,...) ATTR_PRINTF(2,3); + /// Async queries and query holders, implemented in DatabaseImpl.h // Query / member @@ -95,30 +143,21 @@ class MANGOS_DLL_SPEC Database template bool DelayQueryHolder(Class *object, void (Class::*method)(QueryResult*, SqlQueryHolder*, ParamType1), SqlQueryHolder *holder, ParamType1 param1); - virtual bool Execute(const char *sql) = 0; + bool Execute(const char *sql); bool PExecute(const char *format,...) ATTR_PRINTF(2,3); - virtual bool DirectExecute(const char* sql) = 0; - bool DirectPExecute(const char *format,...) ATTR_PRINTF(2,3); // Writes SQL commands to a LOG file (see mangosd.conf "LogSQL") bool PExecuteLog(const char *format,...) ATTR_PRINTF(2,3); - virtual bool BeginTransaction() // nothing do if DB not support transactions - { - return true; - } - virtual bool CommitTransaction() // nothing do if DB not support transactions - { - return true; - } - virtual bool RollbackTransaction() // can't rollback without transaction support - { - return false; - } + bool BeginTransaction(); + bool CommitTransaction(); + bool RollbackTransaction(); + //for sync transaction execution + bool CommitTransactionDirect(); - virtual operator bool () const = 0; + operator bool () const { return m_pQueryConnections.size() && m_pAsyncConn != 0; } - virtual unsigned long escape_string(char *to, const char *from, unsigned long length) { strncpy(to,from,length); return length; } + //escape string generation void escape_string(std::string& str); // must be called before first query in thread (one time for thread using one from existing Database objects) @@ -126,13 +165,76 @@ class MANGOS_DLL_SPEC Database // must be called before finish thread run (one time for thread using one from existing Database objects) virtual void ThreadEnd(); - // sets the result queue of the current thread, be careful what thread you call this from - void SetResultQueue(SqlResultQueue * queue); + // set database-wide result queue. also we should use object-bases and not thread-based result queues + void ProcessResultQueue(); bool CheckRequiredField(char const* table_name, char const* required_name); - uint32 GetPingIntervall() { return m_pingIntervallms;} + uint32 GetPingIntervall() { return m_pingIntervallms; } + + //function to ping database connections + void Ping(); + + protected: + Database() : m_pAsyncConn(NULL), m_pResultQueue(NULL), m_threadBody(NULL), m_delayThread(NULL), + m_logSQL(false), m_pingIntervallms(0), m_nQueryConnPoolSize(1) + { + m_nQueryCounter = -1; + } + + //factory method to create SqlConnection objects + virtual SqlConnection * CreateConnection() = 0; + //factory method to create SqlDelayThread objects + virtual SqlDelayThread * CreateDelayThread(); + + class TransHelper + { + public: + TransHelper() : m_pTrans(NULL) {} + ~TransHelper(); + + //initializes new SqlTransaction object + SqlTransaction * init(); + //gets pointer on current transaction object. Returns NULL if transaction was not initiated + SqlTransaction * get() const { return m_pTrans; } + //detaches SqlTransaction object allocated by init() function + //next call to get() function will return NULL! + //do not forget to destroy obtained SqlTransaction object! + SqlTransaction * detach(); + //destroyes SqlTransaction allocated by init() function + void reset(); + + private: + SqlTransaction * m_pTrans; + }; + + //per-thread based storage for SqlTransaction object initialization - no locking is required + typedef ACE_TSS DBTransHelperTSS; + Database::DBTransHelperTSS m_TransStorage; + + ///< DB connections + + //round-robin connection selection + SqlConnection * getQueryConnection(); + //for now return one single connection for async requests + SqlConnection * getAsyncConnection() const { return m_pAsyncConn; } + + //connection helper counters + int m_nQueryConnPoolSize; //current size of query connection pool + ACE_Atomic_Op m_nQueryCounter; //counter for connection selection + + //lets use pool of connections for sync queries + typedef std::vector< SqlConnection * > SqlConnectionContainer; + SqlConnectionContainer m_pQueryConnections; + + //only one single DB connection for transactions + SqlConnection * m_pAsyncConn; + + SqlResultQueue * m_pResultQueue; ///< Transaction queues from diff. threads + SqlDelayThread * m_threadBody; ///< Pointer to delay sql executer (owned by m_delayThread) + ACE_Based::Thread * m_delayThread; ///< Pointer to executer thread private: + bool m_logSQL; std::string m_logsDir; uint32 m_pingIntervallms; diff --git a/src/shared/Database/DatabaseImpl.h b/src/shared/Database/DatabaseImpl.h index 44072eca3..963b15150 100644 --- a/src/shared/Database/DatabaseImpl.h +++ b/src/shared/Database/DatabaseImpl.h @@ -21,16 +21,8 @@ /// Function body definitions for the template function members of the Database class -#define ASYNC_QUERY_BODY(sql, queue_itr) \ - if (!sql) return false; \ - \ - QueryQueues::iterator queue_itr; \ - \ - { \ - ACE_Based::Thread * queryThread = ACE_Based::Thread::current(); \ - queue_itr = m_queryQueues.find(queryThread); \ - if (queue_itr == m_queryQueues.end()) return false; \ - } +#define ASYNC_QUERY_BODY(sql) if (!sql || !m_pResultQueue) return false; +#define ASYNC_DELAYHOLDER_BODY(holder) if (!holder || !m_pResultQueue) return false; #define ASYNC_PQUERY_BODY(format, szQuery) \ if(!format) return false; \ @@ -51,49 +43,38 @@ } \ } -#define ASYNC_DELAYHOLDER_BODY(holder, queue_itr) \ - if (!holder) return false; \ - \ - QueryQueues::iterator queue_itr; \ - \ - { \ - ACE_Based::Thread * queryThread = ACE_Based::Thread::current(); \ - queue_itr = m_queryQueues.find(queryThread); \ - if (queue_itr == m_queryQueues.end()) return false; \ - } - // -- Query / member -- template bool Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*), const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method), m_pResultQueue)); } template bool Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*, ParamType1), ParamType1 param1, const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, param1), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, param1), m_pResultQueue)); } template bool Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*, ParamType1, ParamType2), ParamType1 param1, ParamType2 param2, const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, param1, param2), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, param1, param2), m_pResultQueue)); } template bool Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*, ParamType1, ParamType2, ParamType3), ParamType1 param1, ParamType2 param2, ParamType3 param3, const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, param1, param2, param3), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, param1, param2, param3), m_pResultQueue)); } // -- Query / static -- @@ -102,24 +83,24 @@ template bool Database::AsyncQuery(void (*method)(QueryResult*, ParamType1), ParamType1 param1, const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback(method, (QueryResult*)NULL, param1), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback(method, (QueryResult*)NULL, param1), m_pResultQueue)); } template bool Database::AsyncQuery(void (*method)(QueryResult*, ParamType1, ParamType2), ParamType1 param1, ParamType2 param2, const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback(method, (QueryResult*)NULL, param1, param2), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback(method, (QueryResult*)NULL, param1, param2), m_pResultQueue)); } template bool Database::AsyncQuery(void (*method)(QueryResult*, ParamType1, ParamType2, ParamType3), ParamType1 param1, ParamType2 param2, ParamType3 param3, const char *sql) { - ASYNC_QUERY_BODY(sql, itr) - return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback(method, (QueryResult*)NULL, param1, param2, param3), itr->second)); + ASYNC_QUERY_BODY(sql) + return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback(method, (QueryResult*)NULL, param1, param2, param3), m_pResultQueue)); } // -- PQuery / member -- @@ -188,16 +169,16 @@ template bool Database::DelayQueryHolder(Class *object, void (Class::*method)(QueryResult*, SqlQueryHolder*), SqlQueryHolder *holder) { - ASYNC_DELAYHOLDER_BODY(holder, itr) - return holder->Execute(new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, holder), m_threadBody, itr->second); + ASYNC_DELAYHOLDER_BODY(holder) + return holder->Execute(new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, holder), m_threadBody, m_pResultQueue); } template bool Database::DelayQueryHolder(Class *object, void (Class::*method)(QueryResult*, SqlQueryHolder*, ParamType1), SqlQueryHolder *holder, ParamType1 param1) { - ASYNC_DELAYHOLDER_BODY(holder, itr) - return holder->Execute(new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, holder, param1), m_threadBody, itr->second); + ASYNC_DELAYHOLDER_BODY(holder) + return holder->Execute(new MaNGOS::QueryCallback(object, method, (QueryResult*)NULL, holder, param1), m_threadBody, m_pResultQueue); } #undef ASYNC_QUERY_BODY diff --git a/src/shared/Database/DatabaseMysql.cpp b/src/shared/Database/DatabaseMysql.cpp index c0d390283..9e596bc8d 100644 --- a/src/shared/Database/DatabaseMysql.cpp +++ b/src/shared/Database/DatabaseMysql.cpp @@ -23,10 +23,10 @@ #include "Platform/Define.h" #include "Threading.h" #include "DatabaseEnv.h" -#include "Database/MySQLDelayThread.h" -#include "Database/SqlOperations.h" #include "Timer.h" +size_t DatabaseMysql::db_count = 0; + void DatabaseMysql::ThreadStart() { mysql_thread_init(); @@ -37,9 +37,7 @@ void DatabaseMysql::ThreadEnd() mysql_thread_end(); } -size_t DatabaseMysql::db_count = 0; - -DatabaseMysql::DatabaseMysql() : Database(), mMysql(0) +DatabaseMysql::DatabaseMysql() { // before first connection if( db_count++ == 0 ) @@ -61,31 +59,40 @@ DatabaseMysql::~DatabaseMysql() if (m_delayThread) HaltDelayThread(); - if (mMysql) - mysql_close(mMysql); + //destroy SqlConnection objects + if(m_pQueryConnections.size()) + { + for (int i = 0; i < m_pQueryConnections.size(); ++i) + delete m_pQueryConnections[i]; + + m_pQueryConnections.clear(); + } + + if(m_pAsyncConn) + { + delete m_pAsyncConn; + m_pAsyncConn = NULL; + } //Free Mysql library pointers for last ~DB if(--db_count == 0) mysql_library_end(); } -bool DatabaseMysql::Initialize(const char *infoString) +SqlConnection * DatabaseMysql::CreateConnection() +{ + return new MySQLConnection(); +} + +bool MySQLConnection::Initialize(const char *infoString) { - - if(!Database::Initialize(infoString)) - return false; - - tranThread = NULL; - MYSQL *mysqlInit; - mysqlInit = mysql_init(NULL); + MYSQL * mysqlInit = mysql_init(NULL); if (!mysqlInit) { sLog.outError( "Could not initialize Mysql connection" ); return false; } - InitDelayThread(); - Tokens tokens = StrSplit(infoString, ";"); Tokens::iterator iter; @@ -108,7 +115,7 @@ bool DatabaseMysql::Initialize(const char *infoString) database = *iter++; mysql_options(mysqlInit,MYSQL_SET_CHARSET_NAME,"utf8"); - #ifdef WIN32 +#ifdef WIN32 if(host==".") // named pipe use option (Windows) { unsigned int opt = MYSQL_PROTOCOL_PIPE; @@ -121,7 +128,7 @@ bool DatabaseMysql::Initialize(const char *infoString) port = atoi(port_or_socket.c_str()); unix_socket = 0; } - #else +#else if(host==".") // socket use option (Unix/Linux) { unsigned int opt = MYSQL_PROTOCOL_SOCKET; @@ -135,7 +142,7 @@ bool DatabaseMysql::Initialize(const char *infoString) port = atoi(port_or_socket.c_str()); unix_socket = 0; } - #endif +#endif mMysql = mysql_real_connect(mysqlInit, host.c_str(), user.c_str(), password.c_str(), database.c_str(), port, unix_socket, 0); @@ -155,16 +162,19 @@ bool DatabaseMysql::Initialize(const char *infoString) // This is wrong since mangos use transactions, // autocommit is turned of during it. // Setting it to on makes atomic updates work - if (!mysql_autocommit(mMysql, 1)) - DETAIL_LOG("AUTOCOMMIT SUCCESSFULLY SET TO 1"); + // --- + // if you want atomic updates to work - USE TRANSACTIONS!!! + // no need to mess up with autocommit mode which might degrade server performance! + if (!mysql_autocommit(mMysql, 0)) + DETAIL_LOG("AUTOCOMMIT SUCCESSFULLY SET TO 0"); else - DETAIL_LOG("AUTOCOMMIT NOT SET TO 1"); + DETAIL_LOG("AUTOCOMMIT NOT SET TO 0"); /*-------------------------------------*/ // set connection properties to UTF8 to properly handle locales for different // server configs - core sends data in UTF8, so MySQL must expect UTF8 too - PExecute("SET NAMES `utf8`"); - PExecute("SET CHARACTER SET `utf8`"); + Execute("SET NAMES `utf8`"); + Execute("SET CHARACTER SET `utf8`"); return true; } @@ -177,33 +187,27 @@ bool DatabaseMysql::Initialize(const char *infoString) } } -bool DatabaseMysql::_Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **pFields, uint64* pRowCount, uint32* pFieldCount) +bool MySQLConnection::_Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **pFields, uint64* pRowCount, uint32* pFieldCount) { if (!mMysql) return 0; + uint32 _s = WorldTimer::getMSTime(); + + if(mysql_query(mMysql, sql)) { - // guarded block for thread-safe mySQL request - ACE_Guard query_connection_guard(mMutex); - - uint32 _s = WorldTimer::getMSTime(); - - if(mysql_query(mMysql, sql)) - { - sLog.outErrorDb( "SQL: %s", sql ); - sLog.outErrorDb("query ERROR: %s", mysql_error(mMysql)); - return false; - } - else - { - DEBUG_FILTER_LOG(LOG_FILTER_SQL_TEXT, "[%u ms] SQL: %s", WorldTimer::getMSTimeDiff(_s,WorldTimer::getMSTime()), sql ); - } - - *pResult = mysql_store_result(mMysql); - *pRowCount = mysql_affected_rows(mMysql); - *pFieldCount = mysql_field_count(mMysql); - // end guarded block + sLog.outErrorDb( "SQL: %s", sql ); + sLog.outErrorDb("query ERROR: %s", mysql_error(mMysql)); + return false; } + else + { + DEBUG_FILTER_LOG(LOG_FILTER_SQL_TEXT, "[%u ms] SQL: %s", WorldTimer::getMSTimeDiff(_s,WorldTimer::getMSTime()), sql ); + } + + *pResult = mysql_store_result(mMysql); + *pRowCount = mysql_affected_rows(mMysql); + *pFieldCount = mysql_field_count(mMysql); if (!*pResult ) return false; @@ -218,7 +222,7 @@ bool DatabaseMysql::_Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **p return true; } -QueryResult* DatabaseMysql::Query(const char *sql) +QueryResult* MySQLConnection::Query(const char *sql) { MYSQL_RES *result = NULL; MYSQL_FIELD *fields = NULL; @@ -231,11 +235,10 @@ QueryResult* DatabaseMysql::Query(const char *sql) QueryResultMysql *queryResult = new QueryResultMysql(result, fields, rowCount, fieldCount); queryResult->NextRow(); - return queryResult; } -QueryNamedResult* DatabaseMysql::QueryNamed(const char *sql) +QueryNamedResult* MySQLConnection::QueryNamed(const char *sql) { MYSQL_RES *result = NULL; MYSQL_FIELD *fields = NULL; @@ -252,44 +255,15 @@ QueryNamedResult* DatabaseMysql::QueryNamed(const char *sql) QueryResultMysql *queryResult = new QueryResultMysql(result, fields, rowCount, fieldCount); queryResult->NextRow(); - return new QueryNamedResult(queryResult,names); } -bool DatabaseMysql::Execute(const char *sql) -{ - if (!mMysql) - return false; - - // don't use queued execution if it has not been initialized - if (!m_threadBody) return DirectExecute(sql); - - ACE_Guard _lock(nMutex); - - tranThread = ACE_Based::Thread::current(); // owner of this transaction - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - { // Statement for transaction - i->second->DelayExecute(sql); - } - else - { - // Simple sql statement - m_threadBody->Delay(new SqlStatement(sql)); - } - - return true; -} - -bool DatabaseMysql::DirectExecute(const char* sql) +bool MySQLConnection::Execute(const char* sql) { if (!mMysql) return false; { - // guarded block for thread-safe mySQL request - ACE_Guard query_connection_guard(mMutex); - uint32 _s = WorldTimer::getMSTime(); if(mysql_query(mMysql, sql)) @@ -308,7 +282,7 @@ bool DatabaseMysql::DirectExecute(const char* sql) return true; } -bool DatabaseMysql::_TransactionCmd(const char *sql) +bool MySQLConnection::_TransactionCmd(const char *sql) { if (mysql_query(mMysql, sql)) { @@ -323,100 +297,22 @@ bool DatabaseMysql::_TransactionCmd(const char *sql) return true; } -bool DatabaseMysql::BeginTransaction() +bool MySQLConnection::BeginTransaction() { - if (!mMysql) - return false; - - // don't use queued execution if it has not been initialized - if (!m_threadBody) - { - if (tranThread == ACE_Based::Thread::current()) - return false; // huh? this thread already started transaction - - mMutex.acquire(); - if (!_TransactionCmd("START TRANSACTION")) - { - mMutex.release(); // can't start transaction - return false; - } - return true; // transaction started - } - - ACE_Guard _lock(nMutex); - - tranThread = ACE_Based::Thread::current(); // owner of this transaction - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - // If for thread exists queue and also contains transaction - // delete that transaction (not allow trans in trans) - delete i->second; - - m_tranQueues[tranThread] = new SqlTransaction(); - - return true; + return _TransactionCmd("START TRANSACTION"); } -bool DatabaseMysql::CommitTransaction() +bool MySQLConnection::CommitTransaction() { - if (!mMysql) - return false; - - // don't use queued execution if it has not been initialized - if (!m_threadBody) - { - if (tranThread != ACE_Based::Thread::current()) - return false; - bool _res = _TransactionCmd("COMMIT"); - tranThread = NULL; - mMutex.release(); - return _res; - } - - ACE_Guard _lock(nMutex); - - tranThread = ACE_Based::Thread::current(); - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - { - m_threadBody->Delay(i->second); - m_tranQueues.erase(i); - return true; - } - - return false; + return _TransactionCmd("COMMIT"); } -bool DatabaseMysql::RollbackTransaction() +bool MySQLConnection::RollbackTransaction() { - if (!mMysql) - return false; - - // don't use queued execution if it has not been initialized - if (!m_threadBody) - { - if (tranThread != ACE_Based::Thread::current()) - return false; - bool _res = _TransactionCmd("ROLLBACK"); - tranThread = NULL; - mMutex.release(); - return _res; - } - - ACE_Guard _lock(nMutex); - - tranThread = ACE_Based::Thread::current(); - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - { - delete i->second; - m_tranQueues.erase(i); - } - - return true; + return _TransactionCmd("ROLLBACK"); } -unsigned long DatabaseMysql::escape_string(char *to, const char *from, unsigned long length) +unsigned long MySQLConnection::escape_string(char *to, const char *from, unsigned long length) { if (!mMysql || !to || !from || !length) return 0; @@ -424,23 +320,4 @@ unsigned long DatabaseMysql::escape_string(char *to, const char *from, unsigned return(mysql_real_escape_string(mMysql, to, from, length)); } -void DatabaseMysql::InitDelayThread() -{ - assert(!m_delayThread); - - //New delay thread for delay execute - m_threadBody = new MySQLDelayThread(this); // will deleted at m_delayThread delete - m_delayThread = new ACE_Based::Thread(m_threadBody); -} - -void DatabaseMysql::HaltDelayThread() -{ - if (!m_threadBody || !m_delayThread) return; - - m_threadBody->Stop(); //Stop event - m_delayThread->wait(); //Wait for flush to DB - delete m_delayThread; //This also deletes m_threadBody - m_delayThread = NULL; - m_threadBody = NULL; -} #endif diff --git a/src/shared/Database/DatabaseMysql.h b/src/shared/Database/DatabaseMysql.h index 51ebd4f68..a9d920e73 100644 --- a/src/shared/Database/DatabaseMysql.h +++ b/src/shared/Database/DatabaseMysql.h @@ -34,6 +34,31 @@ #include #endif +class MANGOS_DLL_SPEC MySQLConnection : public SqlConnection +{ + public: + MySQLConnection() : mMysql(NULL) {} + ~MySQLConnection() { mysql_close(mMysql); } + + bool Initialize(const char *infoString); + + QueryResult* Query(const char *sql); + QueryNamedResult* QueryNamed(const char *sql); + bool Execute(const char *sql); + + unsigned long escape_string(char *to, const char *from, unsigned long length); + + bool BeginTransaction(); + bool CommitTransaction(); + bool RollbackTransaction(); + + private: + bool _TransactionCmd(const char *sql); + bool _Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **pFields, uint64* pRowCount, uint32* pFieldCount); + + MYSQL *mMysql; +}; + class MANGOS_DLL_SPEC DatabaseMysql : public Database { friend class MaNGOS::OperatorNew; @@ -44,38 +69,18 @@ class MANGOS_DLL_SPEC DatabaseMysql : public Database //! Initializes Mysql and connects to a server. /*! infoString should be formated like hostname;username;password;database. */ - bool Initialize(const char *infoString); - void InitDelayThread(); - void HaltDelayThread(); - QueryResult* Query(const char *sql); - QueryNamedResult* QueryNamed(const char *sql); - bool Execute(const char *sql); - bool DirectExecute(const char* sql); - bool BeginTransaction(); - bool CommitTransaction(); - bool RollbackTransaction(); - - operator bool () const { return mMysql != NULL; } - - unsigned long escape_string(char *to, const char *from, unsigned long length); - using Database::escape_string; // must be call before first query in thread void ThreadStart(); // must be call before finish thread run void ThreadEnd(); + + protected: + virtual SqlConnection * CreateConnection(); + private: - ACE_Thread_Mutex mMutex; // For thread safe operations between core and mySQL server - ACE_Thread_Mutex nMutex; // For thread safe operations on m_transQueues - - ACE_Based::Thread * tranThread; - - MYSQL *mMysql; - static size_t db_count; - - bool _TransactionCmd(const char *sql); - bool _Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **pFields, uint64* pRowCount, uint32* pFieldCount); }; + #endif #endif diff --git a/src/shared/Database/DatabasePostgre.cpp b/src/shared/Database/DatabasePostgre.cpp index d25a099ee..0edfe55d4 100644 --- a/src/shared/Database/DatabasePostgre.cpp +++ b/src/shared/Database/DatabasePostgre.cpp @@ -23,18 +23,16 @@ #include "Platform/Define.h" #include "Threading.h" #include "DatabaseEnv.h" -#include "Database/PGSQLDelayThread.h" #include "Database/SqlOperations.h" #include "Timer.h" size_t DatabasePostgre::db_count = 0; -DatabasePostgre::DatabasePostgre() : Database(), mPGconn(NULL) +DatabasePostgre::DatabasePostgre() { // before first connection if( db_count++ == 0 ) { - if (!PQisthreadsafe()) { sLog.outError("FATAL ERROR: PostgreSQL libpq isn't thread-safe."); @@ -45,26 +43,32 @@ DatabasePostgre::DatabasePostgre() : Database(), mPGconn(NULL) DatabasePostgre::~DatabasePostgre() { - if (m_delayThread) HaltDelayThread(); - if( mPGconn ) + //destroy SqlConnection objects + if(m_pQueryConnections.size()) { - PQfinish(mPGconn); - mPGconn = NULL; + for (int i = 0; i < m_pQueryConnections.size(); ++i) + delete m_pQueryConnections[i]; + + m_pQueryConnections.clear(); + } + + if(m_pAsyncConn) + { + delete m_pAsyncConn; + m_pAsyncConn = NULL; } } -bool DatabasePostgre::Initialize(const char *infoString) +SqlConnection * DatabasePostgre::CreateConnection() { - if(!Database::Initialize(infoString)) - return false; - - tranThread = NULL; - - InitDelayThread(); + return new PostgreSQLConnection(); +} +bool PostgreSQLConnection::Initialize(const char *infoString) +{ Tokens tokens = StrSplit(infoString, ";"); Tokens::iterator iter; @@ -98,26 +102,18 @@ bool DatabasePostgre::Initialize(const char *infoString) mPGconn = NULL; return false; } - else - { - sLog.outDetail( "Connected to Postgre database at %s", - host.c_str()); - sLog.outString( "PostgreSQL server ver: %d",PQserverVersion(mPGconn)); - return true; - } + sLog.outDetail( "Connected to Postgre database at %s", host.c_str()); + sLog.outString( "PostgreSQL server ver: %d", PQserverVersion(mPGconn)); + return true; } -bool DatabasePostgre::_Query(const char *sql, PGresult** pResult, uint64* pRowCount, uint32* pFieldCount) +bool PostgreSQLConnection::_Query(const char *sql, PGresult** pResult, uint64* pRowCount, uint32* pFieldCount) { if (!mPGconn) - return 0; + return false; - // guarded block for thread-safe request - ACE_Guard query_connection_guard(mMutex); - #ifdef MANGOS_DEBUG - uint32 _s = getMSTime(); - #endif + uint32 _s = WorldTimer::getMSTime(); // Send the query *pResult = PQexec(mPGconn, sql); if(!*pResult ) @@ -132,9 +128,7 @@ bool DatabasePostgre::_Query(const char *sql, PGresult** pResult, uint64* pRowCo } else { - #ifdef MANGOS_DEBUG - sLog.outDebug("[%u ms] SQL: %s", getMSTime() - _s, sql ); - #endif + DEBUG_FILTER_LOG(LOG_FILTER_SQL_TEXT, "[%u ms] SQL: %s", WorldTimer::getMSTimeDiff(_s,WorldTimer::getMSTime()), sql ); } *pRowCount = PQntuples(*pResult); @@ -146,13 +140,14 @@ bool DatabasePostgre::_Query(const char *sql, PGresult** pResult, uint64* pRowCo PQclear(*pResult); return false; } + return true; } -QueryResult* DatabasePostgre::Query(const char *sql) +QueryResult* PostgreSQLConnection::Query(const char *sql) { if (!mPGconn) - return 0; + return NULL; PGresult* result = NULL; uint64 rowCount = 0; @@ -162,15 +157,15 @@ QueryResult* DatabasePostgre::Query(const char *sql) return NULL; QueryResultPostgre * queryResult = new QueryResultPostgre(result, rowCount, fieldCount); - queryResult->NextRow(); + queryResult->NextRow(); return queryResult; } -QueryNamedResult* DatabasePostgre::QueryNamed(const char *sql) +QueryNamedResult* PostgreSQLConnection::QueryNamed(const char *sql) { if (!mPGconn) - return 0; + return NULL; PGresult* result = NULL; uint64 rowCount = 0; @@ -184,67 +179,35 @@ QueryNamedResult* DatabasePostgre::QueryNamed(const char *sql) names[i] = PQfname(result, i); QueryResultPostgre * queryResult = new QueryResultPostgre(result, rowCount, fieldCount); - queryResult->NextRow(); + queryResult->NextRow(); return new QueryNamedResult(queryResult,names); } -bool DatabasePostgre::Execute(const char *sql) +bool PostgreSQLConnection::Execute(const char *sql) { - if (!mPGconn) return false; - // don't use queued execution if it has not been initialized - if (!m_threadBody) - return DirectExecute(sql); + uint32 _s = WorldTimer::getMSTime(); - tranThread = ACE_Based::Thread::current(); // owner of this transaction - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - { // Statement for transaction - i->second->DelayExecute(sql); + PGresult *res = PQexec(mPGconn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + sLog.outErrorDb( "SQL: %s", sql ); + sLog.outErrorDb( "SQL %s", PQerrorMessage(mPGconn) ); + return false; } else { - // Simple sql statement - m_threadBody->Delay(new SqlStatement(sql)); + DEBUG_FILTER_LOG(LOG_FILTER_SQL_TEXT, "[%u ms] SQL: %s", WorldTimer::getMSTimeDiff(_s,WorldTimer::getMSTime()), sql ); } + PQclear(res); return true; } -bool DatabasePostgre::DirectExecute(const char* sql) -{ - if (!mPGconn) - return false; - { - // guarded block for thread-safe request - ACE_Guard query_connection_guard(mMutex); - #ifdef MANGOS_DEBUG - uint32 _s = getMSTime(); - #endif - PGresult *res = PQexec(mPGconn, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - sLog.outErrorDb( "SQL: %s", sql ); - sLog.outErrorDb( "SQL %s", PQerrorMessage(mPGconn) ); - return false; - } - else - { - #ifdef MANGOS_DEBUG - sLog.outDebug("[%u ms] SQL: %s", getMSTime() - _s, sql ); - #endif - } - PQclear(res); - - // end guarded block - } - return true; -} - -bool DatabasePostgre::_TransactionCmd(const char *sql) +bool PostgreSQLConnection::_TransactionCmd(const char *sql) { if (!mPGconn) return false; @@ -263,88 +226,22 @@ bool DatabasePostgre::_TransactionCmd(const char *sql) return true; } -bool DatabasePostgre::BeginTransaction() +bool PostgreSQLConnection::BeginTransaction() { - if (!mPGconn) - return false; - // don't use queued execution if it has not been initialized - if (!m_threadBody) - { - if (tranThread == ACE_Based::Thread::current()) - return false; // huh? this thread already started transaction - mMutex.acquire(); - if (!_TransactionCmd("START TRANSACTION")) - { - mMutex.release(); // can't start transaction - return false; - } - return true; - } - // transaction started - tranThread = ACE_Based::Thread::current(); // owner of this transaction - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - // If for thread exists queue and also contains transaction - // delete that transaction (not allow trans in trans) - delete i->second; - - m_tranQueues[tranThread] = new SqlTransaction(); - - return true; + return _TransactionCmd("START TRANSACTION"); } -bool DatabasePostgre::CommitTransaction() +bool PostgreSQLConnection::CommitTransaction() { - if (!mPGconn) - return false; - - // don't use queued execution if it has not been initialized - if (!m_threadBody) - { - if (tranThread != ACE_Based::Thread::current()) - return false; - bool _res = _TransactionCmd("COMMIT"); - tranThread = NULL; - mMutex.release(); - return _res; - } - tranThread = ACE_Based::Thread::current(); - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - { - m_threadBody->Delay(i->second); - i->second = NULL; - return true; - } - else - return false; + return _TransactionCmd("COMMIT"); } -bool DatabasePostgre::RollbackTransaction() -{ - if (!mPGconn) - return false; - // don't use queued execution if it has not been initialized - if (!m_threadBody) - { - if (tranThread != ACE_Based::Thread::current()) - return false; - bool _res = _TransactionCmd("ROLLBACK"); - tranThread = NULL; - mMutex.release(); - return _res; - } - tranThread = ACE_Based::Thread::current(); - TransactionQueues::iterator i = m_tranQueues.find(tranThread); - if (i != m_tranQueues.end() && i->second != NULL) - { - delete i->second; - i->second = NULL; - } - return true; +bool PostgreSQLConnection::RollbackTransaction() +{ + return _TransactionCmd("ROLLBACK"); } -unsigned long DatabasePostgre::escape_string(char *to, const char *from, unsigned long length) +unsigned long PostgreSQLConnection::escape_string(char *to, const char *from, unsigned long length) { if (!mPGconn || !to || !from || !length) return 0; @@ -352,23 +249,4 @@ unsigned long DatabasePostgre::escape_string(char *to, const char *from, unsigne return PQescapeString(to, from, length); } -void DatabasePostgre::InitDelayThread() -{ - assert(!m_delayThread); - - //New delay thread for delay execute - m_threadBody = new PGSQLDelayThread(this); // Will be deleted on m_delayThread delete - m_delayThread = new ACE_Based::Thread(m_threadBody); -} - -void DatabasePostgre::HaltDelayThread() -{ - if (!m_threadBody || !m_delayThread) return; - - m_threadBody->Stop(); //Stop event - m_delayThread->wait(); //Wait for flush to DB - delete m_delayThread; //This also deletes m_threadBody - m_delayThread = NULL; - m_threadBody = NULL; -} #endif diff --git a/src/shared/Database/DatabasePostgre.h b/src/shared/Database/DatabasePostgre.h index cb5d9c65c..19f0c4681 100644 --- a/src/shared/Database/DatabasePostgre.h +++ b/src/shared/Database/DatabasePostgre.h @@ -19,7 +19,11 @@ #ifndef _DatabasePostgre_H #define _DatabasePostgre_H +#include "Common.h" +#include "Database.h" #include "Policies/Singleton.h" +#include "ace/Thread_Mutex.h" +#include "ace/Guard_T.h" #include #ifdef WIN32 @@ -30,7 +34,32 @@ #include #endif -class DatabasePostgre : public Database +class MANGOS_DLL_SPEC PostgreSQLConnection : public SqlConnection +{ + public: + PostgreSQLConnection() : mPGconn(NULL) {} + ~PostgreSQLConnection() { PQfinish(mPGconn); } + + bool Initialize(const char *infoString); + + QueryResult* Query(const char *sql); + QueryNamedResult* QueryNamed(const char *sql); + bool Execute(const char *sql); + + unsigned long escape_string(char *to, const char *from, unsigned long length); + + bool BeginTransaction(); + bool CommitTransaction(); + bool RollbackTransaction(); + + private: + bool _TransactionCmd(const char *sql); + bool _Query(const char *sql, PGresult **pResult, uint64* pRowCount, uint32* pFieldCount); + + PGconn *mPGconn; +}; + +class MANGOS_DLL_SPEC DatabasePostgre : public Database { friend class MaNGOS::OperatorNew; @@ -40,30 +69,11 @@ class DatabasePostgre : public Database //! Initializes Postgres and connects to a server. /*! infoString should be formated like hostname;username;password;database. */ - bool Initialize(const char *infoString); - void InitDelayThread(); - void HaltDelayThread(); - QueryResult* Query(const char *sql); - QueryNamedResult* QueryNamed(const char *sql); - bool Execute(const char *sql); - bool DirectExecute(const char* sql); - bool BeginTransaction(); - bool CommitTransaction(); - bool RollbackTransaction(); - operator bool () const { return mPGconn != NULL; } + protected: + virtual SqlConnection * CreateConnection(); - unsigned long escape_string(char *to, const char *from, unsigned long length); - using Database::escape_string; private: - ACE_Thread_Mutex mMutex; - ACE_Based::Thread * tranThread; - - PGconn *mPGconn; - static size_t db_count; - - bool _TransactionCmd(const char *sql); - bool _Query(const char *sql, PGresult **pResult, uint64* pRowCount, uint32* pFieldCount); }; #endif diff --git a/src/shared/Database/SqlDelayThread.cpp b/src/shared/Database/SqlDelayThread.cpp index 2504772be..b15b405db 100644 --- a/src/shared/Database/SqlDelayThread.cpp +++ b/src/shared/Database/SqlDelayThread.cpp @@ -20,7 +20,7 @@ #include "Database/SqlOperations.h" #include "DatabaseEnv.h" -SqlDelayThread::SqlDelayThread(Database* db) : m_dbEngine(db), m_running(true) +SqlDelayThread::SqlDelayThread(Database* db, SqlConnection* conn) : m_dbEngine(db), m_dbConnection(conn), m_running(true) { } @@ -40,7 +40,7 @@ void SqlDelayThread::run() const uint32 loopSleepms = 10; - const uint32 pingEveryLoop = m_dbEngine->GetPingIntervall()/loopSleepms; + const uint32 pingEveryLoop = m_dbEngine->GetPingIntervall() / loopSleepms; uint32 loopCounter = 0; while (m_running) @@ -52,14 +52,14 @@ void SqlDelayThread::run() SqlOperation* s = NULL; while (m_sqlQueue.next(s)) { - s->Execute(m_dbEngine); + s->Execute(m_dbConnection); delete s; } if((loopCounter++) >= pingEveryLoop) { loopCounter = 0; - delete m_dbEngine->Query("SELECT 1"); + m_dbEngine->Ping(); } } diff --git a/src/shared/Database/SqlDelayThread.h b/src/shared/Database/SqlDelayThread.h index 03e4d65d9..167f5763c 100644 --- a/src/shared/Database/SqlDelayThread.h +++ b/src/shared/Database/SqlDelayThread.h @@ -26,6 +26,7 @@ class Database; class SqlOperation; +class SqlConnection; class SqlDelayThread : public ACE_Based::Runnable { @@ -34,10 +35,11 @@ class SqlDelayThread : public ACE_Based::Runnable private: SqlQueue m_sqlQueue; ///< Queue of SQL statements Database* m_dbEngine; ///< Pointer to used Database engine + SqlConnection * m_dbConnection; ///< Pointer to DB connection volatile bool m_running; public: - SqlDelayThread(Database* db); + SqlDelayThread(Database* db, SqlConnection* conn); ~SqlDelayThread(); ///< Put sql statement to delay queue diff --git a/src/shared/Database/SqlOperations.cpp b/src/shared/Database/SqlOperations.cpp index 09d9bc64d..013cd6f48 100644 --- a/src/shared/Database/SqlOperations.cpp +++ b/src/shared/Database/SqlOperations.cpp @@ -21,54 +21,60 @@ #include "DatabaseEnv.h" #include "DatabaseImpl.h" +#define LOCK_DB_CONN(conn) SqlConnection::Lock guard(conn) + /// ---- ASYNC STATEMENTS / TRANSACTIONS ---- -void SqlStatement::Execute(Database *db) +void SqlStatement::Execute(SqlConnection *conn) { /// just do it - db->DirectExecute(m_sql); + LOCK_DB_CONN(conn); + conn->Execute(m_sql); } -void SqlTransaction::Execute(Database *db) +SqlTransaction::~SqlTransaction() +{ + while(!m_queue.empty()) + { + delete [] (const_cast(m_queue.back())); + m_queue.pop_back(); + } +} + +void SqlTransaction::Execute(SqlConnection *conn) { - ACE_Guard _lock(m_Mutex); if(m_queue.empty()) return; - db->DirectExecute("START TRANSACTION"); - while(!m_queue.empty()) + LOCK_DB_CONN(conn); + + conn->BeginTransaction(); + + const int nItems = m_queue.size(); + for (int i = 0; i < nItems; ++i) { - char *sql = const_cast(m_queue.front()); - m_queue.pop(); + const char *sql = m_queue[i]; - if(!db->DirectExecute(sql)) + if(!conn->Execute(sql)) { - delete [] sql; - - db->DirectExecute("ROLLBACK"); - while(!m_queue.empty()) - { - delete [] (const_cast(m_queue.front())); - m_queue.pop(); - } - + conn->RollbackTransaction(); return; } - - delete [] sql; } - db->DirectExecute("COMMIT"); + conn->CommitTransaction(); } /// ---- ASYNC QUERIES ---- -void SqlQuery::Execute(Database *db) +void SqlQuery::Execute(SqlConnection *conn) { if(!m_callback || !m_queue) return; + + LOCK_DB_CONN(conn); /// execute the query and store the result in the callback - m_callback->SetResult(db->Query(m_sql)); + m_callback->SetResult(conn->Query(m_sql)); /// add the callback to the sql result queue of the thread it originated from m_queue->add(m_callback); } @@ -184,19 +190,19 @@ void SqlQueryHolder::SetSize(size_t size) m_queries.resize(size); } -void SqlQueryHolderEx::Execute(Database *db) +void SqlQueryHolderEx::Execute(SqlConnection *conn) { if(!m_holder || !m_callback || !m_queue) return; + LOCK_DB_CONN(conn); /// we can do this, we are friends std::vector &queries = m_holder->m_queries; - for(size_t i = 0; i < queries.size(); i++) { /// execute all queries in the holder and pass the results char const *sql = queries[i].first; - if(sql) m_holder->SetResult(i, db->Query(sql)); + if(sql) m_holder->SetResult(i, conn->Query(sql)); } /// sync with the caller thread diff --git a/src/shared/Database/SqlOperations.h b/src/shared/Database/SqlOperations.h index e3ae0213e..ac53e5df5 100644 --- a/src/shared/Database/SqlOperations.h +++ b/src/shared/Database/SqlOperations.h @@ -29,13 +29,14 @@ /// ---- BASE --- class Database; +class SqlConnection; class SqlDelayThread; class SqlOperation { public: virtual void OnRemove() { delete this; } - virtual void Execute(Database *db) = 0; + virtual void Execute(SqlConnection *conn) = 0; virtual ~SqlOperation() {} }; @@ -48,26 +49,25 @@ class SqlStatement : public SqlOperation public: SqlStatement(const char *sql) : m_sql(mangos_strdup(sql)){} ~SqlStatement() { char* tofree = const_cast(m_sql); delete [] tofree; } - void Execute(Database *db); + void Execute(SqlConnection *conn); }; class SqlTransaction : public SqlOperation { private: - std::queue m_queue; - ACE_Thread_Mutex m_Mutex; + std::vector m_queue; + public: SqlTransaction() {} + ~SqlTransaction(); + void DelayExecute(const char *sql) { char* _sql = mangos_strdup(sql); - if (_sql) - { - ACE_Guard _lock(m_Mutex); - m_queue.push(_sql); - } + m_queue.push_back(_sql); } - void Execute(Database *db); + + void Execute(SqlConnection *conn); }; /// ---- ASYNC QUERIES ---- @@ -95,7 +95,7 @@ class SqlQuery : public SqlOperation SqlQuery(const char *sql, MaNGOS::IQueryCallback * callback, SqlResultQueue * queue) : m_sql(mangos_strdup(sql)), m_callback(callback), m_queue(queue) {} ~SqlQuery() { char* tofree = const_cast(m_sql); delete [] tofree; } - void Execute(Database *db); + void Execute(SqlConnection *conn); }; class SqlQueryHolder @@ -124,6 +124,6 @@ class SqlQueryHolderEx : public SqlOperation public: SqlQueryHolderEx(SqlQueryHolder *holder, MaNGOS::IQueryCallback * callback, SqlResultQueue * queue) : m_holder(holder), m_callback(callback), m_queue(queue) {} - void Execute(Database *db); + void Execute(SqlConnection *conn); }; #endif //__SQLOPERATIONS_H diff --git a/src/shared/revision_nr.h b/src/shared/revision_nr.h index 01c70eaca..39a3de96c 100644 --- a/src/shared/revision_nr.h +++ b/src/shared/revision_nr.h @@ -1,4 +1,4 @@ #ifndef __REVISION_NR_H__ #define __REVISION_NR_H__ - #define REVISION_NR "11044" + #define REVISION_NR "11045" #endif // __REVISION_NR_H__