00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "asterisk.h"
00026
00027 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 185947 $")
00028
00029 #include <signal.h>
00030 #include <sys/time.h>
00031
00032 #include "asterisk/_private.h"
00033 #include "asterisk/module.h"
00034 #include "asterisk/time.h"
00035 #include "asterisk/astobj2.h"
00036 #include "asterisk/cli.h"
00037 #include "asterisk/taskprocessor.h"
00038
00039
00040
00041
00042
00043
00044
00045 struct tps_task {
00046
00047 int (*execute)(void *datap);
00048
00049 void *datap;
00050
00051 AST_LIST_ENTRY(tps_task) list;
00052 };
00053
00054
00055 struct tps_taskprocessor_stats {
00056
00057 unsigned long max_qsize;
00058
00059 unsigned long _tasks_processed_count;
00060 };
00061
00062
00063 struct ast_taskprocessor {
00064
00065 char *name;
00066
00067 ast_cond_t poll_cond;
00068
00069 pthread_t poll_thread;
00070
00071 ast_mutex_t taskprocessor_lock;
00072
00073 unsigned char poll_thread_run;
00074
00075 struct tps_taskprocessor_stats *stats;
00076
00077 long tps_queue_size;
00078
00079 AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
00080
00081 AST_LIST_ENTRY(ast_taskprocessor) list;
00082 };
00083 #define TPS_MAX_BUCKETS 7
00084
00085 static struct ao2_container *tps_singletons;
00086
00087
00088 static ast_cond_t cli_ping_cond;
00089
00090
00091 AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
00092
00093
00094 static int tps_hash_cb(const void *obj, const int flags);
00095
00096 static int tps_cmp_cb(void *obj, void *arg, int flags);
00097
00098
00099 static void *tps_processing_function(void *data);
00100
00101
00102 static void tps_taskprocessor_destroy(void *tps);
00103
00104
00105 static int tps_ping_handler(void *datap);
00106
00107
00108 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
00109
00110
00111 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
00112
00113 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
00114 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
00115
00116 static struct ast_cli_entry taskprocessor_clis[] = {
00117 AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
00118 AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
00119 };
00120
00121
00122 int ast_tps_init(void)
00123 {
00124 if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
00125 ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
00126 return -1;
00127 }
00128
00129 ast_cond_init(&cli_ping_cond, NULL);
00130
00131 ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
00132 return 0;
00133 }
00134
00135
00136 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
00137 {
00138 struct tps_task *t;
00139 if ((t = ast_calloc(1, sizeof(*t)))) {
00140 t->execute = task_exe;
00141 t->datap = datap;
00142 }
00143 return t;
00144 }
00145
00146
00147 static void *tps_task_free(struct tps_task *task)
00148 {
00149 if (task) {
00150 ast_free(task);
00151 }
00152 return NULL;
00153 }
00154
00155
00156 static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
00157 {
00158 int tklen;
00159 int wordnum = 0;
00160 char *name = NULL;
00161 struct ao2_iterator i;
00162
00163 if (a->pos != 3)
00164 return NULL;
00165
00166 tklen = strlen(a->word);
00167 i = ao2_iterator_init(tps_singletons, 0);
00168 while ((p = ao2_iterator_next(&i))) {
00169 if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
00170 name = ast_strdup(p->name);
00171 ao2_ref(p, -1);
00172 break;
00173 }
00174 ao2_ref(p, -1);
00175 }
00176 return name;
00177 }
00178
00179
00180 static int tps_ping_handler(void *datap)
00181 {
00182 ast_mutex_lock(&cli_ping_cond_lock);
00183 ast_cond_signal(&cli_ping_cond);
00184 ast_mutex_unlock(&cli_ping_cond_lock);
00185 return 0;
00186 }
00187
00188
00189 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00190 {
00191 struct timeval begin, end, delta;
00192 char *name;
00193 struct timeval when;
00194 struct timespec ts;
00195 struct ast_taskprocessor *tps = NULL;
00196
00197 switch (cmd) {
00198 case CLI_INIT:
00199 e->command = "core ping taskprocessor";
00200 e->usage =
00201 "Usage: core ping taskprocessor <taskprocessor>\n"
00202 " Displays the time required for a task to be processed\n";
00203 return NULL;
00204 case CLI_GENERATE:
00205 return tps_taskprocessor_tab_complete(tps, a);
00206 }
00207
00208 if (a->argc != 4)
00209 return CLI_SHOWUSAGE;
00210
00211 name = a->argv[3];
00212 if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
00213 ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
00214 return CLI_SUCCESS;
00215 }
00216 ast_cli(a->fd, "\npinging %s ...", name);
00217 when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
00218 ts.tv_sec = when.tv_sec;
00219 ts.tv_nsec = when.tv_usec * 1000;
00220 ast_mutex_lock(&cli_ping_cond_lock);
00221 if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
00222 ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
00223 ao2_ref(tps, -1);
00224 return CLI_FAILURE;
00225 }
00226 ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
00227 ast_mutex_unlock(&cli_ping_cond_lock);
00228 end = ast_tvnow();
00229 delta = ast_tvsub(end, begin);
00230 ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
00231 ao2_ref(tps, -1);
00232 return CLI_SUCCESS;
00233 }
00234
00235 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00236 {
00237 char name[256];
00238 int tcount;
00239 unsigned long qsize;
00240 unsigned long maxqsize;
00241 unsigned long processed;
00242 struct ast_taskprocessor *p;
00243 struct ao2_iterator i;
00244
00245 switch (cmd) {
00246 case CLI_INIT:
00247 e->command = "core show taskprocessors";
00248 e->usage =
00249 "Usage: core show taskprocessors\n"
00250 " Shows a list of instantiated task processors and their statistics\n";
00251 return NULL;
00252 case CLI_GENERATE:
00253 return NULL;
00254 }
00255
00256 if (a->argc != e->args)
00257 return CLI_SHOWUSAGE;
00258
00259 ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
00260 i = ao2_iterator_init(tps_singletons, 0);
00261 while ((p = ao2_iterator_next(&i))) {
00262 ast_copy_string(name, p->name, sizeof(name));
00263 qsize = p->tps_queue_size;
00264 maxqsize = p->stats->max_qsize;
00265 processed = p->stats->_tasks_processed_count;
00266 ast_cli(a->fd, "\n%24s %17ld %12ld %12ld", name, processed, qsize, maxqsize);
00267 ao2_ref(p, -1);
00268 }
00269 tcount = ao2_container_count(tps_singletons);
00270 ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
00271 return CLI_SUCCESS;
00272 }
00273
00274
00275 static void *tps_processing_function(void *data)
00276 {
00277 struct ast_taskprocessor *i = data;
00278 struct tps_task *t;
00279 int size;
00280
00281 if (!i) {
00282 ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
00283 return NULL;
00284 }
00285
00286 while (i->poll_thread_run) {
00287 ast_mutex_lock(&i->taskprocessor_lock);
00288 if (!i->poll_thread_run) {
00289 ast_mutex_unlock(&i->taskprocessor_lock);
00290 break;
00291 }
00292 if (!(size = tps_taskprocessor_depth(i))) {
00293 ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
00294 if (!i->poll_thread_run) {
00295 ast_mutex_unlock(&i->taskprocessor_lock);
00296 break;
00297 }
00298 }
00299 ast_mutex_unlock(&i->taskprocessor_lock);
00300
00301 if (!(t = tps_taskprocessor_pop(i))) {
00302 ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
00303 continue;
00304 }
00305 if (!t->execute) {
00306 ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
00307 tps_task_free(t);
00308 continue;
00309 }
00310 t->execute(t->datap);
00311
00312 ast_mutex_lock(&i->taskprocessor_lock);
00313 if (i->stats) {
00314 i->stats->_tasks_processed_count++;
00315 if (size > i->stats->max_qsize) {
00316 i->stats->max_qsize = size;
00317 }
00318 }
00319 ast_mutex_unlock(&i->taskprocessor_lock);
00320
00321 tps_task_free(t);
00322 }
00323 while ((t = tps_taskprocessor_pop(i))) {
00324 tps_task_free(t);
00325 }
00326 return NULL;
00327 }
00328
00329
00330 static int tps_hash_cb(const void *obj, const int flags)
00331 {
00332 const struct ast_taskprocessor *tps = obj;
00333
00334 return ast_str_case_hash(tps->name);
00335 }
00336
00337
00338 static int tps_cmp_cb(void *obj, void *arg, int flags)
00339 {
00340 struct ast_taskprocessor *lhs = obj, *rhs = arg;
00341
00342 return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
00343 }
00344
00345
00346 static void tps_taskprocessor_destroy(void *tps)
00347 {
00348 struct ast_taskprocessor *t = tps;
00349
00350 if (!tps) {
00351 ast_log(LOG_ERROR, "missing taskprocessor\n");
00352 return;
00353 }
00354 ast_log(LOG_DEBUG, "destroying taskprocessor '%s'\n", t->name);
00355
00356 ast_mutex_lock(&t->taskprocessor_lock);
00357 t->poll_thread_run = 0;
00358 ast_cond_signal(&t->poll_cond);
00359 ast_mutex_unlock(&t->taskprocessor_lock);
00360 pthread_join(t->poll_thread, NULL);
00361 t->poll_thread = AST_PTHREADT_NULL;
00362 ast_mutex_destroy(&t->taskprocessor_lock);
00363 ast_cond_destroy(&t->poll_cond);
00364
00365 if (t->stats) {
00366 ast_free(t->stats);
00367 t->stats = NULL;
00368 }
00369 ast_free(t->name);
00370 }
00371
00372
00373 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
00374 {
00375 struct tps_task *task;
00376
00377 if (!tps) {
00378 ast_log(LOG_ERROR, "missing taskprocessor\n");
00379 return NULL;
00380 }
00381 ast_mutex_lock(&tps->taskprocessor_lock);
00382 if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
00383 tps->tps_queue_size--;
00384 }
00385 ast_mutex_unlock(&tps->taskprocessor_lock);
00386 return task;
00387 }
00388
00389 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
00390 {
00391 return (tps) ? tps->tps_queue_size : -1;
00392 }
00393
00394
00395 const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
00396 {
00397 if (!tps) {
00398 ast_log(LOG_ERROR, "no taskprocessor specified!\n");
00399 return NULL;
00400 }
00401 return tps->name;
00402 }
00403
00404
00405
00406
00407 struct ast_taskprocessor *ast_taskprocessor_get(char *name, enum ast_tps_options create)
00408 {
00409 struct ast_taskprocessor *p, tmp_tps = {
00410 .name = name,
00411 };
00412
00413 if (ast_strlen_zero(name)) {
00414 ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
00415 return NULL;
00416 }
00417 ao2_lock(tps_singletons);
00418 p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
00419 if (p) {
00420 ao2_unlock(tps_singletons);
00421 return p;
00422 }
00423 if (create & TPS_REF_IF_EXISTS) {
00424
00425 ao2_unlock(tps_singletons);
00426 return NULL;
00427 }
00428
00429 if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
00430 ao2_unlock(tps_singletons);
00431 ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
00432 return NULL;
00433 }
00434
00435 ast_cond_init(&p->poll_cond, NULL);
00436 ast_mutex_init(&p->taskprocessor_lock);
00437
00438 if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
00439 ao2_unlock(tps_singletons);
00440 ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
00441 ao2_ref(p, -1);
00442 return NULL;
00443 }
00444 if (!(p->name = ast_strdup(name))) {
00445 ao2_unlock(tps_singletons);
00446 ao2_ref(p, -1);
00447 return NULL;
00448 }
00449 p->poll_thread_run = 1;
00450 p->poll_thread = AST_PTHREADT_NULL;
00451 if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) {
00452 ao2_unlock(tps_singletons);
00453 ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
00454 ao2_ref(p, -1);
00455 return NULL;
00456 }
00457 if (!(ao2_link(tps_singletons, p))) {
00458 ao2_unlock(tps_singletons);
00459 ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
00460 ao2_ref(p, -1);
00461 return NULL;
00462 }
00463 ao2_unlock(tps_singletons);
00464 return p;
00465 }
00466
00467
00468 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
00469 {
00470 if (tps) {
00471 ao2_lock(tps_singletons);
00472 ao2_unlink(tps_singletons, tps);
00473 if (ao2_ref(tps, -1) > 1) {
00474 ao2_link(tps_singletons, tps);
00475 }
00476 ao2_unlock(tps_singletons);
00477 }
00478 return NULL;
00479 }
00480
00481
00482 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
00483 {
00484 struct tps_task *t;
00485
00486 if (!tps || !task_exe) {
00487 ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
00488 return -1;
00489 }
00490 if (!(t = tps_task_alloc(task_exe, datap))) {
00491 ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
00492 return -1;
00493 }
00494 ast_mutex_lock(&tps->taskprocessor_lock);
00495 AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
00496 tps->tps_queue_size++;
00497 ast_cond_signal(&tps->poll_cond);
00498 ast_mutex_unlock(&tps->taskprocessor_lock);
00499 return 0;
00500 }
00501