Thu Apr 28 2011 16:56:49

Asterisk developer's documentation


sched.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 1999 - 2008, Digium, Inc.
00005  *
00006  * Mark Spencer <markster@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*! \file
00020  *
00021  * \brief Scheduler Routines (from cheops-NG)
00022  *
00023  * \author Mark Spencer <markster@digium.com>
00024  */
00025 
00026 #include "asterisk.h"
00027 
00028 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 281574 $")
00029 
00030 #ifdef DEBUG_SCHEDULER
00031 #define DEBUG(a) do { \
00032    if (option_debug) \
00033       DEBUG_M(a) \
00034    } while (0)
00035 #else
00036 #define DEBUG(a) 
00037 #endif
00038 
00039 #include <sys/time.h>
00040 
00041 #include "asterisk/sched.h"
00042 #include "asterisk/channel.h"
00043 #include "asterisk/lock.h"
00044 #include "asterisk/utils.h"
00045 #include "asterisk/linkedlists.h"
00046 #include "asterisk/dlinkedlists.h"
00047 #include "asterisk/hashtab.h"
00048 #include "asterisk/heap.h"
00049 
00050 struct sched {
00051    AST_LIST_ENTRY(sched) list;
00052    int id;                       /*!< ID number of event */
00053    struct timeval when;          /*!< Absolute time event should take place */
00054    int resched;                  /*!< When to reschedule */
00055    int variable;                 /*!< Use return value from callback to reschedule */
00056    const void *data;             /*!< Data */
00057    ast_sched_cb callback;        /*!< Callback */
00058    ssize_t __heap_index;
00059 };
00060 
00061 struct sched_context {
00062    ast_mutex_t lock;
00063    unsigned int eventcnt;                  /*!< Number of events processed */
00064    unsigned int schedcnt;                  /*!< Number of outstanding schedule events */
00065    unsigned int highwater;             /*!< highest count so far */
00066    struct ast_hashtab *schedq_ht;             /*!< hash table for fast searching */
00067    struct ast_heap *sched_heap;
00068 
00069 #ifdef SCHED_MAX_CACHE
00070    AST_LIST_HEAD_NOLOCK(, sched) schedc;   /*!< Cache of unused schedule structures and how many */
00071    unsigned int schedccnt;
00072 #endif
00073 };
00074 
00075 struct ast_sched_thread {
00076    pthread_t thread;
00077    ast_mutex_t lock;
00078    ast_cond_t cond;
00079    struct sched_context *context;
00080    unsigned int stop:1;
00081 };
00082 
00083 static void *sched_run(void *data)
00084 {
00085    struct ast_sched_thread *st = data;
00086 
00087    while (!st->stop) {
00088       int ms;
00089       struct timespec ts = {
00090          .tv_sec = 0,   
00091       };
00092 
00093       ast_mutex_lock(&st->lock);
00094 
00095       if (st->stop) {
00096          ast_mutex_unlock(&st->lock);
00097          return NULL;
00098       }
00099 
00100       ms = ast_sched_wait(st->context);
00101 
00102       if (ms == -1) {
00103          ast_cond_wait(&st->cond, &st->lock);
00104       } else { 
00105          struct timeval tv;
00106          tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
00107          ts.tv_sec = tv.tv_sec;
00108          ts.tv_nsec = tv.tv_usec * 1000;
00109          ast_cond_timedwait(&st->cond, &st->lock, &ts);
00110       }
00111 
00112       ast_mutex_unlock(&st->lock);
00113 
00114       if (st->stop) {
00115          return NULL;
00116       }
00117 
00118       ast_sched_runq(st->context);
00119    }
00120 
00121    return NULL;
00122 }
00123 
00124 void ast_sched_thread_poke(struct ast_sched_thread *st)
00125 {
00126    ast_mutex_lock(&st->lock);
00127    ast_cond_signal(&st->cond);
00128    ast_mutex_unlock(&st->lock);
00129 }
00130 
00131 struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st)
00132 {
00133    return st->context;
00134 }
00135 
00136 struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st)
00137 {
00138    if (st->thread != AST_PTHREADT_NULL) {
00139       ast_mutex_lock(&st->lock);
00140       st->stop = 1;
00141       ast_cond_signal(&st->cond);
00142       ast_mutex_unlock(&st->lock);
00143       pthread_join(st->thread, NULL);
00144       st->thread = AST_PTHREADT_NULL;
00145    }
00146 
00147    ast_mutex_destroy(&st->lock);
00148    ast_cond_destroy(&st->cond);
00149 
00150    if (st->context) {
00151       sched_context_destroy(st->context);
00152       st->context = NULL;
00153    }
00154 
00155    ast_free(st);
00156 
00157    return NULL;
00158 }
00159 
00160 struct ast_sched_thread *ast_sched_thread_create(void)
00161 {
00162    struct ast_sched_thread *st;
00163 
00164    if (!(st = ast_calloc(1, sizeof(*st)))) {
00165       return NULL;
00166    }
00167 
00168    ast_mutex_init(&st->lock);
00169    ast_cond_init(&st->cond, NULL);
00170 
00171    st->thread = AST_PTHREADT_NULL;
00172 
00173    if (!(st->context = sched_context_create())) {
00174       ast_log(LOG_ERROR, "Failed to create scheduler\n");
00175       ast_sched_thread_destroy(st);
00176       return NULL;
00177    }
00178    
00179    if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) {
00180       ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
00181       ast_sched_thread_destroy(st);
00182       return NULL;
00183    }
00184 
00185    return st;
00186 }
00187 
00188 int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
00189       const void *data, int variable)
00190 {
00191    int res;
00192 
00193    ast_mutex_lock(&st->lock);
00194    res = ast_sched_add_variable(st->context, when, cb, data, variable);
00195    if (res != -1) {
00196       ast_cond_signal(&st->cond);
00197    }
00198    ast_mutex_unlock(&st->lock);
00199 
00200    return res;
00201 }
00202 
00203 int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
00204       const void *data)
00205 {
00206    int res;
00207 
00208    ast_mutex_lock(&st->lock);
00209    res = ast_sched_add(st->context, when, cb, data);
00210    if (res != -1) {
00211       ast_cond_signal(&st->cond);
00212    }
00213    ast_mutex_unlock(&st->lock);
00214 
00215    return res;
00216 }
00217 
00218 /* hash routines for sched */
00219 
00220 static int sched_cmp(const void *a, const void *b)
00221 {
00222    const struct sched *as = a;
00223    const struct sched *bs = b;
00224    return as->id != bs->id; /* return 0 on a match like strcmp would */
00225 }
00226 
00227 static unsigned int sched_hash(const void *obj)
00228 {
00229    const struct sched *s = obj;
00230    unsigned int h = s->id;
00231    return h;
00232 }
00233 
00234 static int sched_time_cmp(void *a, void *b)
00235 {
00236    return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
00237 }
00238 
00239 struct sched_context *sched_context_create(void)
00240 {
00241    struct sched_context *tmp;
00242 
00243    if (!(tmp = ast_calloc(1, sizeof(*tmp))))
00244       return NULL;
00245 
00246    ast_mutex_init(&tmp->lock);
00247    tmp->eventcnt = 1;
00248 
00249    tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
00250 
00251    if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
00252          offsetof(struct sched, __heap_index)))) {
00253       sched_context_destroy(tmp);
00254       return NULL;
00255    }
00256 
00257    return tmp;
00258 }
00259 
00260 void sched_context_destroy(struct sched_context *con)
00261 {
00262    struct sched *s;
00263 
00264    ast_mutex_lock(&con->lock);
00265 
00266 #ifdef SCHED_MAX_CACHE
00267    /* Eliminate the cache */
00268    while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
00269       ast_free(s);
00270 #endif
00271 
00272    if (con->sched_heap) {
00273       while ((s = ast_heap_pop(con->sched_heap))) {
00274          ast_free(s);
00275       }
00276       ast_heap_destroy(con->sched_heap);
00277       con->sched_heap = NULL;
00278    }
00279 
00280    ast_hashtab_destroy(con->schedq_ht, NULL);
00281    con->schedq_ht = NULL;
00282    
00283    /* And the context */
00284    ast_mutex_unlock(&con->lock);
00285    ast_mutex_destroy(&con->lock);
00286    ast_free(con);
00287 }
00288 
00289 static struct sched *sched_alloc(struct sched_context *con)
00290 {
00291    struct sched *tmp;
00292 
00293    /*
00294     * We keep a small cache of schedule entries
00295     * to minimize the number of necessary malloc()'s
00296     */
00297 #ifdef SCHED_MAX_CACHE
00298    if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
00299       con->schedccnt--;
00300    else
00301 #endif
00302       tmp = ast_calloc(1, sizeof(*tmp));
00303 
00304    return tmp;
00305 }
00306 
00307 static void sched_release(struct sched_context *con, struct sched *tmp)
00308 {
00309    /*
00310     * Add to the cache, or just free() if we
00311     * already have too many cache entries
00312     */
00313 
00314 #ifdef SCHED_MAX_CACHE   
00315    if (con->schedccnt < SCHED_MAX_CACHE) {
00316       AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
00317       con->schedccnt++;
00318    } else
00319 #endif
00320       ast_free(tmp);
00321 }
00322 
00323 /*! \brief
00324  * Return the number of milliseconds 
00325  * until the next scheduled event
00326  */
00327 int ast_sched_wait(struct sched_context *con)
00328 {
00329    int ms;
00330    struct sched *s;
00331 
00332    DEBUG(ast_debug(1, "ast_sched_wait()\n"));
00333 
00334    ast_mutex_lock(&con->lock);
00335    if ((s = ast_heap_peek(con->sched_heap, 1))) {
00336       ms = ast_tvdiff_ms(s->when, ast_tvnow());
00337       if (ms < 0) {
00338          ms = 0;
00339       }
00340    } else {
00341       ms = -1;
00342    }
00343    ast_mutex_unlock(&con->lock);
00344 
00345    return ms;
00346 }
00347 
00348 
00349 /*! \brief
00350  * Take a sched structure and put it in the
00351  * queue, such that the soonest event is
00352  * first in the list. 
00353  */
00354 static void schedule(struct sched_context *con, struct sched *s)
00355 {
00356    ast_heap_push(con->sched_heap, s);
00357 
00358    if (!ast_hashtab_insert_safe(con->schedq_ht, s)) {
00359       ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n", s->id);
00360    }
00361 
00362    con->schedcnt++;
00363 
00364    if (con->schedcnt > con->highwater) {
00365       con->highwater = con->schedcnt;
00366    }
00367 }
00368 
00369 /*! \brief
00370  * given the last event *tv and the offset in milliseconds 'when',
00371  * computes the next value,
00372  */
00373 static int sched_settime(struct timeval *t, int when)
00374 {
00375    struct timeval now = ast_tvnow();
00376 
00377    /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
00378    if (ast_tvzero(*t))  /* not supplied, default to now */
00379       *t = now;
00380    *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
00381    if (ast_tvcmp(*t, now) < 0) {
00382       *t = now;
00383    }
00384    return 0;
00385 }
00386 
00387 int ast_sched_replace_variable(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00388 {
00389    /* 0 means the schedule item is new; do not delete */
00390    if (old_id > 0) {
00391       AST_SCHED_DEL(con, old_id);
00392    }
00393    return ast_sched_add_variable(con, when, callback, data, variable);
00394 }
00395 
00396 /*! \brief
00397  * Schedule callback(data) to happen when ms into the future
00398  */
00399 int ast_sched_add_variable(struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00400 {
00401    struct sched *tmp;
00402    int res = -1;
00403 
00404    DEBUG(ast_debug(1, "ast_sched_add()\n"));
00405 
00406    ast_mutex_lock(&con->lock);
00407    if ((tmp = sched_alloc(con))) {
00408       tmp->id = con->eventcnt++;
00409       tmp->callback = callback;
00410       tmp->data = data;
00411       tmp->resched = when;
00412       tmp->variable = variable;
00413       tmp->when = ast_tv(0, 0);
00414       if (sched_settime(&tmp->when, when)) {
00415          sched_release(con, tmp);
00416       } else {
00417          schedule(con, tmp);
00418          res = tmp->id;
00419       }
00420    }
00421 #ifdef DUMP_SCHEDULER
00422    /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
00423    if (option_debug)
00424       ast_sched_dump(con);
00425 #endif
00426    ast_mutex_unlock(&con->lock);
00427 
00428    return res;
00429 }
00430 
00431 int ast_sched_replace(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data)
00432 {
00433    if (old_id > -1) {
00434       AST_SCHED_DEL(con, old_id);
00435    }
00436    return ast_sched_add(con, when, callback, data);
00437 }
00438 
00439 int ast_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data)
00440 {
00441    return ast_sched_add_variable(con, when, callback, data, 0);
00442 }
00443 
00444 const void *ast_sched_find_data(struct sched_context *con, int id)
00445 {
00446    struct sched tmp,*res;
00447    tmp.id = id;
00448    res = ast_hashtab_lookup(con->schedq_ht, &tmp);
00449    if (res)
00450       return res->data;
00451    return NULL;
00452 }
00453    
00454 /*! \brief
00455  * Delete the schedule entry with number
00456  * "id".  It's nearly impossible that there
00457  * would be two or more in the list with that
00458  * id.
00459  */
00460 #ifndef AST_DEVMODE
00461 int ast_sched_del(struct sched_context *con, int id)
00462 #else
00463 int _ast_sched_del(struct sched_context *con, int id, const char *file, int line, const char *function)
00464 #endif
00465 {
00466    struct sched *s, tmp = {
00467       .id = id,
00468    };
00469 
00470    DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
00471    
00472    ast_mutex_lock(&con->lock);
00473    s = ast_hashtab_lookup(con->schedq_ht, &tmp);
00474    if (s) {
00475       if (!ast_heap_remove(con->sched_heap, s)) {
00476          ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
00477       }
00478 
00479       if (!ast_hashtab_remove_this_object(con->schedq_ht, s)) {
00480          ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
00481       }
00482 
00483       con->schedcnt--;
00484 
00485       sched_release(con, s);
00486    }
00487    
00488 #ifdef DUMP_SCHEDULER
00489    /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
00490    if (option_debug)
00491       ast_sched_dump(con);
00492 #endif
00493    ast_mutex_unlock(&con->lock);
00494 
00495    if (!s) {
00496       ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
00497 #ifndef AST_DEVMODE
00498       ast_assert(s != NULL);
00499 #else
00500       _ast_assert(0, "s != NULL", file, line, function);
00501 #endif
00502       return -1;
00503    }
00504    
00505    return 0;
00506 }
00507 
00508 void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
00509 {
00510    int i, x;
00511    struct sched *cur;
00512    int countlist[cbnames->numassocs + 1];
00513    size_t heap_size;
00514    
00515    memset(countlist, 0, sizeof(countlist));
00516    ast_str_set(buf, 0, " Highwater = %d\n schedcnt = %d\n", con->highwater, con->schedcnt);
00517 
00518    ast_mutex_lock(&con->lock);
00519 
00520    heap_size = ast_heap_size(con->sched_heap);
00521    for (x = 1; x <= heap_size; x++) {
00522       cur = ast_heap_peek(con->sched_heap, x);
00523       /* match the callback to the cblist */
00524       for (i = 0; i < cbnames->numassocs; i++) {
00525          if (cur->callback == cbnames->cblist[i]) {
00526             break;
00527          }
00528       }
00529       if (i < cbnames->numassocs) {
00530          countlist[i]++;
00531       } else {
00532          countlist[cbnames->numassocs]++;
00533       }
00534    }
00535 
00536    ast_mutex_unlock(&con->lock);
00537 
00538    for (i = 0; i < cbnames->numassocs; i++) {
00539       ast_str_append(buf, 0, "    %s : %d\n", cbnames->list[i], countlist[i]);
00540    }
00541 
00542    ast_str_append(buf, 0, "   <unknown> : %d\n", countlist[cbnames->numassocs]);
00543 }
00544    
00545 /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
00546 void ast_sched_dump(struct sched_context *con)
00547 {
00548    struct sched *q;
00549    struct timeval when = ast_tvnow();
00550    int x;
00551    size_t heap_size;
00552 #ifdef SCHED_MAX_CACHE
00553    ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d Cache, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
00554 #else
00555    ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->highwater);
00556 #endif
00557 
00558    ast_debug(1, "=============================================================\n");
00559    ast_debug(1, "|ID    Callback          Data              Time  (sec:ms)   |\n");
00560    ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
00561    ast_mutex_lock(&con->lock);
00562    heap_size = ast_heap_size(con->sched_heap);
00563    for (x = 1; x <= heap_size; x++) {
00564       struct timeval delta;
00565       q = ast_heap_peek(con->sched_heap, x);
00566       delta = ast_tvsub(q->when, when);
00567       ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n", 
00568          q->id,
00569          q->callback,
00570          q->data,
00571          (long)delta.tv_sec,
00572          (long int)delta.tv_usec);
00573    }
00574    ast_mutex_unlock(&con->lock);
00575    ast_debug(1, "=============================================================\n");
00576 }
00577 
00578 /*! \brief
00579  * Launch all events which need to be run at this time.
00580  */
00581 int ast_sched_runq(struct sched_context *con)
00582 {
00583    struct sched *current;
00584    struct timeval when;
00585    int numevents;
00586    int res;
00587 
00588    DEBUG(ast_debug(1, "ast_sched_runq()\n"));
00589       
00590    ast_mutex_lock(&con->lock);
00591 
00592    when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
00593    for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
00594       /* schedule all events which are going to expire within 1ms.
00595        * We only care about millisecond accuracy anyway, so this will
00596        * help us get more than one event at one time if they are very
00597        * close together.
00598        */
00599       if (ast_tvcmp(current->when, when) != -1) {
00600          break;
00601       }
00602       
00603       current = ast_heap_pop(con->sched_heap);
00604 
00605       if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
00606          ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
00607       }
00608 
00609       con->schedcnt--;
00610 
00611       /*
00612        * At this point, the schedule queue is still intact.  We
00613        * have removed the first event and the rest is still there,
00614        * so it's permissible for the callback to add new events, but
00615        * trying to delete itself won't work because it isn't in
00616        * the schedule queue.  If that's what it wants to do, it 
00617        * should return 0.
00618        */
00619          
00620       ast_mutex_unlock(&con->lock);
00621       res = current->callback(current->data);
00622       ast_mutex_lock(&con->lock);
00623          
00624       if (res) {
00625          /*
00626           * If they return non-zero, we should schedule them to be
00627           * run again.
00628           */
00629          if (sched_settime(&current->when, current->variable? res : current->resched)) {
00630             sched_release(con, current);
00631          } else {
00632             schedule(con, current);
00633          }
00634       } else {
00635          /* No longer needed, so release it */
00636          sched_release(con, current);
00637       }
00638    }
00639 
00640    ast_mutex_unlock(&con->lock);
00641    
00642    return numevents;
00643 }
00644 
00645 long ast_sched_when(struct sched_context *con,int id)
00646 {
00647    struct sched *s, tmp;
00648    long secs = -1;
00649    DEBUG(ast_debug(1, "ast_sched_when()\n"));
00650 
00651    ast_mutex_lock(&con->lock);
00652    
00653    /* these next 2 lines replace a lookup loop */
00654    tmp.id = id;
00655    s = ast_hashtab_lookup(con->schedq_ht, &tmp);
00656    
00657    if (s) {
00658       struct timeval now = ast_tvnow();
00659       secs = s->when.tv_sec - now.tv_sec;
00660    }
00661    ast_mutex_unlock(&con->lock);
00662    
00663    return secs;
00664 }