00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00043 #ifndef CCXX_RTP_IQUEUE_H_
00044 #define CCXX_RTP_IQUEUE_H_
00045
00046 #include <ccrtp/queuebase.h>
00047 #include <ccrtp/CryptoContext.h>
00048
00049 #include <list>
00050
00051 NAMESPACE_COMMONCPP
00052
00067 class __EXPORT Members
00068 {
00069 public:
00070 inline void
00071 setMembersCount(uint32 n)
00072 { members = n; }
00073
00074 inline void
00075 increaseMembersCount()
00076 { members++; }
00077
00078 inline void
00079 decreaseMembersCount()
00080 { members--; }
00081
00082 inline uint32
00083 getMembersCount() const
00084 { return members; }
00085
00086 inline void
00087 setSendersCount(uint32 n)
00088 { activeSenders = n; }
00089
00090 inline void
00091 increaseSendersCount()
00092 { activeSenders++; }
00093
00094 inline void
00095 decreaseSendersCount()
00096 { activeSenders--; }
00097
00098 inline uint32
00099 getSendersCount() const
00100 { return activeSenders; }
00101
00102 protected:
00103 Members() :
00104 members(0),
00105 activeSenders(0)
00106 { }
00107
00108 inline virtual ~Members()
00109 { }
00110
00111 private:
00113 uint32 members;
00115 uint32 activeSenders;
00116 };
00117
00124 class __EXPORT SyncSourceHandler
00125 {
00126 public:
00133 inline void*
00134 getLink(const SyncSource& source) const
00135 { return source.getLink(); }
00136
00137 inline void
00138 setLink(SyncSource& source, void* link)
00139 { source.setLink(link); }
00140
00141 inline void
00142 setParticipant(SyncSource& source, Participant& p)
00143 { source.setParticipant(p); }
00144
00145 inline void
00146 setState(SyncSource& source, SyncSource::State ns)
00147 { source.setState(ns); }
00148
00149 inline void
00150 setSender(SyncSource& source, bool active)
00151 { source.setSender(active); }
00152
00153 inline void
00154 setDataTransportPort(SyncSource& source, tpport_t p)
00155 { source.setDataTransportPort(p); }
00156
00157 inline void
00158 setControlTransportPort(SyncSource& source, tpport_t p)
00159 { source.setControlTransportPort(p); }
00160
00161 inline void
00162 setNetworkAddress(SyncSource& source, InetAddress addr)
00163 { source.setNetworkAddress(addr); }
00164
00165 protected:
00166 SyncSourceHandler()
00167 { }
00168
00169 inline virtual ~SyncSourceHandler()
00170 { }
00171 };
00172
00179 class __EXPORT ParticipantHandler
00180 {
00181 public:
00182 inline void
00183 setSDESItem(Participant* part, SDESItemType item,
00184 const std::string& val)
00185 { part->setSDESItem(item,val); }
00186
00187 inline void
00188 setPRIVPrefix(Participant* part, const std::string val)
00189 { part->setPRIVPrefix(val); }
00190
00191 protected:
00192 ParticipantHandler()
00193 { }
00194
00195 inline virtual ~ParticipantHandler()
00196 { }
00197 };
00198
00205 class __EXPORT ApplicationHandler
00206 {
00207 public:
00208 inline void
00209 addParticipant(RTPApplication& app, Participant& part)
00210 { app.addParticipant(part); }
00211
00212 inline void
00213 removeParticipant(RTPApplication& app,
00214 RTPApplication::ParticipantLink* pl)
00215 { app.removeParticipant(pl); }
00216
00217 protected:
00218 ApplicationHandler()
00219 { }
00220
00221 inline virtual ~ApplicationHandler()
00222 { }
00223 };
00224
00232 class __EXPORT ConflictHandler
00233 {
00234 public:
00235 struct ConflictingTransportAddress
00236 {
00237 ConflictingTransportAddress(InetAddress na,
00238 tpport_t dtp, tpport_t ctp);
00239
00240 void setNext(ConflictingTransportAddress* nc)
00241 { next = nc; }
00242
00243 inline const InetAddress& getNetworkAddress( ) const
00244 { return networkAddress; }
00245
00246 inline tpport_t getDataTransportPort() const
00247 { return dataTransportPort; }
00248
00249 inline tpport_t getControlTransportPort() const
00250 { return controlTransportPort; }
00251
00252 InetAddress networkAddress;
00253 tpport_t dataTransportPort;
00254 tpport_t controlTransportPort;
00255 ConflictingTransportAddress* next;
00256
00257 timeval lastPacketTime;
00258 };
00259
00264 ConflictingTransportAddress* searchDataConflict(InetAddress na,
00265 tpport_t dtp);
00270 ConflictingTransportAddress* searchControlConflict(InetAddress na,
00271 tpport_t ctp);
00272
00273 void updateConflict(ConflictingTransportAddress& ca)
00274 { SysTime::gettimeofday(&(ca.lastPacketTime),NULL); }
00275
00276 void addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp);
00277
00278 protected:
00279 ConflictHandler()
00280 { firstConflict = lastConflict = NULL; }
00281
00282 inline virtual ~ConflictHandler()
00283 { }
00284
00285 ConflictingTransportAddress* firstConflict, * lastConflict;
00286 };
00287
00298 class __EXPORT MembershipBookkeeping :
00299 public SyncSourceHandler,
00300 public ParticipantHandler,
00301 public ApplicationHandler,
00302 public ConflictHandler,
00303 private Members
00304 {
00305 public:
00306 inline size_t getDefaultMembersHashSize()
00307 { return defaultMembersHashSize; }
00308
00309 protected:
00310
00324 MembershipBookkeeping(uint32 initialSize = defaultMembersHashSize);
00325
00330 inline virtual
00331 ~MembershipBookkeeping()
00332 { endMembers(); }
00333
00334 struct SyncSourceLink;
00335
00336 inline SyncSourceLink* getLink(const SyncSource& source) const
00337 { return static_cast<SyncSourceLink*>(SyncSourceHandler::getLink(source)); }
00342 inline bool isMine(const SyncSource& source) const
00343 { return getLink(source)->getMembership() == this; }
00344
00351 struct IncomingRTPPktLink
00352 {
00353 IncomingRTPPktLink(IncomingRTPPkt* pkt, SyncSourceLink* sLink,
00354 struct timeval& recv_ts,
00355 uint32 shifted_ts,
00356 IncomingRTPPktLink* sp,
00357 IncomingRTPPktLink* sn,
00358 IncomingRTPPktLink* p,
00359 IncomingRTPPktLink* n) :
00360 packet(pkt),
00361 sourceLink(sLink),
00362 prev(p), next(n),
00363 srcPrev(sp), srcNext(sn),
00364 receptionTime(recv_ts),
00365 shiftedTimestamp(shifted_ts)
00366 { }
00367
00368 ~IncomingRTPPktLink()
00369 { }
00370
00371 inline SyncSourceLink* getSourceLink() const
00372 { return sourceLink; }
00373
00374 inline void setSourceLink(SyncSourceLink* src)
00375 { sourceLink = src; }
00376
00377 inline IncomingRTPPktLink* getNext() const
00378 { return next; }
00379
00380 inline void setNext(IncomingRTPPktLink* nl)
00381 { next = nl; }
00382
00383 inline IncomingRTPPktLink* getPrev() const
00384 { return prev; }
00385
00386 inline void setPrev(IncomingRTPPktLink* pl)
00387 { prev = pl; }
00388
00389 inline IncomingRTPPktLink* getSrcNext() const
00390 { return srcNext; }
00391
00392 inline void setSrcNext(IncomingRTPPktLink* sn)
00393 { srcNext = sn; }
00394
00395 inline IncomingRTPPktLink* getSrcPrev() const
00396 { return srcPrev; }
00397
00398 inline void setSrcPrev(IncomingRTPPktLink* sp)
00399 { srcPrev = sp; }
00400
00401 inline IncomingRTPPkt* getPacket() const
00402 { return packet; }
00403
00404 inline void setPacket(IncomingRTPPkt* pkt)
00405 { packet = pkt; }
00406
00414 inline void setRecvTime(const timeval &t)
00415 { receptionTime = t; }
00416
00420 inline timeval getRecvTime() const
00421 { return receptionTime; }
00422
00431 inline uint32 getTimestamp() const
00432 { return shiftedTimestamp; }
00433
00434 inline void setTimestamp(uint32 ts)
00435 { shiftedTimestamp = ts;}
00436
00437
00438 IncomingRTPPkt* packet;
00439
00440 SyncSourceLink* sourceLink;
00441
00442 IncomingRTPPktLink* prev, * next;
00443
00444 IncomingRTPPktLink* srcPrev, * srcNext;
00445
00446 struct timeval receptionTime;
00447
00448
00449
00450 uint32 shiftedTimestamp;
00451 };
00452
00469 struct SyncSourceLink
00470 {
00471
00472 static const uint32 SEQNUMMOD;
00473
00474 SyncSourceLink(MembershipBookkeeping* m,
00475 SyncSource* s,
00476 IncomingRTPPktLink* fp = NULL,
00477 IncomingRTPPktLink* lp = NULL,
00478 SyncSourceLink* ps = NULL,
00479 SyncSourceLink* ns = NULL,
00480 SyncSourceLink* ncollis = NULL) :
00481 membership(m), source(s), first(fp), last(lp),
00482 prev(ps), next(ns), nextCollis(ncollis),
00483 prevConflict(NULL)
00484 { m->setLink(*s,this);
00485 initStats();
00486 }
00487
00491 ~SyncSourceLink();
00492
00493 inline MembershipBookkeeping* getMembership()
00494 { return membership; }
00495
00500 inline SyncSource* getSource() { return source; }
00501
00506 inline IncomingRTPPktLink* getFirst()
00507 { return first; }
00508
00509 inline void setFirst(IncomingRTPPktLink* fp)
00510 { first = fp; }
00511
00516 inline IncomingRTPPktLink* getLast()
00517 { return last; }
00518
00519 inline void setLast(IncomingRTPPktLink* lp)
00520 { last = lp; }
00521
00525 inline SyncSourceLink* getPrev()
00526 { return prev; }
00527
00528 inline void setPrev(SyncSourceLink* ps)
00529 { prev = ps; }
00530
00534 inline SyncSourceLink* getNext()
00535 { return next; }
00536
00537 inline void setNext(SyncSourceLink *ns)
00538 { next = ns; }
00539
00546 inline SyncSourceLink* getNextCollis()
00547 { return nextCollis; }
00548
00549 inline void setNextCollis(SyncSourceLink* ns)
00550 { nextCollis = ns; }
00551
00552 inline ConflictingTransportAddress* getPrevConflict() const
00553 { return prevConflict; }
00554
00558 void setPrevConflict(InetAddress& addr, tpport_t dataPort,
00559 tpport_t controlPort);
00560
00561 unsigned char* getSenderInfo()
00562 { return senderInfo; }
00563
00564 void setSenderInfo(unsigned char* si);
00565
00566 unsigned char* getReceiverInfo()
00567 { return receiverInfo; }
00568
00569 void setReceiverInfo(unsigned char* ri);
00570
00571 inline timeval getLastPacketTime() const
00572 { return lastPacketTime; }
00573
00574 inline timeval getLastRTCPPacketTime() const
00575 { return lastRTCPPacketTime; }
00576
00577 inline timeval getLastRTCPSRTime() const
00578 { return lastRTCPSRTime; }
00579
00584 inline uint32 getObservedPacketCount() const
00585 { return obsPacketCount; }
00586
00587 inline void incObservedPacketCount()
00588 { obsPacketCount++; }
00589
00594 inline uint32 getObservedOctetCount() const
00595 { return obsOctetCount; }
00596
00597 inline void incObservedOctetCount(uint32 n)
00598 { obsOctetCount += n; }
00599
00603 uint16
00604 getMaxSeqNum() const
00605 { return maxSeqNum; }
00606
00611 void
00612 setMaxSeqNum(uint16 max)
00613 { maxSeqNum = max; }
00614
00615 inline uint32
00616 getExtendedMaxSeqNum() const
00617 { return extendedMaxSeqNum; }
00618
00619 inline void
00620 setExtendedMaxSeqNum(uint32 seq)
00621 { extendedMaxSeqNum = seq; }
00622
00623 inline uint32 getCumulativePacketLost() const
00624 { return cumulativePacketLost; }
00625
00626 inline void setCumulativePacketLost(uint32 pl)
00627 { cumulativePacketLost = pl; }
00628
00629 inline uint8 getFractionLost() const
00630 { return fractionLost; }
00631
00632 inline void setFractionLost(uint8 fl)
00633 { fractionLost = fl; }
00634
00635 inline uint32 getLastPacketTransitTime()
00636 { return lastPacketTransitTime; }
00637
00638 inline void setLastPacketTransitTime(uint32 time)
00639 { lastPacketTransitTime = time; }
00640
00641 inline float getJitter() const
00642 { return jitter; }
00643
00644 inline void setJitter(float j)
00645 { jitter = j; }
00646
00647 inline uint32 getInitialDataTimestamp() const
00648 { return initialDataTimestamp; }
00649
00650 inline void setInitialDataTimestamp(uint32 ts)
00651 { initialDataTimestamp = ts; }
00652
00653 inline timeval getInitialDataTime() const
00654 { return initialDataTime; }
00655
00656 inline void setInitialDataTime(timeval it)
00657 { initialDataTime = it; }
00658
00666 bool getGoodbye()
00667 {
00668 if(!flag)
00669 return false;
00670 flag = false;
00671 return true;
00672 }
00673
00680 bool getHello() {
00681 if(flag)
00682 return false;
00683 flag = true;
00684 return true;
00685 }
00686
00687 inline uint32 getBadSeqNum() const
00688 { return badSeqNum; }
00689
00690 inline void setBadSeqNum(uint32 seq)
00691 { badSeqNum = seq; }
00692
00693 uint8 getProbation() const
00694 { return probation; }
00695
00696 inline void setProbation(uint8 p)
00697 { probation = p; }
00698
00699 inline void decProbation()
00700 { --probation; }
00701
00702 bool isValid() const
00703 { return 0 == probation; }
00704
00705 inline uint16 getBaseSeqNum() const
00706 { return baseSeqNum; }
00707
00708 inline void setBaseSeqNum(uint16 seqnum)
00709 { baseSeqNum = seqnum; }
00710
00711 inline uint32 getSeqNumAccum() const
00712 { return seqNumAccum; }
00713
00714 inline void incSeqNumAccum()
00715 { seqNumAccum += SEQNUMMOD; }
00716
00720 inline void initSequence(uint16 seqnum)
00721 { maxSeqNum = seqNumAccum = seqnum; }
00722
00733 void recordInsertion(const IncomingRTPPktLink& pl);
00734
00735 void initStats();
00736
00741 void computeStats();
00742
00743 MembershipBookkeeping* membership;
00744
00745 SyncSource* source;
00746
00747 IncomingRTPPktLink* first, * last;
00748
00749
00750 SyncSourceLink* prev, * next;
00751
00752 SyncSourceLink* nextCollis;
00753 ConflictingTransportAddress* prevConflict;
00754 unsigned char* senderInfo;
00755 unsigned char* receiverInfo;
00756
00757
00758 timeval lastPacketTime;
00759
00760 timeval lastRTCPPacketTime;
00761
00762
00763 timeval lastRTCPSRTime;
00764
00765
00766
00767 uint32 obsPacketCount;
00768
00769 uint32 obsOctetCount;
00770
00771 uint16 maxSeqNum;
00772 uint32 extendedMaxSeqNum;
00773 uint32 cumulativePacketLost;
00774 uint8 fractionLost;
00775
00776 uint32 lastPacketTransitTime;
00777
00778 float jitter;
00779 uint32 initialDataTimestamp;
00780 timeval initialDataTime;
00781
00782
00783
00784 bool flag;
00785
00786
00787 uint32 badSeqNum;
00788 uint8 probation;
00789 uint16 baseSeqNum;
00790 uint32 expectedPrior;
00791 uint32 receivedPrior;
00792 uint32 seqNumAccum;
00793 };
00794
00799 bool
00800 isRegistered(uint32 ssrc);
00801
00810 SyncSourceLink*
00811 getSourceBySSRC(uint32 ssrc, bool& created);
00812
00823 bool
00824 BYESource(uint32 ssrc);
00825
00833 bool
00834 removeSource(uint32 ssrc);
00835
00836 inline SyncSourceLink* getFirst()
00837 { return first; }
00838
00839 inline SyncSourceLink* getLast()
00840 { return last; }
00841
00842 inline uint32
00843 getMembersCount()
00844 { return Members::getMembersCount(); }
00845
00846 inline void
00847 setMembersCount(uint32 n)
00848 { Members::setMembersCount(n); }
00849
00850 inline uint32
00851 getSendersCount()
00852 { return Members::getSendersCount(); }
00853
00854 static const size_t defaultMembersHashSize;
00855 static const uint32 SEQNUMMOD;
00856
00857 private:
00858 MembershipBookkeeping(const MembershipBookkeeping &o);
00859
00860 MembershipBookkeeping&
00861 operator=(const MembershipBookkeeping &o);
00862
00867 void
00868 endMembers();
00869
00870
00871 uint32 sourceBucketsNum;
00872 SyncSourceLink** sourceLinks;
00873
00874 SyncSourceLink* first, * last;
00875 };
00876
00883 class __EXPORT IncomingDataQueue: public IncomingDataQueueBase,
00884 protected MembershipBookkeeping
00885 {
00886 public:
00892 class SyncSourcesIterator
00893 {
00894 public:
00895 typedef std::forward_iterator_tag iterator_category;
00896 typedef SyncSource value_type;
00897 typedef std::ptrdiff_t difference_type;
00898 typedef const SyncSource* pointer;
00899 typedef const SyncSource& reference;
00900
00901 SyncSourcesIterator(SyncSourceLink* l = NULL) :
00902 link(l)
00903 { }
00904
00905 SyncSourcesIterator(const SyncSourcesIterator& si) :
00906 link(si.link)
00907 { }
00908
00909 reference operator*() const
00910 { return *(link->getSource()); }
00911
00912 pointer operator->() const
00913 { return link->getSource(); }
00914
00915 SyncSourcesIterator& operator++() {
00916 link = link->getNext();
00917 return *this;
00918 }
00919
00920 SyncSourcesIterator operator++(int) {
00921 SyncSourcesIterator result(*this);
00922 ++(*this);
00923 return result;
00924 }
00925
00926 friend bool operator==(const SyncSourcesIterator& l,
00927 const SyncSourcesIterator& r)
00928 { return l.link == r.link; }
00929
00930 friend bool operator!=(const SyncSourcesIterator& l,
00931 const SyncSourcesIterator& r)
00932 { return l.link != r.link; }
00933
00934 private:
00935 SyncSourceLink *link;
00936 };
00937
00938 SyncSourcesIterator begin()
00939 { return SyncSourcesIterator(MembershipBookkeeping::getFirst()); }
00940
00941 SyncSourcesIterator end()
00942 { return SyncSourcesIterator(NULL); }
00943
00953 const AppDataUnit*
00954 getData(uint32 stamp, const SyncSource* src = NULL);
00955
00956
00963 bool
00964 isWaiting(const SyncSource* src = NULL) const;
00965
00972 uint32
00973 getFirstTimestamp(const SyncSource* src = NULL) const;
00974
00997 void
00998 setMinValidPacketSequence(uint8 packets)
00999 { minValidPacketSequence = packets; }
01000
01001 uint8
01002 getDefaultMinValidPacketSequence() const
01003 { return defaultMinValidPacketSequence; }
01004
01009 uint8
01010 getMinValidPacketSequence() const
01011 { return minValidPacketSequence; }
01012
01013 void
01014 setMaxPacketMisorder(uint16 packets)
01015 { maxPacketMisorder = packets; }
01016
01017 uint16
01018 getDefaultMaxPacketMisorder() const
01019 { return defaultMaxPacketMisorder; }
01020
01021 uint16
01022 getMaxPacketMisorder() const
01023 { return maxPacketMisorder; }
01024
01030 void
01031 setMaxPacketDropout(uint16 packets)
01032 { maxPacketDropout = packets; }
01033
01034 uint16
01035 getDefaultMaxPacketDropout() const
01036 { return defaultMaxPacketDropout; }
01037
01038 uint16
01039 getMaxPacketDropout() const
01040 { return maxPacketDropout; }
01041
01042
01043
01044 inline static size_t
01045 getDefaultMembersSize()
01046 { return defaultMembersSize; }
01047
01056 void
01057 setInQueueCryptoContext(CryptoContext* cc);
01058
01069 void
01070 removeInQueueCryptoContext(CryptoContext* cc);
01071
01079 CryptoContext*
01080 getInQueueCryptoContext(uint32 ssrc);
01081
01082 protected:
01086 IncomingDataQueue(uint32 size);
01087
01088 virtual ~IncomingDataQueue()
01089 { }
01090
01103 bool checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink,
01104 bool is_new, InetAddress& na,
01105 tpport_t tp);
01106
01122 void setSourceExpirationPeriod(uint8 intervals)
01123 { sourceExpirationPeriod = intervals; }
01124
01131 virtual size_t
01132 takeInDataPacket();
01133
01134 void renewLocalSSRC();
01135
01145 IncomingDataQueue::IncomingRTPPktLink*
01146 getWaiting(uint32 timestamp, const SyncSource *src = NULL);
01147
01163 bool
01164 recordReception(SyncSourceLink& srcLink, const IncomingRTPPkt& pkt,
01165 const timeval recvtime);
01166
01173 void
01174 recordExtraction(const IncomingRTPPkt& pkt);
01175
01176 void purgeIncomingQueue();
01177
01184 inline virtual void
01185 onNewSyncSource(const SyncSource&)
01186 { }
01187
01188 protected:
01205 inline virtual bool
01206 onRTPPacketRecv(IncomingRTPPkt&)
01207 { return true; }
01208
01217 inline virtual void onExpireRecv(IncomingRTPPkt&)
01218 { return; }
01219
01233 inline virtual bool
01234 onSRTPPacketError(IncomingRTPPkt& pkt, int32 errorCode)
01235 { return false; }
01236
01237 inline virtual bool
01238 end2EndDelayed(IncomingRTPPktLink&)
01239 { return false; }
01240
01256 bool
01257 insertRecvPacket(IncomingRTPPktLink* packetLink);
01258
01270 virtual size_t
01271 recvData(unsigned char* buffer, size_t length,
01272 InetHostAddress& host, tpport_t& port) = 0;
01273
01274 virtual size_t
01275 getNextDataPacketSize() const = 0;
01276
01277 mutable ThreadLock recvLock;
01278
01279 IncomingRTPPktLink* recvFirst, * recvLast;
01280
01281 static const uint8 defaultMinValidPacketSequence;
01282 static const uint16 defaultMaxPacketMisorder;
01283 static const uint16 defaultMaxPacketDropout;
01284 uint8 minValidPacketSequence;
01285 uint16 maxPacketMisorder;
01286 uint16 maxPacketDropout;
01287 static const size_t defaultMembersSize;
01288 uint8 sourceExpirationPeriod;
01289 mutable Mutex cryptoMutex;
01290 std::list<CryptoContext *> cryptoContexts;
01291 };
01292
01294
01295 END_NAMESPACE
01296
01297 #endif //CCXX_RTP_IQUEUE_H_
01298