00001
00002
00003
00004
00005
00006
00007
00008
00010
00011 #include "threadpool.h"
00012 #include "util.h"
00013 #include "upnp.h"
00014
00017
00018
00019
00022
00024
00026
00027 CEvent::CEvent( bool bInitiallyOwn )
00028 {
00029 m_bSignaled = bInitiallyOwn;
00030 }
00031
00033
00035
00036 CEvent::~CEvent()
00037 {
00038 }
00039
00041
00043
00044 bool CEvent::SetEvent ()
00045 {
00046 m_mutex.lock();
00047 m_bSignaled = true;
00048
00049 m_wait.wakeAll();
00050
00051 m_mutex.unlock();
00052
00053 return( true );
00054 }
00055
00057
00059
00060 bool CEvent::ResetEvent ()
00061 {
00062 m_mutex.lock();
00063 m_bSignaled = false;
00064 m_mutex.unlock();
00065
00066 return( true );
00067 }
00068
00070
00072
00073 bool CEvent::IsSignaled()
00074 {
00075 m_mutex.lock();
00076 bool bSignaled = m_bSignaled;
00077 m_mutex.unlock();
00078
00079 return( bSignaled );
00080 }
00081
00083
00085
00086 bool CEvent::WaitForEvent( unsigned long time )
00087 {
00088 m_mutex.lock();
00089
00090 if (m_bSignaled)
00091 {
00092 m_mutex.unlock();
00093 return true;
00094 }
00095
00096 bool ret = m_wait.wait(&m_mutex, time);
00097
00098 m_mutex.unlock();
00099
00100 return ret;
00101 }
00102
00105
00106
00107
00110
00112
00114
00115 WorkerThread::WorkerThread( ThreadPool *pThreadPool, const QString &sName )
00116 {
00117 m_bInitialized = false;
00118 m_bTermRequested = false;
00119 m_pThreadPool = pThreadPool;
00120 m_sName = sName;
00121 m_nIdleTimeoutMS = 60000;
00122 m_bAllowTimeout = false;
00123
00124 }
00125
00127
00129
00130 WorkerThread::~WorkerThread()
00131 {
00132 m_bTermRequested = true;
00133
00134 m_WorkAvailable.SetEvent();
00135
00136 wait();
00137 }
00138
00140
00142
00143 bool WorkerThread::WaitForInitialized( unsigned long msecs )
00144 {
00145 m_mutex.lock();
00146 bool bInitialized = m_bInitialized;
00147 m_mutex.unlock();
00148
00149 if (bInitialized)
00150 return true;
00151
00152 return( m_Initialized.WaitForEvent( msecs ));
00153 }
00154
00156
00158
00159 void WorkerThread::SignalWork()
00160 {
00161 m_WorkAvailable.SetEvent();
00162 }
00163
00165
00167
00168 void WorkerThread::SetTimeout( long nIdleTimeout )
00169 {
00170
00171
00172
00173 m_nIdleTimeoutMS = nIdleTimeout;
00174
00175 if (m_nIdleTimeoutMS == -1 )
00176 m_bAllowTimeout = false;
00177 else
00178 m_bAllowTimeout = true;
00179 }
00180
00182
00184
00185 void WorkerThread::run( void )
00186 {
00187 m_mutex.lock();
00188 m_bInitialized = true;
00189 m_mutex.unlock();
00190
00191 m_Initialized.SetEvent();
00192
00193 MythTimer timer;
00194
00195 timer.start();
00196
00197 while ( !m_bTermRequested )
00198 {
00199 if (m_bAllowTimeout && (timer.elapsed() > m_nIdleTimeoutMS) )
00200 break;
00201
00202 if (m_WorkAvailable.WaitForEvent(500))
00203 {
00204 m_WorkAvailable.ResetEvent();
00205
00206 if ( !m_bTermRequested )
00207 {
00208 try
00209 {
00210 ProcessWork();
00211
00212 timer.restart();
00213 }
00214 catch(...)
00215 {
00216 VERBOSE( VB_IMPORTANT, QString( "WorkerThread::Run( %1 ) - Unexpected Exception." )
00217 .arg( m_sName ));
00218 }
00219
00220 m_pThreadPool->ThreadAvailable( this );
00221 }
00222 }
00223 }
00224
00225 if (m_pThreadPool != NULL )
00226 {
00227 m_pThreadPool->ThreadTerminating( this );
00228 m_pThreadPool = NULL;
00229 }
00230
00231 VERBOSE( VB_UPNP, QString( "WorkerThread:Run - Exiting: %1" ).arg( m_sName ));
00232 }
00233
00236
00237
00238
00241
00243
00245
00246 ThreadPool::ThreadPool( const QString &sName )
00247 {
00248 m_sName = sName;
00249
00250 m_lstThreads .setAutoDelete( false );
00251 m_lstAvailableThreads.setAutoDelete( false );
00252
00253 m_nInitialThreadCount = UPnp::g_pConfig->GetValue( "ThreadPool/" + m_sName + "/Initial", 1 );
00254 m_nMaxThreadCount = UPnp::g_pConfig->GetValue( "ThreadPool/" + m_sName + "/Max" , 5 );
00255 m_nIdleTimeout = UPnp::g_pConfig->GetValue( "ThreadPool/" + m_sName + "/Timeout", 60000 );
00256
00257 m_nInitialThreadCount = min( m_nInitialThreadCount, m_nMaxThreadCount );
00258
00259 }
00260
00262
00264
00265 ThreadPool::~ThreadPool( )
00266 {
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277 WorkerThreadList::iterator it = m_lstThreads.begin();
00278
00279 while (it != m_lstThreads.end() )
00280 {
00281 WorkerThread *pThread = *it;
00282
00283 if (pThread != NULL)
00284 delete pThread;
00285
00286 it = m_lstThreads.erase( it );
00287 }
00288
00289
00290 }
00291
00293
00295
00296 void ThreadPool::InitializeThreads()
00297 {
00298
00299
00300
00301
00302 for (long nIdx = 0; nIdx < m_nInitialThreadCount; nIdx ++ )
00303 AddWorkerThread( true, -1 );
00304
00305 }
00306
00308
00310
00311 WorkerThread *ThreadPool::GetWorkerThread()
00312 {
00313 WorkerThread *pThread = NULL;
00314 long nThreadCount= 0;
00315
00316 while (pThread == NULL)
00317 {
00318
00319
00320
00321
00322 m_mList.lock();
00323 {
00324 if ( m_lstAvailableThreads.count() > 0)
00325 {
00326 pThread = m_lstAvailableThreads.getFirst();
00327
00328 m_lstAvailableThreads.removeFirst();
00329 }
00330
00331 nThreadCount = m_lstThreads.count();
00332 }
00333 m_mList.unlock();
00334
00335 if (pThread == NULL)
00336 {
00337
00338
00339
00340
00341
00342 if ( nThreadCount < m_nMaxThreadCount)
00343 pThread = AddWorkerThread( false, m_nIdleTimeout );
00344 else
00345 {
00346 if (m_threadAvail.wait( 5000 ) == false )
00347 return( NULL );
00348 }
00349 }
00350 }
00351
00352 return( pThread );
00353 }
00354
00356
00358
00359 WorkerThread *ThreadPool::AddWorkerThread( bool bMakeAvailable, long nTimeout )
00360 {
00361 QString sName = m_sName + "_WorkerThread";
00362
00363 VERBOSE( VB_UPNP, QString( "ThreadPool:AddWorkerThread - %1" ).arg( sName ));
00364
00365 WorkerThread *pThread = CreateWorkerThread( this, sName );
00366
00367 if (pThread != NULL)
00368 {
00369 pThread->SetTimeout( nTimeout );
00370 pThread->start();
00371
00372 if (pThread->WaitForInitialized( 5000 ))
00373 {
00374
00375
00376
00377
00378 m_mList.lock();
00379 {
00380
00381 m_lstThreads.append( pThread );
00382
00383 if (bMakeAvailable)
00384 {
00385 m_lstAvailableThreads.append( pThread );
00386
00387 m_threadAvail.wakeAll();
00388 }
00389 }
00390 m_mList.unlock();
00391
00392 }
00393 else
00394 {
00395
00396
00397
00398
00399
00400
00401
00402 delete pThread;
00403 pThread = NULL;
00404 }
00405 }
00406
00407 return( pThread );
00408 }
00409
00411
00413
00414 void ThreadPool::ThreadAvailable ( WorkerThread *pThread )
00415 {
00416 m_mList.lock();
00417 m_lstAvailableThreads.prepend( pThread );
00418 m_mList.unlock();
00419
00420 m_threadAvail.wakeAll();
00421 }
00422
00424
00426
00427 void ThreadPool::ThreadTerminating ( WorkerThread *pThread )
00428 {
00429 m_mList.lock();
00430 {
00431 m_lstAvailableThreads.remove( pThread );
00432
00433
00434 }
00435 m_mList.unlock();
00436 }
00437