00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #if defined(WIN32) || defined(_WIN32_WCE)
00022 #include "ortp-config-win32.h"
00023 #else
00024 #include "ortp-config.h"
00025 #endif
00026
00027 #include "ortp/ortp.h"
00028 #include "ortp/telephonyevents.h"
00029 #include "ortp/rtcp.h"
00030 #include "jitterctl.h"
00031 #include "scheduler.h"
00032 #include "utils.h"
00033 #include "rtpsession_priv.h"
00034
00035 extern mblk_t *rtcp_create_simple_bye_packet(uint32_t ssrc, const char *reason);
00036 extern int rtcp_sr_init(RtpSession *session, char *buf, int size);
00037 extern int rtcp_rr_init(RtpSession *session, char *buf, int size);
00038
00039
00040
00041
00042 static void payload_type_changed(RtpSession *session, PayloadType *pt){
00043 jitter_control_set_payload(&session->rtp.jittctl,pt);
00044 session->rtp.rtcp_report_snt_interval=RTCP_DEFAULT_REPORT_INTERVAL*pt->clock_rate;
00045 rtp_session_set_time_jump_limit(session,session->rtp.time_jump);
00046 if (pt->type==PAYLOAD_VIDEO){
00047 session->permissive=TRUE;
00048 ortp_message("Using permissive algorithm");
00049 }
00050 else session->permissive=FALSE;
00051 }
00052
00053 void wait_point_init(WaitPoint *wp){
00054 ortp_mutex_init(&wp->lock,NULL);
00055 ortp_cond_init(&wp->cond,NULL);
00056 wp->time=0;
00057 wp->wakeup=FALSE;
00058 }
00059 void wait_point_uninit(WaitPoint *wp){
00060 ortp_cond_destroy(&wp->cond);
00061 ortp_mutex_destroy(&wp->lock);
00062 }
00063
00064 #define wait_point_lock(wp) ortp_mutex_lock(&(wp)->lock)
00065 #define wait_point_unlock(wp) ortp_mutex_unlock(&(wp)->lock)
00066
00067 void wait_point_wakeup_at(WaitPoint *wp, uint32_t t, bool_t dosleep){
00068 wp->time=t;
00069 wp->wakeup=TRUE;
00070 if (dosleep) ortp_cond_wait(&wp->cond,&wp->lock);
00071 }
00072
00073
00074 bool_t wait_point_check(WaitPoint *wp, uint32_t t){
00075 bool_t ok=FALSE;
00076
00077 if (wp->wakeup){
00078 if (TIME_IS_NEWER_THAN(t,wp->time)){
00079 wp->wakeup=FALSE;
00080 ok=TRUE;
00081
00082 }
00083 }
00084 return ok;
00085 }
00086 #define wait_point_wakeup(wp) ortp_cond_signal(&(wp)->cond);
00087
00088 extern void rtp_parse(RtpSession *session, mblk_t *mp, uint32_t local_str_ts,
00089 struct sockaddr *addr, socklen_t addrlen);
00090
00091
00092 static uint32_t uint32_t_random(){
00093 return random();
00094 }
00095
00096
00097 #define RTP_SEQ_IS_GREATER(seq1,seq2)\
00098 ((uint16_t)((uint16_t)(seq1) - (uint16_t)(seq2))< (uint16_t)(1<<15))
00099
00100
00101 void rtp_putq(queue_t *q, mblk_t *mp)
00102 {
00103 mblk_t *tmp;
00104 rtp_header_t *rtp=(rtp_header_t*)mp->b_rptr,*tmprtp;
00105
00106
00107 ortp_debug("rtp_putq(): Enqueuing packet with ts=%i and seq=%i",rtp->timestamp,rtp->seq_number);
00108
00109 if (qempty(q)) {
00110 putq(q,mp);
00111 return;
00112 }
00113 tmp=qlast(q);
00114
00115
00116 while (!qend(q,tmp))
00117 {
00118 tmprtp=(rtp_header_t*)tmp->b_rptr;
00119 ortp_debug("rtp_putq(): Seeing packet with seq=%i",tmprtp->seq_number);
00120
00121 if (rtp->seq_number == tmprtp->seq_number)
00122 {
00123
00124 ortp_debug("rtp_putq: duplicated message.");
00125 freemsg(mp);
00126 return;
00127 }else if (RTP_SEQ_IS_GREATER(rtp->seq_number,tmprtp->seq_number)){
00128
00129 insq(q,tmp->b_next,mp);
00130 return;
00131 }
00132 tmp=tmp->b_prev;
00133 }
00134
00135
00136 insq(q,qfirst(q),mp);
00137
00138 }
00139
00140
00141
00142 mblk_t *rtp_getq(queue_t *q,uint32_t timestamp, int *rejected)
00143 {
00144 mblk_t *tmp,*ret=NULL,*old=NULL;
00145 rtp_header_t *tmprtp;
00146 uint32_t ts_found=0;
00147
00148 *rejected=0;
00149 ortp_debug("rtp_getq(): Timestamp %i wanted.",timestamp);
00150
00151 if (qempty(q))
00152 {
00153
00154 return NULL;
00155 }
00156
00157
00158 while ((tmp=qfirst(q))!=NULL)
00159 {
00160 tmprtp=(rtp_header_t*)tmp->b_rptr;
00161 ortp_debug("rtp_getq: Seeing packet with ts=%i",tmprtp->timestamp);
00162 if ( RTP_TIMESTAMP_IS_NEWER_THAN(timestamp,tmprtp->timestamp) )
00163 {
00164 if (ret!=NULL && tmprtp->timestamp==ts_found) {
00165
00166 break;
00167 }
00168 if (old!=NULL) {
00169 ortp_debug("rtp_getq: discarding too old packet with ts=%i",ts_found);
00170 (*rejected)++;
00171 freemsg(old);
00172 }
00173 ret=getq(q);
00174 ts_found=tmprtp->timestamp;
00175 ortp_debug("rtp_getq: Found packet with ts=%i",tmprtp->timestamp);
00176 old=ret;
00177 }
00178 else
00179 {
00180 break;
00181 }
00182 }
00183 return ret;
00184 }
00185
00186 mblk_t *rtp_getq_permissive(queue_t *q,uint32_t timestamp, int *rejected)
00187 {
00188 mblk_t *tmp,*ret=NULL;
00189 rtp_header_t *tmprtp;
00190
00191 *rejected=0;
00192 ortp_debug("rtp_getq_permissive(): Timestamp %i wanted.",timestamp);
00193
00194 if (qempty(q))
00195 {
00196
00197 return NULL;
00198 }
00199
00200
00201 tmp=qfirst(q);
00202 tmprtp=(rtp_header_t*)tmp->b_rptr;
00203 ortp_debug("rtp_getq_permissive: Seeing packet with ts=%i",tmprtp->timestamp);
00204 if ( RTP_TIMESTAMP_IS_NEWER_THAN(timestamp,tmprtp->timestamp) )
00205 {
00206 ret=getq(q);
00207 ortp_debug("rtp_getq_permissive: Found packet with ts=%i",tmprtp->timestamp);
00208 }
00209 return ret;
00210 }
00211
00212
00213 void
00214 rtp_session_init (RtpSession * session, int mode)
00215 {
00216 JBParameters jbp;
00217 memset (session, 0, sizeof (RtpSession));
00218 session->rtp.max_rq_size = 100;
00219 session->mode = (RtpSessionMode) mode;
00220 if ((mode == RTP_SESSION_RECVONLY) || (mode == RTP_SESSION_SENDRECV))
00221 {
00222 rtp_session_set_flag (session, RTP_SESSION_RECV_SYNC);
00223 rtp_session_set_flag (session, RTP_SESSION_RECV_NOT_STARTED);
00224
00225 }
00226 if ((mode == RTP_SESSION_SENDONLY) || (mode == RTP_SESSION_SENDRECV))
00227 {
00228 rtp_session_set_flag (session, RTP_SESSION_SEND_NOT_STARTED);
00229 session->snd.ssrc=uint32_t_random();
00230
00231 rtp_session_set_source_description(session,"unknown@unknown",NULL,NULL,
00232 NULL,NULL,"oRTP-" ORTP_VERSION,"This is free sofware (LGPL) !");
00233 }
00234 session->snd.telephone_events_pt=-1;
00235 session->rcv.telephone_events_pt=-1;
00236 rtp_session_set_profile (session, &av_profile);
00237 session->rtp.socket=-1;
00238 session->rtcp.socket=-1;
00239 session->dscp=RTP_DEFAULT_DSCP;
00240 session->multicast_ttl=RTP_DEFAULT_MULTICAST_TTL;
00241 session->multicast_loopback=RTP_DEFAULT_MULTICAST_LOOPBACK;
00242 qinit(&session->rtp.rq);
00243 qinit(&session->rtp.tev_rq);
00244 qinit(&session->contributing_sources);
00245 session->eventqs=NULL;
00246
00247 rtp_signal_table_init (&session->on_ssrc_changed, session,"ssrc_changed");
00248 rtp_signal_table_init (&session->on_payload_type_changed, session,"payload_type_changed");
00249 rtp_signal_table_init (&session->on_telephone_event, session,"telephone-event");
00250 rtp_signal_table_init (&session->on_telephone_event_packet, session,"telephone-event_packet");
00251 rtp_signal_table_init (&session->on_timestamp_jump,session,"timestamp_jump");
00252 rtp_signal_table_init (&session->on_network_error,session,"network_error");
00253 rtp_signal_table_init (&session->on_rtcp_bye,session,"rtcp_bye");
00254 wait_point_init(&session->snd.wp);
00255 wait_point_init(&session->rcv.wp);
00256
00257 rtp_session_set_send_payload_type(session,0);
00258
00259 rtp_session_set_recv_payload_type(session,-1);
00260
00261 jbp.min_size=RTP_DEFAULT_JITTER_TIME;
00262 jbp.nom_size=RTP_DEFAULT_JITTER_TIME;
00263 jbp.max_size=-1;
00264 jbp.adaptive=TRUE;
00265 rtp_session_enable_jitter_buffer(session,TRUE);
00266 rtp_session_set_jitter_buffer_params(session,&jbp);
00267 rtp_session_set_time_jump_limit(session,5000);
00268 rtp_session_enable_rtcp(session,TRUE);
00269 session->recv_buf_size = UDP_MAX_SIZE;
00270 session->symmetric_rtp = FALSE;
00271 session->permissive=FALSE;
00272 }
00273
00274
00284 RtpSession *
00285 rtp_session_new (int mode)
00286 {
00287 RtpSession *session;
00288 session = (RtpSession *) ortp_malloc (sizeof (RtpSession));
00289 rtp_session_init (session, mode);
00290 return session;
00291 }
00292
00306 void
00307 rtp_session_set_scheduling_mode (RtpSession * session, int yesno)
00308 {
00309 if (yesno)
00310 {
00311 RtpScheduler *sched;
00312 sched = ortp_get_scheduler ();
00313 if (sched != NULL)
00314 {
00315 rtp_session_set_flag (session, RTP_SESSION_SCHEDULED);
00316 session->sched = sched;
00317 rtp_scheduler_add_session (sched, session);
00318 }
00319 else
00320 ortp_warning
00321 ("rtp_session_set_scheduling_mode: Cannot use scheduled mode because the "
00322 "scheduler is not started. Use ortp_scheduler_init() before.");
00323 }
00324 else
00325 rtp_session_unset_flag (session, RTP_SESSION_SCHEDULED);
00326 }
00327
00328
00341 void
00342 rtp_session_set_blocking_mode (RtpSession * session, int yesno)
00343 {
00344 if (yesno){
00345 rtp_session_set_scheduling_mode(session,TRUE);
00346 rtp_session_set_flag (session, RTP_SESSION_BLOCKING_MODE);
00347 }else
00348 rtp_session_unset_flag (session, RTP_SESSION_BLOCKING_MODE);
00349 }
00350
00360 void
00361 rtp_session_set_profile (RtpSession * session, RtpProfile * profile)
00362 {
00363 session->snd.profile = profile;
00364 session->rcv.profile = profile;
00365 rtp_session_telephone_events_supported(session);
00366 }
00367
00376 void rtp_session_enable_rtcp(RtpSession *session, bool_t yesno){
00377 session->rtcp.enabled=yesno;
00378 }
00379
00389 void
00390 rtp_session_set_send_profile (RtpSession * session, RtpProfile * profile)
00391 {
00392 session->snd.profile = profile;
00393 rtp_session_send_telephone_events_supported(session);
00394 }
00395
00396
00397
00407 void
00408 rtp_session_set_recv_profile (RtpSession * session, RtpProfile * profile)
00409 {
00410 session->rcv.profile = profile;
00411 rtp_session_recv_telephone_events_supported(session);
00412 }
00413
00421 RtpProfile *rtp_session_get_profile(RtpSession *session){
00422 return session->snd.profile;
00423 }
00424
00425
00432 RtpProfile *rtp_session_get_send_profile(RtpSession *session){
00433 return session->snd.profile;
00434 }
00435
00442 RtpProfile *rtp_session_get_recv_profile(RtpSession *session){
00443 return session->rcv.profile;
00444 }
00445
00454 void rtp_session_set_recv_buf_size(RtpSession *session, int bufsize){
00455 session->recv_buf_size=bufsize;
00456 }
00457
00491 int
00492 rtp_session_signal_connect (RtpSession * session, const char *signal_name,
00493 RtpCallback cb, unsigned long user_data)
00494 {
00495 OList *elem;
00496 for (elem=session->signal_tables;elem!=NULL;elem=o_list_next(elem)){
00497 RtpSignalTable *s=(RtpSignalTable*) elem->data;
00498 if (strcmp(signal_name,s->signal_name)==0){
00499 return rtp_signal_table_add(s,cb,user_data);
00500 }
00501 }
00502 ortp_warning ("rtp_session_signal_connect: inexistant signal %s",signal_name);
00503 return -1;
00504 }
00505
00506
00515 int
00516 rtp_session_signal_disconnect_by_callback (RtpSession * session, const char *signal_name,
00517 RtpCallback cb)
00518 {
00519 OList *elem;
00520 for (elem=session->signal_tables;elem!=NULL;elem=o_list_next(elem)){
00521 RtpSignalTable *s=(RtpSignalTable*) elem->data;
00522 if (strcmp(signal_name,s->signal_name)==0){
00523 return rtp_signal_table_remove_by_callback(s,cb);
00524 }
00525 }
00526 ortp_warning ("rtp_session_signal_connect: inexistant signal %s",signal_name);
00527 return -1;
00528 }
00529
00530
00537 void rtp_session_set_seq_number(RtpSession *session, uint16_t seq){
00538 session->rtp.snd_seq=seq;
00539 }
00540
00541
00542 uint16_t rtp_session_get_seq_number(RtpSession *session){
00543 return session->rtp.snd_seq;
00544 }
00545
00546
00554 void
00555 rtp_session_set_ssrc (RtpSession * session, uint32_t ssrc)
00556 {
00557 session->snd.ssrc = ssrc;
00558 }
00559
00560
00561 void rtp_session_update_payload_type(RtpSession *session, int paytype){
00562
00563 PayloadType *pt=rtp_profile_get_payload(session->rcv.profile,paytype);
00564 session->hw_recv_pt=paytype;
00565 if (pt!=0){
00566 ortp_message ("payload type changed to %i(%s) !",
00567 paytype,pt->mime_type);
00568 payload_type_changed(session,pt);
00569 }else{
00570 ortp_warning("Receiving packet with unknown payload type %i.",paytype);
00571 }
00572 }
00585 int
00586 rtp_session_set_send_payload_type (RtpSession * session, int paytype)
00587 {
00588 session->snd.pt=paytype;
00589 return 0;
00590 }
00591
00597 int rtp_session_get_send_payload_type(const RtpSession *session){
00598 return session->snd.pt;
00599 }
00600
00612 int
00613 rtp_session_set_recv_payload_type (RtpSession * session, int paytype)
00614 {
00615 PayloadType *pt;
00616 session->rcv.pt=paytype;
00617 session->hw_recv_pt=paytype;
00618 pt=rtp_profile_get_payload(session->rcv.profile,paytype);
00619 if (pt!=NULL){
00620 payload_type_changed(session,pt);
00621 }
00622 return 0;
00623 }
00624
00630 int rtp_session_get_recv_payload_type(const RtpSession *session){
00631 return session->rcv.pt;
00632 }
00633
00643 int rtp_session_set_payload_type(RtpSession *session, int pt){
00644 if (rtp_session_set_send_payload_type(session,pt)<0) return -1;
00645 if (rtp_session_set_recv_payload_type(session,pt)<0) return -1;
00646 return 0;
00647 }
00648
00649
00650 static void rtp_header_init_from_session(rtp_header_t *rtp, RtpSession *session){
00651 rtp->version = 2;
00652 rtp->padbit = 0;
00653 rtp->extbit = 0;
00654 rtp->markbit= 0;
00655 rtp->cc = 0;
00656 rtp->paytype = session->snd.pt;
00657 rtp->ssrc = session->snd.ssrc;
00658 rtp->timestamp = 0;
00659
00660 rtp->seq_number=session->rtp.snd_seq;
00661 }
00662
00675 mblk_t * rtp_session_create_packet(RtpSession *session,int header_size, const uint8_t *payload, int payload_size)
00676 {
00677 mblk_t *mp;
00678 int msglen=header_size+payload_size;
00679 rtp_header_t *rtp;
00680
00681 mp=allocb(msglen,BPRI_MED);
00682 rtp=(rtp_header_t*)mp->b_rptr;
00683 rtp_header_init_from_session(rtp,session);
00684
00685 mp->b_wptr+=header_size;
00686 if (payload_size){
00687 memcpy(mp->b_wptr,payload,payload_size);
00688 mp->b_wptr+=payload_size;
00689 }
00690 return mp;
00691 }
00692
00709 mblk_t * rtp_session_create_packet_with_data(RtpSession *session, uint8_t *payload, int payload_size, void (*freefn)(void*))
00710 {
00711 mblk_t *mp,*mpayload;
00712 int header_size=RTP_FIXED_HEADER_SIZE;
00713 rtp_header_t *rtp;
00714
00715 mp=allocb(header_size,BPRI_MED);
00716 rtp=(rtp_header_t*)mp->b_rptr;
00717 rtp_header_init_from_session(rtp,session);
00718 mp->b_wptr+=header_size;
00719
00720 mpayload=esballoc(payload,payload_size,BPRI_MED,freefn);
00721 mpayload->b_wptr+=payload_size;
00722
00723 mp->b_cont=mpayload;
00724 return mp;
00725 }
00726
00727
00741 mblk_t * rtp_session_create_packet_in_place(RtpSession *session,uint8_t *buffer, int size, void (*freefn)(void*) )
00742 {
00743 mblk_t *mp;
00744 rtp_header_t *rtp;
00745
00746 mp=esballoc(buffer,size,BPRI_MED,freefn);
00747
00748 rtp=(rtp_header_t*)mp->b_rptr;
00749 rtp_header_init_from_session(rtp,session);
00750 return mp;
00751 }
00752
00764 int
00765 rtp_session_sendm_with_ts (RtpSession * session, mblk_t *mp, uint32_t timestamp)
00766 {
00767 rtp_header_t *rtp;
00768 uint32_t packet_time;
00769 int error = 0;
00770 int packsize;
00771 RtpScheduler *sched=session->sched;
00772 RtpStream *stream=&session->rtp;
00773
00774 if (session->flags & RTP_SESSION_SEND_NOT_STARTED)
00775 {
00776 session->rtp.snd_ts_offset = timestamp;
00777
00778 if ((session->flags & RTP_SESSION_RECV_NOT_STARTED)
00779 || session->mode == RTP_SESSION_SENDONLY)
00780 {
00781 gettimeofday(&session->last_recv_time, NULL);
00782 }
00783 if (session->flags & RTP_SESSION_SCHEDULED)
00784 {
00785 session->rtp.snd_time_offset = sched->time_;
00786 }
00787 rtp_session_unset_flag (session,RTP_SESSION_SEND_NOT_STARTED);
00788 }
00789
00790
00791
00792
00793 if (session->flags & RTP_SESSION_SCHEDULED)
00794 {
00795 packet_time =
00796 rtp_session_ts_to_time (session,
00797 timestamp -
00798 session->rtp.snd_ts_offset) +
00799 session->rtp.snd_time_offset;
00800
00801 wait_point_lock(&session->snd.wp);
00802 if (TIME_IS_STRICTLY_NEWER_THAN (packet_time, sched->time_))
00803 {
00804 wait_point_wakeup_at(&session->snd.wp,packet_time,(session->flags & RTP_SESSION_BLOCKING_MODE)!=0);
00805 session_set_clr(&sched->w_sessions,session);
00806 }
00807 else session_set_set(&sched->w_sessions,session);
00808 wait_point_unlock(&session->snd.wp);
00809 }
00810
00811
00812 rtp=(rtp_header_t*)mp->b_rptr;
00813
00814 packsize = msgdsize(mp) ;
00815
00816 rtp->timestamp=timestamp;
00817 if (session->snd.telephone_events_pt==rtp->paytype)
00818 {
00819 session->rtp.snd_seq++;
00820 rtp->seq_number = session->rtp.snd_seq;
00821 }
00822 else
00823 session->rtp.snd_seq=rtp->seq_number+1;
00824 session->rtp.snd_last_ts = timestamp;
00825
00826
00827 ortp_global_stats.sent += packsize;
00828 stream->stats.sent += packsize;
00829 ortp_global_stats.packet_sent++;
00830 stream->stats.packet_sent++;
00831
00832 error = rtp_session_rtp_send (session, mp);
00833
00834 rtp_session_rtcp_process_send(session);
00835
00836
00837 if (session->mode==RTP_SESSION_SENDONLY) rtp_session_rtcp_recv(session);
00838 return error;
00839 }
00840
00841
00854 int
00855 rtp_session_send_with_ts (RtpSession * session, const uint8_t * buffer, int len,
00856 uint32_t userts)
00857 {
00858 mblk_t *m;
00859 int err;
00860 #ifdef USE_SENDMSG
00861 m=rtp_session_create_packet_with_data(session,(uint8_t*)buffer,len,NULL);
00862 #else
00863 m = rtp_session_create_packet(session,RTP_FIXED_HEADER_SIZE,(uint8_t*)buffer,len);
00864 #endif
00865 err=rtp_session_sendm_with_ts(session,m,userts);
00866 return err;
00867 }
00868
00869
00870
00871 extern void rtcp_parse(RtpSession *session, mblk_t *mp);
00872
00873
00874
00875 static void payload_type_changed_notify(RtpSession *session, int paytype){
00876 session->rcv.pt = paytype;
00877 rtp_signal_table_emit (&session->on_payload_type_changed);
00878 }
00879
00880
00895 mblk_t *
00896 rtp_session_recvm_with_ts (RtpSession * session, uint32_t user_ts)
00897 {
00898 mblk_t *mp = NULL;
00899 rtp_header_t *rtp;
00900 uint32_t ts;
00901 uint32_t packet_time;
00902 RtpScheduler *sched=session->sched;
00903 RtpStream *stream=&session->rtp;
00904 int rejected=0;
00905 bool_t read_socket=TRUE;
00906
00907
00908
00909
00910 if (session->flags & RTP_SESSION_RECV_NOT_STARTED)
00911 {
00912 session->rtp.rcv_query_ts_offset = user_ts;
00913
00914 if ((session->flags & RTP_SESSION_SEND_NOT_STARTED)
00915 || session->mode == RTP_SESSION_RECVONLY){
00916 gettimeofday(&session->last_recv_time, NULL);
00917 }
00918 if (session->flags & RTP_SESSION_SCHEDULED)
00919 {
00920 session->rtp.rcv_time_offset = sched->time_;
00921
00922 }
00923 rtp_session_unset_flag (session,RTP_SESSION_RECV_NOT_STARTED);
00924 }else{
00925
00926
00927 if (user_ts==session->rtp.rcv_last_app_ts)
00928 read_socket=FALSE;
00929 }
00930 session->rtp.rcv_last_app_ts = user_ts;
00931 if (read_socket){
00932 rtp_session_rtp_recv (session, user_ts);
00933 rtp_session_rtcp_recv(session);
00934 }
00935
00936 mp=getq(&session->rtp.tev_rq);
00937 if (mp!=NULL){
00938 int msgsize=msgdsize(mp);
00939 ortp_global_stats.recv += msgsize;
00940 stream->stats.recv += msgsize;
00941 rtp_signal_table_emit2(&session->on_telephone_event_packet,(long)mp);
00942 rtp_session_check_telephone_events(session,mp);
00943 freemsg(mp);
00944 mp=NULL;
00945 }
00946
00947
00948
00949
00950
00951 if (session->flags & RTP_SESSION_RECV_SYNC)
00952 {
00953 queue_t *q = &session->rtp.rq;
00954 if (qempty(q))
00955 {
00956 ortp_debug ("Queue is empty.");
00957 goto end;
00958 }
00959 rtp = (rtp_header_t *) qfirst(q)->b_rptr;
00960 session->rtp.rcv_ts_offset = rtp->timestamp;
00961
00962
00963 session->rtp.hwrcv_diff_ts = rtp->timestamp - user_ts;
00964
00965 session->rtp.rcv_diff_ts=session->rtp.hwrcv_diff_ts - session->rtp.jittctl.jitt_comp_ts;
00966 session->rtp.rcv_last_ret_ts = user_ts;
00967 session->rcv.ssrc = rtp->ssrc;
00968
00969 rtp_session_unset_flag (session, RTP_SESSION_RECV_SYNC);
00970 }
00971
00972 ts = user_ts + session->rtp.rcv_diff_ts;
00973 if (session->rtp.jittctl.enabled==TRUE){
00974 if (session->permissive)
00975 mp = rtp_getq_permissive(&session->rtp.rq, ts,&rejected);
00976 else{
00977 mp = rtp_getq(&session->rtp.rq, ts,&rejected);
00978 }
00979 }else mp=getq(&session->rtp.rq);
00980
00981 stream->stats.outoftime+=rejected;
00982 ortp_global_stats.outoftime+=rejected;
00983
00984 goto end;
00985
00986 end:
00987 if (mp != NULL)
00988 {
00989 int msgsize = msgdsize (mp);
00990 uint32_t packet_ts;
00991 ortp_global_stats.recv += msgsize;
00992 stream->stats.recv += msgsize;
00993 rtp = (rtp_header_t *) mp->b_rptr;
00994 packet_ts=rtp->timestamp;
00995 ortp_debug("Returning mp with ts=%i", packet_ts);
00996
00997 if (session->rcv.pt != rtp->paytype)
00998 {
00999 payload_type_changed_notify(session, rtp->paytype);
01000 }
01001
01002
01003 if (session->rtp.jittctl.adaptive){
01004 uint32_t changed_ts;
01005
01006
01007 if (packet_ts!=session->rtp.rcv_last_ts)
01008 jitter_control_update_corrective_slide(&session->rtp.jittctl);
01009 changed_ts=packet_ts-session->rtp.jittctl.corrective_slide;
01010 rtp->timestamp=changed_ts;
01011
01012 }
01013 session->rtp.rcv_last_ts = packet_ts;
01014 if (!(session->flags & RTP_SESSION_FIRST_PACKET_DELIVERED)){
01015 rtp_session_set_flag(session,RTP_SESSION_FIRST_PACKET_DELIVERED);
01016 }
01017 }
01018 else
01019 {
01020 ortp_debug ("No mp for timestamp queried");
01021 stream->stats.unavaillable++;
01022 ortp_global_stats.unavaillable++;
01023 }
01024 rtp_session_rtcp_process_recv(session);
01025
01026 if (session->flags & RTP_SESSION_SCHEDULED)
01027 {
01028
01029
01030
01031
01032 packet_time =
01033 rtp_session_ts_to_time (session,
01034 user_ts -
01035 session->rtp.rcv_query_ts_offset) +
01036 session->rtp.rcv_time_offset;
01037 ortp_debug ("rtp_session_recvm_with_ts: packet_time=%i, time=%i",packet_time, sched->time_);
01038 wait_point_lock(&session->rcv.wp);
01039 if (TIME_IS_STRICTLY_NEWER_THAN (packet_time, sched->time_))
01040 {
01041 wait_point_wakeup_at(&session->rcv.wp,packet_time, (session->flags & RTP_SESSION_BLOCKING_MODE)!=0);
01042 session_set_clr(&sched->r_sessions,session);
01043 }
01044 else session_set_set(&sched->r_sessions,session);
01045 wait_point_unlock(&session->rcv.wp);
01046 }
01047 return mp;
01048 }
01049
01050
01051 static int msg_to_buf (mblk_t * mp, uint8_t *buffer, int len)
01052 {
01053 int rlen = len;
01054 mblk_t *m, *mprev;
01055 int mlen;
01056 m = mp->b_cont;
01057 mprev = mp;
01058 while (m != NULL)
01059 {
01060 mlen = (int) (m->b_wptr - m->b_rptr);
01061 if (mlen <= rlen)
01062 {
01063 mblk_t *consumed = m;
01064 memcpy (buffer, m->b_rptr, mlen);
01065
01066 mprev->b_cont = m->b_cont;
01067 m = m->b_cont;
01068 consumed->b_cont = NULL;
01069 freeb (consumed);
01070 buffer += mlen;
01071 rlen -= mlen;
01072 }
01073 else
01074 {
01075 memcpy (buffer, m->b_rptr, rlen);
01076 m->b_rptr += rlen;
01077 return len;
01078 }
01079 }
01080 return len - rlen;
01081 }
01082
01122 int rtp_session_recv_with_ts (RtpSession * session, uint8_t * buffer,
01123 int len, uint32_t ts, int * have_more)
01124 {
01125 mblk_t *mp;
01126 int rlen = len;
01127 int wlen, mlen;
01128 uint32_t ts_int = 0;
01129 PayloadType *payload;
01130 RtpStream *stream=&session->rtp;
01131
01132 *have_more = 0;
01133
01134 mp = rtp_session_recvm_with_ts (session, ts);
01135 payload =rtp_profile_get_payload (session->rcv.profile,
01136 session->rcv.pt);
01137 if (payload==NULL){
01138 ortp_warning("rtp_session_recv_with_ts: unable to recv an unsupported payload (%i)",session->rcv.pt);
01139 if (mp!=NULL) freemsg(mp);
01140 return -1;
01141 }
01142 if (!(session->flags & RTP_SESSION_RECV_SYNC))
01143 {
01144
01145 if (RTP_TIMESTAMP_IS_STRICTLY_NEWER_THAN
01146 (ts, session->rtp.rcv_last_ret_ts))
01147 {
01148
01149
01150
01151 *have_more = 1;
01152 }
01153 if (payload->type == PAYLOAD_AUDIO_CONTINUOUS)
01154 {
01155 ts_int = (len * payload->bits_per_sample) >> 3;
01156 session->rtp.rcv_last_ret_ts += ts_int;
01157
01158 }
01159 else
01160 ts_int = 0;
01161 }
01162 else return 0;
01163
01164
01165 while (1)
01166 {
01167
01168 if (mp != NULL)
01169 {
01170 mlen = msgdsize (mp->b_cont);
01171 wlen = msg_to_buf (mp, buffer, rlen);
01172 buffer += wlen;
01173 rlen -= wlen;
01174 ortp_debug("mlen=%i wlen=%i rlen=%i", mlen, wlen,
01175 rlen);
01176
01177 if (rlen > 0)
01178 {
01179
01180 freemsg (mp);
01181
01182
01183
01184 if (ts_int > 0)
01185 {
01186 ts = session->rtp.rcv_last_ret_ts;
01187 ortp_debug("Need more: will ask for %i.", ts);
01188 }
01189 else
01190 return len - rlen;
01191 }
01192 else if (mlen > wlen)
01193 {
01194 int unread =
01195 mlen - wlen + (int) (mp->b_wptr -
01196 mp->b_rptr);
01197
01198
01199 ortp_debug ("Re-enqueuing packet.");
01200 rtp_putq (&session->rtp.rq, mp);
01201
01202 ortp_global_stats.recv -= unread;
01203 stream->stats.recv -= unread;
01204 return len;
01205 }
01206 else
01207 {
01208
01209 freemsg (mp);
01210 return len;
01211 }
01212 }
01213 else
01214 {
01215
01216 if (payload->pattern_length != 0)
01217 {
01218 int i = 0, j = 0;
01219 while (i < rlen)
01220 {
01221 buffer[i] = payload->zero_pattern[j];
01222 i++;
01223 j++;
01224 if (j <= payload->pattern_length)
01225 j = 0;
01226 }
01227 return len;
01228 }
01229 *have_more = 0;
01230 return 0;
01231 }
01232 mp = rtp_session_recvm_with_ts (session, ts);
01233 payload = rtp_profile_get_payload (session->rcv.profile,
01234 session->rcv.pt);
01235 if (payload==NULL){
01236 ortp_warning("rtp_session_recv_with_ts: unable to recv an unsupported payload.");
01237 if (mp!=NULL) freemsg(mp);
01238 return -1;
01239 }
01240 }
01241 return -1;
01242 }
01255 uint32_t rtp_session_get_current_send_ts(RtpSession *session)
01256 {
01257 uint32_t userts;
01258 uint32_t session_time;
01259 RtpScheduler *sched=session->sched;
01260 PayloadType *payload;
01261 payload=rtp_profile_get_payload(session->snd.profile,session->snd.pt);
01262 return_val_if_fail(payload!=NULL, 0);
01263 if ( (session->flags & RTP_SESSION_SCHEDULED)==0 ){
01264 ortp_warning("can't guess current timestamp because session is not scheduled.");
01265 return 0;
01266 }
01267 session_time=sched->time_-session->rtp.snd_time_offset;
01268 userts= (uint32_t)( ( (double)(session_time) * (double) payload->clock_rate )/ 1000.0)
01269 + session->rtp.snd_ts_offset;
01270 return userts;
01271 }
01272
01281 uint32_t rtp_session_get_current_recv_ts(RtpSession *session){
01282 uint32_t userts;
01283 uint32_t session_time;
01284 RtpScheduler *sched=ortp_get_scheduler();
01285 PayloadType *payload;
01286 payload=rtp_profile_get_payload(session->rcv.profile,session->rcv.pt);
01287 return_val_if_fail(payload!=NULL, 0);
01288 if ( (session->flags & RTP_SESSION_SCHEDULED)==0 ){
01289 ortp_warning("can't guess current timestamp because session is not scheduled.");
01290 return 0;
01291 }
01292 session_time=sched->time_-session->rtp.rcv_time_offset;
01293 userts= (uint32_t)( ( (double)(session_time) * (double) payload->clock_rate )/ 1000.0)
01294 + session->rtp.rcv_ts_offset;
01295 return userts;
01296 }
01297
01308 void rtp_session_set_time_jump_limit(RtpSession *session, int milisecs){
01309 uint32_t ts;
01310 session->rtp.time_jump=milisecs;
01311 ts=rtp_session_time_to_ts(session,milisecs);
01312 if (ts==0) session->rtp.ts_jump=1<<31;
01313 else session->rtp.ts_jump=ts;
01314 }
01315
01319 void rtp_session_release_sockets(RtpSession *session){
01320 if (session->rtp.socket>=0) close_socket (session->rtp.socket);
01321 if (session->rtcp.socket>=0) close_socket (session->rtcp.socket);
01322 session->rtp.socket=-1;
01323 session->rtcp.socket=-1;
01324 session->rtp.tr = 0;
01325 session->rtcp.tr = 0;
01326
01327
01328
01329
01330
01331 }
01332
01333 ortp_socket_t rtp_session_get_rtp_socket(const RtpSession *session){
01334 return rtp_session_using_transport(session, rtp) ? (session->rtp.tr->t_getsocket)(session->rtp.tr) : session->rtp.socket;
01335 }
01336
01337 ortp_socket_t rtp_session_get_rtcp_socket(const RtpSession *session){
01338 return rtp_session_using_transport(session, rtcp) ? (session->rtcp.tr->t_getsocket)(session->rtcp.tr) : session->rtcp.socket;
01339 }
01340
01345 void rtp_session_register_event_queue(RtpSession *session, OrtpEvQueue *q){
01346 session->eventqs=o_list_append(session->eventqs,q);
01347 }
01348
01349 void rtp_session_unregister_event_queue(RtpSession *session, OrtpEvQueue *q){
01350 session->eventqs=o_list_remove(session->eventqs,q);
01351 }
01352
01353 void rtp_session_dispatch_event(RtpSession *session, OrtpEvent *ev){
01354 OList *it;
01355 int i;
01356 for(i=0,it=session->eventqs;it!=NULL;it=it->next,++i){
01357 ortp_ev_queue_put((OrtpEvQueue*)it->data,ortp_event_dup(ev));
01358 }
01359 ortp_event_destroy(ev);
01360 }
01361
01362
01363 void rtp_session_uninit (RtpSession * session)
01364 {
01365
01366 if (session->flags & RTP_SESSION_SCHEDULED)
01367 {
01368 rtp_scheduler_remove_session (session->sched,session);
01369 }
01370
01371 flushq(&session->rtp.rq, FLUSHALL);
01372 flushq(&session->rtp.tev_rq, FLUSHALL);
01373
01374 if (session->eventqs!=NULL) o_list_free(session->eventqs);
01375
01376 rtp_session_release_sockets(session);
01377
01378 wait_point_uninit(&session->snd.wp);
01379 wait_point_uninit(&session->rcv.wp);
01380 if (session->current_tev!=NULL) freemsg(session->current_tev);
01381 if (session->rtp.cached_mp!=NULL) freemsg(session->rtp.cached_mp);
01382 if (session->rtcp.cached_mp!=NULL) freemsg(session->rtcp.cached_mp);
01383 if (session->sd!=NULL) freemsg(session->sd);
01384
01385 session->signal_tables = o_list_free(session->signal_tables);
01386 }
01387
01394 void rtp_session_resync(RtpSession *session){
01395 flushq (&session->rtp.rq, FLUSHALL);
01396 rtp_session_set_flag(session, RTP_SESSION_RECV_SYNC);
01397 rtp_session_unset_flag(session,RTP_SESSION_FIRST_PACKET_DELIVERED);
01398 jitter_control_init(&session->rtp.jittctl,-1,NULL);
01399 }
01400
01407 void rtp_session_reset (RtpSession * session)
01408 {
01409 rtp_session_set_flag (session, RTP_SESSION_RECV_NOT_STARTED);
01410 rtp_session_set_flag (session, RTP_SESSION_SEND_NOT_STARTED);
01411
01412 session->rtp.snd_time_offset = 0;
01413 session->rtp.snd_ts_offset = 0;
01414 session->rtp.snd_rand_offset = 0;
01415 session->rtp.snd_last_ts = 0;
01416 session->rtp.rcv_time_offset = 0;
01417 session->rtp.rcv_ts_offset = 0;
01418 session->rtp.rcv_query_ts_offset = 0;
01419 session->rtp.rcv_diff_ts = 0;
01420 session->rtp.rcv_last_ts = 0;
01421 session->rtp.rcv_last_app_ts = 0;
01422 session->rtp.hwrcv_extseq = 0;
01423 session->rtp.hwrcv_since_last_SR=0;
01424 session->rtp.snd_seq = 0;
01425 rtp_session_clear_send_error_code(session);
01426 rtp_session_clear_recv_error_code(session);
01427 rtp_stats_reset(&session->rtp.stats);
01428 rtp_session_resync(session);
01429 }
01430
01434 const rtp_stats_t * rtp_session_get_stats(const RtpSession *session){
01435 return &session->rtp.stats;
01436 }
01437
01438 void rtp_session_reset_stats(RtpSession *session){
01439 memset(&session->rtp.stats,0,sizeof(rtp_stats_t));
01440 }
01441
01448 void rtp_session_set_data(RtpSession *session, void *data){
01449 session->user_data=data;
01450 }
01451
01456 void *rtp_session_get_data(const RtpSession *session){
01457 return session->user_data;
01458 }
01459
01469 void
01470 rtp_session_set_symmetric_rtp (RtpSession * session, bool_t yesno)
01471 {
01472 session->symmetric_rtp =yesno;
01473 }
01474
01488 void rtp_session_set_connected_mode(RtpSession *session, bool_t yesno){
01489 session->use_connect=yesno;
01490 }
01491
01492 static float compute_bw(struct timeval *orig, unsigned int bytes){
01493 struct timeval current;
01494 float bw;
01495 float time;
01496 if (bytes==0) return 0;
01497 gettimeofday(¤t,NULL);
01498 time=(float)(current.tv_sec - orig->tv_sec) +
01499 ((float)(current.tv_usec - orig->tv_usec)*1e-6);
01500 bw=((float)bytes)*8/(time+0.001);
01501
01502 return bw;
01503 }
01504
01505 float rtp_session_compute_recv_bandwidth(RtpSession *session){
01506 float bw;
01507 bw=compute_bw(&session->rtp.recv_bw_start,session->rtp.recv_bytes);
01508 session->rtp.recv_bytes=0;
01509 return bw;
01510 }
01511
01512 float rtp_session_compute_send_bandwidth(RtpSession *session){
01513 float bw;
01514 bw=compute_bw(&session->rtp.send_bw_start,session->rtp.sent_bytes);
01515 session->rtp.sent_bytes=0;
01516 return bw;
01517 }
01518
01519 int rtp_session_get_last_send_error_code(RtpSession *session){
01520 return session->rtp.send_errno;
01521 }
01522
01523 void rtp_session_clear_send_error_code(RtpSession *session){
01524 session->rtp.send_errno=0;
01525 }
01526
01527 int rtp_session_get_last_recv_error_code(RtpSession *session){
01528 return session->rtp.recv_errno;
01529 }
01530
01531 void rtp_session_clear_recv_error_code(RtpSession *session){
01532 session->rtp.send_errno=0;
01533 }
01534
01541 void rtp_session_destroy (RtpSession * session)
01542 {
01543 rtp_session_uninit (session);
01544 ortp_free (session);
01545 }
01546
01547 void rtp_session_make_time_distorsion(RtpSession *session, int milisec)
01548 {
01549 session->rtp.snd_time_offset+=milisec;
01550 }
01551
01552
01553
01554
01555 void rtp_add_csrc(mblk_t *mp, uint32_t csrc)
01556 {
01557 rtp_header_t *hdr=(rtp_header_t*)mp->b_rptr;
01558 hdr->csrc[hdr->cc]=csrc;
01559 hdr->cc++;
01560 }
01561
01562
01569 void
01570 rtp_session_get_last_recv_time(RtpSession *session, struct timeval *tv)
01571 {
01572 #ifdef PERF
01573 ortp_error("rtp_session_get_last_recv_time() feature disabled.");
01574 #else
01575 *tv = session->last_recv_time;
01576 #endif
01577 }
01578
01579
01580
01581 uint32_t rtp_session_time_to_ts(RtpSession *session, int millisecs){
01582 PayloadType *payload;
01583 payload =
01584 rtp_profile_get_payload (session->snd.profile,
01585 session->snd.pt);
01586 if (payload == NULL)
01587 {
01588 ortp_warning
01589 ("rtp_session_ts_to_t: use of unsupported payload type %d.", session->snd.pt);
01590 return 0;
01591 }
01592
01593 return (uint32_t) (payload->clock_rate*(double) (millisecs/1000.0f));
01594 }
01595
01596
01597 uint32_t rtp_session_ts_to_time (RtpSession * session, uint32_t timestamp)
01598 {
01599 PayloadType *payload;
01600 payload =
01601 rtp_profile_get_payload (session->snd.profile,
01602 session->snd.pt);
01603 if (payload == NULL)
01604 {
01605 ortp_warning
01606 ("rtp_session_ts_to_t: use of unsupported payload type %d.", session->snd.pt);
01607 return 0;
01608 }
01609
01610 return (uint32_t) (1000.0 *
01611 ((double) timestamp /
01612 (double) payload->clock_rate));
01613 }
01614
01615
01616
01617 void rtp_session_process (RtpSession * session, uint32_t time, RtpScheduler *sched)
01618 {
01619 wait_point_lock(&session->snd.wp);
01620 if (wait_point_check(&session->snd.wp,time)){
01621 session_set_set(&sched->w_sessions,session);
01622 wait_point_wakeup(&session->snd.wp);
01623 }
01624 wait_point_unlock(&session->snd.wp);
01625
01626 wait_point_lock(&session->rcv.wp);
01627 if (wait_point_check(&session->rcv.wp,time)){
01628 session_set_set(&sched->r_sessions,session);
01629 wait_point_wakeup(&session->rcv.wp);
01630 }
01631 wait_point_unlock(&session->rcv.wp);
01632 }
01633