00001 #include <algorithm>
00002 #include <cassert>
00003
00004 #include "DeviceReadBuffer.h"
00005 #include "mythcontext.h"
00006 #include "tspacket.h"
00007 #include "compat.h"
00008
00009 #ifndef USING_MINGW
00010 #include <sys/poll.h>
00011 #endif
00012
00014 #define REPORT_RING_STATS 0
00015
00016 #define LOC QString("DevRdB(%1): ").arg(videodevice)
00017 #define LOC_ERR QString("DevRdB(%1) Error: ").arg(videodevice)
00018
00019 DeviceReadBuffer::DeviceReadBuffer(ReaderPausedCB *cb, bool use_poll)
00020 : videodevice(QString::null), _stream_fd(-1),
00021 readerPausedCB(cb),
00022
00023
00024 run(false), running(false),
00025 eof(false), error(false),
00026 request_pause(false), paused(false),
00027 using_poll(use_poll),
00028
00029 size(0), used(0),
00030 dev_read_size(0), min_read(0),
00031
00032 buffer(NULL), readPtr(NULL),
00033 writePtr(NULL), endPtr(NULL),
00034
00035
00036 max_used(0), avg_used(0),
00037 avg_cnt(0)
00038 {
00039 }
00040
00041 DeviceReadBuffer::~DeviceReadBuffer()
00042 {
00043 if (buffer)
00044 delete[] buffer;
00045 }
00046
00047 bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd)
00048 {
00049 QMutexLocker locker(&lock);
00050
00051 if (buffer)
00052 delete[] buffer;
00053
00054 videodevice = streamName;
00055 _stream_fd = streamfd;
00056
00057
00058 eof = false;
00059 error = false;
00060 request_pause = false;
00061 paused = false;
00062
00063 size = gContext->GetNumSetting("HDRingbufferSize",
00064 50 * TSPacket::SIZE) * 1024;
00065 used = 0;
00066 dev_read_size = TSPacket::SIZE * (using_poll ? 256 : 48);
00067 min_read = TSPacket::SIZE * 4;
00068
00069 buffer = new unsigned char[size + TSPacket::SIZE];
00070 readPtr = buffer;
00071 writePtr = buffer;
00072 endPtr = buffer + size;
00073
00074
00075 if (!buffer)
00076 return false;
00077 memset(buffer, 0xFF, size + TSPacket::SIZE);
00078
00079
00080 max_used = 0;
00081 avg_used = 0;
00082 avg_cnt = 0;
00083 lastReport.start();
00084
00085 VERBOSE(VB_RECORD, LOC + QString("buffer size %1 KB").arg(size/1024));
00086
00087 return true;
00088 }
00089
00090 void DeviceReadBuffer::Start(void)
00091 {
00092 lock.lock();
00093 bool was_running = running;
00094 lock.unlock();
00095 if (was_running)
00096 {
00097 VERBOSE(VB_IMPORTANT, LOC_ERR + "Start(): Already running.");
00098 SetRequestPause(false);
00099 return;
00100 }
00101
00102 pthread_create(&thread, NULL, boot_ringbuffer, this);
00103 }
00104
00105 void DeviceReadBuffer::Reset(const QString &streamName, int streamfd)
00106 {
00107 QMutexLocker locker(&lock);
00108
00109 videodevice = streamName;
00110 _stream_fd = streamfd;
00111
00112 used = 0;
00113 readPtr = buffer;
00114 writePtr = buffer;
00115 }
00116
00117 void DeviceReadBuffer::Stop(void)
00118 {
00119 bool was_running = IsRunning();
00120 lock.lock();
00121 run = false;
00122 lock.unlock();
00123
00124 if (!was_running)
00125 {
00126 VERBOSE(VB_IMPORTANT, LOC_ERR + "Stop(): Not running.");
00127 return;
00128 }
00129
00130 pthread_join(thread, NULL);
00131 }
00132
00133 void DeviceReadBuffer::SetRequestPause(bool req)
00134 {
00135 QMutexLocker locker(&lock);
00136 request_pause = req;
00137 }
00138
00139 void DeviceReadBuffer::SetPaused(bool val)
00140 {
00141 lock.lock();
00142 paused = val;
00143 lock.unlock();
00144 if (val)
00145 pauseWait.wakeAll();
00146 else
00147 unpauseWait.wakeAll();
00148 }
00149
00150 bool DeviceReadBuffer::IsPaused(void) const
00151 {
00152 QMutexLocker locker(&lock);
00153 return paused;
00154 }
00155
00156 bool DeviceReadBuffer::WaitForUnpause(int timeout)
00157 {
00158 if (IsPaused())
00159 unpauseWait.wait(timeout);
00160 return IsPaused();
00161 }
00162
00163 bool DeviceReadBuffer::IsPauseRequested(void) const
00164 {
00165 QMutexLocker locker(&lock);
00166 return request_pause;
00167 }
00168
00169 bool DeviceReadBuffer::IsRunning(void) const
00170 {
00171 QMutexLocker locker(&lock);
00172 return running;
00173 }
00174
00175 uint DeviceReadBuffer::GetUnused(void) const
00176 {
00177 QMutexLocker locker(&lock);
00178 return size - used;
00179 }
00180
00181 uint DeviceReadBuffer::GetUsed(void) const
00182 {
00183 QMutexLocker locker(&lock);
00184 return used;
00185 }
00186
00187 uint DeviceReadBuffer::GetContiguousUnused(void) const
00188 {
00189 QMutexLocker locker(&lock);
00190 return endPtr - writePtr;
00191 }
00192
00193 void DeviceReadBuffer::IncrWritePointer(uint len)
00194 {
00195 QMutexLocker locker(&lock);
00196 used += len;
00197 writePtr += len;
00198 writePtr = (writePtr == endPtr) ? buffer : writePtr;
00199 #if REPORT_RING_STATS
00200 max_used = max(used, max_used);
00201 avg_used = ((avg_used * avg_cnt) + used) / ++avg_cnt;
00202 #endif
00203 }
00204
00205 void DeviceReadBuffer::IncrReadPointer(uint len)
00206 {
00207 QMutexLocker locker(&lock);
00208 used -= len;
00209 readPtr += len;
00210 readPtr = (readPtr == endPtr) ? buffer : readPtr;
00211 assert(readPtr <= endPtr);
00212 }
00213
00214 void *DeviceReadBuffer::boot_ringbuffer(void *arg)
00215 {
00216 ((DeviceReadBuffer*) arg)->fill_ringbuffer();
00217 return NULL;
00218 }
00219
00220 void DeviceReadBuffer::fill_ringbuffer(void)
00221 {
00222 uint errcnt = 0;
00223
00224 lock.lock();
00225 run = true;
00226 running = true;
00227 lock.unlock();
00228
00229 while (run)
00230 {
00231 if (!HandlePausing())
00232 continue;
00233
00234 if (!IsOpen())
00235 {
00236 usleep(5000);
00237 continue;
00238 }
00239
00240 if (using_poll && !Poll())
00241 continue;
00242
00243
00244 size_t read_size =
00245 min(dev_read_size, (size_t) WaitForUnused(TSPacket::SIZE));
00246
00247
00248 if (read_size)
00249 {
00250 ssize_t len = read(_stream_fd, writePtr, read_size);
00251 if (!CheckForErrors(len, errcnt))
00252 {
00253 if (errcnt > 5)
00254 break;
00255 else
00256 continue;
00257 }
00258 errcnt = 0;
00259 IncrWritePointer(len);
00260 }
00261 }
00262
00263 lock.lock();
00264 running = false;
00265 lock.unlock();
00266 }
00267
00268 bool DeviceReadBuffer::HandlePausing(void)
00269 {
00270 if (IsPauseRequested())
00271 {
00272 SetPaused(true);
00273
00274 if (readerPausedCB)
00275 readerPausedCB->ReaderPaused(_stream_fd);
00276
00277 usleep(5000);
00278 return false;
00279 }
00280 else if (IsPaused())
00281 {
00282 Reset(videodevice, _stream_fd);
00283 SetPaused(false);
00284 }
00285 return true;
00286 }
00287
00288 bool DeviceReadBuffer::Poll(void) const
00289 {
00290 #ifdef USING_MINGW
00291 #warning mingw DeviceReadBuffer::Poll
00292 VERBOSE(VB_IMPORTANT, LOC_ERR +
00293 "mingw DeviceReadBuffer::Poll is not implemented");
00294 return false;
00295 #else
00296 bool retval = true;
00297 while (true)
00298 {
00299 struct pollfd polls;
00300 polls.fd = _stream_fd;
00301 polls.events = POLLIN;
00302 polls.revents = 0;
00303
00304 int ret = poll(&polls, 1 , 10 );
00305 if (IsPauseRequested() || !IsOpen() || !run)
00306 {
00307 retval = false;
00308 break;
00309 }
00310
00311 if (ret > 0)
00312 break;
00313 if ((-1 == ret) && (EOVERFLOW == errno))
00314 break;
00315
00316 if ((-1 == ret) && ((EAGAIN == errno) || (EINTR == errno)))
00317 continue;
00318 if (ret == 0)
00319 continue;
00320
00321 usleep(2500);
00322 }
00323 return retval;
00324 #endif
00325 }
00326
00327 bool DeviceReadBuffer::CheckForErrors(ssize_t len, uint &errcnt)
00328 {
00329 #ifdef USING_MINGW
00330 #warning mingw DeviceReadBuffer::CheckForErrors
00331 VERBOSE(VB_IMPORTANT, LOC_ERR +
00332 "mingw DeviceReadBuffer::CheckForErrors is not implemented");
00333 return false;
00334 #else
00335 if (len < 0)
00336 {
00337 if (EINTR == errno)
00338 return false;
00339 if (EAGAIN == errno)
00340 {
00341 usleep(2500);
00342 return false;
00343 }
00344 if (EOVERFLOW == errno)
00345 {
00346 VERBOSE(VB_IMPORTANT, LOC_ERR + "Driver buffers overflowed");
00347 return false;
00348 }
00349
00350 VERBOSE(VB_IMPORTANT, LOC_ERR +
00351 QString("Problem reading fd(%1)").arg(_stream_fd) + ENO);
00352
00353 if (++errcnt > 5)
00354 {
00355 lock.lock();
00356 error = true;
00357 lock.unlock();
00358 return false;
00359 }
00360
00361 usleep(500);
00362 return false;
00363 }
00364 else if (len == 0)
00365 {
00366 if (++errcnt > 5)
00367 {
00368 VERBOSE(VB_IMPORTANT, LOC +
00369 QString("End-Of-File? fd(%1)").arg(_stream_fd));
00370
00371 lock.lock();
00372 eof = true;
00373 lock.unlock();
00374
00375 return false;
00376 }
00377 usleep(500);
00378 return false;
00379 }
00380 return true;
00381 #endif
00382 }
00383
00390 uint DeviceReadBuffer::Read(unsigned char *buf, const uint count)
00391 {
00392 uint avail = WaitForUsed(min(count, (uint)min_read));
00393 size_t cnt = min(count, avail);
00394
00395 if (!cnt)
00396 return 0;
00397
00398 if (readPtr + cnt > endPtr)
00399 {
00400
00401 size_t len = endPtr - readPtr;
00402 if (len)
00403 {
00404 memcpy(buf, readPtr, len);
00405 buf += len;
00406 IncrReadPointer(len);
00407 }
00408 if (cnt > len)
00409 {
00410 len = cnt - len;
00411 memcpy(buf, readPtr, len);
00412 IncrReadPointer(len);
00413 }
00414 }
00415 else
00416 {
00417 memcpy(buf, readPtr, cnt);
00418 IncrReadPointer(cnt);
00419 }
00420
00421 #if REPORT_RING_STATS
00422 ReportStats();
00423 #endif
00424
00425 return cnt;
00426 }
00427
00432 uint DeviceReadBuffer::WaitForUnused(uint needed) const
00433 {
00434 size_t unused = GetUnused();
00435 size_t contig = GetContiguousUnused();
00436
00437 if (contig > TSPacket::SIZE)
00438 {
00439 while (unused < needed)
00440 {
00441 unused = GetUnused();
00442 if (IsPauseRequested() || !IsOpen() || !run)
00443 return 0;
00444 usleep(5000);
00445 }
00446 if (IsPauseRequested() || !IsOpen() || !run)
00447 return 0;
00448 contig = GetContiguousUnused();
00449 }
00450
00451 return min(contig, unused);
00452 }
00453
00458 uint DeviceReadBuffer::WaitForUsed(uint needed) const
00459 {
00460 size_t avail = GetUsed();
00461 while ((needed > avail) && running)
00462 {
00463 {
00464 QMutexLocker locker(&lock);
00465 avail = used;
00466 if (request_pause || error || eof)
00467 return 0;
00468 }
00469 usleep(5000);
00470 }
00471 return avail;
00472 }
00473
00474 void DeviceReadBuffer::ReportStats(void)
00475 {
00476 #if REPORT_RING_STATS
00477 if (lastReport.elapsed() > 20*1000 )
00478 {
00479 QMutexLocker locker(&lock);
00480 double rsize = 100.0 / size;
00481 QString msg = QString("fill avg(%1%) ").arg(avg_used*rsize,3,'f',0);
00482 msg += QString("fill max(%2%) ").arg(max_used*rsize,3,'f',0);
00483 msg += QString("samples(%3)").arg(avg_cnt);
00484
00485 avg_used = 0;
00486 avg_cnt = 0;
00487 max_used = 0;
00488 lastReport.start();
00489
00490 VERBOSE(VB_IMPORTANT, LOC + msg);
00491 }
00492 #endif
00493 }