[11045] Rewrite internals of DB layer. Simplify code and use less locking. Spawn and use separate connections for sync and async DB requests. Implement database connection pool for SELECT queries. Up to maximum 16 connections supported. Disable 'autocommit' mode for MySQL.

UPDATE YOUR CONFIGS!

Defaults:
LoginDatabaseConnections = 1
WorldDatabaseConnections = 1
CharacterDatabaseConnections = 1

If you are not using <mtmaps> patch do not change the default settings - this is useless. You can try following option in your MySQL config to squeeze even more performance from your DB:

[mysqld]
transaction-isolation = READ-COMMITTED

Great thanks to Undergarun, kero99 and selector for making tests and providing very useful feedback and DB statistics! Have fun :)

Signed-off-by: Ambal <pogrebniak@gala.net>
This commit is contained in:
Ambal 2011-01-19 22:04:54 +02:00
parent 9bc37afa28
commit 631ce36680
19 changed files with 655 additions and 547 deletions

View file

@ -949,8 +949,10 @@ void WorldSession::HandleChangePlayerNameOpcodeCallBack(QueryResult *result, uin
delete result; delete result;
CharacterDatabase.BeginTransaction();
CharacterDatabase.PExecute("UPDATE characters set name = '%s', at_login = at_login & ~ %u WHERE guid ='%u'", newname.c_str(), uint32(AT_LOGIN_RENAME), guidLow); CharacterDatabase.PExecute("UPDATE characters set name = '%s', at_login = at_login & ~ %u WHERE guid ='%u'", newname.c_str(), uint32(AT_LOGIN_RENAME), guidLow);
CharacterDatabase.PExecute("DELETE FROM character_declinedname WHERE guid ='%u'", guidLow); CharacterDatabase.PExecute("DELETE FROM character_declinedname WHERE guid ='%u'", guidLow);
CharacterDatabase.CommitTransaction();
sLog.outChar("Account: %d (IP: %s) Character:[%s] (guid:%u) Changed name to: %s", session->GetAccountId(), session->GetRemoteAddress().c_str(), oldname.c_str(), guidLow, newname.c_str()); sLog.outChar("Account: %d (IP: %s) Character:[%s] (guid:%u) Changed name to: %s", session->GetAccountId(), session->GetRemoteAddress().c_str(), oldname.c_str(), guidLow, newname.c_str());

View file

@ -6797,9 +6797,12 @@ void ObjectMgr::LoadWeatherZoneChances()
void ObjectMgr::SaveCreatureRespawnTime(uint32 loguid, uint32 instance, time_t t) void ObjectMgr::SaveCreatureRespawnTime(uint32 loguid, uint32 instance, time_t t)
{ {
mCreatureRespawnTimes[MAKE_PAIR64(loguid,instance)] = t; mCreatureRespawnTimes[MAKE_PAIR64(loguid,instance)] = t;
CharacterDatabase.BeginTransaction();
CharacterDatabase.PExecute("DELETE FROM creature_respawn WHERE guid = '%u' AND instance = '%u'", loguid, instance); CharacterDatabase.PExecute("DELETE FROM creature_respawn WHERE guid = '%u' AND instance = '%u'", loguid, instance);
if(t) if(t)
CharacterDatabase.PExecute("INSERT INTO creature_respawn VALUES ( '%u', '" UI64FMTD "', '%u' )", loguid, uint64(t), instance); CharacterDatabase.PExecute("INSERT INTO creature_respawn VALUES ( '%u', '" UI64FMTD "', '%u' )", loguid, uint64(t), instance);
CharacterDatabase.CommitTransaction();
} }
void ObjectMgr::DeleteCreatureData(uint32 guid) void ObjectMgr::DeleteCreatureData(uint32 guid)
@ -6815,9 +6818,12 @@ void ObjectMgr::DeleteCreatureData(uint32 guid)
void ObjectMgr::SaveGORespawnTime(uint32 loguid, uint32 instance, time_t t) void ObjectMgr::SaveGORespawnTime(uint32 loguid, uint32 instance, time_t t)
{ {
mGORespawnTimes[MAKE_PAIR64(loguid,instance)] = t; mGORespawnTimes[MAKE_PAIR64(loguid,instance)] = t;
CharacterDatabase.BeginTransaction();
CharacterDatabase.PExecute("DELETE FROM gameobject_respawn WHERE guid = '%u' AND instance = '%u'", loguid, instance); CharacterDatabase.PExecute("DELETE FROM gameobject_respawn WHERE guid = '%u' AND instance = '%u'", loguid, instance);
if(t) if(t)
CharacterDatabase.PExecute("INSERT INTO gameobject_respawn VALUES ( '%u', '" UI64FMTD "', '%u' )", loguid, uint64(t), instance); CharacterDatabase.PExecute("INSERT INTO gameobject_respawn VALUES ( '%u', '" UI64FMTD "', '%u' )", loguid, uint64(t), instance);
CharacterDatabase.CommitTransaction();
} }
void ObjectMgr::DeleteRespawnTimeForInstance(uint32 instance) void ObjectMgr::DeleteRespawnTimeForInstance(uint32 instance)
@ -6842,8 +6848,10 @@ void ObjectMgr::DeleteRespawnTimeForInstance(uint32 instance)
mCreatureRespawnTimes.erase(itr); mCreatureRespawnTimes.erase(itr);
} }
CharacterDatabase.BeginTransaction();
CharacterDatabase.PExecute("DELETE FROM creature_respawn WHERE instance = '%u'", instance); CharacterDatabase.PExecute("DELETE FROM creature_respawn WHERE instance = '%u'", instance);
CharacterDatabase.PExecute("DELETE FROM gameobject_respawn WHERE instance = '%u'", instance); CharacterDatabase.PExecute("DELETE FROM gameobject_respawn WHERE instance = '%u'", instance);
CharacterDatabase.CommitTransaction();
} }
void ObjectMgr::DeleteGOData(uint32 guid) void ObjectMgr::DeleteGOData(uint32 guid)

View file

@ -89,7 +89,6 @@ World::World()
m_startTime=m_gameTime; m_startTime=m_gameTime;
m_maxActiveSessionCount = 0; m_maxActiveSessionCount = 0;
m_maxQueuedSessionCount = 0; m_maxQueuedSessionCount = 0;
m_resultQueue = NULL;
m_NextDailyQuestReset = 0; m_NextDailyQuestReset = 0;
m_NextWeeklyQuestReset = 0; m_NextWeeklyQuestReset = 0;
m_scheduledScripts = 0; m_scheduledScripts = 0;
@ -133,8 +132,6 @@ World::~World()
VMAP::VMapFactory::clear(); VMAP::VMapFactory::clear();
if(m_resultQueue) delete m_resultQueue;
//TODO free addSessQueue //TODO free addSessQueue
} }
@ -1938,13 +1935,14 @@ void World::ProcessCliCommands()
void World::InitResultQueue() void World::InitResultQueue()
{ {
m_resultQueue = new SqlResultQueue;
CharacterDatabase.SetResultQueue(m_resultQueue);
} }
void World::UpdateResultQueue() void World::UpdateResultQueue()
{ {
m_resultQueue->Update(); //process async result queues
CharacterDatabase.ProcessResultQueue();
WorldDatabase.ProcessResultQueue();
LoginDatabase.ProcessResultQueue();
} }
void World::UpdateRealmCharCount(uint32 accountId) void World::UpdateRealmCharCount(uint32 accountId)
@ -1960,8 +1958,11 @@ void World::_UpdateRealmCharCount(QueryResult *resultCharCount, uint32 accountId
Field *fields = resultCharCount->Fetch(); Field *fields = resultCharCount->Fetch();
uint32 charCount = fields[0].GetUInt32(); uint32 charCount = fields[0].GetUInt32();
delete resultCharCount; delete resultCharCount;
LoginDatabase.BeginTransaction();
LoginDatabase.PExecute("DELETE FROM realmcharacters WHERE acctid= '%u' AND realmid = '%u'", accountId, realmID); LoginDatabase.PExecute("DELETE FROM realmcharacters WHERE acctid= '%u' AND realmid = '%u'", accountId, realmID);
LoginDatabase.PExecute("INSERT INTO realmcharacters (numchars, acctid, realmid) VALUES (%u, %u, %u)", charCount, accountId, realmID); LoginDatabase.PExecute("INSERT INTO realmcharacters (numchars, acctid, realmid) VALUES (%u, %u, %u)", charCount, accountId, realmID);
LoginDatabase.CommitTransaction();
} }
} }

View file

@ -657,7 +657,6 @@ class World
// CLI command holder to be thread safe // CLI command holder to be thread safe
ACE_Based::LockedQueue<CliCommandHolder*,ACE_Thread_Mutex> cliCmdQueue; ACE_Based::LockedQueue<CliCommandHolder*,ACE_Thread_Mutex> cliCmdQueue;
SqlResultQueue *m_resultQueue;
// next daily quests reset time // next daily quests reset time
time_t m_NextDailyQuestReset; time_t m_NextDailyQuestReset;

View file

