00001 #include <unistd.h>
00002
00003 #include <qsqldatabase.h>
00004 #include <qsqlquery.h>
00005 #include <qdatetime.h>
00006 #include <qfileinfo.h>
00007 #include <qregexp.h>
00008
00009 #include <sys/types.h>
00010 #include <sys/stat.h>
00011
00012 #include <iostream>
00013 #include <cstdlib>
00014 using namespace std;
00015
00016 #include "exitcodes.h"
00017 #include "jobqueue.h"
00018 #include "programinfo.h"
00019 #include "libmyth/mythcontext.h"
00020 #include "libmyth/util.h"
00021 #include "NuppelVideoPlayer.h"
00022 #include "mythdbcon.h"
00023 #include "previewgenerator.h"
00024 #include "compat.h"
00025
00026 #define LOC QString("JobQueue: ")
00027 #define LOC_ERR QString("JobQueue Error: ")
00028
00029 JobQueue::JobQueue(bool master)
00030 {
00031 isMaster = master;
00032 m_hostname = gContext->GetHostName();
00033
00034 jobQueueCPU = gContext->GetNumSetting("JobQueueCPU", 0);
00035
00036 jobsRunning = 0;
00037
00038 #ifndef USING_VALGRIND
00039 queueThreadCondLock.lock();
00040 pthread_create(&queueThread, NULL, QueueProcesserThread, this);
00041 queueThreadCond.wait(&queueThreadCondLock);
00042 queueThreadCondLock.unlock();
00043 #else
00044 VERBOSE(VB_IMPORTANT, LOC_ERR + "The JobQueue has been disabled because "
00045 "you compiled with the --enable-valgrind option.");
00046 #endif // USING_VALGRIND
00047
00048 gContext->addListener(this);
00049 }
00050
00051 JobQueue::~JobQueue(void)
00052 {
00053 pthread_cancel(queueThread);
00054 pthread_join(queueThread, NULL);
00055
00056 gContext->removeListener(this);
00057 }
00058
00059 void JobQueue::customEvent(QCustomEvent *e)
00060 {
00061 if ((MythEvent::Type)(e->type()) == MythEvent::MythEventMessage)
00062 {
00063 MythEvent *me = (MythEvent *)e;
00064 QString message = me->Message();
00065
00066 if (message.left(9) == "LOCAL_JOB")
00067 {
00068
00069 QString msg;
00070 message = message.simplifyWhiteSpace();
00071 QStringList tokens = QStringList::split(" ", message);
00072 QString action = tokens[1];
00073 int jobType = JOB_UNKNOWN;
00074 int jobID = -1;
00075 QString chanid;
00076
00077 QDateTime starttime;
00078 QString detectionHost = "";
00079
00080 if (tokens[2] == "ID")
00081 {
00082 jobID = tokens[3].toInt();
00083 GetJobInfoFromID(jobID, jobType, chanid, starttime);
00084 }
00085 else
00086 {
00087 jobType = tokens[2].toInt();
00088 chanid = tokens[3];
00089 starttime = QDateTime::fromString(tokens[4], Qt::ISODate);
00090 detectionHost = tokens[5];
00091 }
00092
00093 if (jobType == JOB_UNKNOWN)
00094 {
00095 msg = QString("Unable to determine job info for message: "
00096 "%1. Program will not be flagged.")
00097 .arg(message);
00098 VERBOSE(VB_IMPORTANT, LOC_ERR + msg);
00099 return;
00100 }
00101
00102 QString key = GetJobQueueKey(chanid, starttime);
00103
00104 msg = QString("Received message '%1'").arg(message);
00105 VERBOSE(VB_JOBQUEUE, LOC + msg);
00106
00107 if (((action == "STOP") ||
00108 (action == "PAUSE") ||
00109 (action == "RESTART") ||
00110 (action == "RESUME" )) &&
00111 (jobControlFlags.contains(key)) &&
00112 (runningJobTypes.contains(key)) &&
00113 (runningJobTypes[key] == jobType))
00114 {
00115 controlFlagsLock.lock();
00116
00117 if (action == "STOP")
00118 *(jobControlFlags[key]) = JOB_STOP;
00119 else if (action == "PAUSE")
00120 *(jobControlFlags[key]) = JOB_PAUSE;
00121 else if (action == "RESUME")
00122 *(jobControlFlags[key]) = JOB_RUN;
00123 else if (action == "RESTART")
00124 *(jobControlFlags[key]) = JOB_RESTART;
00125
00126 controlFlagsLock.unlock();
00127 }
00128 }
00129 }
00130 }
00131
00132 void JobQueue::RunQueueProcesser()
00133 {
00134 queueThreadCondLock.lock();
00135 queueThreadCond.wakeAll();
00136 queueThreadCondLock.unlock();
00137
00138 RecoverQueue();
00139
00140 sleep(10);
00141
00142 ProcessQueue();
00143 }
00144
00145 void *JobQueue::QueueProcesserThread(void *param)
00146 {
00147 JobQueue *jobqueue = (JobQueue *)param;
00148 jobqueue->RunQueueProcesser();
00149
00150 return NULL;
00151 }
00152
00153 void JobQueue::ProcessQueue(void)
00154 {
00155 VERBOSE(VB_JOBQUEUE, LOC + "ProcessQueue() started");
00156
00157 QString chanid;
00158 QDateTime starttime;
00159 QDateTime schedruntime;
00160 QString startts;
00161 int type;
00162 int id;
00163 int cmds;
00164 int flags;
00165 int status;
00166 QString hostname;
00167 int sleepTime;
00168
00169 QMap<QString, int> jobStatus;
00170 int maxJobs;
00171 QString message;
00172 QMap<int, JobQueueEntry> jobs;
00173 bool atMax = false;
00174 bool inTimeWindow = true;
00175 bool startedJobAlready = false;
00176
00177 for (;;)
00178 {
00179 pthread_testcancel();
00180
00181 startedJobAlready = false;
00182 sleepTime = gContext->GetNumSetting("JobQueueCheckFrequency", 30);
00183 maxJobs = gContext->GetNumSetting("JobQueueMaxSimultaneousJobs", 3);
00184 VERBOSE(VB_JOBQUEUE, LOC +
00185 QString("Currently set to run up to %1 job(s) max.")
00186 .arg(maxJobs));
00187
00188 jobStatus.clear();
00189
00190 GetJobsInQueue(jobs);
00191
00192 if (jobs.size())
00193 {
00194 inTimeWindow = InJobRunWindow();
00195 jobsRunning = 0;
00196 for (unsigned int x = 0; x < jobs.size(); x++)
00197 {
00198 status = jobs[x].status;
00199 hostname = jobs[x].hostname;
00200
00201 if (((status == JOB_RUNNING) ||
00202 (status == JOB_STARTING) ||
00203 (status == JOB_PAUSED)) &&
00204 (hostname == m_hostname))
00205 jobsRunning++;
00206 }
00207
00208 message = QString("Currently Running %1 jobs.")
00209 .arg(jobsRunning);
00210 if (!inTimeWindow)
00211 {
00212 message += QString(" Jobs in Queue, but we are outside of the "
00213 "Job Queue time window, no new jobs can be "
00214 "started.");
00215 VERBOSE(VB_JOBQUEUE, LOC + message);
00216 }
00217 else if (jobsRunning >= maxJobs)
00218 {
00219 message += " (At Maximum, no new jobs can be started until "
00220 "a running job completes)";
00221
00222 if (!atMax)
00223 VERBOSE(VB_JOBQUEUE, LOC + message);
00224
00225 atMax = true;
00226 }
00227 else
00228 {
00229 VERBOSE(VB_JOBQUEUE, LOC + message);
00230 atMax = false;
00231 }
00232
00233
00234 for (unsigned int x = 0;
00235 (x < jobs.size()) && (jobsRunning < maxJobs); x++)
00236 {
00237 id = jobs[x].id;
00238 chanid = jobs[x].chanid;
00239 startts = jobs[x].startts;
00240 starttime = jobs[x].starttime;
00241 schedruntime = jobs[x].schedruntime;
00242 type = jobs[x].type;
00243 cmds = jobs[x].cmds;
00244 flags = jobs[x].flags;
00245 status = jobs[x].status;
00246 hostname = jobs[x].hostname;
00247
00248 QString key = GetJobQueueKey(chanid, startts);
00249
00250
00251 if ((inTimeWindow) &&
00252 (hostname != "") &&
00253 (hostname != m_hostname))
00254 {
00255
00256
00257
00258 jobStatus[key] = status;
00259
00260 message = QString("Skipping '%1' job for chanid "
00261 "%2 @ %3, should run on '%4' instead")
00262 .arg(JobText(type))
00263 .arg(chanid).arg(startts)
00264 .arg(hostname);
00265 VERBOSE(VB_JOBQUEUE, LOC + message);
00266 continue;
00267 }
00268
00269
00270 if ((inTimeWindow) &&
00271 (jobStatus.contains(key)) &&
00272 (!(jobStatus[key] & JOB_DONE)))
00273 {
00274 message = QString("Skipping '%1' job for chanid "
00275 "%2 @ %3, there is another job for "
00276 "this recording with a status of '%4'")
00277 .arg(JobText(type))
00278 .arg(chanid).arg(startts)
00279 .arg(StatusText(jobStatus[key]));
00280 VERBOSE(VB_JOBQUEUE, LOC + message);
00281 continue;
00282 }
00283
00284 jobStatus[key] = status;
00285
00286
00287 if ((inTimeWindow) && (!AllowedToRun(jobs[x])))
00288 {
00289 message = QString("Skipping '%1' job for chanid "
00290 "%2 @ %3, not allowed to run on this "
00291 "backend.")
00292 .arg(JobText(type))
00293 .arg(chanid).arg(startts);
00294 VERBOSE(VB_JOBQUEUE, LOC + message);
00295 continue;
00296 }
00297
00298
00299 if (schedruntime > QDateTime::currentDateTime())
00300 {
00301 message = QString("Skipping '%1' job for chanid "
00302 "%2 @ %3, this job is not scheduled to "
00303 "run until %4.")
00304 .arg(JobText(type))
00305 .arg(chanid).arg(startts)
00306 .arg(schedruntime.toString(Qt::ISODate));
00307 VERBOSE(VB_JOBQUEUE, LOC + message);
00308 continue;
00309 }
00310
00311 if (cmds & JOB_STOP)
00312 {
00313
00314
00315 if (status != JOB_QUEUED) {
00316 message = QString("Stopping '%1' job for "
00317 "chanid %2 @ %3")
00318 .arg(JobText(type))
00319 .arg(chanid).arg(startts);
00320 VERBOSE(VB_JOBQUEUE, LOC + message);
00321
00322 controlFlagsLock.lock();
00323 if (jobControlFlags.contains(key))
00324 *(jobControlFlags[key]) = JOB_STOP;
00325 controlFlagsLock.unlock();
00326
00327
00328 continue;
00329
00330
00331
00332
00333 } else {
00334 message = QString("Cancelling '%1' job for "
00335 "chanid %2 @ %3")
00336 .arg(JobText(type))
00337 .arg(chanid).arg(startts);
00338 VERBOSE(VB_JOBQUEUE, LOC + message);
00339
00340
00341
00342
00343
00344 if (!ChangeJobHost(id, m_hostname))
00345 {
00346 message = QString("Unable to clain '%1' job "
00347 "for chanid %2 @ %3.")
00348 .arg(JobText(type))
00349 .arg(chanid).arg(startts);
00350 VERBOSE(VB_JOBQUEUE, LOC_ERR + message);
00351 continue;
00352 }
00353
00354 ChangeJobStatus(id, JOB_CANCELLED, "");
00355 ChangeJobCmds(id, JOB_RUN);
00356 continue;
00357 }
00358 }
00359
00360 if ((cmds & JOB_PAUSE) && (status != JOB_QUEUED))
00361 {
00362 message = QString("Pausing '%1' job for chanid %2 @ %3")
00363 .arg(JobText(type))
00364 .arg(chanid).arg(startts);
00365 VERBOSE(VB_JOBQUEUE, LOC + message);
00366
00367 controlFlagsLock.lock();
00368 if (jobControlFlags.contains(key))
00369 *(jobControlFlags[key]) = JOB_PAUSE;
00370 controlFlagsLock.unlock();
00371
00372 ChangeJobCmds(id, JOB_RUN);
00373 continue;
00374 }
00375
00376 if ((cmds & JOB_RESTART) && (status != JOB_QUEUED))
00377 {
00378 message = QString("Restart '%1' job for chanid %2 @ %3")
00379 .arg(JobText(type))
00380 .arg(chanid).arg(startts);
00381 VERBOSE(VB_JOBQUEUE, LOC + message);
00382
00383 controlFlagsLock.lock();
00384 if (jobControlFlags.contains(key))
00385 *(jobControlFlags[key]) = JOB_RUN;
00386 controlFlagsLock.unlock();
00387
00388 ChangeJobCmds(id, JOB_RUN);
00389 continue;
00390 }
00391
00392 if (status != JOB_QUEUED)
00393 {
00394
00395 if (hostname == "")
00396 {
00397 message = QString("Resetting '%1' job for "
00398 "chanid %2 @ %3 to %4 status, "
00399 "because no hostname is set.")
00400 .arg(JobText(type))
00401 .arg(chanid).arg(startts)
00402 .arg(StatusText(JOB_QUEUED));
00403 VERBOSE(VB_JOBQUEUE, LOC + message);
00404
00405 ChangeJobStatus(id, JOB_QUEUED, "");
00406 ChangeJobCmds(id, JOB_RUN);
00407 }
00408 else if (inTimeWindow)
00409 {
00410 message = QString("Skipping '%1' job for chanid %2 @ "
00411 "%3, current job status is '%4'")
00412 .arg(JobText(type))
00413 .arg(chanid).arg(startts)
00414 .arg(StatusText(status));
00415 VERBOSE(VB_JOBQUEUE, LOC + message);
00416 }
00417 continue;
00418 }
00419
00420
00421 if (startedJobAlready)
00422 continue;
00423
00424 if ((inTimeWindow) &&
00425 (hostname == "") &&
00426 (!ChangeJobHost(id, m_hostname)))
00427 {
00428 message = QString("Unable to claim '%1' job "
00429 "for chanid %2 @ %3.")
00430 .arg(JobText(type))
00431 .arg(chanid).arg(startts);
00432 VERBOSE(VB_JOBQUEUE, LOC_ERR + message);
00433 continue;
00434 }
00435
00436 if (!inTimeWindow)
00437 {
00438 message = QString("Skipping '%1' job for chanid "
00439 "%2 @ %3, Current time is outside of the "
00440 "Job Queue processing window.")
00441 .arg(JobText(type))
00442 .arg(chanid).arg(startts);
00443 VERBOSE(VB_JOBQUEUE, LOC + message);
00444 continue;
00445 }
00446
00447 message = QString("Processing '%1' job for chanid "
00448 "%2 @ %3, current status is '%4'")
00449 .arg(JobText(type))
00450 .arg(chanid).arg(startts)
00451 .arg(StatusText(status));
00452 VERBOSE(VB_JOBQUEUE, LOC + message);
00453
00454 ProcessJob(id, type, chanid, starttime);
00455
00456 startedJobAlready = true;
00457 }
00458 }
00459
00460 if (startedJobAlready)
00461 sleep(5);
00462 else
00463 sleep(sleepTime);
00464 }
00465 }
00466
00467 bool JobQueue::QueueRecordingJobs(ProgramInfo *pinfo, int jobTypes)
00468 {
00469 if (!pinfo)
00470 return false;
00471
00472 if (jobTypes == JOB_NONE)
00473 jobTypes = pinfo->GetAutoRunJobs();
00474
00475 if (pinfo->chancommfree)
00476 jobTypes &= (~JOB_COMMFLAG);
00477
00478 if (jobTypes != JOB_NONE)
00479 {
00480 QString jobHost = "";
00481
00482 if (gContext->GetNumSetting("JobsRunOnRecordHost", 0))
00483 jobHost = pinfo->hostname;
00484
00485 return JobQueue::QueueJobs(jobTypes, pinfo->chanid, pinfo->recstartts,
00486 "", "", jobHost);
00487 }
00488 else
00489 return false;
00490
00491 return true;
00492 }
00493
00494 bool JobQueue::QueueJob(int jobType, QString chanid, QDateTime starttime,
00495 QString args, QString comment, QString host,
00496 int flags, int status, QDateTime schedruntime)
00497 {
00498 int tmpStatus = JOB_UNKNOWN;
00499 int tmpCmd = JOB_UNKNOWN;
00500 int jobID = -1;
00501
00502 if(!schedruntime.isValid())
00503 schedruntime = QDateTime::currentDateTime();
00504
00505 MSqlQuery query(MSqlQuery::InitCon());
00506
00507 query.prepare("SELECT status, id, cmds FROM jobqueue "
00508 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00509 "AND type = :JOBTYPE;");
00510 query.bindValue(":CHANID", chanid);
00511 query.bindValue(":STARTTIME", starttime);
00512 query.bindValue(":JOBTYPE", jobType);
00513
00514 query.exec();
00515
00516 if (!query.isActive())
00517 {
00518 MythContext::DBError("Error in JobQueue::QueueJob()", query);
00519 return false;
00520 }
00521 else
00522 {
00523 if ((query.size() > 0) && query.next())
00524 {
00525 tmpStatus = query.value(0).toInt();
00526 jobID = query.value(1).toInt();
00527 tmpCmd = query.value(2).toInt();
00528 }
00529 }
00530 switch (tmpStatus)
00531 {
00532 case JOB_UNKNOWN:
00533 break;
00534 case JOB_STARTING:
00535 case JOB_RUNNING:
00536 case JOB_PAUSED:
00537 case JOB_STOPPING:
00538 case JOB_ERRORING:
00539 case JOB_ABORTING:
00540 return false;
00541 default:
00542 DeleteJob(jobID);
00543 break;
00544 }
00545 if (! (tmpStatus & JOB_DONE) && (tmpCmd & JOB_STOP))
00546 return false;
00547
00548 query.prepare("INSERT INTO jobqueue (chanid, starttime, inserttime, type, "
00549 "status, statustime, schedruntime, hostname, args, comment, "
00550 "flags) "
00551 "VALUES (:CHANID, :STARTTIME, now(), :JOBTYPE, :STATUS, "
00552 "now(), :SCHEDRUNTIME, :HOST, :ARGS, :COMMENT, :FLAGS);");
00553
00554 query.bindValue(":CHANID", chanid);
00555 query.bindValue(":STARTTIME", starttime);
00556 query.bindValue(":JOBTYPE", jobType);
00557 query.bindValue(":STATUS", status);
00558 query.bindValue(":SCHEDRUNTIME", schedruntime);
00559 query.bindValue(":HOST", host);
00560 query.bindValue(":ARGS", args);
00561 query.bindValue(":COMMENT", comment);
00562 query.bindValue(":FLAGS", flags);
00563
00564 query.exec();
00565
00566 if (!query.isActive())
00567 {
00568 MythContext::DBError("Error in JobQueue::StartJob()", query);
00569 return false;
00570 }
00571
00572 return true;
00573 }
00574
00575 bool JobQueue::QueueJobs(int jobTypes, QString chanid, QDateTime starttime,
00576 QString args, QString comment, QString host)
00577 {
00578 if (gContext->GetNumSetting("AutoTranscodeBeforeAutoCommflag", 0))
00579 {
00580 if (jobTypes & JOB_TRANSCODE)
00581 QueueJob(JOB_TRANSCODE, chanid, starttime, args, comment, host);
00582 if (jobTypes & JOB_COMMFLAG)
00583 QueueJob(JOB_COMMFLAG, chanid, starttime, args, comment, host);
00584 }
00585 else
00586 {
00587 if (jobTypes & JOB_COMMFLAG)
00588 QueueJob(JOB_COMMFLAG, chanid, starttime, args, comment, host);
00589 if (jobTypes & JOB_TRANSCODE)
00590 {
00591 QDateTime schedruntime = QDateTime::currentDateTime();
00592
00593 int defer = gContext->GetNumSetting("DeferAutoTranscodeDays", 0);
00594 if (defer)
00595 {
00596 schedruntime = schedruntime.addDays(defer);
00597 schedruntime.setTime(QTime::QTime(0,0));
00598 }
00599
00600 QueueJob(JOB_TRANSCODE, chanid, starttime, args, comment, host,
00601 0, JOB_QUEUED, schedruntime);
00602 }
00603 }
00604
00605 if (jobTypes & JOB_USERJOB1)
00606 QueueJob(JOB_USERJOB1, chanid, starttime, args, comment, host);
00607 if (jobTypes & JOB_USERJOB2)
00608 QueueJob(JOB_USERJOB2, chanid, starttime, args, comment, host);
00609 if (jobTypes & JOB_USERJOB3)
00610 QueueJob(JOB_USERJOB3, chanid, starttime, args, comment, host);
00611 if (jobTypes & JOB_USERJOB4)
00612 QueueJob(JOB_USERJOB4, chanid, starttime, args, comment, host);
00613
00614 return true;
00615 }
00616
00617 int JobQueue::GetJobID(int jobType, QString chanid, QDateTime starttime)
00618 {
00619 MSqlQuery query(MSqlQuery::InitCon());
00620
00621 query.prepare("SELECT id FROM jobqueue "
00622 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00623 "AND type = :JOBTYPE;");
00624 query.bindValue(":CHANID", chanid);
00625 query.bindValue(":STARTTIME", starttime);
00626 query.bindValue(":JOBTYPE", jobType);
00627
00628 query.exec();
00629
00630 if (!query.isActive())
00631 {
00632 MythContext::DBError("Error in JobQueue::GetJobID()", query);
00633 return -1;
00634 }
00635 else
00636 {
00637 if ((query.size() > 0) && query.next())
00638 return query.value(0).toInt();
00639 }
00640
00641 return -1;
00642 }
00643
00644 bool JobQueue::GetJobInfoFromID(int jobID, int &jobType, QString &chanid,
00645 QDateTime &starttime)
00646 {
00647 MSqlQuery query(MSqlQuery::InitCon());
00648
00649 query.prepare("SELECT type, chanid, starttime FROM jobqueue "
00650 "WHERE id = :ID;");
00651
00652 query.bindValue(":ID", jobID);
00653
00654 query.exec();
00655
00656 if (!query.isActive())
00657 {
00658 MythContext::DBError("Error in JobQueue::GetJobID()", query);
00659 return false;
00660 }
00661 else
00662 {
00663 if ((query.size() > 0) && query.next())
00664 {
00665 jobType = query.value(0).toInt();
00666 chanid = query.value(1).toString();
00667 starttime = query.value(2).toDateTime();
00668 return true;
00669 }
00670 }
00671
00672 return false;
00673 }
00674
00675 bool JobQueue::GetJobInfoFromID(int jobID, int &jobType, QString &chanid,
00676 QString &starttime)
00677 {
00678 QDateTime tmpStarttime;
00679 bool result = JobQueue::GetJobInfoFromID(jobID, jobType, chanid,
00680 tmpStarttime);
00681 if (result)
00682 starttime = tmpStarttime.toString("yyyyMMddhhmmss");
00683
00684 return result;
00685 }
00686
00687 bool JobQueue::PauseJob(int jobID)
00688 {
00689 QString message = QString("GLOBAL_JOB PAUSE ID %1").arg(jobID);
00690 MythEvent me(message);
00691 gContext->dispatch(me);
00692
00693 return ChangeJobCmds(jobID, JOB_PAUSE);
00694 }
00695
00696 bool JobQueue::ResumeJob(int jobID)
00697 {
00698 QString message = QString("GLOBAL_JOB RESUME ID %1").arg(jobID);
00699 MythEvent me(message);
00700 gContext->dispatch(me);
00701
00702 return ChangeJobCmds(jobID, JOB_RESUME);
00703 }
00704
00705 bool JobQueue::RestartJob(int jobID)
00706 {
00707 QString message = QString("GLOBAL_JOB RESTART ID %1").arg(jobID);
00708 MythEvent me(message);
00709 gContext->dispatch(me);
00710
00711 return ChangeJobCmds(jobID, JOB_RESTART);
00712 }
00713
00714 bool JobQueue::StopJob(int jobID)
00715 {
00716 QString message = QString("GLOBAL_JOB STOP ID %1").arg(jobID);
00717 MythEvent me(message);
00718 gContext->dispatch(me);
00719
00720 return ChangeJobCmds(jobID, JOB_STOP);
00721 }
00722
00723 bool JobQueue::DeleteAllJobs(QString chanid, QDateTime starttime)
00724 {
00725 QString key = GetJobQueueKey(chanid, starttime);
00726 MSqlQuery query(MSqlQuery::InitCon());
00727 QString message;
00728
00729 query.prepare("UPDATE jobqueue SET status = :CANCELLED "
00730 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00731 "AND status = :QUEUED;");
00732
00733 query.bindValue(":CANCELLED", JOB_CANCELLED);
00734 query.bindValue(":CHANID", chanid);
00735 query.bindValue(":STARTTIME", starttime);
00736 query.bindValue(":QUEUED", JOB_QUEUED);
00737
00738 query.exec();
00739
00740 if (!query.isActive())
00741 MythContext::DBError("Cancel Pending Jobs", query);
00742
00743 query.prepare("UPDATE jobqueue SET cmds = :CMD "
00744 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00745 "AND status <> :CANCELLED;");
00746 query.bindValue(":CMD", JOB_STOP);
00747 query.bindValue(":CHANID", chanid);
00748 query.bindValue(":STARTTIME", starttime);
00749 query.bindValue(":CANCELLED", JOB_CANCELLED);
00750
00751 if (!query.exec())
00752 {
00753 MythContext::DBError("Stop Unfinished Jobs", query);
00754 return false;
00755 }
00756
00757
00758 bool jobsAreRunning = true;
00759 int totalSlept = 0;
00760 int maxSleep = 90;
00761 while (jobsAreRunning && totalSlept < maxSleep)
00762 {
00763 usleep(1000);
00764 query.prepare("SELECT id FROM jobqueue "
00765 "WHERE chanid = :CHANID and starttime = :STARTTIME "
00766 "AND status NOT IN "
00767 "(:FINISHED,:ABORTED,:ERRORED,:CANCELLED);");
00768 query.bindValue(":CHANID", chanid);
00769 query.bindValue(":STARTTIME", starttime);
00770 query.bindValue(":FINISHED", JOB_FINISHED);
00771 query.bindValue(":ABORTED", JOB_ABORTED);
00772 query.bindValue(":ERRORED", JOB_ERRORED);
00773 query.bindValue(":CANCELLED", JOB_CANCELLED);
00774
00775 query.exec();
00776
00777 if (!query.exec() || !query.isActive())
00778 {
00779 MythContext::DBError("Stop Unfinished Jobs", query);
00780 return false;
00781 }
00782
00783 if (query.size() == 0)
00784 {
00785 jobsAreRunning = false;
00786 break;
00787 }
00788 else if ((totalSlept % 5) == 0)
00789 {
00790 message = QString("Waiting on %1 jobs still running for "
00791 "chanid %2 @ %3").arg(query.size())
00792 .arg(chanid).arg(starttime.toString());
00793 VERBOSE(VB_JOBQUEUE, LOC + message);
00794 }
00795
00796 sleep(1);
00797 totalSlept++;
00798 }
00799
00800 if (totalSlept <= maxSleep)
00801 {
00802 query.prepare("DELETE FROM jobqueue "
00803 "WHERE chanid = :CHANID AND starttime = :STARTTIME;");
00804 query.bindValue(":CHANID", chanid);
00805 query.bindValue(":STARTTIME", starttime);
00806
00807 query.exec();
00808
00809 if (!query.isActive())
00810 MythContext::DBError("Delete All Jobs", query);
00811 }
00812 else
00813 {
00814 query.prepare("SELECT id, type, status, comment FROM jobqueue "
00815 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00816 "AND status <> :CANCELLED ORDER BY id;");
00817
00818 query.bindValue(":CHANID", chanid);
00819 query.bindValue(":STARTTIME", starttime);
00820 query.bindValue(":CANCELLED", JOB_CANCELLED);
00821
00822 if (!query.exec() || !query.isActive())
00823 {
00824 MythContext::DBError("Error in JobQueue::DeleteAllJobs(), Unable "
00825 "to query list of Jobs left in Queue.", query);
00826 return 0;
00827 }
00828
00829 VERBOSE(VB_IMPORTANT, LOC_ERR +
00830 QString( "In DeleteAllJobs: There are Jobs "
00831 "left in the JobQueue that are still running for "
00832 "chanid %1 @ %2.").arg(chanid)
00833 .arg(starttime.toString()));
00834
00835 if (query.numRowsAffected() > 0)
00836 {
00837 while (query.next())
00838 {
00839 VERBOSE(VB_IMPORTANT, LOC_ERR +
00840 QString("Job ID %1: '%2' with status '%3' and "
00841 "comment '%4'")
00842 .arg(query.value(0).toInt())
00843 .arg(JobText(query.value(1).toInt()))
00844 .arg(StatusText(query.value(2).toInt()))
00845 .arg(query.value(3).toString()));
00846 }
00847 }
00848
00849 return false;
00850 }
00851
00852 return true;
00853 }
00854
00855 bool JobQueue::DeleteJob(int jobID)
00856 {
00857 if (jobID < 0)
00858 return false;
00859
00860 MSqlQuery query(MSqlQuery::InitCon());
00861
00862 query.prepare("DELETE FROM jobqueue WHERE id = :ID;");
00863
00864 query.bindValue(":ID", jobID);
00865
00866 query.exec();
00867
00868 if (!query.isActive())
00869 {
00870 MythContext::DBError("Error in JobQueue::DeleteJob()", query);
00871 return false;
00872 }
00873
00874 return true;
00875 }
00876
00877 bool JobQueue::ChangeJobCmds(int jobID, int newCmds)
00878 {
00879 if (jobID < 0)
00880 return false;
00881
00882 MSqlQuery query(MSqlQuery::InitCon());
00883
00884 query.prepare("UPDATE jobqueue SET cmds = :CMDS WHERE id = :ID;");
00885
00886 query.bindValue(":CMDS", newCmds);
00887 query.bindValue(":ID", jobID);
00888
00889 query.exec();
00890
00891 if (!query.isActive())
00892 {
00893 MythContext::DBError("Error in JobQueue::ChangeJobCmds()", query);
00894 return false;
00895 }
00896
00897 return true;
00898 }
00899
00900 bool JobQueue::ChangeJobCmds(int jobType, QString chanid,
00901 QDateTime starttime, int newCmds)
00902 {
00903 MSqlQuery query(MSqlQuery::InitCon());
00904
00905 query.prepare("UPDATE jobqueue SET cmds = :CMDS WHERE type = :TYPE "
00906 "AND chanid = :CHANID AND starttime = :STARTTIME;");
00907
00908 query.bindValue(":CMDS", newCmds);
00909 query.bindValue(":TYPE", jobType);
00910 query.bindValue(":CHANID", chanid);
00911 query.bindValue(":STARTTIME", starttime);
00912
00913 query.exec();
00914
00915 if (!query.isActive())
00916 {
00917 MythContext::DBError("Error in JobQueue::ChangeJobCmds()", query);
00918 return false;
00919 }
00920
00921 return true;
00922 }
00923
00924 bool JobQueue::ChangeJobFlags(int jobID, int newFlags)
00925 {
00926 if (jobID < 0)
00927 return false;
00928
00929 MSqlQuery query(MSqlQuery::InitCon());
00930
00931 query.prepare("UPDATE jobqueue SET flags = :FLAGS WHERE id = :ID;");
00932
00933 query.bindValue(":FLAGS", newFlags);
00934 query.bindValue(":ID", jobID);
00935
00936 query.exec();
00937
00938 if (!query.isActive())
00939 {
00940 MythContext::DBError("Error in JobQueue::ChangeJobFlags()", query);
00941 return false;
00942 }
00943
00944 return true;
00945 }
00946
00947 bool JobQueue::ChangeJobStatus(int jobID, int newStatus, QString comment)
00948 {
00949 if (jobID < 0)
00950 return false;
00951
00952 VERBOSE(VB_JOBQUEUE, LOC + QString("ChangeJobStatus(%1, %2, '%3')")
00953 .arg(jobID).arg(StatusText(newStatus)).arg(comment));
00954
00955 MSqlQuery query(MSqlQuery::InitCon());
00956
00957 query.prepare("UPDATE jobqueue SET status = :STATUS, comment = :COMMENT "
00958 "WHERE id = :ID;");
00959
00960 query.bindValue(":STATUS", newStatus);
00961 query.bindValue(":COMMENT", comment);
00962 query.bindValue(":ID", jobID);
00963
00964 query.exec();
00965
00966 if (!query.isActive())
00967 {
00968 MythContext::DBError("Error in JobQueue::ChangeJobStatus()", query);
00969 return false;
00970 }
00971
00972 return true;
00973 }
00974
00975 bool JobQueue::ChangeJobComment(int jobID, QString comment)
00976 {
00977 if (jobID < 0)
00978 return false;
00979
00980 VERBOSE(VB_JOBQUEUE, LOC + QString("ChangeJobComment(%1, '%2')")
00981 .arg(jobID).arg(comment));
00982
00983 MSqlQuery query(MSqlQuery::InitCon());
00984
00985 query.prepare("UPDATE jobqueue SET comment = :COMMENT "
00986 "WHERE id = :ID;");
00987
00988 query.bindValue(":COMMENT", comment);
00989 query.bindValue(":ID", jobID);
00990
00991 query.exec();
00992
00993 if (!query.isActive())
00994 {
00995 MythContext::DBError("Error in JobQueue::ChangeJobComment()", query);
00996 return false;
00997 }
00998
00999 return true;
01000 }
01001
01002 bool JobQueue::ChangeJobArgs(int jobID, QString args)
01003 {
01004 if (jobID < 0)
01005 return false;
01006
01007 MSqlQuery query(MSqlQuery::InitCon());
01008
01009 query.prepare("UPDATE jobqueue SET args = :ARGS "
01010 "WHERE id = :ID;");
01011
01012 query.bindValue(":ARGS", args);
01013 query.bindValue(":ID", jobID);
01014
01015 query.exec();
01016
01017 if (!query.isActive())
01018 {
01019 MythContext::DBError("Error in JobQueue::ChangeJobArgs()", query);
01020 return false;
01021 }
01022
01023 return true;
01024 }
01025
01026 bool JobQueue::IsJobRunning(int jobType, QString chanid, QDateTime starttime)
01027 {
01028 int tmpStatus = GetJobStatus(jobType, chanid, starttime);
01029
01030 if ((tmpStatus != JOB_UNKNOWN) && (tmpStatus != JOB_QUEUED) &&
01031 (!(tmpStatus & JOB_DONE)))
01032 return true;
01033
01034 return false;
01035 }
01036
01037 bool JobQueue::IsJobRunning(int jobType, ProgramInfo *pginfo)
01038 {
01039 return JobQueue::IsJobRunning(jobType, pginfo->chanid, pginfo->recstartts);
01040 }
01041
01042 bool JobQueue::IsJobQueuedOrRunning(int jobType, QString chanid,
01043 QDateTime starttime)
01044 {
01045 int tmpStatus = GetJobStatus(jobType, chanid, starttime);
01046
01047 if ((tmpStatus != JOB_UNKNOWN) && (!(tmpStatus & JOB_DONE)))
01048 return true;
01049
01050 return false;
01051 }
01052
01053 bool JobQueue::IsJobQueued(int jobType, QString chanid,
01054 QDateTime starttime)
01055 {
01056 int tmpStatus = GetJobStatus(jobType, chanid, starttime);
01057
01058 if (tmpStatus & JOB_QUEUED)
01059 return true;
01060
01061 return false;
01062 }
01063
01064 QString JobQueue::JobText(int jobType)
01065 {
01066 switch (jobType)
01067 {
01068 case JOB_TRANSCODE: return tr("Transcode");
01069 case JOB_COMMFLAG: return tr("Flag Commercials");
01070 }
01071
01072 if (jobType & JOB_USERJOB)
01073 {
01074 QString settingName =
01075 QString("UserJobDesc%1").arg(UserJobTypeToIndex(jobType));
01076 return gContext->GetSetting(settingName, settingName);
01077 }
01078
01079 return tr("Unknown Job");
01080 }
01081
01082 QString JobQueue::StatusText(int status)
01083 {
01084 switch (status)
01085 {
01086 #define JOBSTATUS_STATUSTEXT(A,B,C) case A: return C;
01087 JOBSTATUS_MAP(JOBSTATUS_STATUSTEXT)
01088 default: break;
01089 }
01090 return tr("Undefined");
01091 }
01092
01093 QString JobQueue::GetJobQueueKey(QString chanid, QString startts)
01094 {
01095 return QString("%1_%2").arg(chanid).arg(startts);
01096 }
01097
01098 QString JobQueue::GetJobQueueKey(QString chanid, QDateTime starttime)
01099 {
01100 return JobQueue::GetJobQueueKey(chanid,
01101 starttime.toString("yyyyMMddhhmmss"));
01102 }
01103
01104 QString JobQueue::GetJobQueueKey(ProgramInfo *pginfo)
01105 {
01106 return JobQueue::GetJobQueueKey(pginfo->chanid, pginfo->recstartts);
01107 }
01108
01109 bool JobQueue::InJobRunWindow(int orStartsWithinMins)
01110 {
01111 QString queueStartTimeStr;
01112 QString queueEndTimeStr;
01113 QTime queueStartTime;
01114 QTime queueEndTime;
01115 QTime curTime = QTime::currentTime();
01116 bool inTimeWindow = false;
01117 orStartsWithinMins = orStartsWithinMins < 0 ? 0 : orStartsWithinMins;
01118
01119 queueStartTimeStr = gContext->GetSetting("JobQueueWindowStart", "00:00");
01120 queueEndTimeStr = gContext->GetSetting("JobQueueWindowEnd", "23:59");
01121
01122 VERBOSE(VB_JOBQUEUE, LOC +
01123 QString("Currently set to run new jobs from %1 to %2")
01124 .arg(queueStartTimeStr).arg(queueEndTimeStr));
01125
01126 queueStartTime = QTime::fromString(queueStartTimeStr);
01127 if (!queueStartTime.isValid())
01128 {
01129 VERBOSE(VB_IMPORTANT, "Invalid JobQueueWindowStart time, using 00:00");
01130 queueStartTime = QTime::QTime(0, 0);
01131 }
01132
01133 queueEndTime = QTime::fromString(queueEndTimeStr);
01134 if (!queueEndTime.isValid())
01135 {
01136 VERBOSE(VB_IMPORTANT, "Invalid JobQueueWindowEnd time, using 23:59");
01137 queueEndTime = QTime::QTime(23, 59);
01138 }
01139
01140 if ((queueStartTime <= curTime) && (curTime < queueEndTime))
01141 {
01142 inTimeWindow = true;
01143 }
01144 else if ((queueStartTime > queueEndTime) &&
01145 ((curTime < queueEndTime) || (queueStartTime <= curTime)))
01146 {
01147 inTimeWindow = true;
01148 }
01149 else if (orStartsWithinMins > 0)
01150 {
01151
01152 if (curTime <= queueStartTime)
01153 {
01154
01155 if (queueStartTime.secsTo(curTime) <= (orStartsWithinMins * 60))
01156 {
01157 VERBOSE(VB_JOBQUEUE, LOC +
01158 QString("Job run window will start within %1 minutes")
01159 .arg(orStartsWithinMins));
01160 inTimeWindow = true;
01161 }
01162 }
01163 else
01164 {
01165
01166 QDateTime curDateTime = QDateTime::currentDateTime();
01167 QDateTime startDateTime = QDateTime(QDate::currentDate(),
01168 queueStartTime).addDays(1);
01169
01170 if (curDateTime.secsTo(startDateTime) <= (orStartsWithinMins * 60))
01171 {
01172 VERBOSE(VB_JOBQUEUE, LOC + QString("Job run window will start "
01173 "within %1 minutes (tomorrow)")
01174 .arg(orStartsWithinMins));
01175 inTimeWindow = true;
01176 }
01177 }
01178 }
01179
01180 return inTimeWindow;
01181 }
01182
01183 bool JobQueue::HasRunningOrPendingJobs(int startingWithinMins)
01184 {
01185
01186
01187 QMap<int, JobQueueEntry> jobs;
01188 QMap<int, JobQueueEntry>::Iterator it;
01189 QDateTime maxSchedRunTime = QDateTime::currentDateTime();
01190 int tmpStatus = 0;
01191 bool checkForQueuedJobs = (startingWithinMins <= 0
01192 || InJobRunWindow(startingWithinMins));
01193
01194 if (checkForQueuedJobs && startingWithinMins > 0) {
01195 maxSchedRunTime = maxSchedRunTime.addSecs(startingWithinMins * 60);
01196 VERBOSE(VB_JOBQUEUE, LOC +
01197 QString("HasRunningOrPendingJobs: checking for jobs "
01198 "starting before: %1").arg(maxSchedRunTime.toString()));
01199 }
01200
01201 JobQueue::GetJobsInQueue(jobs, JOB_LIST_NOT_DONE);
01202
01203 if (jobs.size()) {
01204 for (it = jobs.begin(); it != jobs.end(); ++it)
01205 {
01206 tmpStatus = it.data().status;
01207 if (tmpStatus == JOB_RUNNING) {
01208 VERBOSE(VB_JOBQUEUE, LOC +
01209 QString("HasRunningOrPendingJobs: found running job"));
01210 return true;
01211 }
01212
01213 if (checkForQueuedJobs) {
01214 if ((tmpStatus != JOB_UNKNOWN) && (!(tmpStatus & JOB_DONE))) {
01215 if (startingWithinMins <= 0) {
01216 VERBOSE(VB_JOBQUEUE, LOC +
01217 QString("HasRunningOrPendingJobs: "
01218 "found pending job"));
01219 return true;
01220 }
01221 else if (it.data().schedruntime <= maxSchedRunTime) {
01222 VERBOSE(VB_JOBQUEUE, LOC +
01223 QString("HasRunningOrPendingJobs: found pending "
01224 "job scheduled to start at: %1")
01225 .arg(it.data().schedruntime.toString()));
01226 return true;
01227 }
01228 }
01229 }
01230 }
01231 }
01232 return false;
01233 }
01234
01235
01236 int JobQueue::GetJobsInQueue(QMap<int, JobQueueEntry> &jobs, int findJobs)
01237 {
01238 JobQueueEntry thisJob;
01239 MSqlQuery query(MSqlQuery::InitCon());
01240 QDateTime recentDate = QDateTime::currentDateTime().addSecs(-4 * 3600);
01241 int jobCount = 0;
01242 bool commflagWhileRecording =
01243 gContext->GetNumSetting("AutoCommflagWhileRecording", 0);
01244
01245 jobs.clear();
01246
01247 query.prepare("SELECT j.id, j.chanid, j.starttime, j.inserttime, j.type, "
01248 "j.cmds, j.flags, j.status, j.statustime, j.hostname, "
01249 "j.args, j.comment, r.endtime, j.schedruntime "
01250 "FROM jobqueue j, recorded r "
01251 "WHERE j.chanid = r.chanid AND j.starttime = r.starttime "
01252 "ORDER BY j.schedruntime, j.id;");
01253
01254 if (!query.exec() || !query.isActive())
01255 {
01256 MythContext::DBError("Error in JobQueue::GetJobs(), Unable to "
01257 "query list of Jobs in Queue.", query);
01258 return 0;
01259 }
01260
01261 VERBOSE(VB_JOBQUEUE, LOC +
01262 QString("GetJobsInQueue: findJobs search bitmask %1, "
01263 "found %2 total jobs")
01264 .arg(findJobs).arg(query.numRowsAffected()));
01265
01266
01267 if (query.numRowsAffected() > 0)
01268 {
01269 while (query.next())
01270 {
01271 bool wantThisJob = false;
01272
01273 thisJob.chanid = query.value(1).toString();
01274 thisJob.starttime = query.value(2).toDateTime();
01275 thisJob.schedruntime = query.value(13).toDateTime();
01276 thisJob.type = query.value(4).toInt();
01277 thisJob.status = query.value(7).toInt();
01278 thisJob.statustime = query.value(8).toDateTime();
01279 thisJob.startts = thisJob.starttime.toString("yyyyMMddhhmmss");
01280
01281 if ((query.value(12).toDateTime() > QDateTime::currentDateTime()) &&
01282 ((!commflagWhileRecording) ||
01283 (thisJob.type != JOB_COMMFLAG)))
01284 {
01285 VERBOSE(VB_JOBQUEUE, LOC +
01286 QString("GetJobsInQueue: Ignoring '%1' Job "
01287 "for %2 @ %3 in %4 state. Endtime in future.")
01288 .arg(JobText(thisJob.type))
01289 .arg(thisJob.chanid)
01290 .arg(thisJob.startts)
01291 .arg(StatusText(thisJob.status)));
01292 continue;
01293 }
01294
01295 if ((findJobs & JOB_LIST_ALL) ||
01296 ((findJobs & JOB_LIST_DONE) &&
01297 (thisJob.status & JOB_DONE)) ||
01298 ((findJobs & JOB_LIST_NOT_DONE) &&
01299 (!(thisJob.status & JOB_DONE))) ||
01300 ((findJobs & JOB_LIST_ERROR) &&
01301 (thisJob.status == JOB_ERRORED)) ||
01302 ((findJobs & JOB_LIST_RECENT) &&
01303 (thisJob.statustime > recentDate)))
01304 wantThisJob = true;
01305
01306 if (!wantThisJob)
01307 {
01308 VERBOSE(VB_JOBQUEUE, LOC +
01309 QString("GetJobsInQueue: Ignore '%1' Job for "
01310 "%2 @ %3 in %4 state.")
01311 .arg(JobText(thisJob.type))
01312 .arg(thisJob.chanid)
01313 .arg(thisJob.startts)
01314 .arg(StatusText(thisJob.status)));
01315 continue;
01316 }
01317
01318 VERBOSE(VB_JOBQUEUE, LOC +
01319 QString("GetJobsInQueue: Found '%1' Job for "
01320 "%2 @ %3 in %4 state.")
01321 .arg(JobText(thisJob.type))
01322 .arg(thisJob.chanid)
01323 .arg(thisJob.startts)
01324 .arg(StatusText(thisJob.status)));
01325
01326 thisJob.id = query.value(0).toInt();
01327 thisJob.inserttime = query.value(3).toDateTime();
01328 thisJob.cmds = query.value(5).toInt();
01329 thisJob.flags = query.value(6).toInt();
01330 thisJob.hostname = query.value(9).toString();
01331 thisJob.args = query.value(10).toString();
01332 thisJob.comment = query.value(11).toString();
01333
01334 if ((thisJob.type & JOB_USERJOB) &&
01335 (UserJobTypeToIndex(thisJob.type) == 0))
01336 {
01337 thisJob.type = JOB_NONE;
01338 VERBOSE(VB_JOBQUEUE, LOC +
01339 QString("GetJobsInQueue: Unknown Job Type: %1")
01340 .arg(thisJob.type));
01341 }
01342
01343 if (thisJob.type != JOB_NONE)
01344 jobs[jobCount++] = thisJob;
01345 }
01346 }
01347
01348 return jobCount;
01349 }
01350
01351 bool JobQueue::ChangeJobHost(int jobID, QString newHostname)
01352 {
01353 MSqlQuery query(MSqlQuery::InitCon());
01354
01355 if (newHostname != "")
01356 {
01357 query.prepare("UPDATE jobqueue SET hostname = :NEWHOSTNAME "
01358 "WHERE hostname = :EMPTY AND id = :ID;");
01359 query.bindValue(":NEWHOSTNAME", newHostname);
01360 query.bindValue(":EMPTY", "");
01361 query.bindValue(":ID", jobID);
01362 }
01363 else
01364 {
01365 query.prepare("UPDATE jobqueue SET hostname = :EMPTY "
01366 "WHERE id = :ID;");
01367 query.bindValue(":EMPTY", "");
01368 query.bindValue(":ID", jobID);
01369 }
01370
01371 if (!query.exec() || !query.isActive())
01372 {
01373 MythContext::DBError(QString("Error in JobQueue::ChangeJobHost(), "
01374 "Unable to set hostname to '%1' for "
01375 "job %2.").arg(newHostname).arg(jobID),
01376 query);
01377 return false;
01378 }
01379
01380 if (query.numRowsAffected() > 0)
01381 return true;
01382
01383 return false;
01384 }
01385
01386 bool JobQueue::AllowedToRun(JobQueueEntry job)
01387 {
01388 QString allowSetting;
01389
01390 if ((job.hostname != "") &&
01391 (job.hostname != m_hostname))
01392 return false;
01393
01394 if (job.type & JOB_USERJOB)
01395 {
01396 allowSetting =
01397 QString("JobAllowUserJob%1").arg(UserJobTypeToIndex(job.type));
01398 }
01399 else
01400 {
01401 switch (job.type)
01402 {
01403 case JOB_TRANSCODE: allowSetting = "JobAllowTranscode";
01404 break;
01405 case JOB_COMMFLAG: allowSetting = "JobAllowCommFlag";
01406 break;
01407 default: return false;
01408 }
01409 }
01410
01411 if (gContext->GetNumSetting(allowSetting, 1))
01412 return true;
01413
01414 return false;
01415 }
01416
01417 enum JobCmds JobQueue::GetJobCmd(int jobID)
01418 {
01419 MSqlQuery query(MSqlQuery::InitCon());
01420
01421 query.prepare("SELECT cmds FROM jobqueue WHERE id = :ID;");
01422
01423 query.bindValue(":ID", jobID);
01424
01425 query.exec();
01426
01427 if (query.isActive())
01428 {
01429 if ((query.size() > 0) && query.next())
01430 return (enum JobCmds)query.value(0).toInt();
01431 }
01432 else
01433 {
01434 MythContext::DBError("Error in JobQueue::GetJobCmd()", query);
01435 }
01436
01437 return JOB_RUN;
01438 }
01439
01440 QString JobQueue::GetJobArgs(int jobID)
01441 {
01442 MSqlQuery query(MSqlQuery::InitCon());
01443
01444 query.prepare("SELECT args FROM jobqueue WHERE id = :ID;");
01445
01446 query.bindValue(":ID", jobID);
01447
01448 query.exec();
01449
01450 if (query.isActive())
01451 {
01452 if ((query.numRowsAffected() > 0) && query.next())
01453 return query.value(0).toString();
01454 }
01455 else
01456 {
01457 MythContext::DBError("Error in JobQueue::GetJobArgs()", query);
01458 }
01459
01460 return QString("");
01461 }
01462
01463 enum JobFlags JobQueue::GetJobFlags(int jobID)
01464 {
01465 MSqlQuery query(MSqlQuery::InitCon());
01466
01467 query.prepare("SELECT flags FROM jobqueue WHERE id = :ID;");
01468
01469 query.bindValue(":ID", jobID);
01470
01471 query.exec();
01472
01473 if (query.isActive())
01474 {
01475 if ((query.size() > 0) && query.next())
01476 return (enum JobFlags)query.value(0).toInt();
01477 }
01478 else
01479 {
01480 MythContext::DBError("Error in JobQueue::GetJobFlags()", query);
01481 }
01482
01483 return JOB_NO_FLAGS;
01484 }
01485
01486 enum JobStatus JobQueue::GetJobStatus(int jobID)
01487 {
01488 MSqlQuery query(MSqlQuery::InitCon());
01489
01490 query.prepare("SELECT status FROM jobqueue WHERE id = :ID;");
01491
01492 query.bindValue(":ID", jobID);
01493
01494 query.exec();
01495
01496 if (query.isActive())
01497 {
01498 if ((query.size() > 0) && query.next())
01499 return (enum JobStatus)query.value(0).toInt();
01500 }
01501 else
01502 {
01503 MythContext::DBError("Error in JobQueue::GetJobStatus()", query);
01504 }
01505 return JOB_UNKNOWN;
01506 }
01507
01508 enum JobStatus JobQueue::GetJobStatus(int jobType, QString chanid,
01509 QDateTime startts)
01510 {
01511 MSqlQuery query(MSqlQuery::InitCon());
01512
01513 query.prepare("SELECT status FROM jobqueue WHERE type = :TYPE "
01514 "AND chanid = :CHANID AND starttime = :STARTTIME;");
01515
01516 query.bindValue(":TYPE", jobType);
01517 query.bindValue(":CHANID", chanid);
01518 query.bindValue(":STARTTIME", startts);
01519
01520 query.exec();
01521
01522 if (query.isActive())
01523 {
01524 if (query.size() > 0 && query.next())
01525 return (enum JobStatus)query.value(0).toInt();
01526 }
01527 else
01528 {
01529 MythContext::DBError("Error in JobQueue::GetJobStatus()", query);
01530 }
01531 return JOB_UNKNOWN;
01532 }
01533
01534 void JobQueue::RecoverQueue(bool justOld)
01535 {
01536 QMap<int, JobQueueEntry> jobs;
01537 QString msg;
01538
01539 msg = QString("RecoverQueue: Checking for unfinished jobs to "
01540 "recover.");
01541 VERBOSE(VB_JOBQUEUE, LOC + msg);
01542
01543 GetJobsInQueue(jobs);
01544
01545 if (jobs.size())
01546 {
01547 QMap<int, JobQueueEntry>::Iterator it;
01548 QDateTime oldDate = QDateTime::currentDateTime().addDays(-1);
01549 QString hostname = gContext->GetHostName();
01550 int tmpStatus;
01551 int tmpCmds;
01552
01553 for (it = jobs.begin(); it != jobs.end(); ++it)
01554 {
01555 tmpCmds = it.data().cmds;
01556 tmpStatus = it.data().status;
01557
01558 if (((tmpStatus == JOB_STARTING) ||
01559 (tmpStatus == JOB_RUNNING) ||
01560 (tmpStatus == JOB_PAUSED) ||
01561 (tmpCmds & JOB_STOP) ||
01562 (tmpStatus == JOB_STOPPING)) &&
01563 (((!justOld) &&
01564 (it.data().hostname == hostname)) ||
01565 (it.data().statustime < oldDate)))
01566 {
01567 msg = QString("RecoverQueue: Recovering '%1' %2 @ %3 "
01568 "from '%4' state.")
01569 .arg(JobText(it.data().type))
01570 .arg(it.data().chanid)
01571 .arg(it.data().startts)
01572 .arg(StatusText(it.data().status));
01573 VERBOSE(VB_JOBQUEUE, LOC + msg);
01574
01575 ChangeJobStatus(it.data().id, JOB_QUEUED, "");
01576 ChangeJobCmds(it.data().id, JOB_RUN);
01577 if (!gContext->GetNumSetting("JobsRunOnRecordHost", 0))
01578 ChangeJobHost(it.data().id, "");
01579 }
01580 else
01581 {
01582 msg = QString("RecoverQueue: Ignoring '%1' %2 @ %3 "
01583 "in '%4' state.")
01584 .arg(JobText(it.data().type))
01585 .arg(it.data().chanid)
01586 .arg(it.data().startts)
01587 .arg(StatusText(it.data().status));
01588
01589
01590 }
01591 }
01592 }
01593 }
01594
01595 void JobQueue::CleanupOldJobsInQueue()
01596 {
01597 MSqlQuery delquery(MSqlQuery::InitCon());
01598 QDateTime donePurgeDate = QDateTime::currentDateTime().addDays(-2);
01599 QDateTime errorsPurgeDate = QDateTime::currentDateTime().addDays(-4);
01600
01601 delquery.prepare("DELETE FROM jobqueue "
01602 "WHERE (status in (:FINISHED, :ABORTED, :CANCELLED) "
01603 "AND statustime < :DONEPURGEDATE) "
01604 "OR (status in (:ERRORED) "
01605 "AND statustime < :ERRORSPURGEDATE) ");
01606 delquery.bindValue(":FINISHED", JOB_FINISHED);
01607 delquery.bindValue(":ABORTED", JOB_ABORTED);
01608 delquery.bindValue(":CANCELLED", JOB_CANCELLED);
01609 delquery.bindValue(":ERRORED", JOB_ERRORED);
01610 delquery.bindValue(":DONEPURGEDATE", donePurgeDate);
01611 delquery.bindValue(":ERRORSPURGEDATE", errorsPurgeDate);
01612
01613 if (!delquery.exec() || !delquery.isActive())
01614 {
01615 MythContext::DBError("JobQueue::CleanupOldJobsInQueue: Error deleting "
01616 "old finished jobs.", delquery);
01617 }
01618 }
01619
01620 void JobQueue::ProcessJob(int id, int jobType, QString chanid,
01621 QDateTime starttime)
01622 {
01623 QString name = QString("jobqueue%1%2").arg(id).arg(rand());
01624
01625 QString key = GetJobQueueKey(chanid, starttime);
01626
01627 if (!MSqlQuery::testDBConnection())
01628 {
01629 VERBOSE(VB_JOBQUEUE, LOC_ERR +
01630 "ProcessJob(): Unable to open database connection");
01631 return;
01632 }
01633
01634 ChangeJobStatus(id, JOB_PENDING);
01635 ProgramInfo *pginfo = ProgramInfo::GetProgramFromRecorded(chanid,
01636 starttime);
01637
01638 if (!pginfo)
01639 {
01640 QString message = QString("Unable to retrieve "
01641 "program info for chanid %1, starttime %2")
01642 .arg(chanid).arg(starttime.toString());
01643 VERBOSE(VB_JOBQUEUE, LOC_ERR + message);
01644
01645 ChangeJobStatus(id, JOB_ERRORED,
01646 "Unable to retrieve Program Info from database");
01647
01648 return;
01649 }
01650
01651 controlFlagsLock.lock();
01652
01653 ChangeJobStatus(id, JOB_STARTING);
01654 runningJobTypes[key] = jobType;
01655 runningJobIDs[key] = id;
01656 runningJobDescs[key] = GetJobDescription(jobType);
01657 runningJobCommands[key] = GetJobCommand(id, jobType, pginfo);
01658
01659 if ((jobType == JOB_TRANSCODE) ||
01660 (runningJobCommands[key] == "mythtranscode"))
01661 {
01662 StartChildJob(TranscodeThread, pginfo);
01663 }
01664 else if ((jobType == JOB_COMMFLAG) ||
01665 (runningJobCommands[key] == "mythcommflag"))
01666 {
01667 StartChildJob(FlagCommercialsThread, pginfo);
01668 }
01669 else if (jobType & JOB_USERJOB)
01670 {
01671 StartChildJob(UserJobThread, pginfo);
01672 }
01673 else
01674 {
01675 ChangeJobStatus(id, JOB_ERRORED,
01676 "UNKNOWN JobType, unable to process!");
01677 runningJobTypes.erase(key);
01678 runningJobIDs.erase(key);
01679 runningJobDescs.erase(key);
01680 runningJobCommands.erase(key);
01681 }
01682
01683 controlFlagsLock.unlock();
01684 }
01685
01686 void JobQueue::StartChildJob(void *(*ChildThreadRoutine)(void *),
01687 ProgramInfo *tmpInfo)
01688 {
01689 m_pginfo = tmpInfo;
01690 childThreadStarted = false;
01691
01692 pthread_t childThread;
01693 pthread_attr_t attr;
01694 pthread_attr_init(&attr);
01695 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
01696 pthread_create(&childThread, &attr, ChildThreadRoutine, this);
01697 pthread_attr_destroy(&attr);
01698
01699 while (!childThreadStarted)
01700 usleep(50);
01701
01702 delete m_pginfo;
01703 m_pginfo = NULL;
01704 }
01705
01706 QString JobQueue::GetJobDescription(int jobType)
01707 {
01708 if (jobType == JOB_TRANSCODE)
01709 return "Transcode";
01710 else if (jobType == JOB_COMMFLAG)
01711 return "Commercial Flagging";
01712 else if (!(jobType & JOB_USERJOB))
01713 return "Unknown Job";
01714
01715 QString descSetting =
01716 QString("UserJobDesc%1").arg(UserJobTypeToIndex(jobType));
01717
01718 return gContext->GetSetting(descSetting, "Unknown Job");
01719 }
01720
01721 QString JobQueue::GetJobCommand(int id, int jobType, ProgramInfo *tmpInfo)
01722 {
01723 QString command = "";
01724 MSqlQuery query(MSqlQuery::InitCon());
01725
01726 if (jobType == JOB_TRANSCODE)
01727 {
01728 command = gContext->GetSetting("JobQueueTranscodeCommand");
01729 if (command.stripWhiteSpace().isEmpty())
01730 command = "mythtranscode";
01731
01732 if (command == "mythtranscode")
01733 return command;
01734 }
01735 else if (jobType == JOB_COMMFLAG)
01736 {
01737 command = gContext->GetSetting("JobQueueCommFlagCommand");
01738 if (command.stripWhiteSpace().isEmpty())
01739 command = "mythcommflag";
01740
01741 if (command == "mythcommflag")
01742 return command;
01743 }
01744 else if (jobType & JOB_USERJOB)
01745 {
01746 command = gContext->GetSetting(
01747 QString("UserJob%1").arg(UserJobTypeToIndex(jobType)), "");
01748 }
01749
01750 if (command != "")
01751 {
01752 QString pburl = tmpInfo->GetPlaybackURL(false, true);
01753 if (pburl.left(7) == "myth://")
01754 {
01755
01756 command.replace(QRegExp("%DIR%"), pburl);
01757 }
01758 else
01759 {
01760 QFileInfo dirInfo(pburl);
01761 command.replace(QRegExp("%DIR%"), dirInfo.dirPath());
01762 }
01763
01764 command.replace(QRegExp("%FILE%"), tmpInfo->GetRecordBasename(true));
01765 command.replace(QRegExp("%TITLE%"), tmpInfo->title);
01766 command.replace(QRegExp("%SUBTITLE%"), tmpInfo->subtitle);
01767 command.replace(QRegExp("%DESCRIPTION%"), tmpInfo->description);
01768 command.replace(QRegExp("%HOSTNAME%"), tmpInfo->hostname);
01769 command.replace(QRegExp("%CATEGORY%"), tmpInfo->category);
01770 command.replace(QRegExp("%RECGROUP%"), tmpInfo->recgroup);
01771 command.replace(QRegExp("%PLAYGROUP%"), tmpInfo->playgroup);
01772 command.replace(QRegExp("%CHANID%"), tmpInfo->chanid);
01773 command.replace(QRegExp("%STARTTIME%"),
01774 tmpInfo->recstartts.toString("yyyyMMddhhmmss"));
01775 command.replace(QRegExp("%ENDTIME%"),
01776 tmpInfo->recendts.toString("yyyyMMddhhmmss"));
01777 command.replace(QRegExp("%STARTTIMEISO%"),
01778 tmpInfo->recstartts.toString(Qt::ISODate));
01779 command.replace(QRegExp("%ENDTIMEISO%"),
01780 tmpInfo->recendts.toString(Qt::ISODate));
01781 command.replace(QRegExp("%PROGSTART%"),
01782 tmpInfo->startts.toString("yyyyMMddhhmmss"));
01783 command.replace(QRegExp("%PROGEND%"),
01784 tmpInfo->endts.toString("yyyyMMddhhmmss"));
01785 command.replace(QRegExp("%PROGSTARTISO%"),
01786 tmpInfo->startts.toString(Qt::ISODate));
01787 command.replace(QRegExp("%PROGENDISO%"),
01788 tmpInfo->endts.toString(Qt::ISODate));
01789 command.replace(QRegExp("%VERBOSELEVEL%"),
01790 QString("%1").arg(print_verbose_messages));
01791 command.replace(QRegExp("%JOBID%"), QString("%1").arg(id));
01792
01793 QString transProf;
01794 if (tmpInfo->transcoder == RecordingProfile::TranscoderAutodetect)
01795 transProf = "autodetect";
01796 else
01797 transProf = QString::number(tmpInfo->transcoder);
01798 command.replace(QRegExp("%TRANSPROFILE%"), transProf);
01799 }
01800
01801 return command;
01802 }
01803
01804 QString JobQueue::PrettyPrint(off_t bytes)
01805 {
01806
01807
01808 static const struct {
01809 const char *suffix;
01810 unsigned int max;
01811 int precision;
01812 } pptab[] = {
01813 { "bytes", 9999, 0 },
01814 { "kB", 999, 0 },
01815 { "MB", 999, 1 },
01816 { "GB", 999, 1 },
01817 { "TB", 999, 1 },
01818 { "PB", 999, 1 },
01819 { "EB", 999, 1 },
01820 { "ZB", 999, 1 },
01821 { "YB", 0, 0 },
01822 };
01823 unsigned int ii;
01824 float fbytes = bytes;
01825
01826 ii = 0;
01827 while (pptab[ii].max && fbytes > pptab[ii].max) {
01828 fbytes /= 1024;
01829 ii++;
01830 }
01831
01832 return QString("%1 %2")
01833 .arg(fbytes, 0, 'f', pptab[ii].precision)
01834 .arg(pptab[ii].suffix);
01835 }
01836
01837 void *JobQueue::TranscodeThread(void *param)
01838 {
01839 JobQueue *theTranscoder = (JobQueue *)param;
01840 theTranscoder->DoTranscodeThread();
01841
01842 return NULL;
01843 }
01844
01845 void JobQueue::DoTranscodeThread(void)
01846 {
01847 if (!m_pginfo)
01848 return;
01849
01850 ProgramInfo *program_info = new ProgramInfo(*m_pginfo);
01851 int controlTranscoding = JOB_RUN;
01852 QString subtitle = program_info->subtitle.isEmpty() ? "" :
01853 QString(" \"%1\"").arg(program_info->subtitle);
01854
01855 QString key = GetJobQueueKey(program_info);
01856 int jobID = runningJobIDs[key];
01857
01858 childThreadStarted = true;
01859
01860 ChangeJobStatus(jobID, JOB_RUNNING);
01861 bool hasCutlist = !!(program_info->getProgramFlags() & FL_CUTLIST);
01862 bool useCutlist = !!(GetJobFlags(jobID) & JOB_USE_CUTLIST) && hasCutlist;
01863
01864 controlFlagsLock.lock();
01865 jobControlFlags[key] = &controlTranscoding;
01866 controlFlagsLock.unlock();
01867
01868 int transcoder = program_info->transcoder;
01869 QString profilearg = transcoder == RecordingProfile::TranscoderAutodetect ?
01870 "autodetect" :
01871 QString::number(transcoder);
01872
01873 QString path;
01874 QString command;
01875
01876 if (runningJobCommands[key] == "mythtranscode")
01877 {
01878 path = gContext->GetInstallPrefix() + "/bin/mythtranscode";
01879 command = QString("%1 -j %2 -V %3 -p %4 %5")
01880 .arg(path).arg(jobID).arg(print_verbose_messages)
01881 .arg(profilearg.ascii()).arg(useCutlist ? "-l" : "");
01882 }
01883 else
01884 {
01885 command = runningJobCommands[key];
01886
01887 QStringList tokens = QStringList::split(" ", command);
01888 path = tokens[0];
01889 }
01890
01891 if (jobQueueCPU < 2)
01892 nice(17);
01893
01894 QString transcoderName;
01895 if (transcoder == RecordingProfile::TranscoderAutodetect)
01896 {
01897 transcoderName = "Autodetect";
01898 }
01899 else
01900 {
01901 MSqlQuery query(MSqlQuery::InitCon());
01902 query.prepare("SELECT name FROM recordingprofiles WHERE id = :ID;");
01903 query.bindValue(":ID", transcoder);
01904 query.exec();
01905 if (query.isActive() && query.size() > 0 && query.next())
01906 {
01907 transcoderName = query.value(0).toString();
01908 }
01909 else
01910 {
01911
01912 transcoderName = QString("Autodetect(%1)").arg(transcoder);
01913 }
01914 }
01915
01916 QString msg;
01917 bool retry = true;
01918 int retrylimit = 3;
01919 while (retry)
01920 {
01921 off_t origfilesize, filesize;
01922 struct stat st;
01923
01924 retry = false;
01925
01926 ChangeJobStatus(jobID, JOB_STARTING);
01927 program_info->SetTranscoded(TRANSCODING_RUNNING);
01928
01929 QString filename = program_info->GetPlaybackURL(false, true);
01930
01931 origfilesize = 0;
01932 filesize = 0;
01933
01934 if (stat(filename.ascii(), &st) == 0)
01935 origfilesize = st.st_size;
01936
01937 QString msg = QString("Transcode %1")
01938 .arg(StatusText(GetJobStatus(jobID)));
01939
01940 QString details = QString("%1%2: %3 (%4)")
01941 .arg(program_info->title.local8Bit())
01942 .arg(subtitle.local8Bit())
01943 .arg(transcoderName)
01944 .arg(PrettyPrint(origfilesize));
01945
01946 VERBOSE(VB_GENERAL, LOC + QString("%1 for %2").arg(msg).arg(details));
01947 gContext->LogEntry("transcode", LP_NOTICE, msg, details);
01948
01949 VERBOSE(VB_JOBQUEUE, LOC + QString("Running command: '%1'")
01950 .arg(command));
01951
01952 int result = myth_system(command);
01953 int status = GetJobStatus(jobID);
01954
01955 if ((result == MYTHSYSTEM__EXIT__EXECL_ERROR) ||
01956 (result == MYTHSYSTEM__EXIT__CMD_NOT_FOUND))
01957 {
01958 ChangeJobStatus(jobID, JOB_ERRORED,
01959 "ERROR: Unable to find mythtranscode, check backend logs.");
01960 program_info->SetTranscoded(TRANSCODING_NOT_TRANSCODED);
01961
01962 msg = QString("Transcode %1").arg(StatusText(GetJobStatus(jobID)));
01963 details = QString("%1%2: %3 does not exist or is not executable")
01964 .arg(program_info->title.local8Bit())
01965 .arg(subtitle.local8Bit())
01966 .arg(path);
01967
01968 VERBOSE(VB_IMPORTANT,
01969 LOC_ERR + QString("%1 for %2").arg(msg).arg(details));
01970 gContext->LogEntry("transcode", LP_WARNING, msg, details);
01971 }
01972 else if (result == TRANSCODE_EXIT_RESTART && retrylimit > 0)
01973 {
01974 VERBOSE(VB_JOBQUEUE, LOC + "Transcode command restarting");
01975 retry = true;
01976 retrylimit--;
01977
01978 program_info->SetTranscoded(TRANSCODING_NOT_TRANSCODED);
01979
01980 msg = QString("Transcode restarting");
01981 gContext->LogEntry("transcode", LP_NOTICE, msg, details);
01982 }
01983 else
01984 {
01985 if (status == JOB_FINISHED)
01986 {
01987 ChangeJobStatus(jobID, JOB_FINISHED, "Finished.");
01988 retry = false;
01989
01990 filename = program_info->GetPlaybackURL(false, true);
01991
01992 if (stat(filename.ascii(), &st) == 0)
01993 {
01994 filesize = st.st_size;
01995
01996 QString comment = QString("%1: %2 => %3")
01997 .arg(transcoderName)
01998 .arg(PrettyPrint(origfilesize))
01999 .arg(PrettyPrint(filesize));
02000 ChangeJobComment(jobID, comment);
02001
02002 if (filesize > 0)
02003 program_info->SetFilesize(filesize);
02004
02005 details = QString("%1%2: %3 (%4)")
02006 .arg(program_info->title)
02007 .arg(subtitle)
02008 .arg(transcoderName)
02009 .arg(PrettyPrint(filesize));
02010 }
02011 else
02012 {
02013 int saved = errno;
02014 QString comment = QString("couldn't stat \"%1\": %2")
02015 .arg(filename.ascii()).arg(strerror(saved));
02016 ChangeJobStatus(jobID, JOB_FINISHED, comment);
02017
02018 details = QString("%1%2: %3")
02019 .arg(program_info->title)
02020 .arg(subtitle)
02021 .arg(comment);
02022 }
02023
02024 MythEvent me("RECORDING_LIST_CHANGE");
02025 gContext->dispatch(me);
02026
02027 program_info->SetTranscoded(TRANSCODING_COMPLETE);
02028 }
02029 else
02030 {
02031 program_info->SetTranscoded(TRANSCODING_NOT_TRANSCODED);
02032
02033 QString comment =
02034 QString("exit status %1, job status was \"%2\"")
02035 .arg(result)
02036 .arg(StatusText(status));
02037
02038 ChangeJobStatus(jobID, JOB_ERRORED, comment);
02039
02040 details = QString("%1%2: %3 (%4)")
02041 .arg(program_info->title)
02042 .arg(subtitle)
02043 .arg(transcoderName)
02044 .arg(comment);
02045 }
02046
02047 msg = QString("Transcode %1").arg(StatusText(GetJobStatus(jobID)));
02048 gContext->LogEntry("transcode", LP_NOTICE, msg, details);
02049 VERBOSE(VB_GENERAL, LOC + msg + ": " + details);
02050 }
02051 }
02052
02053 if (retrylimit == 0)
02054 {
02055 VERBOSE(VB_JOBQUEUE, LOC_ERR + "Retry limit exceeded for transcoder, "
02056 "setting job status to errored.");
02057 ChangeJobStatus(jobID, JOB_ERRORED, "Retry limit exceeded");
02058 }
02059
02060 controlFlagsLock.lock();
02061 runningJobIDs.erase(key);
02062 runningJobTypes.erase(key);
02063 runningJobDescs.erase(key);
02064 runningJobCommands.erase(key);
02065 controlFlagsLock.unlock();
02066 }
02067
02068 void *JobQueue::FlagCommercialsThread(void *param)
02069 {
02070 JobQueue *theFlagger = (JobQueue *)param;
02071 theFlagger->DoFlagCommercialsThread();
02072
02073 return NULL;
02074 }
02075
02076 void JobQueue::DoFlagCommercialsThread(void)
02077 {
02078 if (!m_pginfo)
02079 return;
02080
02081 ProgramInfo *program_info = new ProgramInfo(*m_pginfo);
02082 int controlFlagging = JOB_RUN;
02083 QString subtitle = program_info->subtitle.isEmpty() ? "" :
02084 QString(" \"%1\"").arg(program_info->subtitle);
02085 QString logDesc = QString("%1%2 recorded from channel %3 at %4")
02086 .arg(program_info->title.local8Bit())
02087 .arg(subtitle.local8Bit())
02088 .arg(program_info->chanid)
02089 .arg(program_info->recstartts.toString());
02090
02091 QString key = GetJobQueueKey(program_info);
02092 int jobID = runningJobIDs[key];
02093
02094 childThreadStarted = true;
02095
02096
02097 if (!MSqlQuery::testDBConnection())
02098 {
02099 QString msg = QString("Commercial Flagging failed. Could not open "
02100 "new database connection for %1. "
02101 "Program can not be flagged.")
02102 .arg(logDesc);
02103 VERBOSE(VB_IMPORTANT, LOC_ERR + msg);
02104
02105 ChangeJobStatus(jobID, JOB_ERRORED,
02106 "Could not open new database connection for "
02107 "commercial flagger.");
02108
02109 delete program_info;
02110 return;
02111 }
02112
02113 controlFlagsLock.lock();
02114 jobControlFlags[key] = &controlFlagging;
02115 controlFlagsLock.unlock();
02116
02117 QString msg = "Commercial Flagging Starting";
02118 VERBOSE(VB_GENERAL, LOC + QString("%1 for %2").arg(msg).arg(logDesc));
02119 gContext->LogEntry("commflag", LP_NOTICE, msg, logDesc);
02120
02121 int breaksFound = 0;
02122 QString path;
02123 QString command;
02124
02125 if (runningJobCommands[key] == "mythcommflag")
02126 {
02127 path = gContext->GetInstallPrefix() + "/bin/mythcommflag";
02128 command = QString("%1 -j %2 -V %3")
02129 .arg(path).arg(jobID).arg(print_verbose_messages);
02130 }
02131 else
02132 {
02133 command = runningJobCommands[key];
02134
02135 QStringList tokens = QStringList::split(" ", command);
02136 path = tokens[0];
02137 }
02138
02139 VERBOSE(VB_JOBQUEUE, LOC + QString("Running command: '%1'").arg(command));
02140
02141 breaksFound = myth_system(command);
02142 int priority = LP_NOTICE;
02143 QString comment = "";
02144
02145 controlFlagsLock.lock();
02146
02147 if ((breaksFound == MYTHSYSTEM__EXIT__EXECL_ERROR) ||
02148 (breaksFound == MYTHSYSTEM__EXIT__CMD_NOT_FOUND))
02149 {
02150 comment = "unable to find mythcommflag";
02151 ChangeJobStatus(jobID, JOB_ERRORED, comment);
02152 priority = LP_WARNING;
02153 }
02154 else if ((*(jobControlFlags[key]) == JOB_STOP))
02155 {
02156 comment = "aborted by user";
02157 ChangeJobStatus(jobID, JOB_ABORTED, comment);
02158 priority = LP_WARNING;
02159 }
02160 else if (breaksFound == COMMFLAG_EXIT_NO_PROGRAM_DATA)
02161 {
02162 comment = "unable to open file or init decoder";
02163 ChangeJobStatus(jobID, JOB_ERRORED, comment);
02164 priority = LP_WARNING;
02165 }
02166 else if (breaksFound >= COMMFLAG_EXIT_START)
02167 {
02168 comment = QString("failed with exit status %1").arg(breaksFound);
02169 ChangeJobStatus(jobID, JOB_ERRORED, comment);
02170 priority = LP_WARNING;
02171 }
02172 else
02173 {
02174 comment = QString("%1 commercial break(s)").arg(breaksFound);
02175 ChangeJobStatus(jobID, JOB_FINISHED, comment);
02176
02177 MythEvent me("RECORDING_LIST_CHANGE");
02178 gContext->dispatch(me);
02179
02180 program_info->pathname = program_info->GetPlaybackURL();
02181 (new PreviewGenerator(program_info, true))->Run();
02182 }
02183
02184 msg = QString("Commercial Flagging %1")
02185 .arg(StatusText(GetJobStatus(jobID)));
02186
02187 if (comment != "")
02188 logDesc += QString(" (%1)").arg(comment);
02189
02190 gContext->LogEntry("commflag", priority, msg, logDesc);
02191
02192 if (priority <= LP_WARNING)
02193 VERBOSE(VB_IMPORTANT, LOC_ERR + msg + ": " + logDesc);
02194
02195 jobControlFlags.erase(key);
02196 runningJobIDs.erase(key);
02197 runningJobTypes.erase(key);
02198 runningJobDescs.erase(key);
02199 runningJobCommands.erase(key);
02200 controlFlagsLock.unlock();
02201
02202 delete program_info;
02203 }
02204
02205 void *JobQueue::UserJobThread(void *param)
02206 {
02207 JobQueue *theUserJob = (JobQueue *)param;
02208 theUserJob->DoUserJobThread();
02209
02210 return NULL;
02211 }
02212
02213 void JobQueue::DoUserJobThread(void)
02214 {
02215 if (!m_pginfo)
02216 return;
02217
02218 ProgramInfo *program_info = new ProgramInfo(*m_pginfo);
02219 QString key = GetJobQueueKey(program_info);
02220 int jobID = runningJobIDs[key];
02221 QString jobDesc = runningJobDescs[key];
02222
02223 childThreadStarted = true;
02224
02225 ChangeJobStatus(jobID, JOB_RUNNING);
02226
02227 QString msg = QString("Started \"%1\" for \"%2\" recorded "
02228 "from channel %3 at %4")
02229 .arg(jobDesc)
02230 .arg(program_info->title.local8Bit())
02231 .arg(program_info->chanid)
02232 .arg(program_info->recstartts.toString());
02233 VERBOSE(VB_GENERAL, LOC + msg);
02234 gContext->LogEntry("jobqueue", LP_NOTICE,
02235 QString("Job \"%1\" Started").arg(jobDesc), msg);
02236
02237 switch (jobQueueCPU)
02238 {
02239 case 0: nice(17);
02240 break;
02241 case 1: nice(10);
02242 break;
02243 case 2:
02244 default: break;
02245 }
02246
02247 VERBOSE(VB_JOBQUEUE, LOC + QString("Running command: '%1'")
02248 .arg(runningJobCommands[key]));
02249
02250 int result = myth_system(runningJobCommands[key]);
02251
02252 if ((result == MYTHSYSTEM__EXIT__EXECL_ERROR) ||
02253 (result == MYTHSYSTEM__EXIT__CMD_NOT_FOUND))
02254 {
02255 msg = QString("User Job '%1' failed, unable to find "
02256 "executable, check your PATH and backend logs.")
02257 .arg(runningJobCommands[key]);
02258 VERBOSE(VB_IMPORTANT, LOC_ERR + msg);
02259 VERBOSE(VB_IMPORTANT, LOC + QString("Current PATH: '%1'")
02260 .arg(getenv("PATH")));
02261
02262 gContext->LogEntry("jobqueue", LP_WARNING,
02263 "User Job Errored", msg);
02264
02265 ChangeJobStatus(jobID, JOB_ERRORED,
02266 "ERROR: Unable to find executable, check backend logs.");
02267 }
02268 else
02269 {
02270 msg = QString("Finished \"%1\" for \"%2\" recorded from "
02271 "channel %3 at %4.")
02272 .arg(jobDesc)
02273 .arg(program_info->title.local8Bit())
02274 .arg(program_info->chanid)
02275 .arg(program_info->recstartts.toString());
02276 VERBOSE(VB_GENERAL, LOC + msg);
02277
02278 gContext->LogEntry("jobqueue", LP_NOTICE,
02279 QString("Job \"%1\" Finished").arg(jobDesc), msg);
02280
02281 ChangeJobStatus(jobID, JOB_FINISHED, "Successfully Completed.");
02282
02283 MythEvent me("RECORDING_LIST_CHANGE");
02284 gContext->dispatch(me);
02285 }
02286
02287 controlFlagsLock.lock();
02288 runningJobIDs.erase(key);
02289 runningJobTypes.erase(key);
02290 runningJobDescs.erase(key);
02291 runningJobCommands.erase(key);
02292 controlFlagsLock.unlock();
02293 }
02294
02295 int JobQueue::UserJobTypeToIndex(int jobType)
02296 {
02297 if (jobType & JOB_USERJOB)
02298 {
02299 int x = ((jobType & JOB_USERJOB)>> 8);
02300 int bits = 1;
02301 while ((x != 0) && ((x & 0x01) == 0))
02302 {
02303 bits++;
02304 x = x >> 1;
02305 }
02306 if ( bits > 4 )
02307 return JOB_NONE;
02308
02309 return bits;
02310 }
02311 return JOB_NONE;
02312 }
02313
02314