Thu Apr 28 2011 16:56:46

Asterisk developer's documentation


evt.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2007, Digium, Inc.
00005  *
00006  * Russell Bryant <russell@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*!
00020  * \file
00021  * \author Russell Bryant <russell@digium.com>
00022  *
00023  * \brief Usage of the SAForum AIS (Application Interface Specification)
00024  *
00025  * \arg http://www.openais.org/
00026  *
00027  * This file contains the code specific to the use of the EVT
00028  * (Event) Service.
00029  */
00030 
00031 #include "asterisk.h"
00032 
00033 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 271869 $");
00034 
00035 #include <stdlib.h>
00036 #include <stdio.h>
00037 #include <string.h>
00038 #include <unistd.h>
00039 #include <errno.h>
00040 
00041 #include "ais.h"
00042 
00043 #include "asterisk/module.h"
00044 #include "asterisk/utils.h"
00045 #include "asterisk/cli.h"
00046 #include "asterisk/logger.h"
00047 #include "asterisk/event.h"
00048 #include "asterisk/config.h"
00049 #include "asterisk/linkedlists.h"
00050 #include "asterisk/devicestate.h"
00051 
00052 #ifndef AST_MODULE
00053 /* XXX HACK */
00054 #define AST_MODULE "res_ais"
00055 #endif
00056 
00057 SaEvtHandleT evt_handle;
00058 static SaAisErrorT evt_init_res;
00059 
00060 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00061    SaAisErrorT error);
00062 void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id,
00063    const SaEvtEventHandleT event_handle, const SaSizeT event_datalen);
00064 
00065 static const SaEvtCallbacksT evt_callbacks = {
00066    .saEvtChannelOpenCallback  = evt_channel_open_cb,
00067    .saEvtEventDeliverCallback = evt_event_deliver_cb,
00068 };
00069 
00070 static const struct {
00071    const char *str;
00072    enum ast_event_type type;
00073 } supported_event_types[] = {
00074    { "mwi", AST_EVENT_MWI },
00075    { "device_state", AST_EVENT_DEVICE_STATE_CHANGE },
00076 };
00077 
00078 /*! Used to provide unique id's to egress subscriptions */
00079 static int unique_id;
00080 
00081 struct subscribe_event {
00082    AST_LIST_ENTRY(subscribe_event) entry;
00083    /*! This is a unique identifier to identify this subscription in the event
00084     *  channel through the different API calls, subscribe, unsubscribe, and
00085     *  the event deliver callback. */
00086    SaEvtSubscriptionIdT id;
00087    enum ast_event_type type;
00088 };
00089 
00090 struct publish_event {
00091    AST_LIST_ENTRY(publish_event) entry;
00092    /*! We subscribe to events internally so that we can publish them
00093     *  on this event channel. */
00094    struct ast_event_sub *sub;
00095    enum ast_event_type type;
00096 };
00097 
00098 struct event_channel {
00099    AST_RWLIST_ENTRY(event_channel) entry;
00100    AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events;
00101    AST_LIST_HEAD_NOLOCK(, publish_event) publish_events;
00102    SaEvtChannelHandleT handle;
00103    char name[1];
00104 };
00105 
00106 static AST_RWLIST_HEAD_STATIC(event_channels, event_channel);
00107 
00108 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00109    SaAisErrorT error)
00110 {
00111 
00112 }
00113 
00114 static void queue_event(struct ast_event *ast_event)
00115 {
00116    ast_event_queue_and_cache(ast_event);
00117 }
00118 
00119 void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id,
00120    const SaEvtEventHandleT event_handle, const SaSizeT event_datalen)
00121 {
00122    /* It is important to note that this works because we *know* that this
00123     * function will only be called by a single thread, the dispatch_thread.
00124     * If this module gets changed such that this is no longer the case, this
00125     * should get changed to a thread-local buffer, instead. */
00126    static unsigned char buf[4096];
00127    struct ast_event *event_dup, *event = (void *) buf;
00128    SaAisErrorT ais_res;
00129    SaSizeT len = sizeof(buf);
00130 
00131    if (event_datalen > len) {
00132       ast_log(LOG_ERROR, "Event received with size %u, which is too big\n"
00133          "for the allocated size %u. Change the code to increase the size.\n",
00134          (unsigned int) event_datalen, (unsigned int) len);
00135       return;
00136    }
00137 
00138    ais_res = saEvtEventDataGet(event_handle, event, &len);
00139    if (ais_res != SA_AIS_OK) {
00140       ast_log(LOG_ERROR, "Error retrieving event payload: %s\n",
00141          ais_err2str(ais_res));
00142       return;
00143    }
00144 
00145    if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
00146       /* Don't feed events back in that originated locally. */
00147       return;
00148    }
00149 
00150    if (!(event_dup = ast_malloc(len)))
00151       return;
00152 
00153    memcpy(event_dup, event, len);
00154 
00155    queue_event(event_dup);
00156 }
00157 
00158 static const char *type_to_filter_str(enum ast_event_type type)
00159 {
00160    const char *filter_str = NULL;
00161    int i;
00162 
00163    for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00164       if (supported_event_types[i].type == type) {
00165          filter_str = supported_event_types[i].str;
00166          break;
00167       }
00168    }
00169 
00170    return filter_str;
00171 }
00172 
00173 static void ast_event_cb(const struct ast_event *ast_event, void *data)
00174 {
00175    SaEvtEventHandleT event_handle;
00176    SaAisErrorT ais_res;
00177    struct event_channel *event_channel = data;
00178    SaClmClusterNodeT local_node;
00179    SaEvtEventPatternArrayT pattern_array;
00180    SaEvtEventPatternT pattern;
00181    SaSizeT len;
00182    const char *filter_str;
00183    SaEvtEventIdT event_id;
00184 
00185    ast_log(LOG_DEBUG, "Got an event to forward\n");
00186 
00187    if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) {
00188       /* If the event didn't originate from this server, don't send it back out. */
00189       ast_log(LOG_DEBUG, "Returning here\n");
00190       return;
00191    }
00192 
00193    ais_res = saEvtEventAllocate(event_channel->handle, &event_handle);
00194    if (ais_res != SA_AIS_OK) {
00195       ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(ais_res));
00196       ast_log(LOG_DEBUG, "Returning here\n");
00197       return;
00198    }
00199 
00200    ais_res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID,
00201       SA_TIME_ONE_SECOND, &local_node);
00202    if (ais_res != SA_AIS_OK) {
00203       ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(ais_res));
00204       goto return_event_free;
00205    }
00206 
00207    filter_str = type_to_filter_str(ast_event_get_type(ast_event));
00208    len = strlen(filter_str) + 1;
00209    pattern.pattern = (SaUint8T *) filter_str;
00210    pattern.patternSize = len;
00211    pattern.allocatedSize = len;
00212 
00213    pattern_array.allocatedNumber = 1;
00214    pattern_array.patternsNumber = 1;
00215    pattern_array.patterns = &pattern;
00216 
00217    /*!
00218     * /todo Make retention time configurable
00219     * /todo Make event priorities configurable
00220     */
00221    ais_res = saEvtEventAttributesSet(event_handle, &pattern_array,
00222       SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName);
00223    if (ais_res != SA_AIS_OK) {
00224       ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(ais_res));
00225       goto return_event_free;
00226    }
00227 
00228    ais_res = saEvtEventPublish(event_handle,
00229       ast_event, ast_event_get_size(ast_event), &event_id);
00230    if (ais_res != SA_AIS_OK) {
00231       ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
00232       goto return_event_free;
00233    }
00234 
00235 return_event_free:
00236    ais_res = saEvtEventFree(event_handle);
00237    if (ais_res != SA_AIS_OK) {
00238       ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(ais_res));
00239    }
00240    ast_log(LOG_DEBUG, "Returning here (event_free)\n");
00241 }
00242 
00243 static char *ais_evt_show_event_channels(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00244 {
00245    struct event_channel *event_channel;
00246 
00247    switch (cmd) {
00248    case CLI_INIT:
00249       e->command = "ais show evt event channels";
00250       e->usage =
00251          "Usage: ais show evt event channels\n"
00252          "       List configured event channels for the (EVT) Eventing service.\n";
00253       return NULL;
00254 
00255    case CLI_GENERATE:
00256       return NULL;   /* no completion */
00257    }
00258 
00259    if (a->argc != e->args)
00260       return CLI_SHOWUSAGE;
00261 
00262    ast_cli(a->fd, "\n"
00263                "=============================================================\n"
00264                "=== Event Channels ==========================================\n"
00265                "=============================================================\n"
00266                "===\n");
00267 
00268    AST_RWLIST_RDLOCK(&event_channels);
00269    AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00270       struct publish_event *publish_event;
00271       struct subscribe_event *subscribe_event;
00272 
00273       ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00274                      "=== Event Channel Name: %s\n", event_channel->name);
00275 
00276       AST_LIST_TRAVERSE(&event_channel->publish_events, publish_event, entry) {
00277          ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
00278             type_to_filter_str(publish_event->type));
00279       }
00280 
00281       AST_LIST_TRAVERSE(&event_channel->subscribe_events, subscribe_event, entry) {
00282          ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
00283             type_to_filter_str(subscribe_event->type));
00284       }
00285 
00286       ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00287                      "===\n");
00288    }
00289    AST_RWLIST_UNLOCK(&event_channels);
00290 
00291    ast_cli(a->fd, "=============================================================\n"
00292                   "\n");
00293 
00294    return CLI_SUCCESS;
00295 }
00296 
00297 static struct ast_cli_entry ais_cli[] = {
00298    AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
00299 };
00300 
00301 static void add_publish_event(struct event_channel *event_channel, const char *event_type)
00302 {
00303    int i;
00304    enum ast_event_type type = -1;
00305    struct publish_event *publish_event;
00306 
00307    for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00308       if (!strcasecmp(event_type, supported_event_types[i].str)) {
00309          type = supported_event_types[i].type;
00310          break;
00311       }
00312    }
00313 
00314    if (type == -1) {
00315       ast_log(LOG_WARNING, "publish_event option given with invalid value '%s'\n", event_type);
00316       return;
00317    }
00318 
00319    if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00320       return;
00321    }
00322 
00323    if (!(publish_event = ast_calloc(1, sizeof(*publish_event)))) {
00324       return;
00325    }
00326 
00327    publish_event->type = type;
00328    ast_log(LOG_DEBUG, "Subscribing to event type %d\n", type);
00329    publish_event->sub = ast_event_subscribe(type, ast_event_cb, event_channel,
00330       AST_EVENT_IE_END);
00331    ast_event_dump_cache(publish_event->sub);
00332 
00333    AST_LIST_INSERT_TAIL(&event_channel->publish_events, publish_event, entry);
00334 }
00335 
00336 static SaAisErrorT set_egress_subscription(struct event_channel *event_channel,
00337    struct subscribe_event *subscribe_event)
00338 {
00339    SaAisErrorT ais_res;
00340    SaEvtEventFilterArrayT filter_array;
00341    SaEvtEventFilterT filter;
00342    const char *filter_str = NULL;
00343    SaSizeT len;
00344 
00345    /* We know it's going to be valid.  It was checked earlier. */
00346    filter_str = type_to_filter_str(subscribe_event->type);
00347 
00348    filter.filterType = SA_EVT_EXACT_FILTER;
00349    len = strlen(filter_str) + 1;
00350    filter.filter.allocatedSize = len;
00351    filter.filter.patternSize = len;
00352    filter.filter.pattern = (SaUint8T *) filter_str;
00353 
00354    filter_array.filtersNumber = 1;
00355    filter_array.filters = &filter;
00356 
00357    ais_res = saEvtEventSubscribe(event_channel->handle, &filter_array,
00358       subscribe_event->id);
00359 
00360    return ais_res;
00361 }
00362 
00363 static void add_subscribe_event(struct event_channel *event_channel, const char *event_type)
00364 {
00365    int i;
00366    enum ast_event_type type = -1;
00367    struct subscribe_event *subscribe_event;
00368    SaAisErrorT ais_res;
00369 
00370    for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00371       if (!strcasecmp(event_type, supported_event_types[i].str)) {
00372          type = supported_event_types[i].type;
00373          break;
00374       }
00375    }
00376 
00377    if (type == -1) {
00378       ast_log(LOG_WARNING, "subscribe_event option given with invalid value '%s'\n", event_type);
00379       return;
00380    }
00381 
00382    if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00383       return;
00384    }
00385 
00386    if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event)))) {
00387       return;
00388    }
00389 
00390    subscribe_event->type = type;
00391    subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1);
00392 
00393    ais_res = set_egress_subscription(event_channel, subscribe_event);
00394    if (ais_res != SA_AIS_OK) {
00395       ast_log(LOG_ERROR, "Error setting up egress subscription: %s\n",
00396          ais_err2str(ais_res));
00397       free(subscribe_event);
00398       return;
00399    }
00400 
00401    AST_LIST_INSERT_TAIL(&event_channel->subscribe_events, subscribe_event, entry);
00402 }
00403 
00404 static void build_event_channel(struct ast_config *cfg, const char *cat)
00405 {
00406    struct ast_variable *var;
00407    struct event_channel *event_channel;
00408    SaAisErrorT ais_res;
00409    SaNameT sa_name = { 0, };
00410 
00411    AST_RWLIST_WRLOCK(&event_channels);
00412    AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00413       if (!strcasecmp(event_channel->name, cat))
00414          break;
00415    }
00416    AST_RWLIST_UNLOCK(&event_channels);
00417    if (event_channel) {
00418       ast_log(LOG_WARNING, "Event channel '%s' was specified twice in "
00419          "configuration.  Second instance ignored.\n", cat);
00420       return;
00421    }
00422 
00423    if (!(event_channel = ast_calloc(1, sizeof(*event_channel) + strlen(cat))))
00424       return;
00425 
00426    strcpy(event_channel->name, cat);
00427    ast_copy_string((char *) sa_name.value, cat, sizeof(sa_name.value));
00428    sa_name.length = strlen((char *) sa_name.value);
00429    ais_res = saEvtChannelOpen(evt_handle, &sa_name,
00430       SA_EVT_CHANNEL_PUBLISHER | SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE,
00431       SA_TIME_MAX, &event_channel->handle);
00432    if (ais_res != SA_AIS_OK) {
00433       ast_log(LOG_ERROR, "Error opening event channel: %s\n", ais_err2str(ais_res));
00434       free(event_channel);
00435       return;
00436    }
00437 
00438    for (var = ast_variable_browse(cfg, cat); var; var = var->next) {
00439       if (!strcasecmp(var->name, "type")) {
00440          continue;
00441       } else if (!strcasecmp(var->name, "publish_event")) {
00442          add_publish_event(event_channel, var->value);
00443       } else if (!strcasecmp(var->name, "subscribe_event")) {
00444          add_subscribe_event(event_channel, var->value);
00445       } else {
00446          ast_log(LOG_WARNING, "Event channel '%s' contains invalid option '%s'\n",
00447             event_channel->name, var->name);
00448       }
00449    }
00450 
00451    AST_RWLIST_WRLOCK(&event_channels);
00452    AST_RWLIST_INSERT_TAIL(&event_channels, event_channel, entry);
00453    AST_RWLIST_UNLOCK(&event_channels);
00454 }
00455 
00456 static void load_config(void)
00457 {
00458    static const char filename[] = "ais.conf";
00459    struct ast_config *cfg;
00460    const char *cat = NULL;
00461    struct ast_flags config_flags = { 0 };
00462 
00463    if (!(cfg = ast_config_load(filename, config_flags)) || cfg == CONFIG_STATUS_FILEINVALID)
00464       return;
00465 
00466    while ((cat = ast_category_browse(cfg, cat))) {
00467       const char *type;
00468 
00469       if (!strcasecmp(cat, "general"))
00470          continue;
00471 
00472       if (!(type = ast_variable_retrieve(cfg, cat, "type"))) {
00473          ast_log(LOG_WARNING, "Invalid entry in %s defined with no type!\n",
00474             filename);
00475          continue;
00476       }
00477 
00478       if (!strcasecmp(type, "event_channel")) {
00479          build_event_channel(cfg, cat);
00480       } else {
00481          ast_log(LOG_WARNING, "Entry in %s defined with invalid type '%s'\n",
00482             filename, type);
00483       }
00484    }
00485 
00486    ast_config_destroy(cfg);
00487 }
00488 
00489 static void publish_event_destroy(struct publish_event *publish_event)
00490 {
00491    ast_event_unsubscribe(publish_event->sub);
00492 
00493    free(publish_event);
00494 }
00495 
00496 static void subscribe_event_destroy(const struct event_channel *event_channel,
00497    struct subscribe_event *subscribe_event)
00498 {
00499    SaAisErrorT ais_res;
00500 
00501    /* saEvtChannelClose() will actually do this automatically, but it just
00502     * feels cleaner to go ahead and do it manually ... */
00503    ais_res = saEvtEventUnsubscribe(event_channel->handle, subscribe_event->id);
00504    if (ais_res != SA_AIS_OK) {
00505       ast_log(LOG_ERROR, "Error unsubscribing: %s\n", ais_err2str(ais_res));
00506    }
00507 
00508    free(subscribe_event);
00509 }
00510 
00511 static void event_channel_destroy(struct event_channel *event_channel)
00512 {
00513    struct publish_event *publish_event;
00514    struct subscribe_event *subscribe_event;
00515    SaAisErrorT ais_res;
00516 
00517    while ((publish_event = AST_LIST_REMOVE_HEAD(&event_channel->publish_events, entry)))
00518       publish_event_destroy(publish_event);
00519    while ((subscribe_event = AST_LIST_REMOVE_HEAD(&event_channel->subscribe_events, entry)))
00520       subscribe_event_destroy(event_channel, subscribe_event);
00521 
00522    ais_res = saEvtChannelClose(event_channel->handle);
00523    if (ais_res != SA_AIS_OK) {
00524       ast_log(LOG_ERROR, "Error closing event channel '%s': %s\n",
00525          event_channel->name, ais_err2str(ais_res));
00526    }
00527 
00528    free(event_channel);
00529 }
00530 
00531 static void destroy_event_channels(void)
00532 {
00533    struct event_channel *event_channel;
00534 
00535    AST_RWLIST_WRLOCK(&event_channels);
00536    while ((event_channel = AST_RWLIST_REMOVE_HEAD(&event_channels, entry))) {
00537       event_channel_destroy(event_channel);
00538    }
00539    AST_RWLIST_UNLOCK(&event_channels);
00540 }
00541 
00542 int ast_ais_evt_load_module(void)
00543 {
00544    evt_init_res = saEvtInitialize(&evt_handle, &evt_callbacks, &ais_version);
00545    if (evt_init_res != SA_AIS_OK) {
00546       ast_log(LOG_ERROR, "Could not initialize eventing service: %s\n",
00547          ais_err2str(evt_init_res));
00548       return -1;
00549    }
00550 
00551    load_config();
00552 
00553    ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
00554 
00555    return 0;
00556 }
00557 
00558 int ast_ais_evt_unload_module(void)
00559 {
00560    SaAisErrorT ais_res;
00561 
00562    if (evt_init_res != SA_AIS_OK) {
00563       return 0;
00564    }
00565 
00566    destroy_event_channels();
00567 
00568    ais_res = saEvtFinalize(evt_handle);
00569    if (ais_res != SA_AIS_OK) {
00570       ast_log(LOG_ERROR, "Problem stopping eventing service: %s\n",
00571          ais_err2str(ais_res));
00572       return -1;
00573    }
00574 
00575    return 0;
00576 }