@ -416,15 +416,16 @@ bool Master::_StartDB()
{ {
///- Get world database info from configuration file ///- Get world database info from configuration file
std::string dbstring = sConfig.GetStringDefault("WorldDatabaseInfo", ""); std::string dbstring = sConfig.GetStringDefault("WorldDatabaseInfo", "");
int nConnections = sConfig.GetIntDefault("WorldDatabaseConnections", 1);
if(dbstring.empty()) if(dbstring.empty())
{ {
sLog.outError("Database not specified in configuration file"); sLog.outError("Database not specified in configuration file");
return false; return false;
} }
sLog.outString("World Database: %s", dbstring.c_str()); sLog.outString("World Database: %s, total connections: %i", dbstring.c_str(), nConnections + 1);
///- Initialise the world database ///- Initialise the world database
if(!WorldDatabase.Initialize(dbstring.c_str())) if(!WorldDatabase.Initialize(dbstring.c_str(), nConnections))
{ {
sLog.outError("Cannot connect to world database %s",dbstring.c_str()); sLog.outError("Cannot connect to world database %s",dbstring.c_str());
return false; return false;
@ -438,6 +439,7 @@ bool Master::_StartDB()
} }
dbstring = sConfig.GetStringDefault("CharacterDatabaseInfo", ""); dbstring = sConfig.GetStringDefault("CharacterDatabaseInfo", "");
nConnections = sConfig.GetIntDefault("CharacterDatabaseConnections", 1);
if(dbstring.empty()) if(dbstring.empty())
{ {
sLog.outError("Character Database not specified in configuration file"); sLog.outError("Character Database not specified in configuration file");
@ -446,10 +448,10 @@ bool Master::_StartDB()
WorldDatabase.HaltDelayThread(); WorldDatabase.HaltDelayThread();
return false; return false;
} }
sLog.outString("Character Database: %s", dbstring.c_str()); sLog.outString("Character Database: %s, total connections: %i", dbstring.c_str(), nConnections + 1);
///- Initialise the Character database ///- Initialise the Character database
if(!CharacterDatabase.Initialize(dbstring.c_str())) if(!CharacterDatabase.Initialize(dbstring.c_str(), nConnections))
{ {
sLog.outError("Cannot connect to Character database %s",dbstring.c_str()); sLog.outError("Cannot connect to Character database %s",dbstring.c_str());
@ -468,6 +470,7 @@ bool Master::_StartDB()
///- Get login database info from configuration file ///- Get login database info from configuration file
dbstring = sConfig.GetStringDefault("LoginDatabaseInfo", ""); dbstring = sConfig.GetStringDefault("LoginDatabaseInfo", "");
nConnections = sConfig.GetIntDefault("LoginDatabaseConnections", 1);
if(dbstring.empty()) if(dbstring.empty())
{ {
sLog.outError("Login database not specified in configuration file"); sLog.outError("Login database not specified in configuration file");
@ -479,8 +482,8 @@ bool Master::_StartDB()
} }
///- Initialise the login database ///- Initialise the login database
sLog.outString("Login Database: %s", dbstring.c_str() ); sLog.outString("Login Database: %s, total connections: %i", dbstring.c_str(), nConnections + 1);
if(!LoginDatabase.Initialize(dbstring.c_str())) if(!LoginDatabase.Initialize(dbstring.c_str(), nConnections))
{ {
sLog.outError("Cannot connect to login database %s",dbstring.c_str()); sLog.outError("Cannot connect to login database %s",dbstring.c_str());

View file

@ -37,6 +37,14 @@ ConfVersion=2010100901
# hostname;port;username;password;database # hostname;port;username;password;database
# .;/path/to/unix_socket/DIRECTORY or . for default path;username;password;database - use Unix sockets at Unix/Linux # .;/path/to/unix_socket/DIRECTORY or . for default path;username;password;database - use Unix sockets at Unix/Linux
# #
# LoginDatabaseConnections
# WorldDatabaseConnections
# CharacterDatabaseConnections
# Amount of connections to database which will be used for SELECT queries. Maximum 16 connections per database.
# Please, note, for data consistency only one connection for each database is used for transactions and async SELECTs.
# So formula to find out how many connections will be established: X = ¹_connections + 1
# Default: 1 connection for SELECT statements
#
# MaxPingTime # MaxPingTime
# Settings for maximum database-ping interval (minutes between pings) # Settings for maximum database-ping interval (minutes between pings)
# #
@ -57,6 +65,9 @@ LogsDir = ""
LoginDatabaseInfo = "127.0.0.1;3306;mangos;mangos;realmd" LoginDatabaseInfo = "127.0.0.1;3306;mangos;mangos;realmd"
WorldDatabaseInfo = "127.0.0.1;3306;mangos;mangos;mangos" WorldDatabaseInfo = "127.0.0.1;3306;mangos;mangos;mangos"
CharacterDatabaseInfo = "127.0.0.1;3306;mangos;mangos;characters" CharacterDatabaseInfo = "127.0.0.1;3306;mangos;mangos;characters"
LoginDatabaseConnections = 1
WorldDatabaseConnections = 1
CharacterDatabaseConnections = 1
MaxPingTime = 30 MaxPingTime = 30
WorldServerPort = 8085 WorldServerPort = 8085
BindIP = "0.0.0.0" BindIP = "0.0.0.0"

View file

@ -302,7 +302,7 @@ extern int main(int argc, char **argv)
{ {
loopCounter = 0; loopCounter = 0;
DETAIL_LOG("Ping MySQL to keep connection alive"); DETAIL_LOG("Ping MySQL to keep connection alive");
delete LoginDatabase.Query("SELECT 1 FROM realmlist LIMIT 1"); LoginDatabase.Ping();
} }
#ifdef WIN32 #ifdef WIN32
if (m_ServiceStatus == 0) stopEvent = true; if (m_ServiceStatus == 0) stopEvent = true;

View file

@ -18,19 +18,29 @@
#include "DatabaseEnv.h" #include "DatabaseEnv.h"
#include "Config/Config.h" #include "Config/Config.h"
#include "Database/SqlOperations.h"
#include <ctime> #include <ctime>
#include <iostream> #include <iostream>
#include <fstream> #include <fstream>
#define MIN_CONNECTION_POOL_SIZE 1
#define MAX_CONNECTION_POOL_SIZE 16
Database::~Database() Database::~Database()
{ {
HaltDelayThread();
/*Delete objects*/ /*Delete objects*/
delete m_pResultQueue;
delete m_pAsyncConn;
for (int i = 0; i < m_pQueryConnections.size(); ++i)
delete m_pQueryConnections[i];
} }
bool Database::Initialize(const char *) bool Database::Initialize(const char * infoString, int nConns /*= 1*/)
{ {
// Enable logging of SQL commands (usally only GM commands) // Enable logging of SQL commands (usually only GM commands)
// (See method: PExecuteLog) // (See method: PExecuteLog)
m_logSQL = sConfig.GetBoolDefault("LogSQL", false); m_logSQL = sConfig.GetBoolDefault("LogSQL", false);
m_logsDir = sConfig.GetStringDefault("LogsDir",""); m_logsDir = sConfig.GetStringDefault("LogsDir","");
@ -41,9 +51,74 @@ bool Database::Initialize(const char *)
} }
m_pingIntervallms = sConfig.GetIntDefault ("MaxPingTime", 30) * (MINUTE * 1000); m_pingIntervallms = sConfig.GetIntDefault ("MaxPingTime", 30) * (MINUTE * 1000);
//create DB connections
//setup connection pool size
if(nConns < MIN_CONNECTION_POOL_SIZE)
m_nQueryConnPoolSize = MIN_CONNECTION_POOL_SIZE;
else if(nConns > MAX_CONNECTION_POOL_SIZE)
m_nQueryConnPoolSize = MAX_CONNECTION_POOL_SIZE;
else
m_nQueryConnPoolSize = nConns;
//create connection pool for sync requests
for (int i = 0; i < m_nQueryConnPoolSize; ++i)
{
SqlConnection * pConn = CreateConnection();
if(!pConn->Initialize(infoString))
{
delete pConn;
return false;
}
m_pQueryConnections.push_back(pConn);
}
//create and initialize connection for async requests
m_pAsyncConn = CreateConnection();
if(!m_pAsyncConn->Initialize(infoString))
return false;
InitDelayThread();
return true; return true;
} }
SqlDelayThread * Database::CreateDelayThread()
{
assert(m_pAsyncConn);
return new SqlDelayThread(this, m_pAsyncConn);
}
void Database::InitDelayThread()
{
assert(!m_delayThread);
m_pResultQueue = new SqlResultQueue;
//New delay thread for delay execute
m_threadBody = CreateDelayThread(); // will deleted at m_delayThread delete
m_delayThread = new ACE_Based::Thread(m_threadBody);
}
void Database::HaltDelayThread()
{
if (!m_threadBody || !m_delayThread) return;
m_threadBody->Stop(); //Stop event
m_delayThread->wait(); //Wait for flush to DB
delete m_delayThread; //This also deletes m_threadBody
m_delayThread = NULL;
m_threadBody = NULL;
//stop async result queue
if(m_pResultQueue)
{
m_pResultQueue->Update();
delete m_pResultQueue;
m_pResultQueue = NULL;
}
}
void Database::ThreadStart() void Database::ThreadStart()
{ {
} }
@ -52,17 +127,52 @@ void Database::ThreadEnd()
{ {
} }
void Database::ProcessResultQueue()
{
if(m_pResultQueue)
m_pResultQueue->Update();
}
void Database::escape_string(std::string& str) void Database::escape_string(std::string& str)
{ {
if(str.empty()) if(str.empty())
return; return;
char* buf = new char[str.size()*2+1]; char* buf = new char[str.size()*2+1];
escape_string(buf,str.c_str(),str.size()); //we don't care what connection to use - escape string will be the same
m_pQueryConnections[0]->escape_string(buf,str.c_str(),str.size());
str = buf; str = buf;
delete[] buf; delete[] buf;
} }
SqlConnection * Database::getQueryConnection()
{
int nCount = 0;
if(m_nQueryCounter == long(1 << 31))
m_nQueryCounter = 0;
else
nCount = ++m_nQueryCounter;
return m_pQueryConnections[nCount % m_nQueryConnPoolSize];
}
void Database::Ping()
{
const char * sql = "SELECT 1";
{
SqlConnection::Lock guard(m_pAsyncConn);
delete guard->Query(sql);
}
for (int i = 0; i < m_nQueryConnPoolSize; ++i)
{
SqlConnection::Lock guard(m_pQueryConnections[i]);
delete guard->Query(sql);
}
}
bool Database::PExecuteLog(const char * format,...) bool Database::PExecuteLog(const char * format,...)
{ {
if (!format) if (!format)
@ -107,12 +217,6 @@ bool Database::PExecuteLog(const char * format,...)
return Execute(szQuery); return Execute(szQuery);
} }
void Database::SetResultQueue(SqlResultQueue * queue)
{
m_queryQueues[ACE_Based::Thread::current()] = queue;
}
QueryResult* Database::PQuery(const char *format,...) QueryResult* Database::PQuery(const char *format,...)
{ {
if(!format) return NULL; if(!format) return NULL;
@ -151,6 +255,29 @@ QueryNamedResult* Database::PQueryNamed(const char *format,...)
return QueryNamed(szQuery); return QueryNamed(szQuery);
} }
bool Database::Execute(const char *sql)
{
if (!m_pAsyncConn)
return false;
SqlTransaction * pTrans = m_TransStorage->get();
if(pTrans)
{
//add SQL request to trans queue
pTrans->DelayExecute(sql);
}
else
{
// Simple sql statement
pTrans = new SqlTransaction;
pTrans->DelayExecute(sql);
m_threadBody->Delay(pTrans);
}
return true;
}
bool Database::PExecute(const char * format,...) bool Database::PExecute(const char * format,...)
{ {
if (!format) if (!format)
@ -171,6 +298,18 @@ bool Database::PExecute(const char * format,...)
return Execute(szQuery); return Execute(szQuery);
} }
bool Database::DirectExecute(const char* sql)
{
if(!m_pAsyncConn)
return false;
SqlTransaction trans;
trans.DelayExecute(sql);
trans.Execute(m_pAsyncConn);
return true;
}
bool Database::DirectPExecute(const char * format,...) bool Database::DirectPExecute(const char * format,...)
{ {
if (!format) if (!format)
@ -191,6 +330,62 @@ bool Database::DirectPExecute(const char * format,...)
return DirectExecute(szQuery); return DirectExecute(szQuery);
} }
bool Database::BeginTransaction()
{
if (!m_pAsyncConn)
return false;
//initiate transaction on current thread
//currently we do not support queued transactions
m_TransStorage->init();
return true;
}
bool Database::CommitTransaction()
{
if (!m_pAsyncConn)
return false;
//check if we have pending transaction
if(!m_TransStorage->get())
return false;
//add SqlTransaction to the async queue
m_threadBody->Delay(m_TransStorage->detach());
return true;
}
bool Database::CommitTransactionDirect()
{
if (!m_pAsyncConn)
return false;
//check if we have pending transaction
if(!m_TransStorage->get())
return false;
//directly execute SqlTransaction
SqlTransaction * pTrans = m_TransStorage->detach();
pTrans->Execute(m_pAsyncConn);
delete pTrans;
return true;
}
bool Database::RollbackTransaction()
{
if (!m_pAsyncConn)
return false;
if(!m_TransStorage->get())
return false;
//remove scheduled transaction
m_TransStorage->reset();
return true;
}
bool Database::CheckRequiredField( char const* table_name, char const* required_name ) bool Database::CheckRequiredField( char const* table_name, char const* required_name )
{ {
// check required field // check required field
@ -278,3 +473,31 @@ bool Database::CheckRequiredField( char const* table_name, char const* required_
return false; return false;
} }
Database::TransHelper::~TransHelper()
{
reset();
}
SqlTransaction * Database::TransHelper::init()
{
MANGOS_ASSERT(!m_pTrans); //if we will get a nested transaction request - we MUST fix code!!!
m_pTrans = new SqlTransaction;
return m_pTrans;
}
SqlTransaction * Database::TransHelper::detach()
{
SqlTransaction * pRes = m_pTrans;
m_pTrans = NULL;
return pRes;
}
void Database::TransHelper::reset()
{
if(m_pTrans)
{
delete m_pTrans;
m_pTrans = NULL;
}
}

View file

@ -22,39 +22,87 @@
#include "Threading.h" #include "Threading.h"
#include "Utilities/UnorderedMapSet.h" #include "Utilities/UnorderedMapSet.h"
#include "Database/SqlDelayThread.h" #include "Database/SqlDelayThread.h"
#include <ace/Recursive_Thread_Mutex.h>
#include "Policies/ThreadingModel.h"
#include <ace/TSS_T.h>
#include <ace/Atomic_Op.h>
class SqlTransaction; class SqlTransaction;
class SqlResultQueue; class SqlResultQueue;
class SqlQueryHolder; class SqlQueryHolder;
typedef UNORDERED_MAP<ACE_Based::Thread* , SqlTransaction*> TransactionQueues;
typedef UNORDERED_MAP<ACE_Based::Thread* , SqlResultQueue*> QueryQueues;
#define MAX_QUERY_LEN 32*1024 #define MAX_QUERY_LEN 32*1024
//
class MANGOS_DLL_SPEC SqlConnection
{
public:
virtual ~SqlConnection() {}
//method for initializing DB connection
virtual bool Initialize(const char *infoString) = 0;
//public methods for making queries
virtual QueryResult* Query(const char *sql) = 0;
virtual QueryNamedResult* QueryNamed(const char *sql) = 0;
//public methods for making requests
virtual bool Execute(const char *sql) = 0;
//escape string generation
virtual unsigned long escape_string(char *to, const char *from, unsigned long length) { strncpy(to,from,length); return length; }
// nothing do if DB not support transactions
virtual bool BeginTransaction() { return true; }
virtual bool CommitTransaction() { return true; }
// can't rollback without transaction support
virtual bool RollbackTransaction() { return true; }
//SqlConnection object lock
class Lock
{
public:
Lock(SqlConnection * conn) : m_pConn(conn) { m_pConn->m_mutex.acquire(); }
~Lock() { m_pConn->m_mutex.release(); }
SqlConnection * operator->() const { return m_pConn; }
private:
SqlConnection * const m_pConn;
};
private:
typedef ACE_Recursive_Thread_Mutex LOCK_TYPE;
LOCK_TYPE m_mutex;
};
class MANGOS_DLL_SPEC Database class MANGOS_DLL_SPEC Database
{ {
protected:
Database() : m_threadBody(NULL), m_delayThread(NULL) {};
TransactionQueues m_tranQueues; ///< Transaction queues from diff. threads
QueryQueues m_queryQueues; ///< Query queues from diff threads
SqlDelayThread* m_threadBody; ///< Pointer to delay sql executer (owned by m_delayThread)
ACE_Based::Thread* m_delayThread; ///< Pointer to executer thread
public: public:
virtual ~Database(); virtual ~Database();
virtual bool Initialize(const char *infoString); virtual bool Initialize(const char *infoString, int nConns = 1);
virtual void InitDelayThread() = 0; virtual void InitDelayThread();
virtual void HaltDelayThread() = 0; virtual void HaltDelayThread();
/// Synchronous DB queries
inline QueryResult* Query(const char *sql)
{
SqlConnection::Lock guard(getQueryConnection());
return guard->Query(sql);
}
inline QueryNamedResult* QueryNamed(const char *sql)
{
SqlConnection::Lock guard(getQueryConnection());
return guard->QueryNamed(sql);
}
virtual QueryResult* Query(const char *sql) = 0;
QueryResult* PQuery(const char *format,...) ATTR_PRINTF(2,3); QueryResult* PQuery(const char *format,...) ATTR_PRINTF(2,3);
virtual QueryNamedResult* QueryNamed(const char *sql) = 0;
QueryNamedResult* PQueryNamed(const char *format,...) ATTR_PRINTF(2,3); QueryNamedResult* PQueryNamed(const char *format,...) ATTR_PRINTF(2,3);
bool DirectExecute(const char* sql);
bool DirectPExecute(const char *format,...) ATTR_PRINTF(2,3);
/// Async queries and query holders, implemented in DatabaseImpl.h /// Async queries and query holders, implemented in DatabaseImpl.h
// Query / member // Query / member
@ -95,30 +143,21 @@ class MANGOS_DLL_SPEC Database
template<class Class, typename ParamType1> template<class Class, typename ParamType1>
bool DelayQueryHolder(Class *object, void (Class::*method)(QueryResult*, SqlQueryHolder*, ParamType1), SqlQueryHolder *holder, ParamType1 param1); bool DelayQueryHolder(Class *object, void (Class::*method)(QueryResult*, SqlQueryHolder*, ParamType1), SqlQueryHolder *holder, ParamType1 param1);
virtual bool Execute(const char *sql) = 0; bool Execute(const char *sql);
bool PExecute(const char *format,...) ATTR_PRINTF(2,3); bool PExecute(const char *format,...) ATTR_PRINTF(2,3);
virtual bool DirectExecute(const char* sql) = 0;
bool DirectPExecute(const char *format,...) ATTR_PRINTF(2,3);
// Writes SQL commands to a LOG file (see mangosd.conf "LogSQL") // Writes SQL commands to a LOG file (see mangosd.conf "LogSQL")
bool PExecuteLog(const char *format,...) ATTR_PRINTF(2,3); bool PExecuteLog(const char *format,...) ATTR_PRINTF(2,3);
virtual bool BeginTransaction() // nothing do if DB not support transactions bool BeginTransaction();
{ bool CommitTransaction();
return true; bool RollbackTransaction();
} //for sync transaction execution
virtual bool CommitTransaction() // nothing do if DB not support transactions bool CommitTransactionDirect();
{
return true;
}
virtual bool RollbackTransaction() // can't rollback without transaction support
{
return false;
}
virtual operator bool () const = 0; operator bool () const { return m_pQueryConnections.size() && m_pAsyncConn != 0; }
virtual unsigned long escape_string(char *to, const char *from, unsigned long length) { strncpy(to,from,length); return length; } //escape string generation
void escape_string(std::string& str); void escape_string(std::string& str);
// must be called before first query in thread (one time for thread using one from existing Database objects) // must be called before first query in thread (one time for thread using one from existing Database objects)
@ -126,13 +165,76 @@ class MANGOS_DLL_SPEC Database
// must be called before finish thread run (one time for thread using one from existing Database objects) // must be called before finish thread run (one time for thread using one from existing Database objects)
virtual void ThreadEnd(); virtual void ThreadEnd();
// sets the result queue of the current thread, be careful what thread you call this from // set database-wide result queue. also we should use object-bases and not thread-based result queues
void SetResultQueue(SqlResultQueue * queue); void ProcessResultQueue();
bool CheckRequiredField(char const* table_name, char const* required_name); bool CheckRequiredField(char const* table_name, char const* required_name);
uint32 GetPingIntervall() { return m_pingIntervallms;} uint32 GetPingIntervall() { return m_pingIntervallms; }
//function to ping database connections
void Ping();
protected:
Database() : m_pAsyncConn(NULL), m_pResultQueue(NULL), m_threadBody(NULL), m_delayThread(NULL),
m_logSQL(false), m_pingIntervallms(0), m_nQueryConnPoolSize(1)
{
m_nQueryCounter = -1;
}
//factory method to create SqlConnection objects
virtual SqlConnection * CreateConnection() = 0;
//factory method to create SqlDelayThread objects
virtual SqlDelayThread * CreateDelayThread();
class TransHelper
{
public:
TransHelper() : m_pTrans(NULL) {}
~TransHelper();
//initializes new SqlTransaction object
SqlTransaction * init();
//gets pointer on current transaction object. Returns NULL if transaction was not initiated
SqlTransaction * get() const { return m_pTrans; }
//detaches SqlTransaction object allocated by init() function
//next call to get() function will return NULL!
//do not forget to destroy obtained SqlTransaction object!
SqlTransaction * detach();
//destroyes SqlTransaction allocated by init() function
void reset();
private:
SqlTransaction * m_pTrans;
};
//per-thread based storage for SqlTransaction object initialization - no locking is required
typedef ACE_TSS<Database::TransHelper> DBTransHelperTSS;
Database::DBTransHelperTSS m_TransStorage;
///< DB connections
//round-robin connection selection
SqlConnection * getQueryConnection();
//for now return one single connection for async requests
SqlConnection * getAsyncConnection() const { return m_pAsyncConn; }
//connection helper counters
int m_nQueryConnPoolSize; //current size of query connection pool
ACE_Atomic_Op<ACE_Thread_Mutex, long> m_nQueryCounter; //counter for connection selection
//lets use pool of connections for sync queries
typedef std::vector< SqlConnection * > SqlConnectionContainer;
SqlConnectionContainer m_pQueryConnections;
//only one single DB connection for transactions
SqlConnection * m_pAsyncConn;
SqlResultQueue * m_pResultQueue; ///< Transaction queues from diff. threads
SqlDelayThread * m_threadBody; ///< Pointer to delay sql executer (owned by m_delayThread)
ACE_Based::Thread * m_delayThread; ///< Pointer to executer thread
private: private:
bool m_logSQL; bool m_logSQL;
std::string m_logsDir; std::string m_logsDir;
uint32 m_pingIntervallms; uint32 m_pingIntervallms;

View file

@ -21,16 +21,8 @@
/// Function body definitions for the template function members of the Database class /// Function body definitions for the template function members of the Database class
#define ASYNC_QUERY_BODY(sql, queue_itr) \ #define ASYNC_QUERY_BODY(sql) if (!sql || !m_pResultQueue) return false;
if (!sql) return false; \ #define ASYNC_DELAYHOLDER_BODY(holder) if (!holder || !m_pResultQueue) return false;
\
QueryQueues::iterator queue_itr; \
\
{ \
ACE_Based::Thread * queryThread = ACE_Based::Thread::current(); \
queue_itr = m_queryQueues.find(queryThread); \
if (queue_itr == m_queryQueues.end()) return false; \
}
#define ASYNC_PQUERY_BODY(format, szQuery) \ #define ASYNC_PQUERY_BODY(format, szQuery) \
if(!format) return false; \ if(!format) return false; \
@ -51,49 +43,38 @@
} \ } \
} }
#define ASYNC_DELAYHOLDER_BODY(holder, queue_itr) \
if (!holder) return false; \
\
QueryQueues::iterator queue_itr; \
\
{ \
ACE_Based::Thread * queryThread = ACE_Based::Thread::current(); \
queue_itr = m_queryQueues.find(queryThread); \
if (queue_itr == m_queryQueues.end()) return false; \
}
// -- Query / member -- // -- Query / member --
template<class Class> template<class Class>
bool bool
Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*), const char *sql) Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*), const char *sql)
{ {
ASYNC_QUERY_BODY(sql, itr) ASYNC_QUERY_BODY(sql)
return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback<Class>(object, method), itr->second)); return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback<Class>(object, method), m_pResultQueue));
} }
template<class Class, typename ParamType1> template<class Class, typename ParamType1>
bool bool
Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*, ParamType1), ParamType1 param1, const char *sql) Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*, ParamType1), ParamType1 param1, const char *sql)
{ {
ASYNC_QUERY_BODY(sql, itr) ASYNC_QUERY_BODY(sql)
return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback<Class, ParamType1>(object, method, (QueryResult*)NULL, param1), itr->second)); return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback<Class, ParamType1>(object, method, (QueryResult*)NULL, param1), m_pResultQueue));
} }
template<class Class, typename ParamType1, typename ParamType2> template<class Class, typename ParamType1, typename ParamType2>
bool bool
Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*, ParamType1, ParamType2), ParamType1 param1, ParamType2 param2, const char *sql) Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*, ParamType1, ParamType2), ParamType1 param1, ParamType2 param2, const char *sql)
{ {
ASYNC_QUERY_BODY(sql, itr) ASYNC_QUERY_BODY(sql)
return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback<Class, ParamType1, ParamType2>(object, method, (QueryResult*)NULL, param1, param2), itr->second)); return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback<Class, ParamType1, ParamType2>(object, method, (QueryResult*)NULL, param1, param2), m_pResultQueue));
} }
template<class Class, typename ParamType1, typename ParamType2, typename ParamType3> template<class Class, typename ParamType1, typename ParamType2, typename ParamType3>
bool bool
Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*, ParamType1, ParamType2, ParamType3), ParamType1 param1, ParamType2 param2, ParamType3 param3, const char *sql) Database::AsyncQuery(Class *object, void (Class::*method)(QueryResult*, ParamType1, ParamType2, ParamType3), ParamType1 param1, ParamType2 param2, ParamType3 param3, const char *sql)
{ {
ASYNC_QUERY_BODY(sql, itr) ASYNC_QUERY_BODY(sql)
return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback<Class, ParamType1, ParamType2, ParamType3>(object, method, (QueryResult*)NULL, param1, param2, param3), itr->second)); return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::QueryCallback<Class, ParamType1, ParamType2, ParamType3>(object, method, (QueryResult*)NULL, param1, param2, param3), m_pResultQueue));
} }
// -- Query / static -- // -- Query / static --
@ -102,24 +83,24 @@ template<typename ParamType1>
bool bool
Database::AsyncQuery(void (*method)(QueryResult*, ParamType1), ParamType1 param1, const char *sql) Database::AsyncQuery(void (*method)(QueryResult*, ParamType1), ParamType1 param1, const char *sql)
{ {
ASYNC_QUERY_BODY(sql, itr) ASYNC_QUERY_BODY(sql)
return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback<ParamType1>(method, (QueryResult*)NULL, param1), itr->second)); return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback<ParamType1>(method, (QueryResult*)NULL, param1), m_pResultQueue));
} }
template<typename ParamType1, typename ParamType2> template<typename ParamType1, typename ParamType2>
bool bool
Database::AsyncQuery(void (*method)(QueryResult*, ParamType1, ParamType2), ParamType1 param1, ParamType2 param2, const char *sql) Database::AsyncQuery(void (*method)(QueryResult*, ParamType1, ParamType2), ParamType1 param1, ParamType2 param2, const char *sql)
{ {
ASYNC_QUERY_BODY(sql, itr) ASYNC_QUERY_BODY(sql)
return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback<ParamType1, ParamType2>(method, (QueryResult*)NULL, param1, param2), itr->second)); return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback<ParamType1, ParamType2>(method, (QueryResult*)NULL, param1, param2), m_pResultQueue));
} }
template<typename ParamType1, typename ParamType2, typename ParamType3> template<typename ParamType1, typename ParamType2, typename ParamType3>
bool bool
Database::AsyncQuery(void (*method)(QueryResult*, ParamType1, ParamType2, ParamType3), ParamType1 param1, ParamType2 param2, ParamType3 param3, const char *sql) Database::AsyncQuery(void (*method)(QueryResult*, ParamType1, ParamType2, ParamType3), ParamType1 param1, ParamType2 param2, ParamType3 param3, const char *sql)
{ {
ASYNC_QUERY_BODY(sql, itr) ASYNC_QUERY_BODY(sql)
return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback<ParamType1, ParamType2, ParamType3>(method, (QueryResult*)NULL, param1, param2, param3), itr->second)); return m_threadBody->Delay(new SqlQuery(sql, new MaNGOS::SQueryCallback<ParamType1, ParamType2, ParamType3>(method, (QueryResult*)NULL, param1, param2, param3), m_pResultQueue));
} }
// -- PQuery / member -- // -- PQuery / member --
@ -188,16 +169,16 @@ template<class Class>
bool bool
Database::DelayQueryHolder(Class *object, void (Class::*method)(QueryResult*, SqlQueryHolder*), SqlQueryHolder *holder) Database::DelayQueryHolder(Class *object, void (Class::*method)(QueryResult*, SqlQueryHolder*), SqlQueryHolder *holder)
{ {
ASYNC_DELAYHOLDER_BODY(holder, itr) ASYNC_DELAYHOLDER_BODY(holder)
return holder->Execute(new MaNGOS::QueryCallback<Class, SqlQueryHolder*>(object, method, (QueryResult*)NULL, holder), m_threadBody, itr->second); return holder->Execute(new MaNGOS::QueryCallback<Class, SqlQueryHolder*>(object, method, (QueryResult*)NULL, holder), m_threadBody, m_pResultQueue);
} }
template<class Class, typename ParamType1> template<class Class, typename ParamType1>
bool bool
Database::DelayQueryHolder(Class *object, void (Class::*method)(QueryResult*, SqlQueryHolder*, ParamType1), SqlQueryHolder *holder, ParamType1 param1) Database::DelayQueryHolder(Class *object, void (Class::*method)(QueryResult*, SqlQueryHolder*, ParamType1), SqlQueryHolder *holder, ParamType1 param1)
{ {
ASYNC_DELAYHOLDER_BODY(holder, itr) ASYNC_DELAYHOLDER_BODY(holder)
return holder->Execute(new MaNGOS::QueryCallback<Class, SqlQueryHolder*, ParamType1>(object, method, (QueryResult*)NULL, holder, param1), m_threadBody, itr->second); return holder->Execute(new MaNGOS::QueryCallback<Class, SqlQueryHolder*, ParamType1>(object, method, (QueryResult*)NULL, holder, param1), m_threadBody, m_pResultQueue);
} }
#undef ASYNC_QUERY_BODY #undef ASYNC_QUERY_BODY

