Merge "stasis: Add statistics gathering in developer mode." into 16

This commit is contained in:
Friendly Automation
2018-12-12 13:12:04 -06:00
committed by Gerrit Code Review
8 changed files with 848 additions and 14 deletions

View File

@@ -604,8 +604,14 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st
* has been subscribed. This occurs immediately before accepted message
* types can be set and the callback must expect to receive it.
*/
#ifdef AST_DEVMODE
struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
#define stasis_subscribe(topic, callback, data) __stasis_subscribe(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
#else
struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data);
#endif
/*!
* \brief Create a subscription whose callbacks occur on a thread pool
@@ -633,8 +639,14 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
* has been subscribed. This occurs immediately before accepted message
* types can be set and the callback must expect to receive it.
*/
#ifdef AST_DEVMODE
struct stasis_subscription *__stasis_subscribe_pool(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
#define stasis_subscribe_pool(topic, callback, data) __stasis_subscribe_pool(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
#else
struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data);
#endif
/*!
* \brief Indicate to a subscription that we are interested in a message type.

View File

@@ -60,11 +60,23 @@
* \return \c NULL on error.
* \since 12
*/
#ifdef AST_DEVMODE
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
int needs_mailbox,
int use_thread_pool,
const char *file,
int lineno,
const char *func);
#else
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
int needs_mailbox,
int use_thread_pool);
#endif
#endif /* STASIS_INTERNAL_H_ */

View File

