mirror of
https://github.com/asterisk/asterisk.git
synced 2025-10-13 00:04:53 +00:00
Merge topic 'update_taskprocessor_commands' into 13
* changes: Sorcery: Create human friendly serializer names. Stasis: Create human friendly taskprocessor/serializer names. taskprocessor.c: New API for human friendly taskprocessor names. taskprocessor.c: Sort CLI "core show taskprocessors" output.
This commit is contained in:
@@ -56,6 +56,9 @@
|
||||
|
||||
struct ast_taskprocessor;
|
||||
|
||||
/*! \brief Suggested maximum taskprocessor name length (less null terminator). */
|
||||
#define AST_TASKPROCESSOR_MAX_NAME 45
|
||||
|
||||
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL 500
|
||||
|
||||
/*!
|
||||
@@ -258,6 +261,30 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps);
|
||||
*/
|
||||
int ast_taskprocessor_is_task(struct ast_taskprocessor *tps);
|
||||
|
||||
/*!
|
||||
* \brief Get the next sequence number to create a human friendly taskprocessor name.
|
||||
* \since 13.8.0
|
||||
*
|
||||
* \return Sequence number for use in creating human friendly taskprocessor names.
|
||||
*/
|
||||
unsigned int ast_taskprocessor_seq_num(void);
|
||||
|
||||
/*!
|
||||
* \brief Build a taskprocessor name with a sequence number on the end.
|
||||
* \since 13.8.0
|
||||
*
|
||||
* \param buf Where to put the built taskprocessor name.
|
||||
* \param size How large is buf including null terminator.
|
||||
* \param format printf format to create the non-sequenced part of the name.
|
||||
*
|
||||
* \note The user supplied part of the taskprocessor name is truncated
|
||||
* to allow the full sequence number to be appended within the supplied
|
||||
* buffer size.
|
||||
*
|
||||
* \return Nothing
|
||||
*/
|
||||
void __attribute__((format(printf, 3, 4))) ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...);
|
||||
|
||||
/*!
|
||||
* \brief Return the name of the taskprocessor singleton
|
||||
* \since 1.6.1
|
||||
|
@@ -820,7 +820,7 @@ static struct ast_sorcery_object_type *sorcery_object_type_alloc(const char *typ
|
||||
{
|
||||
#define INITIAL_WIZARD_VECTOR_SIZE 5
|
||||
struct ast_sorcery_object_type *object_type;
|
||||
char uuid[AST_UUID_STR_LEN];
|
||||
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
|
||||
|
||||
if (!(object_type = ao2_alloc(sizeof(*object_type), sorcery_object_type_destructor))) {
|
||||
return NULL;
|
||||
@@ -853,12 +853,10 @@ static struct ast_sorcery_object_type *sorcery_object_type_alloc(const char *typ
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (!ast_uuid_generate_str(uuid, sizeof(uuid))) {
|
||||
ao2_ref(object_type, -1);
|
||||
return NULL;
|
||||
}
|
||||
/* Create name with seq number appended. */
|
||||
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sorcery/%s", type);
|
||||
|
||||
if (!(object_type->serializer = ast_threadpool_serializer(uuid, threadpool))) {
|
||||
if (!(object_type->serializer = ast_threadpool_serializer(tps_name, threadpool))) {
|
||||
ao2_ref(object_type, -1);
|
||||
return NULL;
|
||||
}
|
||||
|
@@ -462,22 +462,29 @@ struct stasis_subscription *internal_stasis_subscribe(
|
||||
}
|
||||
|
||||
/* The ao2 lock is used for join_cond. */
|
||||
sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, topic->name);
|
||||
sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
|
||||
if (!sub) {
|
||||
return NULL;
|
||||
}
|
||||
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
|
||||
|
||||
if (needs_mailbox) {
|
||||
/* With a small number of subscribers, a thread-per-sub is
|
||||
* acceptable. For larger number of subscribers, a thread
|
||||
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
|
||||
|
||||
/* Create name with seq number appended. */
|
||||
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
|
||||
use_thread_pool ? 'p' : 'm',
|
||||
stasis_topic_name(topic));
|
||||
|
||||
/*
|
||||
* With a small number of subscribers, a thread-per-sub is
|
||||
* acceptable. For a large number of subscribers, a thread
|
||||
* pool should be used.
|
||||
*/
|
||||
if (use_thread_pool) {
|
||||
sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
|
||||
sub->mailbox = ast_threadpool_serializer(tps_name, pool);
|
||||
} else {
|
||||
sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
|
||||
TPS_REF_DEFAULT);
|
||||
sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
|
||||
}
|
||||
if (!sub->mailbox) {
|
||||
return NULL;
|
||||
|
@@ -413,6 +413,44 @@ static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args
|
||||
return CLI_SUCCESS;
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Taskprocessor ao2 container sort function.
|
||||
* \since 13.8.0
|
||||
*
|
||||
* \param obj_left pointer to the (user-defined part) of an object.
|
||||
* \param obj_right pointer to the (user-defined part) of an object.
|
||||
* \param flags flags from ao2_callback()
|
||||
* OBJ_SEARCH_OBJECT - if set, 'obj_right', is an object.
|
||||
* OBJ_SEARCH_KEY - if set, 'obj_right', is a search key item that is not an object.
|
||||
* OBJ_SEARCH_PARTIAL_KEY - if set, 'obj_right', is a partial search key item that is not an object.
|
||||
*
|
||||
* \retval <0 if obj_left < obj_right
|
||||
* \retval =0 if obj_left == obj_right
|
||||
* \retval >0 if obj_left > obj_right
|
||||
*/
|
||||
static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
|
||||
{
|
||||
const struct ast_taskprocessor *tps_left = obj_left;
|
||||
const struct ast_taskprocessor *tps_right = obj_right;
|
||||
const char *right_key = obj_right;
|
||||
int cmp;
|
||||
|
||||
switch (flags & OBJ_SEARCH_MASK) {
|
||||
default:
|
||||
case OBJ_SEARCH_OBJECT:
|
||||
right_key = tps_right->name;
|
||||
/* Fall through */
|
||||
case OBJ_SEARCH_KEY:
|
||||
cmp = strcasecmp(tps_left->name, right_key);
|
||||
break;
|
||||
case OBJ_SEARCH_PARTIAL_KEY:
|
||||
cmp = strncasecmp(tps_left->name, right_key, strlen(right_key));
|
||||
break;
|
||||
}
|
||||
return cmp;
|
||||
}
|
||||
|
||||
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
|
||||
{
|
||||
char name[256];
|
||||
@@ -420,7 +458,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
|
||||
unsigned long qsize;
|
||||
unsigned long maxqsize;
|
||||
unsigned long processed;
|
||||
struct ast_taskprocessor *p;
|
||||
struct ao2_container *sorted_tps;
|
||||
struct ast_taskprocessor *tps;
|
||||
struct ao2_iterator iter;
|
||||
#define FMT_HEADERS "%-45s %10s %10s %10s\n"
|
||||
#define FMT_FIELDS "%-45s %10lu %10lu %10lu\n"
|
||||
@@ -436,28 +475,38 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (a->argc != e->args)
|
||||
if (a->argc != e->args) {
|
||||
return CLI_SHOWUSAGE;
|
||||
}
|
||||
|
||||
sorted_tps = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, tps_sort_cb,
|
||||
NULL);
|
||||
if (!sorted_tps
|
||||
|| ao2_container_dup(sorted_tps, tps_singletons, 0)) {
|
||||
ao2_cleanup(sorted_tps);
|
||||
return CLI_FAILURE;
|
||||
}
|
||||
|
||||
ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth");
|
||||
tcount = 0;
|
||||
iter = ao2_iterator_init(tps_singletons, 0);
|
||||
while ((p = ao2_iterator_next(&iter))) {
|
||||
ast_copy_string(name, p->name, sizeof(name));
|
||||
qsize = p->tps_queue_size;
|
||||
if (p->stats) {
|
||||
maxqsize = p->stats->max_qsize;
|
||||
processed = p->stats->_tasks_processed_count;
|
||||
iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
|
||||
while ((tps = ao2_iterator_next(&iter))) {
|
||||
ast_copy_string(name, tps->name, sizeof(name));
|
||||
qsize = tps->tps_queue_size;
|
||||
if (tps->stats) {
|
||||
maxqsize = tps->stats->max_qsize;
|
||||
processed = tps->stats->_tasks_processed_count;
|
||||
} else {
|
||||
maxqsize = 0;
|
||||
processed = 0;
|
||||
}
|
||||
ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize);
|
||||
ast_taskprocessor_unreference(p);
|
||||
ast_taskprocessor_unreference(tps);
|
||||
++tcount;
|
||||
}
|
||||
ao2_iterator_destroy(&iter);
|
||||
ast_cli(a->fd, "\n%d taskprocessors\n\n", tcount);
|
||||
ao2_ref(sorted_tps, -1);
|
||||
return CLI_SUCCESS;
|
||||
}
|
||||
|
||||
@@ -830,3 +879,37 @@ int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
|
||||
ao2_unlock(tps);
|
||||
return is_task;
|
||||
}
|
||||
|
||||
unsigned int ast_taskprocessor_seq_num(void)
|
||||
{
|
||||
static int seq_num;
|
||||
|
||||
return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
|
||||
}
|
||||
|
||||
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...)
|
||||
{
|
||||
va_list ap;
|
||||
int user_size;
|
||||
#define SEQ_STR_SIZE (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */
|
||||
|
||||
ast_assert(buf != NULL);
|
||||
ast_assert(SEQ_STR_SIZE <= size);
|
||||
|
||||
va_start(ap, format);
|
||||
user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
|
||||
va_end(ap);
|
||||
if (user_size < 0) {
|
||||
/*
|
||||
* Wow! We got an output error to a memory buffer.
|
||||
* Assume no user part of name written.
|
||||
*/
|
||||
user_size = 0;
|
||||
} else if (size < user_size + SEQ_STR_SIZE) {
|
||||
/* Truncate user part of name to make sequence number fit. */
|
||||
user_size = size - SEQ_STR_SIZE;
|
||||
}
|
||||
|
||||
/* Append sequence number to end of user name. */
|
||||
snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
|
||||
}
|
||||
|
Reference in New Issue
Block a user