View file

@ -23,10 +23,10 @@
#include "Platform/Define.h" #include "Platform/Define.h"
#include "Threading.h" #include "Threading.h"
#include "DatabaseEnv.h" #include "DatabaseEnv.h"
#include "Database/MySQLDelayThread.h"
#include "Database/SqlOperations.h"
#include "Timer.h" #include "Timer.h"
size_t DatabaseMysql::db_count = 0;
void DatabaseMysql::ThreadStart() void DatabaseMysql::ThreadStart()
{ {
mysql_thread_init(); mysql_thread_init();
@ -37,9 +37,7 @@ void DatabaseMysql::ThreadEnd()
mysql_thread_end(); mysql_thread_end();
} }
size_t DatabaseMysql::db_count = 0; DatabaseMysql::DatabaseMysql()
DatabaseMysql::DatabaseMysql() : Database(), mMysql(0)
{ {
// before first connection // before first connection
if( db_count++ == 0 ) if( db_count++ == 0 )
@ -61,31 +59,40 @@ DatabaseMysql::~DatabaseMysql()
if (m_delayThread) if (m_delayThread)
HaltDelayThread(); HaltDelayThread();
if (mMysql) //destroy SqlConnection objects
mysql_close(mMysql); if(m_pQueryConnections.size())
{
for (int i = 0; i < m_pQueryConnections.size(); ++i)
delete m_pQueryConnections[i];
m_pQueryConnections.clear();
}
if(m_pAsyncConn)
{
delete m_pAsyncConn;
m_pAsyncConn = NULL;
}
//Free Mysql library pointers for last ~DB //Free Mysql library pointers for last ~DB
if(--db_count == 0) if(--db_count == 0)
mysql_library_end(); mysql_library_end();
} }
bool DatabaseMysql::Initialize(const char *infoString) SqlConnection * DatabaseMysql::CreateConnection()
{
return new MySQLConnection();
}
bool MySQLConnection::Initialize(const char *infoString)
{ {
MYSQL * mysqlInit = mysql_init(NULL);
if(!Database::Initialize(infoString))
return false;
tranThread = NULL;
MYSQL *mysqlInit;
mysqlInit = mysql_init(NULL);
if (!mysqlInit) if (!mysqlInit)
{ {
sLog.outError( "Could not initialize Mysql connection" ); sLog.outError( "Could not initialize Mysql connection" );
return false; return false;
} }
InitDelayThread();
Tokens tokens = StrSplit(infoString, ";"); Tokens tokens = StrSplit(infoString, ";");
Tokens::iterator iter; Tokens::iterator iter;
@ -108,7 +115,7 @@ bool DatabaseMysql::Initialize(const char *infoString)
database = *iter++; database = *iter++;
mysql_options(mysqlInit,MYSQL_SET_CHARSET_NAME,"utf8"); mysql_options(mysqlInit,MYSQL_SET_CHARSET_NAME,"utf8");
#ifdef WIN32 #ifdef WIN32
if(host==".") // named pipe use option (Windows) if(host==".") // named pipe use option (Windows)
{ {
unsigned int opt = MYSQL_PROTOCOL_PIPE; unsigned int opt = MYSQL_PROTOCOL_PIPE;
@ -121,7 +128,7 @@ bool DatabaseMysql::Initialize(const char *infoString)
port = atoi(port_or_socket.c_str()); port = atoi(port_or_socket.c_str());
unix_socket = 0; unix_socket = 0;
} }
#else #else
if(host==".") // socket use option (Unix/Linux) if(host==".") // socket use option (Unix/Linux)
{ {
unsigned int opt = MYSQL_PROTOCOL_SOCKET; unsigned int opt = MYSQL_PROTOCOL_SOCKET;
@ -135,7 +142,7 @@ bool DatabaseMysql::Initialize(const char *infoString)
port = atoi(port_or_socket.c_str()); port = atoi(port_or_socket.c_str());
unix_socket = 0; unix_socket = 0;
} }
#endif #endif
mMysql = mysql_real_connect(mysqlInit, host.c_str(), user.c_str(), mMysql = mysql_real_connect(mysqlInit, host.c_str(), user.c_str(),
password.c_str(), database.c_str(), port, unix_socket, 0); password.c_str(), database.c_str(), port, unix_socket, 0);
@ -155,16 +162,19 @@ bool DatabaseMysql::Initialize(const char *infoString)
// This is wrong since mangos use transactions, // This is wrong since mangos use transactions,
// autocommit is turned of during it. // autocommit is turned of during it.
// Setting it to on makes atomic updates work // Setting it to on makes atomic updates work
if (!mysql_autocommit(mMysql, 1)) // ---
DETAIL_LOG("AUTOCOMMIT SUCCESSFULLY SET TO 1"); // if you want atomic updates to work - USE TRANSACTIONS!!!
// no need to mess up with autocommit mode which might degrade server performance!
if (!mysql_autocommit(mMysql, 0))
DETAIL_LOG("AUTOCOMMIT SUCCESSFULLY SET TO 0");
else else
DETAIL_LOG("AUTOCOMMIT NOT SET TO 1"); DETAIL_LOG("AUTOCOMMIT NOT SET TO 0");
/*-------------------------------------*/ /*-------------------------------------*/
// set connection properties to UTF8 to properly handle locales for different // set connection properties to UTF8 to properly handle locales for different
// server configs - core sends data in UTF8, so MySQL must expect UTF8 too // server configs - core sends data in UTF8, so MySQL must expect UTF8 too
PExecute("SET NAMES `utf8`"); Execute("SET NAMES `utf8`");
PExecute("SET CHARACTER SET `utf8`"); Execute("SET CHARACTER SET `utf8`");
return true; return true;
} }
@ -177,33 +187,27 @@ bool DatabaseMysql::Initialize(const char *infoString)
} }
} }
bool DatabaseMysql::_Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **pFields, uint64* pRowCount, uint32* pFieldCount) bool MySQLConnection::_Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **pFields, uint64* pRowCount, uint32* pFieldCount)
{ {
if (!mMysql) if (!mMysql)
return 0; return 0;
uint32 _s = WorldTimer::getMSTime();
if(mysql_query(mMysql, sql))
{ {
// guarded block for thread-safe mySQL request sLog.outErrorDb( "SQL: %s", sql );
ACE_Guard<ACE_Thread_Mutex> query_connection_guard(mMutex); sLog.outErrorDb("query ERROR: %s", mysql_error(mMysql));
return false;
uint32 _s = WorldTimer::getMSTime();
if(mysql_query(mMysql, sql))
{
sLog.outErrorDb( "SQL: %s", sql );
sLog.outErrorDb("query ERROR: %s", mysql_error(mMysql));
return false;
}
else
{
DEBUG_FILTER_LOG(LOG_FILTER_SQL_TEXT, "[%u ms] SQL: %s", WorldTimer::getMSTimeDiff(_s,WorldTimer::getMSTime()), sql );
}
*pResult = mysql_store_result(mMysql);
*pRowCount = mysql_affected_rows(mMysql);
*pFieldCount = mysql_field_count(mMysql);
// end guarded block
} }
else
{
DEBUG_FILTER_LOG(LOG_FILTER_SQL_TEXT, "[%u ms] SQL: %s", WorldTimer::getMSTimeDiff(_s,WorldTimer::getMSTime()), sql );
}
*pResult = mysql_store_result(mMysql);
*pRowCount = mysql_affected_rows(mMysql);
*pFieldCount = mysql_field_count(mMysql);
if (!*pResult ) if (!*pResult )
return false; return false;
@ -218,7 +222,7 @@ bool DatabaseMysql::_Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **p
return true; return true;
} }
QueryResult* DatabaseMysql::Query(const char *sql) QueryResult* MySQLConnection::Query(const char *sql)
{ {
MYSQL_RES *result = NULL; MYSQL_RES *result = NULL;
MYSQL_FIELD *fields = NULL; MYSQL_FIELD *fields = NULL;
@ -231,11 +235,10 @@ QueryResult* DatabaseMysql::Query(const char *sql)
QueryResultMysql *queryResult = new QueryResultMysql(result, fields, rowCount, fieldCount); QueryResultMysql *queryResult = new QueryResultMysql(result, fields, rowCount, fieldCount);
queryResult->NextRow(); queryResult->NextRow();
return queryResult; return queryResult;
} }
QueryNamedResult* DatabaseMysql::QueryNamed(const char *sql) QueryNamedResult* MySQLConnection::QueryNamed(const char *sql)
{ {
MYSQL_RES *result = NULL; MYSQL_RES *result = NULL;
MYSQL_FIELD *fields = NULL; MYSQL_FIELD *fields = NULL;
@ -252,44 +255,15 @@ QueryNamedResult* DatabaseMysql::QueryNamed(const char *sql)
QueryResultMysql *queryResult = new QueryResultMysql(result, fields, rowCount, fieldCount); QueryResultMysql *queryResult = new QueryResultMysql(result, fields, rowCount, fieldCount);
queryResult->NextRow(); queryResult->NextRow();
return new QueryNamedResult(queryResult,names); return new QueryNamedResult(queryResult,names);
} }
bool DatabaseMysql::Execute(const char *sql) bool MySQLConnection::Execute(const char* sql)
{
if (!mMysql)
return false;
// don't use queued execution if it has not been initialized
if (!m_threadBody) return DirectExecute(sql);
ACE_Guard<ACE_Thread_Mutex> _lock(nMutex);
tranThread = ACE_Based::Thread::current(); // owner of this transaction
TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL)
{ // Statement for transaction
i->second->DelayExecute(sql);
}
else
{
// Simple sql statement
m_threadBody->Delay(new SqlStatement(sql));
}
return true;
}
bool DatabaseMysql::DirectExecute(const char* sql)
{ {
if (!mMysql) if (!mMysql)
return false; return false;
{ {
// guarded block for thread-safe mySQL request
ACE_Guard<ACE_Thread_Mutex> query_connection_guard(mMutex);
uint32 _s = WorldTimer::getMSTime(); uint32 _s = WorldTimer::getMSTime();
if(mysql_query(mMysql, sql)) if(mysql_query(mMysql, sql))
@ -308,7 +282,7 @@ bool DatabaseMysql::DirectExecute(const char* sql)
return true; return true;
} }
bool DatabaseMysql::_TransactionCmd(const char *sql) bool MySQLConnection::_TransactionCmd(const char *sql)
{ {
if (mysql_query(mMysql, sql)) if (mysql_query(mMysql, sql))
{ {
@ -323,100 +297,22 @@ bool DatabaseMysql::_TransactionCmd(const char *sql)
return true; return true;
} }
bool DatabaseMysql::BeginTransaction() bool MySQLConnection::BeginTransaction()
{ {
if (!mMysql) return _TransactionCmd("START TRANSACTION");
return false;
// don't use queued execution if it has not been initialized
if (!m_threadBody)
{
if (tranThread == ACE_Based::Thread::current())
return false; // huh? this thread already started transaction
mMutex.acquire();
if (!_TransactionCmd("START TRANSACTION"))
{
mMutex.release(); // can't start transaction
return false;
}
return true; // transaction started
}
ACE_Guard<ACE_Thread_Mutex> _lock(nMutex);
tranThread = ACE_Based::Thread::current(); // owner of this transaction
TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL)
// If for thread exists queue and also contains transaction
// delete that transaction (not allow trans in trans)
delete i->second;
m_tranQueues[tranThread] = new SqlTransaction();
return true;
} }
bool DatabaseMysql::CommitTransaction() bool MySQLConnection::CommitTransaction()
{ {
if (!mMysql) return _TransactionCmd("COMMIT");
return false;
// don't use queued execution if it has not been initialized
if (!m_threadBody)
{
if (tranThread != ACE_Based::Thread::current())
return false;
bool _res = _TransactionCmd("COMMIT");
tranThread = NULL;
mMutex.release();
return _res;
}
ACE_Guard<ACE_Thread_Mutex> _lock(nMutex);
tranThread = ACE_Based::Thread::current();
TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL)
{
m_threadBody->Delay(i->second);
m_tranQueues.erase(i);
return true;
}
return false;
} }
bool DatabaseMysql::RollbackTransaction() bool MySQLConnection::RollbackTransaction()
{ {
if (!mMysql) return _TransactionCmd("ROLLBACK");
return false;
// don't use queued execution if it has not been initialized
if (!m_threadBody)
{
if (tranThread != ACE_Based::Thread::current())
return false;
bool _res = _TransactionCmd("ROLLBACK");
tranThread = NULL;
mMutex.release();
return _res;
}
ACE_Guard<ACE_Thread_Mutex> _lock(nMutex);
tranThread = ACE_Based::Thread::current();
TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL)
{
delete i->second;
m_tranQueues.erase(i);
}
return true;
} }
unsigned long DatabaseMysql::escape_string(char *to, const char *from, unsigned long length) unsigned long MySQLConnection::escape_string(char *to, const char *from, unsigned long length)
{ {
if (!mMysql || !to || !from || !length) if (!mMysql || !to || !from || !length)
return 0; return 0;
@ -424,23 +320,4 @@ unsigned long DatabaseMysql::escape_string(char *to, const char *from, unsigned
return(mysql_real_escape_string(mMysql, to, from, length)); return(mysql_real_escape_string(mMysql, to, from, length));
} }
void DatabaseMysql::InitDelayThread()
{
assert(!m_delayThread);
//New delay thread for delay execute
m_threadBody = new MySQLDelayThread(this); // will deleted at m_delayThread delete
m_delayThread = new ACE_Based::Thread(m_threadBody);
}
void DatabaseMysql::HaltDelayThread()
{
if (!m_threadBody || !m_delayThread) return;
m_threadBody->Stop(); //Stop event
m_delayThread->wait(); //Wait for flush to DB
delete m_delayThread; //This also deletes m_threadBody
m_delayThread = NULL;
m_threadBody = NULL;
}
#endif #endif

View file

@ -34,6 +34,31 @@
#include <mysql.h> #include <mysql.h>
#endif #endif
class MANGOS_DLL_SPEC MySQLConnection : public SqlConnection
{
public:
MySQLConnection() : mMysql(NULL) {}
~MySQLConnection() { mysql_close(mMysql); }
bool Initialize(const char *infoString);
QueryResult* Query(const char *sql);
QueryNamedResult* QueryNamed(const char *sql);
bool Execute(const char *sql);
unsigned long escape_string(char *to, const char *from, unsigned long length);
bool BeginTransaction();
bool CommitTransaction();
bool RollbackTransaction();
private:
bool _TransactionCmd(const char *sql);
bool _Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **pFields, uint64* pRowCount, uint32* pFieldCount);
MYSQL *mMysql;
};
class MANGOS_DLL_SPEC DatabaseMysql : public Database class MANGOS_DLL_SPEC DatabaseMysql : public Database
{ {
friend class MaNGOS::OperatorNew<DatabaseMysql>; friend class MaNGOS::OperatorNew<DatabaseMysql>;
@ -44,38 +69,18 @@ class MANGOS_DLL_SPEC DatabaseMysql : public Database
//! Initializes Mysql and connects to a server. //! Initializes Mysql and connects to a server.
/*! infoString should be formated like hostname;username;password;database. */ /*! infoString should be formated like hostname;username;password;database. */
bool Initialize(const char *infoString);
void InitDelayThread();
void HaltDelayThread();
QueryResult* Query(const char *sql);
QueryNamedResult* QueryNamed(const char *sql);
bool Execute(const char *sql);
bool DirectExecute(const char* sql);
bool BeginTransaction();
bool CommitTransaction();
bool RollbackTransaction();
operator bool () const { return mMysql != NULL; }
unsigned long escape_string(char *to, const char *from, unsigned long length);
using Database::escape_string;
// must be call before first query in thread // must be call before first query in thread
void ThreadStart(); void ThreadStart();
// must be call before finish thread run // must be call before finish thread run
void ThreadEnd(); void ThreadEnd();
protected:
virtual SqlConnection * CreateConnection();
private: private:
ACE_Thread_Mutex mMutex; // For thread safe operations between core and mySQL server
ACE_Thread_Mutex nMutex; // For thread safe operations on m_transQueues
ACE_Based::Thread * tranThread;
MYSQL *mMysql;
static size_t db_count; static size_t db_count;
bool _TransactionCmd(const char *sql);
bool _Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD **pFields, uint64* pRowCount, uint32* pFieldCount);
}; };
#endif #endif
#endif #endif