@@ -55,8 +55,14 @@ struct stasis_message_router;
*
* \since 12
*/
#ifdef AST_DEVMODE
struct stasis_message_router *__stasis_message_router_create(
struct stasis_topic *topic, const char *file, int lineno, const char *func);
#define stasis_message_router_create(topic) __stasis_message_router_create(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__)
#else
struct stasis_message_router *stasis_message_router_create(
struct stasis_topic *topic);
#endif
/*!
* \brief Create a new message router object.
@@ -71,8 +77,14 @@ struct stasis_message_router *stasis_message_router_create(
*
* \since 12.8.0
*/
#ifdef AST_DEVMODE
struct stasis_message_router *__stasis_message_router_create_pool(
struct stasis_topic *topic, const char *file, int lineno, const char *func);
#define stasis_message_router_create_pool(topic) __stasis_message_router_create_pool(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__)
#else
struct stasis_message_router *stasis_message_router_create_pool(
struct stasis_topic *topic);
#endif
/*!
* \brief Unsubscribe the router from the upstream topic.

View File

@@ -4067,6 +4067,8 @@ static void asterisk_daemon(int isroot, const char *runuser, const char *rungrou
check_init(ast_tps_init(), "Task Processor Core");
check_init(ast_fd_init(), "File Descriptor Debugging");
check_init(ast_pbx_init(), "ast_pbx_init");
check_init(aco_init(), "Configuration Option Framework");
check_init(stasis_init(), "Stasis");
#ifdef TEST_FRAMEWORK
check_init(ast_test_init(), "Test Framework");
#endif
@@ -4079,9 +4081,7 @@ static void asterisk_daemon(int isroot, const char *runuser, const char *rungrou
check_init(ast_format_init(), "Formats");
check_init(ast_format_cache_init(), "Format Cache");
check_init(ast_codec_builtin_init(), "Built-in Codecs");
check_init(aco_init(), "Configuration Option Framework");
check_init(ast_bucket_init(), "Bucket API");
check_init(stasis_init(), "Stasis");
check_init(ast_stasis_system_init(), "Stasis system-level information");
check_init(ast_endpoint_stasis_init(), "Stasis Endpoint");

View File

@@ -27,6 +27,7 @@
LINKER_SYMBOL_PREFIXstrsep;
LINKER_SYMBOL_PREFIXsetenv;
LINKER_SYMBOL_PREFIXstasis_*;
LINKER_SYMBOL_PREFIX__stasis_*;
LINKER_SYMBOL_PREFIXunsetenv;
LINKER_SYMBOL_PREFIXstrcasestr;
LINKER_SYMBOL_PREFIXstrnlen;

View File

@@ -41,6 +41,9 @@
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/config_options.h"
#ifdef AST_DEVMODE
#include "asterisk/cli.h"
#endif
/*** DOCUMENTATION
<managerEvent language="en_US" name="UserEvent">
@@ -304,14 +307,67 @@ static struct ast_threadpool *pool;
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
#ifdef AST_DEVMODE
/*! The number of buckets to use for topic statistics */
#define TOPIC_STATISTICS_BUCKETS 57
/*! The number of buckets to use for subscription statistics */
#define SUBSCRIPTION_STATISTICS_BUCKETS 57
/*! Container which stores statistics for topics */
static struct ao2_container *topic_statistics;
/*! Container which stores statistics for subscriptions */
static struct ao2_container *subscription_statistics;
/*! \internal */
struct stasis_message_type_statistics {
/*! \brief The number of messages of this published */
int published;
/*! \brief The number of messages of this that did not reach a subscriber */
int unused;
/*! \brief The stasis message type */
struct stasis_message_type *message_type;
};
/*! Lock to protect the message types vector */
AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
/*! Vector containing message type information */
static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
/*! \internal */
struct stasis_topic_statistics {
/*! \brief The number of messages that were not dispatched to any subscriber */
int messages_not_dispatched;
/*! \brief The number of messages that were dispatched to at least 1 subscriber */
int messages_dispatched;
/*! \brief Highest time spent dispatching messages to subscribers */
int64_t highest_time_dispatched;
/*! \brief Lowest time spent dispatching messages to subscribers */
int64_t lowest_time_dispatched;
/*! \brief The number of subscribers to this topic */
int subscriber_count;
/*! \brief Name of the topic */
char name[0];
};
#endif
/*! \internal */
struct stasis_topic {
char *name;
/*! Variable length array of the subscribers */
AST_VECTOR(, struct stasis_subscription *) subscribers;
/*! Topics forwarding into this topic */
AST_VECTOR(, struct stasis_topic *) upstream_topics;
#ifdef AST_DEVMODE
struct stasis_topic_statistics *statistics;
#endif
/*! Name of the topic */
char name[0];
};
/* Forward declarations for the tightly-coupled subscription object */
@@ -337,28 +393,54 @@ static void topic_dtor(void *obj)
* unsubscribed before we get here. */
ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
ast_free(topic->name);
topic->name = NULL;
AST_VECTOR_FREE(&topic->subscribers);
AST_VECTOR_FREE(&topic->upstream_topics);
#ifdef AST_DEVMODE
if (topic->statistics) {
ao2_unlink(topic_statistics, topic->statistics);
ao2_ref(topic->statistics, -1);
}
#endif
}
#ifdef AST_DEVMODE
static struct stasis_topic_statistics *stasis_topic_statistics_create(const char *name)
{
struct stasis_topic_statistics *statistics;
statistics = ao2_alloc(sizeof(*statistics) + strlen(name) + 1, NULL);
if (!statistics) {
return NULL;
}
strcpy(statistics->name, name); /* SAFE */
ao2_link(topic_statistics, statistics);
return statistics;
}
#endif
struct stasis_topic *stasis_topic_create(const char *name)
{
struct stasis_topic *topic;
int res = 0;
topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
topic = ao2_t_alloc(sizeof(*topic) + strlen(name) + 1, topic_dtor, name);
if (!topic) {
return NULL;
}
topic->name = ast_strdup(name);
strcpy(topic->name, name); /* SAFE */
res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
#ifdef AST_DEVMODE
topic->statistics = stasis_topic_statistics_create(name);
if (!topic->name || !topic->statistics || res) {
#else
if (!topic->name || res) {
ao2_cleanup(topic);
#endif
ao2_ref(topic, -1);
return NULL;
}
@@ -375,6 +457,35 @@ size_t stasis_topic_subscribers(const struct stasis_topic *topic)
return AST_VECTOR_SIZE(&topic->subscribers);
}
#ifdef AST_DEVMODE
struct stasis_subscription_statistics {
/*! \brief The filename where the subscription originates */
const char *file;
/*! \brief The line number where the subscription originates */
int lineno;
/*! \brief The function where the subscription originates */
const char *func;
/*! \brief The number of messages that were filtered out */
int messages_dropped;
/*! \brief The number of messages that passed filtering */
int messages_passed;
/*! \brief Highest time spent invoking a message */
int64_t highest_time_invoked;
/*! \brief The message type that currently took the longest to process */
struct stasis_message_type *highest_time_message_type;
/*! \brief Lowest time spent invoking a message */
int64_t lowest_time_invoked;
/*! \brief Using a mailbox to queue messages */
int uses_mailbox;
/*! \brief Using stasis threadpool for handling messages */
int uses_threadpool;
/*! \brief Name of the topic we subscribed to */
char *topic;
/*! \brief Unique ID of the subscription */
char uniqueid[0];
};
#endif
/*! \internal */
struct stasis_subscription {
/*! Unique ID for this subscription */
@@ -403,6 +514,11 @@ struct stasis_subscription {
enum stasis_subscription_message_formatters accepted_formatters;
/*! The message filter currently in use */
enum stasis_subscription_message_filter filter;
#ifdef AST_DEVMODE
/*! Statistics information */
struct stasis_subscription_statistics *statistics;
#endif
};
static void subscription_dtor(void *obj)
@@ -423,6 +539,13 @@ static void subscription_dtor(void *obj)
ast_cond_destroy(&sub->join_cond);
AST_VECTOR_FREE(&sub->accepted_message_types);
#ifdef AST_DEVMODE
if (sub->statistics) {
ao2_unlink(subscription_statistics, sub->statistics);
ao2_ref(sub->statistics, -1);
}
#endif
}
/*!
@@ -436,6 +559,12 @@ static void subscription_invoke(struct stasis_subscription *sub,
{
unsigned int final = stasis_subscription_final_message(sub, message);
int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
#ifdef AST_DEVMODE
struct timeval start;
int elapsed;
start = ast_tvnow();
#endif
/* Notify that the final message has been received */
if (final) {
@@ -462,6 +591,19 @@ static void subscription_invoke(struct stasis_subscription *sub,
ast_cond_signal(&sub->join_cond);
ao2_unlock(sub);
}
#ifdef AST_DEVMODE
elapsed = ast_tvdiff_ms(ast_tvnow(), start);
if (elapsed > sub->statistics->highest_time_invoked) {
sub->statistics->highest_time_invoked = elapsed;
ao2_lock(sub->statistics);
sub->statistics->highest_time_message_type = stasis_message_type(message);
ao2_unlock(sub->statistics);
}
if (elapsed < sub->statistics->lowest_time_invoked) {
sub->statistics->lowest_time_invoked = elapsed;
}
#endif
}
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
@@ -471,12 +613,51 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st
{
}
#ifdef AST_DEVMODE
static struct stasis_subscription_statistics *stasis_subscription_statistics_create(const char *uniqueid,
const char *topic, int needs_mailbox, int use_thread_pool, const char *file, int lineno,
const char *func)
{
struct stasis_subscription_statistics *statistics;
size_t uniqueid_len = strlen(uniqueid) + 1;
statistics = ao2_alloc(sizeof(*statistics) + uniqueid_len + strlen(topic) + 1, NULL);
if (!statistics) {
return NULL;
}
statistics->file = file;
statistics->lineno = lineno;
statistics->func = func;
statistics->uses_mailbox = needs_mailbox;
statistics->uses_threadpool = use_thread_pool;
strcpy(statistics->uniqueid, uniqueid); /* SAFE */
statistics->topic = statistics->uniqueid + uniqueid_len;
strcpy(statistics->topic, topic); /* SAFE */
ao2_link(subscription_statistics, statistics);
return statistics;
}
#endif
#ifdef AST_DEVMODE
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
int needs_mailbox,
int use_thread_pool,
const char *file,
int lineno,
const char *func)
#else
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
int needs_mailbox,
int use_thread_pool)
#endif
{
struct stasis_subscription *sub;
@@ -491,6 +672,15 @@ struct stasis_subscription *internal_stasis_subscribe(
}
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
#ifdef AST_DEVMODE
sub->statistics = stasis_subscription_statistics_create(sub->uniqueid, topic->name, needs_mailbox,
use_thread_pool, file, lineno, func);
if (!sub->statistics) {
ao2_ref(sub, -1);
return NULL;
}
#endif
if (needs_mailbox) {
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
@@ -538,6 +728,18 @@ struct stasis_subscription *internal_stasis_subscribe(
return sub;
}
#ifdef AST_DEVMODE
struct stasis_subscription *__stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
const char *file,
int lineno,
const char *func)
{
return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
}
#else
struct stasis_subscription *stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
@@ -545,7 +747,20 @@ struct stasis_subscription *stasis_subscribe(
{
return internal_stasis_subscribe(topic, callback, data, 1, 0);
}
#endif
#ifdef AST_DEVMODE
struct stasis_subscription *__stasis_subscribe_pool(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
const char *file,
int lineno,
const char *func)
{
return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
}
#else
struct stasis_subscription *stasis_subscribe_pool(
struct stasis_topic *topic,
stasis_subscription_cb callback,
@@ -553,6 +768,7 @@ struct stasis_subscription *stasis_subscribe_pool(
{
return internal_stasis_subscribe(topic, callback, data, 1, 1);
}
#endif
static int sub_cleanup(void *data)
{
@@ -808,6 +1024,11 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
topic_add_subscription(
AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
}
#ifdef AST_DEVMODE
topic->statistics->subscriber_count += 1;
#endif
ao2_unlock(topic);
return 0;
@@ -825,6 +1046,13 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s
}
res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
AST_VECTOR_ELEM_CLEANUP_NOOP);
#ifdef AST_DEVMODE
if (!res) {
topic->statistics->subscriber_count -= 1;
}
#endif
ao2_unlock(topic);
return res;
@@ -885,8 +1113,10 @@ static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
* \param message The message to send
* \param synchronous If non-zero, synchronize on the subscriber receiving
* the message
* \retval 0 if message was not dispatched
* \retval 1 if message was dispatched
*/
static void dispatch_message(struct stasis_subscription *sub,
static unsigned int dispatch_message(struct stasis_subscription *sub,
struct stasis_message *message,
int synchronous)
{
@@ -938,14 +1168,22 @@ static void dispatch_message(struct stasis_subscription *sub,
break;
}
return;
#ifdef AST_DEVMODE
ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
#endif
return 0;
} while (0);
#ifdef AST_DEVMODE
ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
#endif
if (!sub->mailbox) {
/* Dispatch directly */
subscription_invoke(sub, message);
return;
return 1;
}
/* Bump the message for the taskprocessor push. This will get de-ref'd
@@ -957,6 +1195,7 @@ static void dispatch_message(struct stasis_subscription *sub,
/* Push failed; ugh. */
ast_log(LOG_ERROR, "Dropping async dispatch\n");
ao2_cleanup(message);
return 0;
}
} else {
struct sync_task_data std;
@@ -972,7 +1211,7 @@ static void dispatch_message(struct stasis_subscription *sub,
ao2_cleanup(message);
ast_mutex_destroy(&std.lock);
ast_cond_destroy(&std.cond);
return;
return 0;
}
ast_mutex_lock(&std.lock);
@@ -984,6 +1223,8 @@ static void dispatch_message(struct stasis_subscription *sub,
ast_mutex_destroy(&std.lock);
ast_cond_destroy(&std.cond);
}
return 1;
}
/*!
@@ -997,12 +1238,41 @@ static void publish_msg(struct stasis_topic *topic,
struct stasis_message *message, struct stasis_subscription *sync_sub)
{
size_t i;
unsigned int dispatched = 0;
#ifdef AST_DEVMODE
int message_type_id = stasis_message_type_id(stasis_message_type(message));
struct stasis_message_type_statistics *statistics;
struct timeval start;
int elapsed;
#endif
ast_assert(topic != NULL);
ast_assert(message != NULL);
#ifdef AST_DEVMODE
ast_mutex_lock(&message_type_statistics_lock);
if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
struct stasis_message_type_statistics new_statistics = {
.published = 0,
};
if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
ast_mutex_unlock(&message_type_statistics_lock);
return;
}
}
statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
statistics->message_type = stasis_message_type(message);
ast_mutex_unlock(&message_type_statistics_lock);
ast_atomic_fetchadd_int(&statistics->published, +1);
#endif
/* If there are no subscribers don't bother */
if (!stasis_topic_subscribers(topic)) {
#ifdef AST_DEVMODE
ast_atomic_fetchadd_int(&statistics->unused, +1);
ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
#endif
return;
}
@@ -1011,15 +1281,35 @@ static void publish_msg(struct stasis_topic *topic,
* Make sure we hold onto a reference while dispatching.
*/
ao2_ref(topic, +1);
#ifdef AST_DEVMODE
start = ast_tvnow();
#endif
ao2_lock(topic);
for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
ast_assert(sub != NULL);
dispatch_message(sub, message, (sub == sync_sub));
dispatched += dispatch_message(sub, message, (sub == sync_sub));
}
ao2_unlock(topic);
#ifdef AST_DEVMODE
elapsed = ast_tvdiff_ms(ast_tvnow(), start);
if (elapsed > topic->statistics->highest_time_dispatched) {
topic->statistics->highest_time_dispatched = elapsed;
}
if (elapsed < topic->statistics->lowest_time_dispatched) {
topic->statistics->lowest_time_dispatched = elapsed;
}
if (dispatched) {
ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
} else {
ast_atomic_fetchadd_int(&statistics->unused, +1);
ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
}
#endif
ao2_ref(topic, -1);
}
@@ -1805,9 +2095,458 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
/*! @} */
#ifdef AST_DEVMODE
/*!
* \internal
* \brief CLI command implementation for 'stasis statistics show subscriptions'
*/
static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct ao2_iterator iter;
struct stasis_subscription_statistics *statistics;
int count = 0;
int dropped = 0;
int passed = 0;
#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
#define FMT_FIELDS2 "%-64s %10d %10d\n"
switch (cmd) {
case CLI_INIT:
e->command = "stasis statistics show subscriptions";
e->usage =
"Usage: stasis statistics show subscriptions\n"
" Shows a list of subscriptions and their general statistics\n";
return NULL;
case CLI_GENERATE:
return NULL;
}
if (a->argc != e->args) {
return CLI_SHOWUSAGE;
}
ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
iter = ao2_iterator_init(subscription_statistics, 0);
while ((statistics = ao2_iterator_next(&iter))) {
ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
statistics->lowest_time_invoked, statistics->highest_time_invoked);
dropped += statistics->messages_dropped;
passed += statistics->messages_passed;
ao2_ref(statistics, -1);
++count;
}
ao2_iterator_destroy(&iter);
ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
ast_cli(a->fd, "\n%d subscriptions\n\n", count);
#undef FMT_HEADERS
#undef FMT_FIELDS
#undef FMT_FIELDS2
return CLI_SUCCESS;
}
/*!
* \internal
* \brief CLI tab completion for subscription statistics names
*/
static char *subscription_statistics_complete_name(const char *word, int state)
{
struct stasis_subscription_statistics *statistics;
struct ao2_iterator it_statistics;
int wordlen = strlen(word);
int which = 0;
char *result = NULL;
it_statistics = ao2_iterator_init(subscription_statistics, 0);
while ((statistics = ao2_iterator_next(&it_statistics))) {
if (!strncasecmp(word, statistics->uniqueid, wordlen)
&& ++which > state) {
result = ast_strdup(statistics->uniqueid);
}
ao2_ref(statistics, -1);
if (result) {
break;
}
}
ao2_iterator_destroy(&it_statistics);
return result;
}
/*!
* \internal
* \brief CLI command implementation for 'stasis statistics show subscription'
*/
static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct stasis_subscription_statistics *statistics;
switch (cmd) {
case CLI_INIT:
e->command = "stasis statistics show subscription";
e->usage =
"Usage: stasis statistics show subscription <uniqueid>\n"
" Show stasis subscription statistics.\n";
return NULL;
case CLI_GENERATE:
if (a->pos == 4) {
return subscription_statistics_complete_name(a->word, a->n);
} else {
return NULL;
}
}
if (a->argc != 5) {
return CLI_SHOWUSAGE;
}
statistics = ao2_find(subscription_statistics, a->argv[4], OBJ_SEARCH_KEY);
if (!statistics) {
ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
return CLI_FAILURE;
}
ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
ast_cli(a->fd, "Topic: %s\n", statistics->topic);
ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
ao2_lock(statistics);
if (statistics->highest_time_message_type) {
ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
}
ao2_unlock(statistics);
ao2_ref(statistics, -1);
return CLI_SUCCESS;
}
/*!
* \internal
* \brief CLI command implementation for 'stasis statistics show topics'
*/
static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct ao2_iterator iter;
struct stasis_topic_statistics *statistics;
int count = 0;
int not_dispatched = 0;
int dispatched = 0;
#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
#define FMT_FIELDS2 "%-64s %10d %10d\n"
switch (cmd) {
case CLI_INIT:
e->command = "stasis statistics show topics";
e->usage =
"Usage: stasis statistics show topics\n"
" Shows a list of topics and their general statistics\n";
return NULL;
case CLI_GENERATE:
return NULL;
}
if (a->argc != e->args) {
return CLI_SHOWUSAGE;
}
ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
iter = ao2_iterator_init(topic_statistics, 0);
while ((statistics = ao2_iterator_next(&iter))) {
ast_cli(a->fd, FMT_FIELDS, statistics->name, statistics->messages_not_dispatched, statistics->messages_dispatched,
statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
not_dispatched += statistics->messages_not_dispatched;
dispatched += statistics->messages_dispatched;
ao2_ref(statistics, -1);
++count;
}
ao2_iterator_destroy(&iter);
ast_cli(a->fd, FMT_FIELDS2, "Total", not_dispatched, dispatched);
ast_cli(a->fd, "\n%d topics\n\n", count);
#undef FMT_HEADERS
#undef FMT_FIELDS
#undef FMT_FIELDS2
return CLI_SUCCESS;
}
/*!
* \internal
* \brief CLI tab completion for topic statistics names
*/
static char *topic_statistics_complete_name(const char *word, int state)
{
struct stasis_topic_statistics *statistics;
struct ao2_iterator it_statistics;
int wordlen = strlen(word);
int which = 0;
char *result = NULL;
it_statistics = ao2_iterator_init(topic_statistics, 0);
while ((statistics = ao2_iterator_next(&it_statistics))) {
if (!strncasecmp(word, statistics->name, wordlen)
&& ++which > state) {
result = ast_strdup(statistics->name);
}
ao2_ref(statistics, -1);
if (result) {
break;
}
}
ao2_iterator_destroy(&it_statistics);
return result;
}
/*!
* \internal
* \brief CLI command implementation for 'stasis statistics show topic'
*/
static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct stasis_topic_statistics *statistics;
switch (cmd) {
case CLI_INIT:
e->command = "stasis statistics show topic";
e->usage =
"Usage: stasis statistics show topic <name>\n"
" Show stasis topic statistics.\n";
return NULL;
case CLI_GENERATE:
if (a->pos == 4) {
return topic_statistics_complete_name(a->word, a->n);
} else {
return NULL;
}
}
if (a->argc != 5) {
return CLI_SHOWUSAGE;
}
statistics = ao2_find(topic_statistics, a->argv[4], OBJ_SEARCH_KEY);
if (!statistics) {
ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
return CLI_FAILURE;
}
ast_cli(a->fd, "Topic: %s\n", statistics->name);
ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
ast_cli(a->fd, "Number of subscribers: %d\n", statistics->subscriber_count);
ao2_ref(statistics, -1);
return CLI_SUCCESS;
}
/*!
* \internal
* \brief CLI command implementation for 'stasis statistics show messages'
*/
static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
int i;
int count = 0;
int published = 0;
int unused = 0;
#define FMT_HEADERS "%-64s %10s %10s\n"
#define FMT_FIELDS "%-64s %10d %10d\n"
switch (cmd) {
case CLI_INIT:
e->command = "stasis statistics show messages";
e->usage =
"Usage: stasis statistics show messages\n"
" Shows a list of message types and their general statistics\n";
return NULL;
case CLI_GENERATE:
return NULL;
}
if (a->argc != e->args) {
return CLI_SHOWUSAGE;
}
ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
ast_mutex_lock(&message_type_statistics_lock);
for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
if (!statistics->message_type) {
continue;
}
ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
statistics->unused);
published += statistics->published;
unused += statistics->unused;
++count;
}
ast_mutex_unlock(&message_type_statistics_lock);
ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
ast_cli(a->fd, "\n%d seen message types\n\n", count);
#undef FMT_HEADERS
#undef FMT_FIELDS
return CLI_SUCCESS;
}
static struct ast_cli_entry cli_stasis_statistics[] = {
AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
};
static int subscription_statistics_hash(const void *obj, const int flags)
{
const struct stasis_subscription_statistics *object;
const char *key;
switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_KEY:
key = obj;
break;
case OBJ_SEARCH_OBJECT:
object = obj;
key = object->uniqueid;
break;
default:
/* Hash can only work on something with a full key. */
ast_assert(0);
return 0;
}
return ast_str_case_hash(key);
}
static int subscription_statistics_cmp(void *obj, void *arg, int flags)
{
const struct stasis_subscription_statistics *object_left = obj;
const struct stasis_subscription_statistics *object_right = arg;
const char *right_key = arg;
int cmp;
switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_OBJECT:
right_key = object_right->uniqueid;
/* Fall through */
case OBJ_SEARCH_KEY:
cmp = strcasecmp(object_left->uniqueid, right_key);
break;
case OBJ_SEARCH_PARTIAL_KEY:
/* Not supported by container */
ast_assert(0);
cmp = -1;
break;
default:
/*
* What arg points to is specific to this traversal callback
* and has no special meaning to astobj2.
*/
cmp = 0;
break;
}
if (cmp) {
return 0;
}
/*
* At this point the traversal callback is identical to a sorted
* container.
*/
return CMP_MATCH;
}
static int topic_statistics_hash(const void *obj, const int flags)
{
const struct stasis_topic_statistics *object;
const char *key;
switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_KEY:
key = obj;
break;
case OBJ_SEARCH_OBJECT:
object = obj;
key = object->name;
break;
default:
/* Hash can only work on something with a full key. */
ast_assert(0);
return 0;
}
return ast_str_case_hash(key);
}
static int topic_statistics_cmp(void *obj, void *arg, int flags)
{
const struct stasis_topic_statistics *object_left = obj;
const struct stasis_topic_statistics *object_right = arg;
const char *right_key = arg;
int cmp;
switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_OBJECT:
right_key = object_right->name;
/* Fall through */
case OBJ_SEARCH_KEY:
cmp = strcasecmp(object_left->name, right_key);
break;
case OBJ_SEARCH_PARTIAL_KEY:
/* Not supported by container */
ast_assert(0);
cmp = -1;
break;
default:
/*
* What arg points to is specific to this traversal callback
* and has no special meaning to astobj2.
*/
cmp = 0;
break;
}
if (cmp) {
return 0;
}
/*
* At this point the traversal callback is identical to a sorted
* container.
*/
return CMP_MATCH;
}
#endif
/*! \brief Cleanup function for graceful shutdowns */
static void stasis_cleanup(void)
{
#ifdef AST_DEVMODE
ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
AST_VECTOR_FREE(&message_type_statistics);
ao2_cleanup(subscription_statistics);
ao2_cleanup(topic_statistics);
#endif
ast_threadpool_shutdown(pool);
pool = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
@@ -1902,5 +2641,28 @@ int stasis_init(void)
return -1;
}
#ifdef AST_DEVMODE
/* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
* topic or subscripton.
*/
subscription_statistics = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
subscription_statistics_hash, 0, subscription_statistics_cmp);
if (!subscription_statistics) {
return -1;
}
topic_statistics = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
topic_statistics_hash, 0, topic_statistics_cmp);
if (!topic_statistics) {
return -1;
}
AST_VECTOR_INIT(&message_type_statistics, 0);
if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
return -1;
}
#endif
return 0;
}

