00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00044 #ifndef CCXX_RTP_IQUEUE_H_
00045 #define CCXX_RTP_IQUEUE_H_
00046
00047 #include <ccrtp/queuebase.h>
00048 #include <ccrtp/CryptoContext.h>
00049
00050 #include <list>
00051
00052 #ifdef CCXX_NAMESPACES
00053 namespace ost {
00054 #endif
00055
00070 class __EXPORT Members
00071 {
00072 public:
00073 inline void
00074 setMembersCount(uint32 n)
00075 { members = n; }
00076
00077 inline void
00078 increaseMembersCount()
00079 { members++; }
00080
00081 inline void
00082 decreaseMembersCount()
00083 { members--; }
00084
00085 inline uint32
00086 getMembersCount() const
00087 { return members; }
00088
00089 inline void
00090 setSendersCount(uint32 n)
00091 { activeSenders = n; }
00092
00093 inline void
00094 increaseSendersCount()
00095 { activeSenders++; }
00096
00097 inline void
00098 decreaseSendersCount()
00099 { activeSenders--; }
00100
00101 inline uint32
00102 getSendersCount() const
00103 { return activeSenders; }
00104
00105 protected:
00106 Members() :
00107 members(0),
00108 activeSenders(0)
00109 { }
00110
00111 inline virtual ~Members()
00112 { }
00113
00114 private:
00116 uint32 members;
00118 uint32 activeSenders;
00119 };
00120
00127 class __EXPORT SyncSourceHandler
00128 {
00129 public:
00136 inline void*
00137 getLink(const SyncSource& source) const
00138 { return source.getLink(); }
00139
00140 inline void
00141 setLink(SyncSource& source, void* link)
00142 { source.setLink(link); }
00143
00144 inline void
00145 setParticipant(SyncSource& source, Participant& p)
00146 { source.setParticipant(p); }
00147
00148 inline void
00149 setState(SyncSource& source, SyncSource::State ns)
00150 { source.setState(ns); }
00151
00152 inline void
00153 setSender(SyncSource& source, bool active)
00154 { source.setSender(active); }
00155
00156 inline void
00157 setDataTransportPort(SyncSource& source, tpport_t p)
00158 { source.setDataTransportPort(p); }
00159
00160 inline void
00161 setControlTransportPort(SyncSource& source, tpport_t p)
00162 { source.setControlTransportPort(p); }
00163
00164 inline void
00165 setNetworkAddress(SyncSource& source, InetAddress addr)
00166 { source.setNetworkAddress(addr); }
00167
00168 protected:
00169 SyncSourceHandler()
00170 { }
00171
00172 inline virtual ~SyncSourceHandler()
00173 { }
00174 };
00175
00182 class __EXPORT ParticipantHandler
00183 {
00184 public:
00185 inline void
00186 setSDESItem(Participant* part, SDESItemType item,
00187 const std::string& val)
00188 { part->setSDESItem(item,val); }
00189
00190 inline void
00191 setPRIVPrefix(Participant* part, const std::string val)
00192 { part->setPRIVPrefix(val); }
00193
00194 protected:
00195 ParticipantHandler()
00196 { }
00197
00198 inline virtual ~ParticipantHandler()
00199 { }
00200 };
00201
00208 class __EXPORT ApplicationHandler
00209 {
00210 public:
00211 inline void
00212 addParticipant(RTPApplication& app, Participant& part)
00213 { app.addParticipant(part); }
00214
00215 inline void
00216 removeParticipant(RTPApplication& app,
00217 RTPApplication::ParticipantLink* pl)
00218 { app.removeParticipant(pl); }
00219
00220 protected:
00221 ApplicationHandler()
00222 { }
00223
00224 inline virtual ~ApplicationHandler()
00225 { }
00226 };
00227
00235 class __EXPORT ConflictHandler
00236 {
00237 public:
00238 struct ConflictingTransportAddress
00239 {
00240 ConflictingTransportAddress(InetAddress na,
00241 tpport_t dtp, tpport_t ctp);
00242
00243 void setNext(ConflictingTransportAddress* nc)
00244 { next = nc; }
00245
00246 inline const InetAddress& getNetworkAddress( ) const
00247 { return networkAddress; }
00248
00249 inline tpport_t getDataTransportPort() const
00250 { return dataTransportPort; }
00251
00252 inline tpport_t getControlTransportPort() const
00253 { return controlTransportPort; }
00254
00255 InetAddress networkAddress;
00256 tpport_t dataTransportPort;
00257 tpport_t controlTransportPort;
00258 ConflictingTransportAddress* next;
00259
00260 timeval lastPacketTime;
00261 };
00262
00267 ConflictingTransportAddress* searchDataConflict(InetAddress na,
00268 tpport_t dtp);
00273 ConflictingTransportAddress* searchControlConflict(InetAddress na,
00274 tpport_t ctp);
00275
00276 void updateConflict(ConflictingTransportAddress& ca)
00277 { gettimeofday(&(ca.lastPacketTime),NULL); }
00278
00279 void addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp);
00280
00281 protected:
00282 ConflictHandler()
00283 { firstConflict = lastConflict = NULL; }
00284
00285 inline virtual ~ConflictHandler()
00286 { }
00287
00288 ConflictingTransportAddress* firstConflict, * lastConflict;
00289 };
00290
00301 class __EXPORT MembershipBookkeeping :
00302 public SyncSourceHandler,
00303 public ParticipantHandler,
00304 public ApplicationHandler,
00305 public ConflictHandler,
00306 private Members
00307 {
00308 public:
00309 inline size_t getDefaultMembersHashSize()
00310 { return defaultMembersHashSize; }
00311
00312 protected:
00313
00327 MembershipBookkeeping(uint32 initialSize = defaultMembersHashSize);
00328
00333 inline virtual
00334 ~MembershipBookkeeping()
00335 { endMembers(); }
00336
00337 struct SyncSourceLink;
00338
00339 inline SyncSourceLink* getLink(const SyncSource& source) const
00340 { return static_cast<SyncSourceLink*>(SyncSourceHandler::getLink(source)); }
00345 inline bool isMine(const SyncSource& source) const
00346 { return getLink(source)->getMembership() == this; }
00347
00354 struct IncomingRTPPktLink
00355 {
00356 IncomingRTPPktLink(IncomingRTPPkt* pkt, SyncSourceLink* sLink,
00357 struct timeval& recv_ts,
00358 uint32 shifted_ts,
00359 IncomingRTPPktLink* sp,
00360 IncomingRTPPktLink* sn,
00361 IncomingRTPPktLink* p,
00362 IncomingRTPPktLink* n) :
00363 packet(pkt),
00364 sourceLink(sLink),
00365 prev(p), next(n),
00366 srcPrev(sp), srcNext(sn),
00367 receptionTime(recv_ts),
00368 shiftedTimestamp(shifted_ts)
00369 { }
00370
00371 ~IncomingRTPPktLink()
00372 { }
00373
00374 inline SyncSourceLink* getSourceLink() const
00375 { return sourceLink; }
00376
00377 inline void setSourceLink(SyncSourceLink* src)
00378 { sourceLink = src; }
00379
00380 inline IncomingRTPPktLink* getNext() const
00381 { return next; }
00382
00383 inline void setNext(IncomingRTPPktLink* nl)
00384 { next = nl; }
00385
00386 inline IncomingRTPPktLink* getPrev() const
00387 { return prev; }
00388
00389 inline void setPrev(IncomingRTPPktLink* pl)
00390 { prev = pl; }
00391
00392 inline IncomingRTPPktLink* getSrcNext() const
00393 { return srcNext; }
00394
00395 inline void setSrcNext(IncomingRTPPktLink* sn)
00396 { srcNext = sn; }
00397
00398 inline IncomingRTPPktLink* getSrcPrev() const
00399 { return srcPrev; }
00400
00401 inline void setSrcPrev(IncomingRTPPktLink* sp)
00402 { srcPrev = sp; }
00403
00404 inline IncomingRTPPkt* getPacket() const
00405 { return packet; }
00406
00407 inline void setPacket(IncomingRTPPkt* pkt)
00408 { packet = pkt; }
00409
00417 inline void setRecvTime(const timeval &t)
00418 { receptionTime = t; }
00419
00423 inline timeval getRecvTime() const
00424 { return receptionTime; }
00425
00434 inline uint32 getTimestamp() const
00435 { return shiftedTimestamp; }
00436
00437 inline void setTimestamp(uint32 ts)
00438 { shiftedTimestamp = ts;}
00439
00440
00441 IncomingRTPPkt* packet;
00442
00443 SyncSourceLink* sourceLink;
00444
00445 IncomingRTPPktLink* prev, * next;
00446
00447 IncomingRTPPktLink* srcPrev, * srcNext;
00448
00449 struct timeval receptionTime;
00450
00451
00452
00453 uint32 shiftedTimestamp;
00454 };
00455
00472 struct SyncSourceLink
00473 {
00474
00475 static const uint32 SEQNUMMOD;
00476
00477 SyncSourceLink(MembershipBookkeeping* m,
00478 SyncSource* s,
00479 IncomingRTPPktLink* fp = NULL,
00480 IncomingRTPPktLink* lp = NULL,
00481 SyncSourceLink* ps = NULL,
00482 SyncSourceLink* ns = NULL,
00483 SyncSourceLink* ncollis = NULL) :
00484 membership(m), source(s), first(fp), last(lp),
00485 prev(ps), next(ns), nextCollis(ncollis),
00486 prevConflict(NULL)
00487 { m->setLink(*s,this);
00488 initStats();
00489 }
00490
00494 ~SyncSourceLink();
00495
00496 inline MembershipBookkeeping* getMembership()
00497 { return membership; }
00498
00503 inline SyncSource* getSource() { return source; }
00504
00509 inline IncomingRTPPktLink* getFirst()
00510 { return first; }
00511
00512 inline void setFirst(IncomingRTPPktLink* fp)
00513 { first = fp; }
00514
00519 inline IncomingRTPPktLink* getLast()
00520 { return last; }
00521
00522 inline void setLast(IncomingRTPPktLink* lp)
00523 { last = lp; }
00524
00528 inline SyncSourceLink* getPrev()
00529 { return prev; }
00530
00531 inline void setPrev(SyncSourceLink* ps)
00532 { prev = ps; }
00533
00537 inline SyncSourceLink* getNext()
00538 { return next; }
00539
00540 inline void setNext(SyncSourceLink *ns)
00541 { next = ns; }
00542
00549 inline SyncSourceLink* getNextCollis()
00550 { return nextCollis; }
00551
00552 inline void setNextCollis(SyncSourceLink* ns)
00553 { nextCollis = ns; }
00554
00555 inline ConflictingTransportAddress* getPrevConflict() const
00556 { return prevConflict; }
00557
00561 void setPrevConflict(InetAddress& addr, tpport_t dataPort,
00562 tpport_t controlPort);
00563
00564 unsigned char* getSenderInfo()
00565 { return senderInfo; }
00566
00567 void setSenderInfo(unsigned char* si);
00568
00569 unsigned char* getReceiverInfo()
00570 { return receiverInfo; }
00571
00572 void setReceiverInfo(unsigned char* ri);
00573
00574 inline timeval getLastPacketTime() const
00575 { return lastPacketTime; }
00576
00577 inline timeval getLastRTCPPacketTime() const
00578 { return lastRTCPPacketTime; }
00579
00580 inline timeval getLastRTCPSRTime() const
00581 { return lastRTCPSRTime; }
00582
00587 inline uint32 getObservedPacketCount() const
00588 { return obsPacketCount; }
00589
00590 inline void incObservedPacketCount()
00591 { obsPacketCount++; }
00592
00597 inline uint32 getObservedOctetCount() const
00598 { return obsOctetCount; }
00599
00600 inline void incObservedOctetCount(uint32 n)
00601 { obsOctetCount += n; }
00602
00606 uint16
00607 getMaxSeqNum() const
00608 { return maxSeqNum; }
00609
00614 void
00615 setMaxSeqNum(uint16 max)
00616 { maxSeqNum = max; }
00617
00618 inline uint32
00619 getExtendedMaxSeqNum() const
00620 { return extendedMaxSeqNum; }
00621
00622 inline void
00623 setExtendedMaxSeqNum(uint32 seq)
00624 { extendedMaxSeqNum = seq; }
00625
00626 inline uint32 getCumulativePacketLost() const
00627 { return cumulativePacketLost; }
00628
00629 inline void setCumulativePacketLost(uint32 pl)
00630 { cumulativePacketLost = pl; }
00631
00632 inline uint8 getFractionLost() const
00633 { return fractionLost; }
00634
00635 inline void setFractionLost(uint8 fl)
00636 { fractionLost = fl; }
00637
00638 inline uint32 getLastPacketTransitTime()
00639 { return lastPacketTransitTime; }
00640
00641 inline void setLastPacketTransitTime(uint32 time)
00642 { lastPacketTransitTime = time; }
00643
00644 inline float getJitter() const
00645 { return jitter; }
00646
00647 inline void setJitter(float j)
00648 { jitter = j; }
00649
00650 inline uint32 getInitialDataTimestamp() const
00651 { return initialDataTimestamp; }
00652
00653 inline void setInitialDataTimestamp(uint32 ts)
00654 { initialDataTimestamp = ts; }
00655
00656 inline timeval getInitialDataTime() const
00657 { return initialDataTime; }
00658
00659 inline void setInitialDataTime(timeval it)
00660 { initialDataTime = it; }
00661
00669 bool getGoodbye()
00670 {
00671 if(!flag)
00672 return false;
00673 flag = false;
00674 return true;
00675 }
00676
00683 bool getHello() {
00684 if(flag)
00685 return false;
00686 flag = true;
00687 return true;
00688 }
00689
00690 inline uint32 getBadSeqNum() const
00691 { return badSeqNum; }
00692
00693 inline void setBadSeqNum(uint32 seq)
00694 { badSeqNum = seq; }
00695
00696 uint8 getProbation() const
00697 { return probation; }
00698
00699 inline void setProbation(uint8 p)
00700 { probation = p; }
00701
00702 inline void decProbation()
00703 { --probation; }
00704
00705 bool isValid() const
00706 { return 0 == probation; }
00707
00708 inline uint16 getBaseSeqNum() const
00709 { return baseSeqNum; }
00710
00711 inline uint32 getSeqNumAccum() const
00712 { return seqNumAccum; }
00713
00714 inline void incSeqNumAccum()
00715 { seqNumAccum += SEQNUMMOD; }
00716
00720 inline void initSequence(uint16 seqnum)
00721 { maxSeqNum = seqNumAccum = seqnum; }
00722
00733 void recordInsertion(const IncomingRTPPktLink& pl);
00734
00735 void initStats();
00736
00741 void computeStats();
00742
00743 MembershipBookkeeping* membership;
00744
00745 SyncSource* source;
00746
00747 IncomingRTPPktLink* first, * last;
00748
00749
00750 SyncSourceLink* prev, * next;
00751
00752 SyncSourceLink* nextCollis;
00753 ConflictingTransportAddress* prevConflict;
00754 unsigned char* senderInfo;
00755 unsigned char* receiverInfo;
00756
00757
00758 timeval lastPacketTime;
00759
00760 timeval lastRTCPPacketTime;
00761
00762
00763 timeval lastRTCPSRTime;
00764
00765
00766
00767 uint32 obsPacketCount;
00768
00769 uint32 obsOctetCount;
00770
00771 uint16 maxSeqNum;
00772 uint32 extendedMaxSeqNum;
00773 uint32 cumulativePacketLost;
00774 uint8 fractionLost;
00775
00776 uint32 lastPacketTransitTime;
00777
00778 float jitter;
00779 uint32 initialDataTimestamp;
00780 timeval initialDataTime;
00781
00782
00783
00784 bool flag;
00785
00786
00787 uint32 badSeqNum;
00788 uint8 probation;
00789 uint16 baseSeqNum;
00790 uint32 expectedPrior;
00791 uint32 receivedPrior;
00792 uint32 seqNumAccum;
00793 };
00794
00799 bool
00800 isRegistered(uint32 ssrc);
00801
00810 SyncSourceLink*
00811 getSourceBySSRC(uint32 ssrc, bool& created);
00812
00823 bool
00824 BYESource(uint32 ssrc);
00825
00833 bool
00834 removeSource(uint32 ssrc);
00835
00836 inline SyncSourceLink* getFirst()
00837 { return first; }
00838
00839 inline SyncSourceLink* getLast()
00840 { return last; }
00841
00842 inline uint32
00843 getMembersCount()
00844 { return Members::getMembersCount(); }
00845
00846 inline void
00847 setMembersCount(uint32 n)
00848 { Members::setMembersCount(n); }
00849
00850 inline uint32
00851 getSendersCount()
00852 { return Members::getSendersCount(); }
00853
00854 static const size_t defaultMembersHashSize;
00855 static const uint32 SEQNUMMOD;
00856
00857 private:
00858 MembershipBookkeeping(const MembershipBookkeeping &o);
00859
00860 MembershipBookkeeping&
00861 operator=(const MembershipBookkeeping &o);
00862
00867 void
00868 endMembers();
00869
00870
00871 uint32 sourceBucketsNum;
00872 SyncSourceLink** sourceLinks;
00873
00874 SyncSourceLink* first, * last;
00875 };
00876
00883 class __EXPORT IncomingDataQueue: public IncomingDataQueueBase,
00884 protected MembershipBookkeeping
00885 {
00886 public:
00892 class SyncSourcesIterator
00893 {
00894 public:
00895 typedef std::forward_iterator_tag iterator_category;
00896 typedef SyncSource value_type;
00897 typedef ptrdiff_t difference_type;
00898 typedef const SyncSource* pointer;
00899 typedef const SyncSource& reference;
00900
00901 SyncSourcesIterator(SyncSourceLink* l = NULL) :
00902 link(l)
00903 { }
00904
00905 SyncSourcesIterator(const SyncSourcesIterator& si) :
00906 link(si.link)
00907 { }
00908
00909 reference operator*() const
00910 { return *(link->getSource()); }
00911
00912 pointer operator->() const
00913 { return link->getSource(); }
00914
00915 SyncSourcesIterator& operator++() {
00916 link = link->getNext();
00917 return *this;
00918 }
00919
00920 SyncSourcesIterator operator++(int) {
00921 SyncSourcesIterator result(*this);
00922 ++(*this);
00923 return result;
00924 }
00925
00926 friend bool operator==(const SyncSourcesIterator& l,
00927 const SyncSourcesIterator& r)
00928 { return l.link == r.link; }
00929
00930 friend bool operator!=(const SyncSourcesIterator& l,
00931 const SyncSourcesIterator& r)
00932 { return l.link != r.link; }
00933
00934 private:
00935 SyncSourceLink *link;
00936 };
00937
00938 SyncSourcesIterator begin()
00939 { return SyncSourcesIterator(MembershipBookkeeping::getFirst()); }
00940
00941 SyncSourcesIterator end()
00942 { return SyncSourcesIterator(NULL); }
00943
00953 const AppDataUnit*
00954 getData(uint32 stamp, const SyncSource* src = NULL);
00955
00956
00963 bool
00964 isWaiting(const SyncSource* src = NULL) const;
00965
00972 uint32
00973 getFirstTimestamp(const SyncSource* src = NULL) const;
00974
00997 void
00998 setMinValidPacketSequence(uint8 packets)
00999 { minValidPacketSequence = packets; }
01000
01001 uint8
01002 getDefaultMinValidPacketSequence() const
01003 { return defaultMinValidPacketSequence; }
01004
01009 uint8
01010 getMinValidPacketSequence() const
01011 { return minValidPacketSequence; }
01012
01013 void
01014 setMaxPacketMisorder(uint16 packets)
01015 { maxPacketMisorder = packets; }
01016
01017 uint16
01018 getDefaultMaxPacketMisorder() const
01019 { return defaultMaxPacketMisorder; }
01020
01021 uint16
01022 getMaxPacketMisorder() const
01023 { return maxPacketMisorder; }
01024
01030 void
01031 setMaxPacketDropout(uint16 packets)
01032 { maxPacketDropout = packets; }
01033
01034 uint16
01035 getDefaultMaxPacketDropout() const
01036 { return defaultMaxPacketDropout; }
01037
01038 uint16
01039 getMaxPacketDropout() const
01040 { return maxPacketDropout; }
01041
01042
01043
01044 inline static size_t
01045 getDefaultMembersSize()
01046 { return defaultMembersSize; }
01047
01056 void
01057 setInQueueCryptoContext(CryptoContext* cc);
01058
01069 void
01070 removeInQueueCryptoContext(CryptoContext* cc);
01071
01079 CryptoContext*
01080 getInQueueCryptoContext(uint32 ssrc);
01081
01082 protected:
01086 IncomingDataQueue(uint32 size);
01087
01088 virtual ~IncomingDataQueue()
01089 { }
01090
01103 bool checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink,
01104 bool is_new, InetAddress& na,
01105 tpport_t tp);
01106
01122 void setSourceExpirationPeriod(uint8 intervals)
01123 { sourceExpirationPeriod = intervals; }
01124
01131 virtual size_t
01132 takeInDataPacket();
01133
01134 void renewLocalSSRC();
01135
01145 IncomingDataQueue::IncomingRTPPktLink*
01146 getWaiting(uint32 timestamp, const SyncSource *src = NULL);
01147
01163 bool
01164 recordReception(SyncSourceLink& srcLink, const IncomingRTPPkt& pkt,
01165 const timeval recvtime);
01166
01173 void
01174 recordExtraction(const IncomingRTPPkt& pkt);
01175
01176 void purgeIncomingQueue();
01177
01184 inline virtual void
01185 onNewSyncSource(const SyncSource&)
01186 { }
01187
01188 protected:
01205 inline virtual bool
01206 onRTPPacketRecv(IncomingRTPPkt&)
01207 { return true; }
01208
01217 inline virtual void onExpireRecv(IncomingRTPPkt&)
01218 { return; }
01219
01233 inline virtual bool
01234 onSRTPPacketError(IncomingRTPPkt& pkt, int32 errorCode)
01235 { return false; }
01236
01237 inline virtual bool
01238 end2EndDelayed(IncomingRTPPktLink&)
01239 { return false; }
01240
01256 bool
01257 insertRecvPacket(IncomingRTPPktLink* packetLink);
01258
01270 virtual size_t
01271 recvData(unsigned char* buffer, size_t length,
01272 InetHostAddress& host, tpport_t& port) = 0;
01273
01274 virtual size_t
01275 getNextDataPacketSize() const = 0;
01276
01277 mutable ThreadLock recvLock;
01278
01279 IncomingRTPPktLink* recvFirst, * recvLast;
01280
01281 static const uint8 defaultMinValidPacketSequence;
01282 static const uint16 defaultMaxPacketMisorder;
01283 static const uint16 defaultMaxPacketDropout;
01284 uint8 minValidPacketSequence;
01285 uint16 maxPacketMisorder;
01286 uint16 maxPacketDropout;
01287 static const size_t defaultMembersSize;
01288 uint8 sourceExpirationPeriod;
01289 mutable Mutex cryptoMutex;
01290 std::list<CryptoContext *> cryptoContexts;
01291 };
01292
01294
01295 #ifdef CCXX_NAMESPACES
01296 }
01297 #endif
01298
01299 #endif //CCXX_RTP_IQUEUE_H_
01300