View file

@ -23,18 +23,16 @@
#include "Platform/Define.h" #include "Platform/Define.h"
#include "Threading.h" #include "Threading.h"
#include "DatabaseEnv.h" #include "DatabaseEnv.h"
#include "Database/PGSQLDelayThread.h"
#include "Database/SqlOperations.h" #include "Database/SqlOperations.h"
#include "Timer.h" #include "Timer.h"
size_t DatabasePostgre::db_count = 0; size_t DatabasePostgre::db_count = 0;
DatabasePostgre::DatabasePostgre() : Database(), mPGconn(NULL) DatabasePostgre::DatabasePostgre()
{ {
// before first connection // before first connection
if( db_count++ == 0 ) if( db_count++ == 0 )
{ {
if (!PQisthreadsafe()) if (!PQisthreadsafe())
{ {
sLog.outError("FATAL ERROR: PostgreSQL libpq isn't thread-safe."); sLog.outError("FATAL ERROR: PostgreSQL libpq isn't thread-safe.");
@ -45,26 +43,32 @@ DatabasePostgre::DatabasePostgre() : Database(), mPGconn(NULL)
DatabasePostgre::~DatabasePostgre() DatabasePostgre::~DatabasePostgre()
{ {
if (m_delayThread) if (m_delayThread)
HaltDelayThread(); HaltDelayThread();
if( mPGconn ) //destroy SqlConnection objects
if(m_pQueryConnections.size())
{ {
PQfinish(mPGconn); for (int i = 0; i < m_pQueryConnections.size(); ++i)
mPGconn = NULL; delete m_pQueryConnections[i];
m_pQueryConnections.clear();
}
if(m_pAsyncConn)
{
delete m_pAsyncConn;
m_pAsyncConn = NULL;
} }
} }
bool DatabasePostgre::Initialize(const char *infoString) SqlConnection * DatabasePostgre::CreateConnection()
{ {
if(!Database::Initialize(infoString)) return new PostgreSQLConnection();
return false; }
tranThread = NULL;
InitDelayThread();
bool PostgreSQLConnection::Initialize(const char *infoString)
{
Tokens tokens = StrSplit(infoString, ";"); Tokens tokens = StrSplit(infoString, ";");
Tokens::iterator iter; Tokens::iterator iter;
@ -98,26 +102,18 @@ bool DatabasePostgre::Initialize(const char *infoString)
mPGconn = NULL; mPGconn = NULL;
return false; return false;
} }
else
{
sLog.outDetail( "Connected to Postgre database at %s",
host.c_str());
sLog.outString( "PostgreSQL server ver: %d",PQserverVersion(mPGconn));
return true;
}
sLog.outDetail( "Connected to Postgre database at %s", host.c_str());
sLog.outString( "PostgreSQL server ver: %d", PQserverVersion(mPGconn));
return true;
} }
bool DatabasePostgre::_Query(const char *sql, PGresult** pResult, uint64* pRowCount, uint32* pFieldCount) bool PostgreSQLConnection::_Query(const char *sql, PGresult** pResult, uint64* pRowCount, uint32* pFieldCount)
{ {
if (!mPGconn) if (!mPGconn)
return 0; return false;
// guarded block for thread-safe request uint32 _s = WorldTimer::getMSTime();
ACE_Guard<ACE_Thread_Mutex> query_connection_guard(mMutex);
#ifdef MANGOS_DEBUG
uint32 _s = getMSTime();
#endif
// Send the query // Send the query
*pResult = PQexec(mPGconn, sql); *pResult = PQexec(mPGconn, sql);
if(!*pResult ) if(!*pResult )
@ -132,9 +128,7 @@ bool DatabasePostgre::_Query(const char *sql, PGresult** pResult, uint64* pRowCo
} }
else else
{ {
#ifdef MANGOS_DEBUG DEBUG_FILTER_LOG(LOG_FILTER_SQL_TEXT, "[%u ms] SQL: %s", WorldTimer::getMSTimeDiff(_s,WorldTimer::getMSTime()), sql );
sLog.outDebug("[%u ms] SQL: %s", getMSTime() - _s, sql );
#endif
} }
*pRowCount = PQntuples(*pResult); *pRowCount = PQntuples(*pResult);
@ -146,13 +140,14 @@ bool DatabasePostgre::_Query(const char *sql, PGresult** pResult, uint64* pRowCo
PQclear(*pResult); PQclear(*pResult);
return false; return false;
} }
return true; return true;
} }
QueryResult* DatabasePostgre::Query(const char *sql) QueryResult* PostgreSQLConnection::Query(const char *sql)
{ {
if (!mPGconn) if (!mPGconn)
return 0; return NULL;
PGresult* result = NULL; PGresult* result = NULL;
uint64 rowCount = 0; uint64 rowCount = 0;
@ -162,15 +157,15 @@ QueryResult* DatabasePostgre::Query(const char *sql)
return NULL; return NULL;
QueryResultPostgre * queryResult = new QueryResultPostgre(result, rowCount, fieldCount); QueryResultPostgre * queryResult = new QueryResultPostgre(result, rowCount, fieldCount);
queryResult->NextRow();
queryResult->NextRow();
return queryResult; return queryResult;
} }
QueryNamedResult* DatabasePostgre::QueryNamed(const char *sql) QueryNamedResult* PostgreSQLConnection::QueryNamed(const char *sql)
{ {
if (!mPGconn) if (!mPGconn)
return 0; return NULL;
PGresult* result = NULL; PGresult* result = NULL;
uint64 rowCount = 0; uint64 rowCount = 0;
@ -184,67 +179,35 @@ QueryNamedResult* DatabasePostgre::QueryNamed(const char *sql)
names[i] = PQfname(result, i); names[i] = PQfname(result, i);
QueryResultPostgre * queryResult = new QueryResultPostgre(result, rowCount, fieldCount); QueryResultPostgre * queryResult = new QueryResultPostgre(result, rowCount, fieldCount);
queryResult->NextRow();
queryResult->NextRow();
return new QueryNamedResult(queryResult,names); return new QueryNamedResult(queryResult,names);
} }
bool DatabasePostgre::Execute(const char *sql) bool PostgreSQLConnection::Execute(const char *sql)
{ {
if (!mPGconn) if (!mPGconn)
return false; return false;
// don't use queued execution if it has not been initialized uint32 _s = WorldTimer::getMSTime();
if (!m_threadBody)
return DirectExecute(sql);
tranThread = ACE_Based::Thread::current(); // owner of this transaction PGresult *res = PQexec(mPGconn, sql);
TransactionQueues::iterator i = m_tranQueues.find(tranThread); if (PQresultStatus(res) != PGRES_COMMAND_OK)
if (i != m_tranQueues.end() && i->second != NULL) {
{ // Statement for transaction sLog.outErrorDb( "SQL: %s", sql );
i->second->DelayExecute(sql); sLog.outErrorDb( "SQL %s", PQerrorMessage(mPGconn) );
return false;
} }
else else
{ {
// Simple sql statement DEBUG_FILTER_LOG(LOG_FILTER_SQL_TEXT, "[%u ms] SQL: %s", WorldTimer::getMSTimeDiff(_s,WorldTimer::getMSTime()), sql );
m_threadBody->Delay(new SqlStatement(sql));
} }
PQclear(res);
return true; return true;
} }
bool DatabasePostgre::DirectExecute(const char* sql) bool PostgreSQLConnection::_TransactionCmd(const char *sql)
{
if (!mPGconn)
return false;
{
// guarded block for thread-safe request
ACE_Guard<ACE_Thread_Mutex> query_connection_guard(mMutex);
#ifdef MANGOS_DEBUG
uint32 _s = getMSTime();
#endif
PGresult *res = PQexec(mPGconn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
sLog.outErrorDb( "SQL: %s", sql );
sLog.outErrorDb( "SQL %s", PQerrorMessage(mPGconn) );
return false;
}
else
{
#ifdef MANGOS_DEBUG
sLog.outDebug("[%u ms] SQL: %s", getMSTime() - _s, sql );
#endif
}
PQclear(res);
// end guarded block
}
return true;
}
bool DatabasePostgre::_TransactionCmd(const char *sql)
{ {
if (!mPGconn) if (!mPGconn)
return false; return false;
@ -263,88 +226,22 @@ bool DatabasePostgre::_TransactionCmd(const char *sql)
return true; return true;
} }
bool DatabasePostgre::BeginTransaction() bool PostgreSQLConnection::BeginTransaction()
{ {
if (!mPGconn) return _TransactionCmd("START TRANSACTION");
return false;
// don't use queued execution if it has not been initialized
if (!m_threadBody)
{
if (tranThread == ACE_Based::Thread::current())
return false; // huh? this thread already started transaction
mMutex.acquire();
if (!_TransactionCmd("START TRANSACTION"))
{
mMutex.release(); // can't start transaction
return false;
}
return true;
}
// transaction started
tranThread = ACE_Based::Thread::current(); // owner of this transaction
TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL)
// If for thread exists queue and also contains transaction
// delete that transaction (not allow trans in trans)
delete i->second;
m_tranQueues[tranThread] = new SqlTransaction();
return true;
} }
bool DatabasePostgre::CommitTransaction() bool PostgreSQLConnection::CommitTransaction()
{ {
if (!mPGconn) return _TransactionCmd("COMMIT");
return false;
// don't use queued execution if it has not been initialized
if (!m_threadBody)
{
if (tranThread != ACE_Based::Thread::current())
return false;
bool _res = _TransactionCmd("COMMIT");
tranThread = NULL;
mMutex.release();
return _res;
}
tranThread = ACE_Based::Thread::current();
TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL)
{
m_threadBody->Delay(i->second);
i->second = NULL;
return true;
}
else
return false;
} }
bool DatabasePostgre::RollbackTransaction() bool PostgreSQLConnection::RollbackTransaction()
{ {
if (!mPGconn) return _TransactionCmd("ROLLBACK");
return false;
// don't use queued execution if it has not been initialized
if (!m_threadBody)
{
if (tranThread != ACE_Based::Thread::current())
return false;
bool _res = _TransactionCmd("ROLLBACK");
tranThread = NULL;
mMutex.release();
return _res;
}
tranThread = ACE_Based::Thread::current();
TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL)
{
delete i->second;
i->second = NULL;
}
return true;
} }
unsigned long DatabasePostgre::escape_string(char *to, const char *from, unsigned long length) unsigned long PostgreSQLConnection::escape_string(char *to, const char *from, unsigned long length)
{ {
if (!mPGconn || !to || !from || !length) if (!mPGconn || !to || !from || !length)
return 0; return 0;
@ -352,23 +249,4 @@ unsigned long DatabasePostgre::escape_string(char *to, const char *from, unsigne
return PQescapeString(to, from, length); return PQescapeString(to, from, length);
} }
void DatabasePostgre::InitDelayThread()
{
assert(!m_delayThread);
//New delay thread for delay execute
m_threadBody = new PGSQLDelayThread(this); // Will be deleted on m_delayThread delete
m_delayThread = new ACE_Based::Thread(m_threadBody);
}
void DatabasePostgre::HaltDelayThread()
{
if (!m_threadBody || !m_delayThread) return;
m_threadBody->Stop(); //Stop event
m_delayThread->wait(); //Wait for flush to DB
delete m_delayThread; //This also deletes m_threadBody
m_delayThread = NULL;
m_threadBody = NULL;
}
#endif #endif

View file

@ -19,7 +19,11 @@
#ifndef _DatabasePostgre_H #ifndef _DatabasePostgre_H
#define _DatabasePostgre_H #define _DatabasePostgre_H
#include "Common.h"
#include "Database.h"
#include "Policies/Singleton.h" #include "Policies/Singleton.h"
#include "ace/Thread_Mutex.h"
#include "ace/Guard_T.h"
#include <stdarg.h> #include <stdarg.h>
#ifdef WIN32 #ifdef WIN32
@ -30,7 +34,32 @@
#include <libpq-fe.h> #include <libpq-fe.h>
#endif #endif
class DatabasePostgre : public Database class MANGOS_DLL_SPEC PostgreSQLConnection : public SqlConnection
{
public:
PostgreSQLConnection() : mPGconn(NULL) {}
~PostgreSQLConnection() { PQfinish(mPGconn); }
bool Initialize(const char *infoString);
QueryResult* Query(const char *sql);
QueryNamedResult* QueryNamed(const char *sql);
bool Execute(const char *sql);
unsigned long escape_string(char *to, const char *from, unsigned long length);
bool BeginTransaction();
bool CommitTransaction();
bool RollbackTransaction();
private:
bool _TransactionCmd(const char *sql);
bool _Query(const char *sql, PGresult **pResult, uint64* pRowCount, uint32* pFieldCount);
PGconn *mPGconn;
};
class MANGOS_DLL_SPEC DatabasePostgre : public Database
{ {
friend class MaNGOS::OperatorNew<DatabasePostgre>; friend class MaNGOS::OperatorNew<DatabasePostgre>;
@ -40,30 +69,11 @@ class DatabasePostgre : public Database
//! Initializes Postgres and connects to a server. //! Initializes Postgres and connects to a server.
/*! infoString should be formated like hostname;username;password;database. */ /*! infoString should be formated like hostname;username;password;database. */
bool Initialize(const char *infoString);
void InitDelayThread();
void HaltDelayThread();
QueryResult* Query(const char *sql);
QueryNamedResult* QueryNamed(const char *sql);
bool Execute(const char *sql);
bool DirectExecute(const char* sql);
bool BeginTransaction();
bool CommitTransaction();
bool RollbackTransaction();
operator bool () const { return mPGconn != NULL; } protected:
virtual SqlConnection * CreateConnection();
unsigned long escape_string(char *to, const char *from, unsigned long length);
using Database::escape_string;
private: private:
ACE_Thread_Mutex mMutex;
ACE_Based::Thread * tranThread;
PGconn *mPGconn;
static size_t db_count; static size_t db_count;
bool _TransactionCmd(const char *sql);
bool _Query(const char *sql, PGresult **pResult, uint64* pRowCount, uint32* pFieldCount);
}; };
#endif #endif

View file

@ -20,7 +20,7 @@
#include "Database/SqlOperations.h" #include "Database/SqlOperations.h"
#include "DatabaseEnv.h" #include "DatabaseEnv.h"
SqlDelayThread::SqlDelayThread(Database* db) : m_dbEngine(db), m_running(true) SqlDelayThread::SqlDelayThread(Database* db, SqlConnection* conn) : m_dbEngine(db), m_dbConnection(conn), m_running(true)
{ {
} }
@ -40,7 +40,7 @@ void SqlDelayThread::run()
const uint32 loopSleepms = 10; const uint32 loopSleepms = 10;
const uint32 pingEveryLoop = m_dbEngine->GetPingIntervall()/loopSleepms; const uint32 pingEveryLoop = m_dbEngine->GetPingIntervall() / loopSleepms;
uint32 loopCounter = 0; uint32 loopCounter = 0;
while (m_running) while (m_running)
@ -52,14 +52,14 @@ void SqlDelayThread::run()
SqlOperation* s = NULL; SqlOperation* s = NULL;
while (m_sqlQueue.next(s)) while (m_sqlQueue.next(s))
{ {
s->Execute(m_dbEngine); s->Execute(m_dbConnection);
delete s; delete s;
} }
if((loopCounter++) >= pingEveryLoop) if((loopCounter++) >= pingEveryLoop)
{ {
loopCounter = 0; loopCounter = 0;
delete m_dbEngine->Query("SELECT 1"); m_dbEngine->Ping();
} }
} }

