00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "OnDemandServerMediaSubsession.hh"
00023 #include "RTCP.hh"
00024 #include "BasicUDPSink.hh"
00025 #include <GroupsockHelper.hh>
00026
00027 OnDemandServerMediaSubsession
00028 ::OnDemandServerMediaSubsession(UsageEnvironment& env,
00029 Boolean reuseFirstSource)
00030 : ServerMediaSubsession(env),
00031 fReuseFirstSource(reuseFirstSource), fLastStreamToken(NULL),
00032 fSDPLines(NULL) {
00033 fDestinationsHashTable = HashTable::create(ONE_WORD_HASH_KEYS);
00034 gethostname(fCNAME, sizeof fCNAME);
00035 fCNAME[sizeof fCNAME-1] = '\0';
00036 }
00037
00038 class Destinations {
00039 public:
00040 Destinations(struct in_addr const& destAddr,
00041 Port const& rtpDestPort,
00042 Port const& rtcpDestPort)
00043 : isTCP(False), addr(destAddr), rtpPort(rtpDestPort), rtcpPort(rtcpDestPort) {
00044 }
00045 Destinations(int tcpSockNum, unsigned char rtpChanId, unsigned char rtcpChanId)
00046 : isTCP(True), rtpPort(0) , rtcpPort(0) ,
00047 tcpSocketNum(tcpSockNum), rtpChannelId(rtpChanId), rtcpChannelId(rtcpChanId) {
00048 }
00049
00050 public:
00051 Boolean isTCP;
00052 struct in_addr addr;
00053 Port rtpPort;
00054 Port rtcpPort;
00055 int tcpSocketNum;
00056 unsigned char rtpChannelId, rtcpChannelId;
00057 };
00058
00059 OnDemandServerMediaSubsession::~OnDemandServerMediaSubsession() {
00060 delete[] fSDPLines;
00061
00062
00063 while (1) {
00064 Destinations* destinations
00065 = (Destinations*)(fDestinationsHashTable->RemoveNext());
00066 if (destinations == NULL) break;
00067 delete destinations;
00068 }
00069 delete fDestinationsHashTable;
00070 }
00071
00072 char const*
00073 OnDemandServerMediaSubsession::sdpLines() {
00074 if (fSDPLines == NULL) {
00075
00076
00077
00078
00079 unsigned estBitrate;
00080 FramedSource* inputSource = createNewStreamSource(0, estBitrate);
00081 if (inputSource == NULL) return NULL;
00082
00083 struct in_addr dummyAddr;
00084 dummyAddr.s_addr = 0;
00085 Groupsock dummyGroupsock(envir(), dummyAddr, 0, 0);
00086 unsigned char rtpPayloadType = 96 + trackNumber()-1;
00087 RTPSink* dummyRTPSink
00088 = createNewRTPSink(&dummyGroupsock, rtpPayloadType, inputSource);
00089
00090 setSDPLinesFromRTPSink(dummyRTPSink, inputSource);
00091 Medium::close(dummyRTPSink);
00092 Medium::close(inputSource);
00093 }
00094
00095 return fSDPLines;
00096 }
00097
00098
00099 class StreamState {
00100 public:
00101 StreamState(Port const& serverRTPPort, Port const& serverRTCPPort,
00102 RTPSink* rtpSink, BasicUDPSink* udpSink,
00103 float streamDuration, unsigned totalBW, char* CNAME,
00104 FramedSource* mediaSource,
00105 Groupsock* rtpGS, Groupsock* rtcpGS);
00106 virtual ~StreamState();
00107
00108 void startPlaying(Destinations* destinations,
00109 TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData);
00110 void pause();
00111 void endPlaying(Destinations* destinations);
00112 void reclaim();
00113
00114 unsigned& referenceCount() { return fReferenceCount; }
00115
00116 Port const& serverRTPPort() const { return fServerRTPPort; }
00117 Port const& serverRTCPPort() const { return fServerRTCPPort; }
00118
00119 RTPSink const* rtpSink() const { return fRTPSink; }
00120
00121 float streamDuration() const { return fStreamDuration; }
00122
00123 FramedSource* mediaSource() const { return fMediaSource; }
00124
00125 private:
00126 Boolean fAreCurrentlyPlaying;
00127 unsigned fReferenceCount;
00128
00129 Port fServerRTPPort, fServerRTCPPort;
00130
00131 RTPSink* fRTPSink;
00132 BasicUDPSink* fUDPSink;
00133
00134 float fStreamDuration;
00135 unsigned fTotalBW; char* fCNAME; RTCPInstance* fRTCPInstance;
00136
00137 FramedSource* fMediaSource;
00138
00139 Groupsock* fRTPgs; Groupsock* fRTCPgs;
00140 };
00141
00142 void OnDemandServerMediaSubsession
00143 ::getStreamParameters(unsigned clientSessionId,
00144 netAddressBits clientAddress,
00145 Port const& clientRTPPort,
00146 Port const& clientRTCPPort,
00147 int tcpSocketNum,
00148 unsigned char rtpChannelId,
00149 unsigned char rtcpChannelId,
00150 netAddressBits& destinationAddress,
00151 u_int8_t& ,
00152 Boolean& isMulticast,
00153 Port& serverRTPPort,
00154 Port& serverRTCPPort,
00155 void*& streamToken) {
00156 if (destinationAddress == 0) destinationAddress = clientAddress;
00157 struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress;
00158 isMulticast = False;
00159
00160 if (fLastStreamToken != NULL && fReuseFirstSource) {
00161
00162
00163 serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort();
00164 serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort();
00165 ++((StreamState*)fLastStreamToken)->referenceCount();
00166 streamToken = fLastStreamToken;
00167 } else {
00168
00169 unsigned streamBitrate;
00170 FramedSource* mediaSource
00171 = createNewStreamSource(clientSessionId, streamBitrate);
00172
00173
00174
00175 struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00176 Groupsock* rtpGroupsock;
00177 Groupsock* rtpGroupsock_old = NULL;
00178 portNumBits serverRTPPortNum = 0;
00179 while (1) {
00180 rtpGroupsock = new Groupsock(envir(), dummyAddr, 0, 255);
00181 if (!getSourcePort(envir(), rtpGroupsock->socketNum(), serverRTPPort)) break;
00182 serverRTPPortNum = ntohs(serverRTPPort.num());
00183
00184
00185 if ((serverRTPPortNum&1) == 0) break;
00186
00187
00188
00189 delete rtpGroupsock_old;
00190 rtpGroupsock_old = rtpGroupsock;
00191 }
00192 delete rtpGroupsock_old;
00193
00194
00195 RTPSink* rtpSink;
00196 BasicUDPSink* udpSink;
00197 Groupsock* rtcpGroupsock;
00198 if (clientRTCPPort.num() == 0) {
00199
00200 rtpSink = NULL;
00201 udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock);
00202 rtcpGroupsock = NULL;
00203 } else {
00204
00205 unsigned char rtpPayloadType = 96 + trackNumber()-1;
00206 rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
00207 udpSink = NULL;
00208
00209
00210 rtcpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPortNum+1, 255);
00211 getSourcePort(envir(), rtcpGroupsock->socketNum(), serverRTCPPort);
00212 }
00213
00214
00215
00216 if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations();
00217 if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations();
00218
00219
00220 streamToken = fLastStreamToken
00221 = new StreamState(serverRTPPort, serverRTCPPort, rtpSink, udpSink,
00222 duration(), streamBitrate, fCNAME, mediaSource,
00223 rtpGroupsock, rtcpGroupsock);
00224 }
00225
00226
00227 Destinations* destinations;
00228 if (tcpSocketNum < 0) {
00229 destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);
00230 } else {
00231 destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
00232 }
00233 fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
00234 }
00235
00236 void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
00237 void* streamToken,
00238 TaskFunc* rtcpRRHandler,
00239 void* rtcpRRHandlerClientData,
00240 unsigned short& rtpSeqNum,
00241 unsigned& rtpTimestamp) {
00242 StreamState* streamState = (StreamState*)streamToken;
00243 Destinations* destinations
00244 = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00245 if (streamState != NULL) {
00246 streamState->startPlaying(destinations,
00247 rtcpRRHandler, rtcpRRHandlerClientData);
00248 if (streamState->rtpSink() != NULL) {
00249 rtpSeqNum = streamState->rtpSink()->currentSeqNo();
00250 rtpTimestamp = streamState->rtpSink()->currentTimestamp();
00251 }
00252 }
00253 }
00254
00255 void OnDemandServerMediaSubsession::pauseStream(unsigned ,
00256 void* streamToken) {
00257
00258
00259 if (fReuseFirstSource) return;
00260
00261 StreamState* streamState = (StreamState*)streamToken;
00262 if (streamState != NULL) streamState->pause();
00263 }
00264
00265 void OnDemandServerMediaSubsession::seekStream(unsigned ,
00266 void* streamToken, float seekNPT) {
00267
00268
00269 if (fReuseFirstSource) return;
00270
00271 StreamState* streamState = (StreamState*)streamToken;
00272 if (streamState != NULL && streamState->mediaSource() != NULL) {
00273 seekStreamSource(streamState->mediaSource(), seekNPT);
00274 }
00275 }
00276
00277 void OnDemandServerMediaSubsession::setStreamScale(unsigned ,
00278 void* streamToken, float scale) {
00279
00280
00281 if (fReuseFirstSource) return;
00282
00283 StreamState* streamState = (StreamState*)streamToken;
00284 if (streamState != NULL && streamState->mediaSource() != NULL) {
00285 setStreamSourceScale(streamState->mediaSource(), scale);
00286 }
00287 }
00288
00289 void OnDemandServerMediaSubsession::deleteStream(unsigned clientSessionId,
00290 void*& streamToken) {
00291
00292 Destinations* destinations
00293 = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00294 if (destinations != NULL) {
00295 fDestinationsHashTable->Remove((char const*)clientSessionId);
00296 }
00297
00298
00299 StreamState* streamState = (StreamState*)streamToken;
00300 if (streamState != NULL) streamState->endPlaying(destinations);
00301
00302
00303 if (streamState != NULL && streamState->referenceCount() > 0) {
00304 --streamState->referenceCount();
00305 if (streamState->referenceCount() == 0) {
00306 delete streamState;
00307 if (fLastStreamToken == streamToken) fLastStreamToken = NULL;
00308 streamToken = NULL;
00309 }
00310 }
00311
00312
00313 delete destinations;
00314 }
00315
00316 char const* OnDemandServerMediaSubsession
00317 ::getAuxSDPLine(RTPSink* rtpSink, FramedSource* ) {
00318
00319 return rtpSink == NULL ? NULL : rtpSink->auxSDPLine();
00320 }
00321
00322 void OnDemandServerMediaSubsession::seekStreamSource(FramedSource* ,
00323 float ) {
00324
00325 }
00326
00327 void OnDemandServerMediaSubsession
00328 ::setStreamSourceScale(FramedSource* , float ) {
00329
00330 }
00331
00332 void OnDemandServerMediaSubsession
00333 ::setSDPLinesFromRTPSink(RTPSink* rtpSink, FramedSource* inputSource) {
00334 if (rtpSink == NULL) return;
00335
00336 char const* mediaType = rtpSink->sdpMediaType();
00337 unsigned char rtpPayloadType = rtpSink->rtpPayloadType();
00338 struct in_addr serverAddrForSDP; serverAddrForSDP.s_addr = fServerAddressForSDP;
00339 char* const ipAddressStr = strDup(our_inet_ntoa(serverAddrForSDP));
00340 char* rtpmapLine = rtpSink->rtpmapLine();
00341 char const* rangeLine = rangeSDPLine();
00342 char const* auxSDPLine = getAuxSDPLine(rtpSink, inputSource);
00343 if (auxSDPLine == NULL) auxSDPLine = "";
00344
00345 char const* const sdpFmt =
00346 "m=%s %u RTP/AVP %d\r\n"
00347 "c=IN IP4 %s\r\n"
00348 "%s"
00349 "%s"
00350 "%s"
00351 "a=control:%s\r\n";
00352 unsigned sdpFmtSize = strlen(sdpFmt)
00353 + strlen(mediaType) + 5 + 3
00354 + strlen(ipAddressStr)
00355 + strlen(rtpmapLine)
00356 + strlen(rangeLine)
00357 + strlen(auxSDPLine)
00358 + strlen(trackId());
00359 char* sdpLines = new char[sdpFmtSize];
00360 sprintf(sdpLines, sdpFmt,
00361 mediaType,
00362 fPortNumForSDP,
00363 rtpPayloadType,
00364 ipAddressStr,
00365 rtpmapLine,
00366 rangeLine,
00367 auxSDPLine,
00368 trackId());
00369 delete[] (char*)rangeLine; delete[] rtpmapLine; delete[] ipAddressStr;
00370
00371 fSDPLines = strDup(sdpLines);
00372 delete[] sdpLines;
00373 }
00374
00375
00377
00378 static void afterPlayingStreamState(void* clientData) {
00379 StreamState* streamState = (StreamState*)clientData;
00380 if (streamState->streamDuration() == 0.0) {
00381
00382
00383
00384
00385 streamState->reclaim();
00386 }
00387
00388
00389
00390 }
00391
00392 StreamState::StreamState(Port const& serverRTPPort, Port const& serverRTCPPort,
00393 RTPSink* rtpSink, BasicUDPSink* udpSink,
00394 float streamDuration, unsigned totalBW, char* CNAME,
00395 FramedSource* mediaSource,
00396 Groupsock* rtpGS, Groupsock* rtcpGS)
00397 : fAreCurrentlyPlaying(False), fReferenceCount(1),
00398 fServerRTPPort(serverRTPPort), fServerRTCPPort(serverRTCPPort),
00399 fRTPSink(rtpSink), fUDPSink(udpSink), fStreamDuration(streamDuration),
00400 fTotalBW(totalBW), fCNAME(CNAME), fRTCPInstance(NULL) ,
00401 fMediaSource(mediaSource), fRTPgs(rtpGS), fRTCPgs(rtcpGS) {
00402 }
00403
00404 StreamState::~StreamState() {
00405 reclaim();
00406 }
00407
00408 void StreamState
00409 ::startPlaying(Destinations* dests,
00410 TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData) {
00411 if (dests == NULL) return;
00412 if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
00413 if (fRTPSink != NULL) {
00414 fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00415 fAreCurrentlyPlaying = True;
00416 } else if (fUDPSink != NULL) {
00417 fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00418 fAreCurrentlyPlaying = True;
00419 }
00420 }
00421
00422 if (fRTCPInstance == NULL && fRTPSink != NULL) {
00423
00424 fRTCPInstance
00425 = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,
00426 fTotalBW, (unsigned char*)fCNAME,
00427 fRTPSink, NULL );
00428
00429 }
00430 if (fRTCPInstance != NULL)
00431 fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
00432 rtcpRRHandler, rtcpRRHandlerClientData);
00433
00434 if (dests->isTCP) {
00435
00436 if (fRTPSink != NULL) {
00437 fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00438 }
00439 if (fRTCPInstance != NULL) {
00440 fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00441 }
00442 } else {
00443
00444
00445 if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
00446 if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
00447 }
00448 }
00449
00450 void StreamState::pause() {
00451 if (fRTPSink != NULL) fRTPSink->stopPlaying();
00452 if (fUDPSink != NULL) fUDPSink->stopPlaying();
00453 fAreCurrentlyPlaying = False;
00454 }
00455
00456 void StreamState::endPlaying(Destinations* dests) {
00457 if (dests->isTCP) {
00458 if (fRTPSink != NULL) {
00459 fRTPSink->removeStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00460 }
00461 if (fRTCPInstance != NULL) {
00462 fRTCPInstance->removeStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00463 }
00464 } else {
00465
00466 if (fRTPgs != NULL) fRTPgs->removeDestination(dests->addr, dests->rtpPort);
00467 if (fRTCPgs != NULL) fRTCPgs->removeDestination(dests->addr, dests->rtcpPort);
00468 if (fRTCPInstance != NULL) {
00469 fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
00470 NULL, NULL);
00471 }
00472 }
00473 }
00474
00475 void StreamState::reclaim() {
00476
00477 Medium::close(fRTCPInstance) ; fRTCPInstance = NULL;
00478 Medium::close(fRTPSink); fRTPSink = NULL;
00479 Medium::close(fUDPSink); fUDPSink = NULL;
00480
00481 Medium::close(fMediaSource); fMediaSource = NULL;
00482
00483 delete fRTPgs; fRTPgs = NULL;
00484 delete fRTCPgs; fRTCPgs = NULL;
00485
00486 fReferenceCount = 0;
00487 }