add high and low priority event queues

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@674 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2006-02-26 00:12:17 +00:00
parent 5782ca928c
commit 7174e8ae8a
5 changed files with 113 additions and 79 deletions

View File

@ -85,6 +85,8 @@ struct switch_event_subclass {
struct switch_event { struct switch_event {
/*! the event id (descriptor) */ /*! the event id (descriptor) */
switch_event_t event_id; switch_event_t event_id;
/*! the priority of the event */
switch_priority_t priority;
/*! the owner of the event */ /*! the owner of the event */
char *owner; char *owner;
/*! the subclass of the event */ /*! the subclass of the event */
@ -139,6 +141,14 @@ SWITCH_DECLARE(switch_status) switch_event_shutdown(void);
*/ */
SWITCH_DECLARE(switch_status) switch_event_create_subclass(switch_event **event, switch_event_t event_id, char *subclass_name); SWITCH_DECLARE(switch_status) switch_event_create_subclass(switch_event **event, switch_event_t event_id, char *subclass_name);
/*!
\brief Set the priority of an event
\param event the event to set the priority on
\param priority the event priority
\return SWITCH_STATUS_SUCCESS
*/
SWITCH_DECLARE(switch_status) switch_event_set_priority(switch_event *event, switch_priority_t priority);
/*! /*!
\brief Retrieve a header value from an event \brief Retrieve a header value from an event
\param event the event to read the header from \param event the event to read the header from
@ -253,6 +263,13 @@ SWITCH_DECLARE(switch_status) switch_event_add_body(switch_event *event, char *f
*/ */
#define switch_event_create(event, id) switch_event_create_subclass(event, id, SWITCH_EVENT_SUBCLASS_ANY) #define switch_event_create(event, id) switch_event_create_subclass(event, id, SWITCH_EVENT_SUBCLASS_ANY)
/*!
\brief Deliver an event to all of the registered event listeners
\param event the event to send (will be nulled)
\note normaly use switch_event_fire for delivering events (only use this when you wish to deliver the event blocking on your thread)
*/
SWITCH_DECLARE(void) switch_event_deliver(switch_event **event);
/*! /*!
\brief Fire an event filling in most of the arguements with obvious values \brief Fire an event filling in most of the arguements with obvious values
\param event the event to send (will be nulled on success) \param event the event to send (will be nulled on success)

View File

@ -47,6 +47,22 @@ extern "C" {
#define SWITCH_TRUE 1 #define SWITCH_TRUE 1
#define SWITCH_FALSE 0 #define SWITCH_FALSE 0
/*!
\enum switch_priority_t
\brief Priority Indication
<pre>
SWITCH_PRIORITY_NORMAL - Normal Priority
SWITCH_PRIORITY_LOW - Low Priority
SWITCH_PRIORITY_HIGH - High Priority
</pre>
*/
typedef enum {
SWITCH_PRIORITY_NORMAL,
SWITCH_PRIORITY_LOW,
SWITCH_PRIORITY_HIGH,
} switch_priority_t;
/*! /*!
\enum switch_ivr_option_t \enum switch_ivr_option_t
\brief Possible options related to ivr functions \brief Possible options related to ivr functions

View File

@ -65,7 +65,12 @@ extern "C" {
!strcasecmp(expr, "true") ||\ !strcasecmp(expr, "true") ||\
atoi(expr))) ? SWITCH_TRUE : SWITCH_FALSE atoi(expr))) ? SWITCH_TRUE : SWITCH_FALSE
/*!
\brief Return a printable name of a switch_priority_t
\param priority the priority to get the name of
\return the printable form of the priority
*/
SWITCH_DECLARE(char *) switch_priority_name(switch_priority_t priority);
/*! /*!
\brief Return the RFC2833 character based on an event id \brief Return the RFC2833 character based on an event id

View File

@ -38,22 +38,12 @@ static switch_memory_pool *RUNTIME_POOL = NULL;
static switch_memory_pool *APOOL = NULL; static switch_memory_pool *APOOL = NULL;
static switch_memory_pool *BPOOL = NULL; static switch_memory_pool *BPOOL = NULL;
static switch_memory_pool *THRUNTIME_POOL = NULL; static switch_memory_pool *THRUNTIME_POOL = NULL;
static switch_queue_t *EVENT_QUEUE = NULL; static switch_queue_t *EVENT_QUEUE[3] = {0,0,0};
//#define MALLOC_EVENTS
#ifdef MALLOC_EVENTS
static int POOL_COUNT = 0;
#endif
static int POOL_COUNT_MAX = 100; static int POOL_COUNT_MAX = 100;
static switch_hash *CUSTOM_HASH = NULL; static switch_hash *CUSTOM_HASH = NULL;
static int THREAD_RUNNING = 0; static int THREAD_RUNNING = 0;
#ifdef MALLOC_EVENTS
#define ALLOC(size) malloc(size)
#define DUP(str) strdup(str)
#else
static void *locked_alloc(size_t len) static void *locked_alloc(size_t len)
{ {
void *mem; void *mem;
@ -82,7 +72,7 @@ static void *locked_dup(char *str)
#define ALLOC(size) locked_alloc(size) #define ALLOC(size) locked_alloc(size)
#define DUP(str) locked_dup(str) #define DUP(str) locked_dup(str)
#endif
/* make sure this is synced with the switch_event_t enum in switch_types.h /* make sure this is synced with the switch_event_t enum in switch_types.h
also never put any new ones before EVENT_ALL also never put any new ones before EVENT_ALL
@ -145,57 +135,61 @@ static int switch_events_match(switch_event *event, switch_event_node *node)
static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread *thread, void *obj) static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread *thread, void *obj)
{ {
switch_event_node *node;
switch_event *out_event = NULL; switch_event *out_event = NULL;
switch_event_t e; switch_queue_t *queue = NULL;
switch_queue_t *queues[3] = {0,0,0};
void *pop; void *pop;
int i, len[3] = {0,0,0};
assert(POOL_LOCK != NULL); assert(POOL_LOCK != NULL);
assert(RUNTIME_POOL != NULL); assert(RUNTIME_POOL != NULL);
THREAD_RUNNING = 1; THREAD_RUNNING = 1;
while (THREAD_RUNNING == 1 || switch_queue_size(EVENT_QUEUE)) {
queues[0] = EVENT_QUEUE[SWITCH_PRIORITY_HIGH];
queues[1] = EVENT_QUEUE[SWITCH_PRIORITY_NORMAL];
queues[2] = EVENT_QUEUE[SWITCH_PRIORITY_LOW];
#ifdef MALLOC_EVENTS while (THREAD_RUNNING == 1 ||
switch_mutex_lock(POOL_LOCK); (len[1] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_NORMAL])) ||
/* <LOCKED> ----------------------------------------------- */ (len[2] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_LOW])) ||
if (POOL_COUNT >= POOL_COUNT_MAX) { (len[0] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_HIGH]))
if (THRUNTIME_POOL == APOOL) { ) {
THRUNTIME_POOL = BPOOL;
} else { for(i = 0; i < 3; i++) {
THRUNTIME_POOL = APOOL; queue = queues[i];
while (queue && switch_queue_trypop(queue, &pop) == SWITCH_STATUS_SUCCESS) {
out_event = pop;
switch_event_deliver(&out_event);
} }
switch_pool_clear(THRUNTIME_POOL);
POOL_COUNT = 0;
} }
switch_mutex_unlock(POOL_LOCK);
/* </LOCKED> ----------------------------------------------- */
#endif
while (switch_queue_trypop(EVENT_QUEUE, &pop) == SWITCH_STATUS_SUCCESS) {
out_event = pop;
for (e = out_event->event_id;; e = SWITCH_EVENT_ALL) {
for (node = EVENT_NODES[e]; node; node = node->next) {
if (switch_events_match(out_event, node)) {
out_event->bind_user_data = node->user_data;
node->callback(out_event);
}
}
if (e == SWITCH_EVENT_ALL) {
break;
}
}
switch_event_destroy(&out_event);
}
switch_yield(1000); switch_yield(1000);
} }
THREAD_RUNNING = 0; THREAD_RUNNING = 0;
return NULL; return NULL;
} }
SWITCH_DECLARE(void) switch_event_deliver(switch_event **event)
{
switch_event_t e;
switch_event_node *node;
for (e = (*event)->event_id;; e = SWITCH_EVENT_ALL) {
for (node = EVENT_NODES[e]; node; node = node->next) {
if (switch_events_match(*event, node)) {
(*event)->bind_user_data = node->user_data;
node->callback(*event);
}
}
if (e == SWITCH_EVENT_ALL) {
break;
}
}
switch_event_destroy(event);
}
SWITCH_DECLARE(switch_status) switch_event_running(void) SWITCH_DECLARE(switch_status) switch_event_running(void)
{ {
return THREAD_RUNNING ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE; return THREAD_RUNNING ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE;
@ -265,7 +259,9 @@ SWITCH_DECLARE(switch_status) switch_event_init(switch_memory_pool *pool)
} }
THRUNTIME_POOL = APOOL; THRUNTIME_POOL = APOOL;
switch_queue_create(&EVENT_QUEUE, POOL_COUNT_MAX + 10, THRUNTIME_POOL); switch_queue_create(&EVENT_QUEUE[0], POOL_COUNT_MAX + 10, THRUNTIME_POOL);
switch_queue_create(&EVENT_QUEUE[1], POOL_COUNT_MAX + 10, THRUNTIME_POOL);
switch_queue_create(&EVENT_QUEUE[2], POOL_COUNT_MAX + 10, THRUNTIME_POOL);
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Activate Eventing Engine.\n"); switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Activate Eventing Engine.\n");
switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
@ -280,7 +276,8 @@ SWITCH_DECLARE(switch_status) switch_event_init(switch_memory_pool *pool)
} }
SWITCH_DECLARE(switch_status) switch_event_create_subclass(switch_event **event, switch_event_t event_id, SWITCH_DECLARE(switch_status) switch_event_create_subclass(switch_event **event,
switch_event_t event_id,
char *subclass_name) char *subclass_name)
{ {
@ -291,9 +288,6 @@ SWITCH_DECLARE(switch_status) switch_event_create_subclass(switch_event **event,
if ((*event = ALLOC(sizeof(switch_event))) == 0) { if ((*event = ALLOC(sizeof(switch_event))) == 0) {
return SWITCH_STATUS_MEMERR; return SWITCH_STATUS_MEMERR;
} }
#ifdef MALLOC_EVENTS
memset(*event, 0, sizeof(switch_event));
#endif
(*event)->event_id = event_id; (*event)->event_id = event_id;
@ -304,6 +298,13 @@ SWITCH_DECLARE(switch_status) switch_event_create_subclass(switch_event **event,
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
SWITCH_DECLARE(switch_status) switch_event_set_priority(switch_event *event, switch_priority_t priority)
{
event->priority = priority;
switch_event_add_header(event, SWITCH_STACK_TOP, "priority", switch_priority_name(priority));
return SWITCH_STATUS_SUCCESS;
}
SWITCH_DECLARE(char *) switch_event_get_header(switch_event *event, char *header_name) SWITCH_DECLARE(char *) switch_event_get_header(switch_event *event, char *header_name)
{ {
switch_event_header *hp; switch_event_header *hp;
@ -336,9 +337,6 @@ SWITCH_DECLARE(switch_status) switch_event_add_header(switch_event *event, switc
if ((header = ALLOC(sizeof(*header))) == 0) { if ((header = ALLOC(sizeof(*header))) == 0) {
return SWITCH_STATUS_MEMERR; return SWITCH_STATUS_MEMERR;
} }
#ifdef MALLOC_EVENTS
memset(header, 0, sizeof(*header));
#endif
header->name = DUP(header_name); header->name = DUP(header_name);
header->value = DUP(data); header->value = DUP(data);
@ -380,19 +378,6 @@ SWITCH_DECLARE(switch_status) switch_event_add_body(switch_event *event, char *f
SWITCH_DECLARE(void) switch_event_destroy(switch_event **event) SWITCH_DECLARE(void) switch_event_destroy(switch_event **event)
{ {
#ifdef MALLOC_EVENTS
switch_event_header *hp, *tofree;
for (hp = (*event)->headers; hp && hp->next;) {
tofree = hp;
hp = hp->next;
free(tofree->name);
free(tofree->value);
free(tofree);
}
free((*event));
#endif
*event = NULL; *event = NULL;
} }
@ -412,9 +397,6 @@ SWITCH_DECLARE(switch_status) switch_event_dup(switch_event **event, switch_even
if ((header = ALLOC(sizeof(*header))) == 0) { if ((header = ALLOC(sizeof(*header))) == 0) {
return SWITCH_STATUS_MEMERR; return SWITCH_STATUS_MEMERR;
} }
#ifdef MALLOC_EVENTS
memset(header, 0, sizeof(*header));
#endif
header->name = DUP(hp->name); header->name = DUP(hp->name);
header->value = DUP(hp->value); header->value = DUP(hp->value);
@ -520,13 +502,9 @@ SWITCH_DECLARE(switch_status) switch_event_fire_detailed(char *file, char *func,
(*event)->event_user_data = user_data; (*event)->event_user_data = user_data;
} }
switch_queue_push(EVENT_QUEUE, *event); switch_queue_push(EVENT_QUEUE[(*event)->priority], *event);
*event = NULL; *event = NULL;
#ifdef MALLOC_EVENTS
POOL_COUNT++;
#endif
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }

View File

@ -29,7 +29,25 @@
* switch_utils.c -- Compatability and Helper Code * switch_utils.c -- Compatability and Helper Code
* *
*/ */
#include <switch_utils.h> #include <switch.h>
SWITCH_DECLARE(char *) switch_priority_name(switch_priority_t priority)
{
switch(priority) { /*lol*/
case SWITCH_PRIORITY_NORMAL:
return "NORMAL";
break;
case SWITCH_PRIORITY_LOW:
return "LOW";
break;
case SWITCH_PRIORITY_HIGH:
return "HIGH";
break;
default:
return "INVALID";
break;
}
}
static char RFC2833_CHARS[] = "0123456789*#ABCDF"; static char RFC2833_CHARS[] = "0123456789*#ABCDF";