View file

@ -26,6 +26,7 @@
class Database; class Database;
class SqlOperation; class SqlOperation;
class SqlConnection;
class SqlDelayThread : public ACE_Based::Runnable class SqlDelayThread : public ACE_Based::Runnable
{ {
@ -34,10 +35,11 @@ class SqlDelayThread : public ACE_Based::Runnable
private: private:
SqlQueue m_sqlQueue; ///< Queue of SQL statements SqlQueue m_sqlQueue; ///< Queue of SQL statements
Database* m_dbEngine; ///< Pointer to used Database engine Database* m_dbEngine; ///< Pointer to used Database engine
SqlConnection * m_dbConnection; ///< Pointer to DB connection
volatile bool m_running; volatile bool m_running;
public: public:
SqlDelayThread(Database* db); SqlDelayThread(Database* db, SqlConnection* conn);
~SqlDelayThread(); ~SqlDelayThread();
///< Put sql statement to delay queue ///< Put sql statement to delay queue

View file

@ -21,54 +21,60 @@
#include "DatabaseEnv.h" #include "DatabaseEnv.h"
#include "DatabaseImpl.h" #include "DatabaseImpl.h"
#define LOCK_DB_CONN(conn) SqlConnection::Lock guard(conn)
/// ---- ASYNC STATEMENTS / TRANSACTIONS ---- /// ---- ASYNC STATEMENTS / TRANSACTIONS ----
void SqlStatement::Execute(Database *db) void SqlStatement::Execute(SqlConnection *conn)
{ {
/// just do it /// just do it
db->DirectExecute(m_sql); LOCK_DB_CONN(conn);
conn->Execute(m_sql);
} }
void SqlTransaction::Execute(Database *db) SqlTransaction::~SqlTransaction()
{
while(!m_queue.empty())
{
delete [] (const_cast<char*>(m_queue.back()));
m_queue.pop_back();
}
}
void SqlTransaction::Execute(SqlConnection *conn)
{ {
ACE_Guard<ACE_Thread_Mutex> _lock(m_Mutex);
if(m_queue.empty()) if(m_queue.empty())
return; return;
db->DirectExecute("START TRANSACTION"); LOCK_DB_CONN(conn);
while(!m_queue.empty())
conn->BeginTransaction();
const int nItems = m_queue.size();
for (int i = 0; i < nItems; ++i)
{ {
char *sql = const_cast<char*>(m_queue.front()); const char *sql = m_queue[i];
m_queue.pop();
if(!db->DirectExecute(sql)) if(!conn->Execute(sql))
{ {
delete [] sql; conn->RollbackTransaction();
db->DirectExecute("ROLLBACK");
while(!m_queue.empty())
{
delete [] (const_cast<char*>(m_queue.front()));
m_queue.pop();
}
return; return;
} }
delete [] sql;
} }
db->DirectExecute("COMMIT"); conn->CommitTransaction();
} }
/// ---- ASYNC QUERIES ---- /// ---- ASYNC QUERIES ----
void SqlQuery::Execute(Database *db) void SqlQuery::Execute(SqlConnection *conn)
{ {
if(!m_callback || !m_queue) if(!m_callback || !m_queue)
return; return;
LOCK_DB_CONN(conn);
/// execute the query and store the result in the callback /// execute the query and store the result in the callback
m_callback->SetResult(db->Query(m_sql)); m_callback->SetResult(conn->Query(m_sql));
/// add the callback to the sql result queue of the thread it originated from /// add the callback to the sql result queue of the thread it originated from
m_queue->add(m_callback); m_queue->add(m_callback);
} }
@ -184,19 +190,19 @@ void SqlQueryHolder::SetSize(size_t size)
m_queries.resize(size); m_queries.resize(size);
} }
void SqlQueryHolderEx::Execute(Database *db) void SqlQueryHolderEx::Execute(SqlConnection *conn)
{ {
if(!m_holder || !m_callback || !m_queue) if(!m_holder || !m_callback || !m_queue)
return; return;
LOCK_DB_CONN(conn);
/// we can do this, we are friends /// we can do this, we are friends
std::vector<SqlQueryHolder::SqlResultPair> &queries = m_holder->m_queries; std::vector<SqlQueryHolder::SqlResultPair> &queries = m_holder->m_queries;
for(size_t i = 0; i < queries.size(); i++) for(size_t i = 0; i < queries.size(); i++)
{ {
/// execute all queries in the holder and pass the results /// execute all queries in the holder and pass the results
char const *sql = queries[i].first; char const *sql = queries[i].first;
if(sql) m_holder->SetResult(i, db->Query(sql)); if(sql) m_holder->SetResult(i, conn->Query(sql));
} }
/// sync with the caller thread /// sync with the caller thread

View file

@ -29,13 +29,14 @@
/// ---- BASE --- /// ---- BASE ---
class Database; class Database;
class SqlConnection;
class SqlDelayThread; class SqlDelayThread;
class SqlOperation class SqlOperation
{ {
public: public:
virtual void OnRemove() { delete this; } virtual void OnRemove() { delete this; }
virtual void Execute(Database *db) = 0; virtual void Execute(SqlConnection *conn) = 0;
virtual ~SqlOperation() {} virtual ~SqlOperation() {}
}; };
@ -48,26 +49,25 @@ class SqlStatement : public SqlOperation
public: public:
SqlStatement(const char *sql) : m_sql(mangos_strdup(sql)){} SqlStatement(const char *sql) : m_sql(mangos_strdup(sql)){}
~SqlStatement() { char* tofree = const_cast<char*>(m_sql); delete [] tofree; } ~SqlStatement() { char* tofree = const_cast<char*>(m_sql); delete [] tofree; }
void Execute(Database *db); void Execute(SqlConnection *conn);
}; };
class SqlTransaction : public SqlOperation class SqlTransaction : public SqlOperation
{ {
private: private:
std::queue<const char *> m_queue; std::vector<const char *> m_queue;
ACE_Thread_Mutex m_Mutex;
public: public:
SqlTransaction() {} SqlTransaction() {}
~SqlTransaction();
void DelayExecute(const char *sql) void DelayExecute(const char *sql)
{ {
char* _sql = mangos_strdup(sql); char* _sql = mangos_strdup(sql);
if (_sql) m_queue.push_back(_sql);
{
ACE_Guard<ACE_Thread_Mutex> _lock(m_Mutex);
m_queue.push(_sql);
}
} }
void Execute(Database *db);
void Execute(SqlConnection *conn);
}; };
/// ---- ASYNC QUERIES ---- /// ---- ASYNC QUERIES ----
@ -95,7 +95,7 @@ class SqlQuery : public SqlOperation
SqlQuery(const char *sql, MaNGOS::IQueryCallback * callback, SqlResultQueue * queue) SqlQuery(const char *sql, MaNGOS::IQueryCallback * callback, SqlResultQueue * queue)
: m_sql(mangos_strdup(sql)), m_callback(callback), m_queue(queue) {} : m_sql(mangos_strdup(sql)), m_callback(callback), m_queue(queue) {}
~SqlQuery() { char* tofree = const_cast<char*>(m_sql); delete [] tofree; } ~SqlQuery() { char* tofree = const_cast<char*>(m_sql); delete [] tofree; }
void Execute(Database *db); void Execute(SqlConnection *conn);
}; };
class SqlQueryHolder class SqlQueryHolder
@ -124,6 +124,6 @@ class SqlQueryHolderEx : public SqlOperation
public: public:
SqlQueryHolderEx(SqlQueryHolder *holder, MaNGOS::IQueryCallback * callback, SqlResultQueue * queue) SqlQueryHolderEx(SqlQueryHolder *holder, MaNGOS::IQueryCallback * callback, SqlResultQueue * queue)
: m_holder(holder), m_callback(callback), m_queue(queue) {} : m_holder(holder), m_callback(callback), m_queue(queue) {}
void Execute(Database *db); void Execute(SqlConnection *conn);
}; };
#endif //__SQLOPERATIONS_H #endif //__SQLOPERATIONS_H

View file

@ -1,4 +1,4 @@
#ifndef __REVISION_NR_H__ #ifndef __REVISION_NR_H__
#define __REVISION_NR_H__ #define __REVISION_NR_H__
#define REVISION_NR "11044" #define REVISION_NR "11045"
#endif // __REVISION_NR_H__ #endif // __REVISION_NR_H__