00001 #include <cstdio>
00002 #include <cstdlib>
00003 #include <unistd.h>
00004 #include <fcntl.h>
00005 #include <cassert>
00006 #include <cerrno>
00007 #include <sys/time.h>
00008 #include <sys/types.h>
00009 #include <sys/stat.h>
00010 #include <ctime>
00011 #include <cmath>
00012
00013 #include "fifowriter.h"
00014 #include "mythcontext.h"
00015 #include "compat.h"
00016
00017 #include "config.h"
00018 #ifdef CONFIG_DARWIN
00019 #include <sys/aio.h>
00020 #endif
00021
00022 #include <iostream>
00023 using namespace std;
00024
00025 FIFOWriter::FIFOWriter(int count, bool sync)
00026 {
00027 num_fifos = count;
00028 usesync = sync;
00029 maxblksize = new long[count];
00030 killwr = new int[count];
00031 fbcount = new int[count];
00032 fifo_buf = new struct fifo_buf *[count];
00033 fb_inptr = new struct fifo_buf *[count];
00034 fb_outptr = new struct fifo_buf *[count];
00035 fifothrds = new pthread_t[count];
00036 fifo_lock = new pthread_mutex_t [count];
00037 empty_cond = new pthread_cond_t[count];
00038 full_cond = new pthread_cond_t[count];
00039 for (int i = 0; i < count; i++)
00040 {
00041 pthread_cond_init(&empty_cond[i], NULL);
00042 pthread_cond_init(&full_cond[i], NULL);
00043 pthread_mutex_init(&fifo_lock[i], NULL);
00044 }
00045 filename = new QString [count];
00046 fbdesc = new QString [count];
00047 }
00048
00049 FIFOWriter::~FIFOWriter()
00050 {
00051 for (int i = 0; i <num_fifos; i++)
00052 {
00053 killwr[i] = 1;
00054
00055 pthread_mutex_lock(&fifo_lock[i]);
00056 pthread_cond_signal(&empty_cond[i]);
00057 pthread_mutex_unlock(&fifo_lock[i]);
00058
00059 pthread_join(fifothrds[i], NULL);
00060
00061 pthread_cond_destroy(&empty_cond[i]);
00062 pthread_cond_destroy(&full_cond[i]);
00063 pthread_mutex_destroy(&fifo_lock[i]);
00064 }
00065 delete [] maxblksize;
00066 delete [] fifo_buf;
00067 delete [] fb_inptr;
00068 delete [] fb_outptr;
00069 delete [] fifothrds;
00070 delete [] full_cond;
00071 delete [] empty_cond;
00072 delete [] fifo_lock;
00073 delete [] filename;
00074 delete [] fbdesc;
00075 delete [] killwr;
00076 delete [] fbcount;
00077 }
00078
00079 int FIFOWriter::FIFOInit(int id, QString desc, QString name, long size,
00080 int num_bufs)
00081 {
00082 if (id < 0 || id >= num_fifos)
00083 return false;
00084 if (mkfifo(name.ascii(),S_IREAD | S_IWRITE | S_IRGRP | S_IROTH) == -1)
00085 {
00086 cerr << "Couldn't create fifo for file: " << name.ascii() << endl;
00087 perror(NULL);
00088 return false;
00089 }
00090 VERBOSE(VB_GENERAL, QString("Created %1 fifo: %2").arg(desc).arg(name));
00091 maxblksize[id] = size;
00092 filename[id] = name;
00093 fbdesc[id] = desc;
00094 killwr[id] = 0;
00095 fbcount[id] = (usesync) ? 2 : num_bufs;
00096 fifo_buf[id] = new struct fifo_buf;
00097 struct fifo_buf *fifoptr = fifo_buf[id];
00098 for (int i = 0; i < fbcount[id]; i++)
00099 {
00100 fifoptr->data = new unsigned char[maxblksize[id]];
00101 if (i == fbcount[id] - 1)
00102 fifoptr->next = fifo_buf[id];
00103 else
00104 fifoptr->next = new struct fifo_buf;
00105 fifoptr = fifoptr->next;
00106 }
00107 fb_inptr[id] = fifo_buf[id];
00108 fb_outptr[id] = fifo_buf[id];
00109
00110 cur_id = id;
00111
00112 pthread_create(&fifothrds[id], NULL, FIFOStartThread, this);
00113 while (cur_id >= 0)
00114 usleep(50);
00115 if (cur_id == -1)
00116 return true;
00117 else
00118 return false;
00119 }
00120
00121 void *FIFOWriter::FIFOStartThread(void *param)
00122 {
00123 FIFOWriter *fifo = (FIFOWriter *)param;
00124 fifo->FIFOWriteThread();
00125
00126 return NULL;
00127 }
00128
00129 void FIFOWriter::FIFOWriteThread(void)
00130 {
00131 int id = cur_id;
00132 int fd = -1;
00133 pthread_mutex_lock(&fifo_lock[id]);
00134 cur_id = -1;
00135 while (1)
00136 {
00137 if (fb_inptr[id] == fb_outptr[id])
00138 pthread_cond_wait(&empty_cond[id],&fifo_lock[id]);
00139 pthread_mutex_unlock(&fifo_lock[id]);
00140 if (killwr[id])
00141 break;
00142 if (fd == -1)
00143 fd = open(filename[id].ascii(), O_WRONLY| O_SYNC);
00144 write(fd, fb_outptr[id]->data, fb_outptr[id]->blksize);
00145 pthread_mutex_lock(&fifo_lock[id]);
00146 fb_outptr[id] = fb_outptr[id]->next;
00147 pthread_cond_signal(&full_cond[id]);
00148 }
00149
00150 if (fd != -1)
00151 close(fd);
00152
00153 unlink(filename[id]);
00154
00155 while (fifo_buf[id]->next != fifo_buf[id])
00156 {
00157 struct fifo_buf *tmpfifo = fifo_buf[id]->next->next;
00158 delete [] fifo_buf[id]->next->data;
00159 delete fifo_buf[id]->next;
00160 fifo_buf[id]->next = tmpfifo;
00161 }
00162 delete [] fifo_buf[id]->data;
00163 delete fifo_buf[id];
00164 }
00165
00166 void FIFOWriter::FIFOWrite(int id, void *buffer, long blksize)
00167 {
00168 pthread_mutex_lock(&fifo_lock[id]);
00169 while (fb_inptr[id]->next == fb_outptr[id])
00170 {
00171 bool blocking = false;
00172 if (!usesync)
00173 {
00174 for(int i = 0; i < num_fifos; i++)
00175 {
00176 if (i == id)
00177 continue;
00178 if (fb_inptr[i] == fb_outptr[i])
00179 blocking = true;
00180 }
00181 }
00182
00183 if (blocking)
00184 {
00185 struct fifo_buf *tmpfifo;
00186 tmpfifo = fb_inptr[id]->next;
00187 fb_inptr[id]->next = new struct fifo_buf;
00188 fb_inptr[id]->next->data = new unsigned char[maxblksize[id]];
00189 fb_inptr[id]->next->next = tmpfifo;
00190 QString msg = QString("allocating additonal buffer for : %1(%2)")
00191 .arg(fbdesc[id]).arg(++fbcount[id]);
00192 VERBOSE(VB_FILE, msg);
00193 }
00194 else
00195 {
00196 struct timespec timeout;
00197 struct timeval now;
00198 gettimeofday(&now, NULL);
00199 timeout.tv_sec = now.tv_sec + 1;
00200 timeout.tv_nsec = now.tv_usec * 1000;
00201 pthread_cond_timedwait(&full_cond[id], &fifo_lock[id], &timeout);
00202 }
00203 }
00204 if (blksize > maxblksize[id])
00205 {
00206 delete [] fb_inptr[id]->data;
00207 fb_inptr[id]->data = new unsigned char[blksize];
00208 }
00209 memcpy(fb_inptr[id]->data,buffer,blksize);
00210 fb_inptr[id]->blksize = blksize;
00211 fb_inptr[id] = fb_inptr[id]->next;
00212 pthread_cond_signal(&empty_cond[id]);
00213 pthread_mutex_unlock(&fifo_lock[id]);
00214 }
00215
00216 void FIFOWriter::FIFODrain(void)
00217 {
00218 int count = 0;
00219 while (count < num_fifos)
00220 {
00221 count = 0;
00222 for (int i = 0; i < num_fifos; i++)
00223 {
00224 if (fb_inptr[i] == fb_outptr[i])
00225 {
00226 killwr[i] = 1;
00227 pthread_mutex_lock(&fifo_lock[i]);
00228 pthread_cond_signal(&empty_cond[i]);
00229 pthread_mutex_unlock(&fifo_lock[i]);
00230 count++;
00231 }
00232 }
00233 usleep(1000);
00234 }
00235 }