View File

@@ -971,7 +971,11 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or
}
ast_free(new_name);
#ifdef AST_DEVMODE
caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
#else
caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
#endif
if (caching_topic->sub == NULL) {
ao2_ref(caching_topic, -1);

View File

@@ -204,8 +204,14 @@ static void router_dispatch(void *data,
}
}
#ifdef AST_DEVMODE
static struct stasis_message_router *stasis_message_router_create_internal(
struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno,
const char *func)
#else
static struct stasis_message_router *stasis_message_router_create_internal(
struct stasis_topic *topic, int use_thread_pool)
#endif
{
int res;
struct stasis_message_router *router;
@@ -224,11 +230,20 @@ static struct stasis_message_router *stasis_message_router_create_internal(
return NULL;
}
#ifdef AST_DEVMODE
if (use_thread_pool) {
router->subscription = __stasis_subscribe_pool(topic, router_dispatch, router, file, lineno, func);
} else {
router->subscription = __stasis_subscribe(topic, router_dispatch, router, file, lineno, func);
}
#else
if (use_thread_pool) {
router->subscription = stasis_subscribe_pool(topic, router_dispatch, router);
} else {
router->subscription = stasis_subscribe(topic, router_dispatch, router);
}
#endif
if (!router->subscription) {
ao2_ref(router, -1);
@@ -241,17 +256,33 @@ static struct stasis_message_router *stasis_message_router_create_internal(
return router;
}
#ifdef AST_DEVMODE
struct stasis_message_router *__stasis_message_router_create(
struct stasis_topic *topic, const char *file, int lineno, const char *func)
{
return stasis_message_router_create_internal(topic, 0, file, lineno, func);
}
#else
struct stasis_message_router *stasis_message_router_create(
struct stasis_topic *topic)
{
return stasis_message_router_create_internal(topic, 0);
}
#endif
#ifdef AST_DEVMODE
struct stasis_message_router *__stasis_message_router_create_pool(
struct stasis_topic *topic, const char *file, int lineno, const char *func)
{
return stasis_message_router_create_internal(topic, 1, file, lineno, func);
}
#else
struct stasis_message_router *stasis_message_router_create_pool(
struct stasis_topic *topic)
{
return stasis_message_router_create_internal(topic, 1);
}
#endif
void stasis_message_router_unsubscribe(struct stasis_message_router *router)
{