vdr
1.7.27
|
00001 /* 00002 * ringbuffer.c: A ring buffer 00003 * 00004 * See the main source file 'vdr.c' for copyright information and 00005 * how to reach the author. 00006 * 00007 * Parts of this file were inspired by the 'ringbuffy.c' from the 00008 * LinuxDVB driver (see linuxtv.org). 00009 * 00010 * $Id: ringbuffer.c 2.3 2009/11/22 11:14:36 kls Exp $ 00011 */ 00012 00013 #include "ringbuffer.h" 00014 #include <stdlib.h> 00015 #include <unistd.h> 00016 #include "tools.h" 00017 00018 // --- cRingBuffer ----------------------------------------------------------- 00019 00020 #define OVERFLOWREPORTDELTA 5 // seconds between reports 00021 #define PERCENTAGEDELTA 10 00022 #define PERCENTAGETHRESHOLD 70 00023 00024 cRingBuffer::cRingBuffer(int Size, bool Statistics) 00025 { 00026 size = Size; 00027 statistics = Statistics; 00028 getThreadTid = 0; 00029 maxFill = 0; 00030 lastPercent = 0; 00031 putTimeout = getTimeout = 0; 00032 lastOverflowReport = 0; 00033 overflowCount = overflowBytes = 0; 00034 } 00035 00036 cRingBuffer::~cRingBuffer() 00037 { 00038 if (statistics) 00039 dsyslog("buffer stats: %d (%d%%) used", maxFill, maxFill * 100 / (size - 1)); 00040 } 00041 00042 void cRingBuffer::UpdatePercentage(int Fill) 00043 { 00044 if (Fill > maxFill) 00045 maxFill = Fill; 00046 int percent = Fill * 100 / (Size() - 1) / PERCENTAGEDELTA * PERCENTAGEDELTA; // clamp down to nearest quantum 00047 if (percent != lastPercent) { 00048 if (percent >= PERCENTAGETHRESHOLD && percent > lastPercent || percent < PERCENTAGETHRESHOLD && lastPercent >= PERCENTAGETHRESHOLD) { 00049 dsyslog("buffer usage: %d%% (tid=%d)", percent, getThreadTid); 00050 lastPercent = percent; 00051 } 00052 } 00053 } 00054 00055 void cRingBuffer::WaitForPut(void) 00056 { 00057 if (putTimeout) 00058 readyForPut.Wait(putTimeout); 00059 } 00060 00061 void cRingBuffer::WaitForGet(void) 00062 { 00063 if (getTimeout) 00064 readyForGet.Wait(getTimeout); 00065 } 00066 00067 void cRingBuffer::EnablePut(void) 00068 { 00069 if (putTimeout && Free() > Size() / 3) 00070 readyForPut.Signal(); 00071 } 00072 00073 void cRingBuffer::EnableGet(void) 00074 { 00075 if (getTimeout && Available() > Size() / 3) 00076 readyForGet.Signal(); 00077 } 00078 00079 void cRingBuffer::SetTimeouts(int PutTimeout, int GetTimeout) 00080 { 00081 putTimeout = PutTimeout; 00082 getTimeout = GetTimeout; 00083 } 00084 00085 void cRingBuffer::ReportOverflow(int Bytes) 00086 { 00087 overflowCount++; 00088 overflowBytes += Bytes; 00089 if (time(NULL) - lastOverflowReport > OVERFLOWREPORTDELTA) { 00090 esyslog("ERROR: %d ring buffer overflow%s (%d bytes dropped)", overflowCount, overflowCount > 1 ? "s" : "", overflowBytes); 00091 overflowCount = overflowBytes = 0; 00092 lastOverflowReport = time(NULL); 00093 } 00094 } 00095 00096 // --- cRingBufferLinear ----------------------------------------------------- 00097 00098 #ifdef DEBUGRINGBUFFERS 00099 #define MAXRBLS 30 00100 #define DEBUGRBLWIDTH 45 00101 00102 cRingBufferLinear *cRingBufferLinear::RBLS[MAXRBLS] = { NULL }; 00103 00104 void cRingBufferLinear::AddDebugRBL(cRingBufferLinear *RBL) 00105 { 00106 for (int i = 0; i < MAXRBLS; i++) { 00107 if (!RBLS[i]) { 00108 RBLS[i] = RBL; 00109 break; 00110 } 00111 } 00112 } 00113 00114 void cRingBufferLinear::DelDebugRBL(cRingBufferLinear *RBL) 00115 { 00116 for (int i = 0; i < MAXRBLS; i++) { 00117 if (RBLS[i] == RBL) { 00118 RBLS[i] = NULL; 00119 break; 00120 } 00121 } 00122 } 00123 00124 void cRingBufferLinear::PrintDebugRBL(void) 00125 { 00126 bool printed = false; 00127 for (int i = 0; i < MAXRBLS; i++) { 00128 cRingBufferLinear *p = RBLS[i]; 00129 if (p) { 00130 printed = true; 00131 int lh = p->lastHead; 00132 int lt = p->lastTail; 00133 int h = lh * DEBUGRBLWIDTH / p->Size(); 00134 int t = lt * DEBUGRBLWIDTH / p->Size(); 00135 char buf[DEBUGRBLWIDTH + 10]; 00136 memset(buf, '-', DEBUGRBLWIDTH); 00137 if (lt <= lh) 00138 memset(buf + t, '*', max(h - t, 1)); 00139 else { 00140 memset(buf, '*', h); 00141 memset(buf + t, '*', DEBUGRBLWIDTH - t); 00142 } 00143 buf[t] = '<'; 00144 buf[h] = '>'; 00145 buf[DEBUGRBLWIDTH] = 0; 00146 printf("%2d %s %8d %8d %s\n", i, buf, p->lastPut, p->lastGet, p->description); 00147 } 00148 } 00149 if (printed) 00150 printf("\n"); 00151 } 00152 #endif 00153 00154 cRingBufferLinear::cRingBufferLinear(int Size, int Margin, bool Statistics, const char *Description) 00155 :cRingBuffer(Size, Statistics) 00156 { 00157 description = Description ? strdup(Description) : NULL; 00158 tail = head = margin = Margin; 00159 gotten = 0; 00160 buffer = NULL; 00161 if (Size > 1) { // 'Size - 1' must not be 0! 00162 if (Margin <= Size / 2) { 00163 buffer = MALLOC(uchar, Size); 00164 if (!buffer) 00165 esyslog("ERROR: can't allocate ring buffer (size=%d)", Size); 00166 Clear(); 00167 } 00168 else 00169 esyslog("ERROR: invalid margin for ring buffer (%d > %d)", Margin, Size / 2); 00170 } 00171 else 00172 esyslog("ERROR: invalid size for ring buffer (%d)", Size); 00173 #ifdef DEBUGRINGBUFFERS 00174 lastHead = head; 00175 lastTail = tail; 00176 lastPut = lastGet = -1; 00177 AddDebugRBL(this); 00178 #endif 00179 } 00180 00181 cRingBufferLinear::~cRingBufferLinear() 00182 { 00183 #ifdef DEBUGRINGBUFFERS 00184 DelDebugRBL(this); 00185 #endif 00186 free(buffer); 00187 free(description); 00188 } 00189 00190 int cRingBufferLinear::DataReady(const uchar *Data, int Count) 00191 { 00192 return Count >= margin ? Count : 0; 00193 } 00194 00195 int cRingBufferLinear::Available(void) 00196 { 00197 int diff = head - tail; 00198 return (diff >= 0) ? diff : Size() + diff - margin; 00199 } 00200 00201 void cRingBufferLinear::Clear(void) 00202 { 00203 tail = head = margin; 00204 #ifdef DEBUGRINGBUFFERS 00205 lastHead = head; 00206 lastTail = tail; 00207 lastPut = lastGet = -1; 00208 #endif 00209 maxFill = 0; 00210 EnablePut(); 00211 } 00212 00213 int cRingBufferLinear::Read(int FileHandle, int Max) 00214 { 00215 int Tail = tail; 00216 int diff = Tail - head; 00217 int free = (diff > 0) ? diff - 1 : Size() - head; 00218 if (Tail <= margin) 00219 free--; 00220 int Count = -1; 00221 errno = EAGAIN; 00222 if (free > 0) { 00223 if (0 < Max && Max < free) 00224 free = Max; 00225 Count = safe_read(FileHandle, buffer + head, free); 00226 if (Count > 0) { 00227 int Head = head + Count; 00228 if (Head >= Size()) 00229 Head = margin; 00230 head = Head; 00231 if (statistics) { 00232 int fill = head - Tail; 00233 if (fill < 0) 00234 fill = Size() + fill; 00235 else if (fill >= Size()) 00236 fill = Size() - 1; 00237 UpdatePercentage(fill); 00238 } 00239 } 00240 } 00241 #ifdef DEBUGRINGBUFFERS 00242 lastHead = head; 00243 lastPut = Count; 00244 #endif 00245 EnableGet(); 00246 if (free == 0) 00247 WaitForPut(); 00248 return Count; 00249 } 00250 00251 int cRingBufferLinear::Read(cUnbufferedFile *File, int Max) 00252 { 00253 int Tail = tail; 00254 int diff = Tail - head; 00255 int free = (diff > 0) ? diff - 1 : Size() - head; 00256 if (Tail <= margin) 00257 free--; 00258 int Count = -1; 00259 errno = EAGAIN; 00260 if (free > 0) { 00261 if (0 < Max && Max < free) 00262 free = Max; 00263 Count = File->Read(buffer + head, free); 00264 if (Count > 0) { 00265 int Head = head + Count; 00266 if (Head >= Size()) 00267 Head = margin; 00268 head = Head; 00269 if (statistics) { 00270 int fill = head - Tail; 00271 if (fill < 0) 00272 fill = Size() + fill; 00273 else if (fill >= Size()) 00274 fill = Size() - 1; 00275 UpdatePercentage(fill); 00276 } 00277 } 00278 } 00279 #ifdef DEBUGRINGBUFFERS 00280 lastHead = head; 00281 lastPut = Count; 00282 #endif 00283 EnableGet(); 00284 if (free == 0) 00285 WaitForPut(); 00286 return Count; 00287 } 00288 00289 int cRingBufferLinear::Put(const uchar *Data, int Count) 00290 { 00291 if (Count > 0) { 00292 int Tail = tail; 00293 int rest = Size() - head; 00294 int diff = Tail - head; 00295 int free = ((Tail < margin) ? rest : (diff > 0) ? diff : Size() + diff - margin) - 1; 00296 if (statistics) { 00297 int fill = Size() - free - 1 + Count; 00298 if (fill >= Size()) 00299 fill = Size() - 1; 00300 UpdatePercentage(fill); 00301 } 00302 if (free > 0) { 00303 if (free < Count) 00304 Count = free; 00305 if (Count >= rest) { 00306 memcpy(buffer + head, Data, rest); 00307 if (Count - rest) 00308 memcpy(buffer + margin, Data + rest, Count - rest); 00309 head = margin + Count - rest; 00310 } 00311 else { 00312 memcpy(buffer + head, Data, Count); 00313 head += Count; 00314 } 00315 } 00316 else 00317 Count = 0; 00318 #ifdef DEBUGRINGBUFFERS 00319 lastHead = head; 00320 lastPut = Count; 00321 #endif 00322 EnableGet(); 00323 if (Count == 0) 00324 WaitForPut(); 00325 } 00326 return Count; 00327 } 00328 00329 uchar *cRingBufferLinear::Get(int &Count) 00330 { 00331 int Head = head; 00332 if (getThreadTid <= 0) 00333 getThreadTid = cThread::ThreadId(); 00334 int rest = Size() - tail; 00335 if (rest < margin && Head < tail) { 00336 int t = margin - rest; 00337 memcpy(buffer + t, buffer + tail, rest); 00338 tail = t; 00339 rest = Head - tail; 00340 } 00341 int diff = Head - tail; 00342 int cont = (diff >= 0) ? diff : Size() + diff - margin; 00343 if (cont > rest) 00344 cont = rest; 00345 uchar *p = buffer + tail; 00346 if ((cont = DataReady(p, cont)) > 0) { 00347 Count = gotten = cont; 00348 return p; 00349 } 00350 WaitForGet(); 00351 return NULL; 00352 } 00353 00354 void cRingBufferLinear::Del(int Count) 00355 { 00356 if (Count > gotten) { 00357 esyslog("ERROR: invalid Count in cRingBufferLinear::Del: %d (limited to %d)", Count, gotten); 00358 Count = gotten; 00359 } 00360 if (Count > 0) { 00361 int Tail = tail; 00362 Tail += Count; 00363 gotten -= Count; 00364 if (Tail >= Size()) 00365 Tail = margin; 00366 tail = Tail; 00367 EnablePut(); 00368 } 00369 #ifdef DEBUGRINGBUFFERS 00370 lastTail = tail; 00371 lastGet = Count; 00372 #endif 00373 } 00374 00375 // --- cFrame ---------------------------------------------------------------- 00376 00377 cFrame::cFrame(const uchar *Data, int Count, eFrameType Type, int Index, uint32_t Pts) 00378 { 00379 count = abs(Count); 00380 type = Type; 00381 index = Index; 00382 pts = Pts; 00383 if (Count < 0) 00384 data = (uchar *)Data; 00385 else { 00386 data = MALLOC(uchar, count); 00387 if (data) 00388 memcpy(data, Data, count); 00389 else 00390 esyslog("ERROR: can't allocate frame buffer (count=%d)", count); 00391 } 00392 next = NULL; 00393 } 00394 00395 cFrame::~cFrame() 00396 { 00397 free(data); 00398 } 00399 00400 // --- cRingBufferFrame ------------------------------------------------------ 00401 00402 cRingBufferFrame::cRingBufferFrame(int Size, bool Statistics) 00403 :cRingBuffer(Size, Statistics) 00404 { 00405 head = NULL; 00406 currentFill = 0; 00407 } 00408 00409 cRingBufferFrame::~cRingBufferFrame() 00410 { 00411 Clear(); 00412 } 00413 00414 void cRingBufferFrame::Clear(void) 00415 { 00416 Lock(); 00417 cFrame *p; 00418 while ((p = Get()) != NULL) 00419 Drop(p); 00420 Unlock(); 00421 EnablePut(); 00422 EnableGet(); 00423 } 00424 00425 bool cRingBufferFrame::Put(cFrame *Frame) 00426 { 00427 if (Frame->Count() <= Free()) { 00428 Lock(); 00429 if (head) { 00430 Frame->next = head->next; 00431 head->next = Frame; 00432 head = Frame; 00433 } 00434 else { 00435 head = Frame->next = Frame; 00436 } 00437 currentFill += Frame->Count(); 00438 Unlock(); 00439 EnableGet(); 00440 return true; 00441 } 00442 return false; 00443 } 00444 00445 cFrame *cRingBufferFrame::Get(void) 00446 { 00447 Lock(); 00448 cFrame *p = head ? head->next : NULL; 00449 Unlock(); 00450 return p; 00451 } 00452 00453 void cRingBufferFrame::Delete(cFrame *Frame) 00454 { 00455 currentFill -= Frame->Count(); 00456 delete Frame; 00457 } 00458 00459 void cRingBufferFrame::Drop(cFrame *Frame) 00460 { 00461 Lock(); 00462 if (head) { 00463 if (Frame == head->next) { 00464 if (head->next != head) { 00465 head->next = Frame->next; 00466 Delete(Frame); 00467 } 00468 else { 00469 Delete(head); 00470 head = NULL; 00471 } 00472 } 00473 else 00474 esyslog("ERROR: attempt to drop wrong frame from ring buffer!"); 00475 } 00476 Unlock(); 00477 EnablePut(); 00478 } 00479 00480 int cRingBufferFrame::Available(void) 00481 { 00482 Lock(); 00483 int av = currentFill; 00484 Unlock(); 00485 return av; 00486 }