[11442] Cleanup codestyle in WorldSocketMgr.cpp & WorldSocketMgr.h

Signed-off-by: VladimirMangos <vladimir@getmangos.com>
This commit is contained in:
Den 2011-05-08 20:00:34 +04:00 committed by VladimirMangos
parent d0a0479749
commit d98b9b9670
3 changed files with 97 additions and 114 deletions

View file

@ -52,8 +52,7 @@
class ReactorRunnable : protected ACE_Task_Base class ReactorRunnable : protected ACE_Task_Base
{ {
public: public:
ReactorRunnable() :
ReactorRunnable () :
m_Reactor (0), m_Reactor (0),
m_Connections (0), m_Connections (0),
m_ThreadId (-1) m_ThreadId (-1)
@ -62,48 +61,48 @@ class ReactorRunnable : protected ACE_Task_Base
#if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL) #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
imp = new ACE_Dev_Poll_Reactor (); imp = new ACE_Dev_Poll_Reactor();
imp->max_notify_iterations (128); imp->max_notify_iterations(128);
imp->restart (1); imp->restart(1);
#else #else
imp = new ACE_TP_Reactor (); imp = new ACE_TP_Reactor();
imp->max_notify_iterations (128); imp->max_notify_iterations(128);
#endif #endif
m_Reactor = new ACE_Reactor (imp, 1); m_Reactor = new ACE_Reactor(imp, 1);
} }
virtual ~ReactorRunnable () virtual ~ReactorRunnable()
{ {
Stop (); Stop();
Wait (); Wait();
if (m_Reactor) if (m_Reactor)
delete m_Reactor; delete m_Reactor;
} }
void Stop () void Stop()
{ {
m_Reactor->end_reactor_event_loop (); m_Reactor->end_reactor_event_loop();
} }
int Start () int Start()
{ {
if (m_ThreadId != -1) if (m_ThreadId != -1)
return -1; return -1;
return (m_ThreadId = activate ()); return (m_ThreadId = activate());
} }
void Wait () { ACE_Task_Base::wait (); } void Wait() { ACE_Task_Base::wait(); }
long Connections () long Connections()
{ {
return static_cast<long> (m_Connections.value ()); return static_cast<long> (m_Connections.value());
} }
int AddSocket (WorldSocket* sock) int AddSocket (WorldSocket* sock)
@ -118,47 +117,46 @@ class ReactorRunnable : protected ACE_Task_Base
return 0; return 0;
} }
ACE_Reactor* GetReactor () ACE_Reactor* GetReactor()
{ {
return m_Reactor; return m_Reactor;
} }
protected: protected:
void AddNewSockets()
void AddNewSockets ()
{ {
ACE_GUARD (ACE_Thread_Mutex, Guard, m_NewSockets_Lock); ACE_GUARD (ACE_Thread_Mutex, Guard, m_NewSockets_Lock);
if (m_NewSockets.empty ()) if (m_NewSockets.empty())
return; return;
for (SocketSet::const_iterator i = m_NewSockets.begin (); i != m_NewSockets.end (); ++i) for (SocketSet::const_iterator i = m_NewSockets.begin(); i != m_NewSockets.end(); ++i)
{ {
WorldSocket* sock = (*i); WorldSocket* sock = (*i);
if (sock->IsClosed ()) if (sock->IsClosed())
{ {
sock->RemoveReference (); sock->RemoveReference();
--m_Connections; --m_Connections;
} }
else else
m_Sockets.insert (sock); m_Sockets.insert(sock);
} }
m_NewSockets.clear (); m_NewSockets.clear();
} }
virtual int svc () virtual int svc()
{ {
DEBUG_LOG ("Network Thread Starting"); DEBUG_LOG ("Network Thread Starting");
WorldDatabase.ThreadStart (); WorldDatabase.ThreadStart();
MANGOS_ASSERT (m_Reactor); MANGOS_ASSERT(m_Reactor);
SocketSet::iterator i, t; SocketSet::iterator i, t;
while (!m_Reactor->reactor_event_loop_done ()) while (!m_Reactor->reactor_event_loop_done())
{ {
// dont be too smart to move this outside the loop // dont be too smart to move this outside the loop
// the run_reactor_event_loop will modify interval // the run_reactor_event_loop will modify interval
@ -167,25 +165,25 @@ class ReactorRunnable : protected ACE_Task_Base
if (m_Reactor->run_reactor_event_loop (interval) == -1) if (m_Reactor->run_reactor_event_loop (interval) == -1)
break; break;
AddNewSockets (); AddNewSockets();
for (i = m_Sockets.begin (); i != m_Sockets.end ();) for (i = m_Sockets.begin(); i != m_Sockets.end();)
{ {
if ((*i)->Update () == -1) if ((*i)->Update() == -1)
{ {
t = i; t = i;
++i; ++i;
(*t)->CloseSocket (); (*t)->CloseSocket();
(*t)->RemoveReference (); (*t)->RemoveReference();
--m_Connections; --m_Connections;
m_Sockets.erase (t); m_Sockets.erase(t);
} }
else else
++i; ++i;
} }
} }
WorldDatabase.ThreadEnd (); WorldDatabase.ThreadEnd();
DEBUG_LOG ("Network Thread Exitting"); DEBUG_LOG ("Network Thread Exitting");
@ -206,17 +204,17 @@ class ReactorRunnable : protected ACE_Task_Base
ACE_Thread_Mutex m_NewSockets_Lock; ACE_Thread_Mutex m_NewSockets_Lock;
}; };
WorldSocketMgr::WorldSocketMgr () : WorldSocketMgr::WorldSocketMgr():
m_NetThreads (0), m_NetThreads(0),
m_NetThreadsCount (0), m_NetThreadsCount(0),
m_SockOutKBuff (-1), m_SockOutKBuff(-1),
m_SockOutUBuff (65536), m_SockOutUBuff(65536),
m_UseNoDelay (true), m_UseNoDelay(true),
m_Acceptor (0) m_Acceptor(0)
{ {
} }
WorldSocketMgr::~WorldSocketMgr () WorldSocketMgr::~WorldSocketMgr()
{ {
if (m_NetThreads) if (m_NetThreads)
delete [] m_NetThreads; delete [] m_NetThreads;
@ -225,8 +223,7 @@ WorldSocketMgr::~WorldSocketMgr ()
delete m_Acceptor; delete m_Acceptor;
} }
int int WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
{ {
m_UseNoDelay = sConfig.GetBoolDefault ("Network.TcpNodelay", true); m_UseNoDelay = sConfig.GetBoolDefault ("Network.TcpNodelay", true);
@ -242,44 +239,43 @@ WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
m_NetThreads = new ReactorRunnable[m_NetThreadsCount]; m_NetThreads = new ReactorRunnable[m_NetThreadsCount];
BASIC_LOG("Max allowed socket connections %d",ACE::max_handles ()); BASIC_LOG("Max allowed socket connections %d", ACE::max_handles());
// -1 means use default // -1 means use default
m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1); m_SockOutKBuff = sConfig.GetIntDefault("Network.OutKBuff", -1);
m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff", 65536); m_SockOutUBuff = sConfig.GetIntDefault("Network.OutUBuff", 65536);
if ( m_SockOutUBuff <= 0 ) if (m_SockOutUBuff <= 0)
{ {
sLog.outError ("Network.OutUBuff is wrong in your config file"); sLog.outError ("Network.OutUBuff is wrong in your config file");
return -1; return -1;
} }
WorldSocket::Acceptor *acc = new WorldSocket::Acceptor; WorldSocket::Acceptor* acc = new WorldSocket::Acceptor;
m_Acceptor = acc; m_Acceptor = acc;
ACE_INET_Addr listen_addr (port, address); ACE_INET_Addr listen_addr (port, address);
if (acc->open (listen_addr, m_NetThreads[0].GetReactor (), ACE_NONBLOCK) == -1) if (acc->open (listen_addr, m_NetThreads[0].GetReactor(), ACE_NONBLOCK) == -1)
{ {
sLog.outError ("Failed to open acceptor ,check if the port is free"); sLog.outError ("Failed to open acceptor, check if the port is free");
return -1; return -1;
} }
for (size_t i = 0; i < m_NetThreadsCount; ++i) for (size_t i = 0; i < m_NetThreadsCount; ++i)
m_NetThreads[i].Start (); m_NetThreads[i].Start();
return 0; return 0;
} }
int int WorldSocketMgr::StartNetwork (ACE_UINT16 port, std::string& address)
WorldSocketMgr::StartNetwork (ACE_UINT16 port, std::string& address)
{ {
m_addr = address; m_addr = address;
m_port = port; m_port = port;
if (!sLog.HasLogLevelOrHigher(LOG_LVL_DEBUG)) if (!sLog.HasLogLevelOrHigher(LOG_LVL_DEBUG))
ACE_Log_Msg::instance ()->priority_mask (LM_ERROR, ACE_Log_Msg::PROCESS); ACE_Log_Msg::instance()->priority_mask (LM_ERROR, ACE_Log_Msg::PROCESS);
if (StartReactiveIO (port, address.c_str()) == -1) if (StartReactiveIO (port, address.c_str()) == -1)
return -1; return -1;
@ -287,46 +283,40 @@ WorldSocketMgr::StartNetwork (ACE_UINT16 port, std::string& address)
return 0; return 0;
} }
void void WorldSocketMgr::StopNetwork()
WorldSocketMgr::StopNetwork ()
{ {
if (m_Acceptor) if (m_Acceptor)
{ {
WorldSocket::Acceptor* acc = dynamic_cast<WorldSocket::Acceptor*> (m_Acceptor); WorldSocket::Acceptor* acc = dynamic_cast<WorldSocket::Acceptor*>(m_Acceptor);
if (acc) if (acc)
acc->close (); acc->close();
} }
if (m_NetThreadsCount != 0) if (m_NetThreadsCount != 0)
{ {
for (size_t i = 0; i < m_NetThreadsCount; ++i) for (size_t i = 0; i < m_NetThreadsCount; ++i)
m_NetThreads[i].Stop (); m_NetThreads[i].Stop();
} }
Wait (); Wait();
} }
void void WorldSocketMgr::Wait()
WorldSocketMgr::Wait ()
{ {
if (m_NetThreadsCount != 0) if (m_NetThreadsCount != 0)
{ {
for (size_t i = 0; i < m_NetThreadsCount; ++i) for (size_t i = 0; i < m_NetThreadsCount; ++i)
m_NetThreads[i].Wait (); m_NetThreads[i].Wait();
} }
} }
int int WorldSocketMgr::OnSocketOpen(WorldSocket* sock)
WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
{ {
// set some options here // set some options here
if (m_SockOutKBuff >= 0) if (m_SockOutKBuff >= 0)
{ {
if (sock->peer ().set_option (SOL_SOCKET, if (sock->peer().set_option(SOL_SOCKET, SO_SNDBUF, (void*)&m_SockOutKBuff, sizeof(int)) == -1 && errno != ENOTSUP)
SO_SNDBUF,
(void*) & m_SockOutKBuff,
sizeof (int)) == -1 && errno != ENOTSUP)
{ {
sLog.outError ("WorldSocketMgr::OnSocketOpen set_option SO_SNDBUF"); sLog.outError ("WorldSocketMgr::OnSocketOpen set_option SO_SNDBUF");
return -1; return -1;
@ -338,12 +328,9 @@ WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
// Set TCP_NODELAY. // Set TCP_NODELAY.
if (m_UseNoDelay) if (m_UseNoDelay)
{ {
if (sock->peer ().set_option (ACE_IPPROTO_TCP, if (sock->peer().set_option(ACE_IPPROTO_TCP, TCP_NODELAY, (void*)&ndoption, sizeof (int)) == -1)
TCP_NODELAY,
(void*)&ndoption,
sizeof (int)) == -1)
{ {
sLog.outError ("WorldSocketMgr::OnSocketOpen: peer ().set_option TCP_NODELAY errno = %s", ACE_OS::strerror (errno)); sLog.outError("WorldSocketMgr::OnSocketOpen: peer().set_option TCP_NODELAY errno = %s", ACE_OS::strerror(errno));
return -1; return -1;
} }
} }
@ -356,14 +343,13 @@ WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
MANGOS_ASSERT (m_NetThreadsCount >= 1); MANGOS_ASSERT (m_NetThreadsCount >= 1);
for (size_t i = 1; i < m_NetThreadsCount; ++i) for (size_t i = 1; i < m_NetThreadsCount; ++i)
if (m_NetThreads[i].Connections () < m_NetThreads[min].Connections ()) if (m_NetThreads[i].Connections() < m_NetThreads[min].Connections())
min = i; min = i;
return m_NetThreads[min].AddSocket (sock); return m_NetThreads[min].AddSocket (sock);
} }
WorldSocketMgr* WorldSocketMgr* WorldSocketMgr::Instance()
WorldSocketMgr::Instance ()
{ {
return ACE_Singleton<WorldSocketMgr,ACE_Thread_Mutex>::instance(); return ACE_Singleton<WorldSocketMgr, ACE_Thread_Mutex>::instance();
} }

View file

@ -38,33 +38,31 @@ class ACE_Event_Handler;
/// Manages all sockets connected to peers and network threads /// Manages all sockets connected to peers and network threads
class WorldSocketMgr class WorldSocketMgr
{ {
public: public:
friend class WorldSocket; friend class WorldSocket;
friend class ACE_Singleton<WorldSocketMgr,ACE_Thread_Mutex>; friend class ACE_Singleton<WorldSocketMgr, ACE_Thread_Mutex>;
/// Start network, listen at address:port . /// Start network, listen at address:port .
int StartNetwork (ACE_UINT16 port, std::string& address); int StartNetwork(ACE_UINT16 port, std::string& address);
/// Stops all network threads, It will wait for all running threads . /// Stops all network threads, It will wait for all running threads .
void StopNetwork (); void StopNetwork();
/// Wait untill all network threads have "joined" . /// Wait untill all network threads have "joined" .
void Wait (); void Wait();
std::string& GetBindAddress() { return m_addr; } std::string& GetBindAddress() { return m_addr; }
ACE_UINT16 GetBindPort() { return m_port; } ACE_UINT16 GetBindPort() { return m_port; }
/// Make this class singleton . /// Make this class singleton .
static WorldSocketMgr* Instance (); static WorldSocketMgr* Instance();
private: private:
int OnSocketOpen(WorldSocket* sock); int OnSocketOpen(WorldSocket* sock);
int StartReactiveIO(ACE_UINT16 port, const char* address); int StartReactiveIO(ACE_UINT16 port, const char* address);
private: WorldSocketMgr();
WorldSocketMgr (); virtual ~WorldSocketMgr();
virtual ~WorldSocketMgr ();
ReactorRunnable* m_NetThreads; ReactorRunnable* m_NetThreads;
size_t m_NetThreadsCount; size_t m_NetThreadsCount;
@ -79,8 +77,7 @@ private:
ACE_Event_Handler* m_Acceptor; ACE_Event_Handler* m_Acceptor;
}; };
#define sWorldSocketMgr WorldSocketMgr::Instance () #define sWorldSocketMgr WorldSocketMgr::Instance()
#endif #endif
/// @} /// @}

View file

@ -1,4 +1,4 @@
#ifndef __REVISION_NR_H__ #ifndef __REVISION_NR_H__
#define __REVISION_NR_H__ #define __REVISION_NR_H__
#define REVISION_NR "11441" #define REVISION_NR "11442"
#endif // __REVISION_NR_H__ #endif // __REVISION_NR_H__