stasis: Segment channel snapshot to reduce creation cost.

When a channel snapshot was created it used to be done
from scratch, copying all data (many strings). This incurs
a cost when doing so.

This change segments the channel snapshot into different
components which can be reused if unchanged from the
previous snapshot creation, reducing the cost. In normal
cases this results in some pointers being copied with
reference count being bumped, some integers being set,
and a string or two copied. The other benefit is that it
is now possible to determine if a channel snapshot update
is redundant and thus stop it before a message is published
to stasis.

The specific segments in the channel snapshot were split up
based on whether they are changed together, how often they
are changed, and their general grouping. In practice only
1 (or 0) of the segments actually get changed in normal
operation.

Invalidation is done by setting a flag on the channel when
the segment source is changed, forcing creation of a new
segment when the channel snapshot is created.

ASTERISK-28119

Change-Id: I5d7ef3df963a88ac47bc187d73c5225c315f8423
This commit is contained in:
Joshua Colp
2018-11-07 13:18:34 -04:00
parent d0ccbb3377
commit 50ac85cb40
32 changed files with 878 additions and 470 deletions

View File

@@ -5890,13 +5890,13 @@ static void queue_agent_cb(void *userdata, struct stasis_subscription *sub,
agent_blob = stasis_message_data(msg);
if (ast_channel_agent_login_type() == stasis_message_type(msg)) {
ast_queue_log("NONE", agent_blob->snapshot->uniqueid,
ast_queue_log("NONE", agent_blob->snapshot->base->uniqueid,
ast_json_string_get(ast_json_object_get(agent_blob->blob, "agent")),
"AGENTLOGIN", "%s", agent_blob->snapshot->name);
"AGENTLOGIN", "%s", agent_blob->snapshot->base->name);
} else if (ast_channel_agent_logoff_type() == stasis_message_type(msg)) {
ast_queue_log("NONE", agent_blob->snapshot->uniqueid,
ast_queue_log("NONE", agent_blob->snapshot->base->uniqueid,
ast_json_string_get(ast_json_object_get(agent_blob->blob, "agent")),
"AGENTLOGOFF", "%s|%ld", agent_blob->snapshot->name,
"AGENTLOGOFF", "%s|%ld", agent_blob->snapshot->base->name,
(long) ast_json_integer_get(ast_json_object_get(agent_blob->blob, "logintime")));
}
}
@@ -6088,8 +6088,8 @@ static void log_attended_transfer(struct queue_stasis_data *queue_data, struct a
ast_str_set(&transfer_str, 0, "APP|%s", atxfer_msg->dest.app);
break;
case AST_ATTENDED_TRANSFER_DEST_LINK:
ast_str_set(&transfer_str, 0, "LINK|%s|%s", atxfer_msg->dest.links[0]->name,
atxfer_msg->dest.links[1]->name);
ast_str_set(&transfer_str, 0, "LINK|%s|%s", atxfer_msg->dest.links[0]->base->name,
atxfer_msg->dest.links[1]->base->name);
break;
case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
case AST_ATTENDED_TRANSFER_DEST_FAIL:
@@ -6098,7 +6098,7 @@ static void log_attended_transfer(struct queue_stasis_data *queue_data, struct a
return;
}
ast_queue_log(queue_data->queue->name, caller->uniqueid, queue_data->member->membername, "ATTENDEDTRANSFER", "%s|%ld|%ld|%d",
ast_queue_log(queue_data->queue->name, caller->base->uniqueid, queue_data->member->membername, "ATTENDEDTRANSFER", "%s|%ld|%ld|%d",
ast_str_buffer(transfer_str),
(long) (queue_data->starttime - queue_data->holdstart),
(long) (time(NULL) - queue_data->starttime), queue_data->caller_pos);
@@ -6131,11 +6131,11 @@ static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
return;
}
if (!strcmp(enter_blob->channel->uniqueid, queue_data->caller_uniqueid)) {
if (!strcmp(enter_blob->channel->base->uniqueid, queue_data->caller_uniqueid)) {
ast_string_field_set(queue_data, bridge_uniqueid,
enter_blob->bridge->uniqueid);
ast_debug(3, "Detected entry of caller channel %s into bridge %s\n",
enter_blob->channel->name, queue_data->bridge_uniqueid);
enter_blob->channel->base->name, queue_data->bridge_uniqueid);
}
}
@@ -6186,7 +6186,7 @@ static void handle_blind_transfer(void *userdata, struct stasis_subscription *su
context = transfer_msg->context;
ast_debug(3, "Detected blind transfer in queue %s\n", queue_data->queue->name);
ast_queue_log(queue_data->queue->name, caller_snapshot->uniqueid, queue_data->member->membername,
ast_queue_log(queue_data->queue->name, caller_snapshot->base->uniqueid, queue_data->member->membername,
"BLINDTRANSFER", "%s|%s|%ld|%ld|%d",
exten, context,
(long) (queue_data->starttime - queue_data->holdstart),
@@ -6302,9 +6302,9 @@ static void handle_local_optimization_begin(void *userdata, struct stasis_subscr
return;
}
if (!strcmp(local_one->uniqueid, queue_data->member_uniqueid)) {
if (!strcmp(local_one->base->uniqueid, queue_data->member_uniqueid)) {
optimization = &queue_data->member_optimize;
} else if (!strcmp(local_two->uniqueid, queue_data->caller_uniqueid)) {
} else if (!strcmp(local_two->base->uniqueid, queue_data->caller_uniqueid)) {
optimization = &queue_data->caller_optimize;
} else {
return;
@@ -6313,9 +6313,9 @@ static void handle_local_optimization_begin(void *userdata, struct stasis_subscr
/* We only allow move-swap optimizations, so there had BETTER be a source */
ast_assert(source != NULL);
optimization->source_chan_uniqueid = ast_strdup(source->uniqueid);
optimization->source_chan_uniqueid = ast_strdup(source->base->uniqueid);
if (!optimization->source_chan_uniqueid) {
ast_log(LOG_ERROR, "Unable to track local channel optimization for channel %s. Expect further errors\n", local_one->name);
ast_log(LOG_ERROR, "Unable to track local channel optimization for channel %s. Expect further errors\n", local_one->base->name);
return;
}
id = ast_json_integer_get(ast_json_object_get(ast_multi_channel_blob_get_json(optimization_blob), "id"));
@@ -6354,10 +6354,10 @@ static void handle_local_optimization_end(void *userdata, struct stasis_subscrip
return;
}
if (!strcmp(local_one->uniqueid, queue_data->member_uniqueid)) {
if (!strcmp(local_one->base->uniqueid, queue_data->member_uniqueid)) {
optimization = &queue_data->member_optimize;
is_caller = 0;
} else if (!strcmp(local_two->uniqueid, queue_data->caller_uniqueid)) {
} else if (!strcmp(local_two->base->uniqueid, queue_data->caller_uniqueid)) {
optimization = &queue_data->caller_optimize;
is_caller = 1;
} else {
@@ -6420,16 +6420,16 @@ static void handle_hangup(void *userdata, struct stasis_subscription *sub,
return;
}
if (!strcmp(channel_blob->snapshot->uniqueid, queue_data->caller_uniqueid)) {
if (!strcmp(channel_blob->snapshot->base->uniqueid, queue_data->caller_uniqueid)) {
reason = CALLER;
} else if (!strcmp(channel_blob->snapshot->uniqueid, queue_data->member_uniqueid)) {
} else if (!strcmp(channel_blob->snapshot->base->uniqueid, queue_data->member_uniqueid)) {
reason = AGENT;
} else {
ao2_unlock(queue_data);
return;
}
chan = ast_channel_get_by_name(channel_blob->snapshot->name);
chan = ast_channel_get_by_name(channel_blob->snapshot->base->name);
if (chan && (ast_channel_has_role(chan, AST_TRANSFERER_ROLE_NAME) ||
!ast_strlen_zero(pbx_builtin_getvar_helper(chan, "ATTENDEDTRANSFER")) ||
!ast_strlen_zero(pbx_builtin_getvar_helper(chan, "BLINDTRANSFER")))) {
@@ -6446,9 +6446,9 @@ static void handle_hangup(void *userdata, struct stasis_subscription *sub,
ao2_unlock(queue_data);
ast_debug(3, "Detected hangup of queue %s channel %s\n", reason == CALLER ? "caller" : "member",
channel_blob->snapshot->name);
channel_blob->snapshot->base->name);
ast_queue_log(queue_data->queue->name, caller_snapshot->uniqueid, queue_data->member->membername,
ast_queue_log(queue_data->queue->name, caller_snapshot->base->uniqueid, queue_data->member->membername,
reason == CALLER ? "COMPLETECALLER" : "COMPLETEAGENT", "%ld|%ld|%d",
(long) (queue_data->starttime - queue_data->holdstart),
(long) (time(NULL) - queue_data->starttime), queue_data->caller_pos);