bridges: Remove reliance on stasis caching

* The bridging core no longer uses the stasis cache for bridge
  snapshots.  The latest bridge snapshot is now stored on the
  ast_bridge structure itself.

* The following APIs are no longer available since the stasis cache
  is no longer used:
    ast_bridge_topic_cached()
    ast_bridge_topic_all_cached()

* A topic pool is now used for individual bridge topics.

* The ast_bridge_cache() function was removed since there's no
  longer a separate container of snapshots.

* A new function "ast_bridges()" was created to retrieve the
  container of all bridges.  Users formerly calling
  ast_bridge_cache() can use the new function to iterate over
  bridges and retrieve the latest snapshot directly from the
  bridge.

* The ast_bridge_snapshot_get_latest() function was renamed to
  ast_bridge_get_snapshot_by_uniqueid().

* A new function "ast_bridge_get_snapshot()" was created to retrieve
  the bridge snapshot directly from the bridge structure.

* The ast_bridge_topic_all() function now returns a normal topic
  not a cached one so you can't use stasis cache functions on it
  either.

* The ast_bridge_snapshot_type() stasis message now has the
  ast_bridge_snapshot_update structure as it's data.  It contains
  the last snapshot and the new one.

* cdr, cel, manager and ari have been updated to use the new
  arrangement.

Change-Id: I7049b80efa88676ce5c4666f818fa18ad1985369
This commit is contained in:
George Joseph
2018-09-19 13:34:41 -06:00
parent 4ca709768d
commit 3667c5e1d2
15 changed files with 397 additions and 340 deletions

View File

@@ -155,7 +155,8 @@ static struct ast_json *ast_bridge_merge_message_to_json(
struct stasis_message *msg,
const struct stasis_message_sanitizer *sanitize);
static struct stasis_cp_all *bridge_cache_all;
static struct stasis_topic *bridge_topic_all;
static struct stasis_topic_pool *bridge_topic_pool;
/*!
* @{ \brief Define bridge message types.
@@ -175,33 +176,9 @@ STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type,
.to_ami = attended_transfer_to_ami);
/*! @} */
struct stasis_cache *ast_bridge_cache(void)
{
return stasis_cp_all_cache(bridge_cache_all);
}
struct stasis_topic *ast_bridge_topic_all(void)
{
return stasis_cp_all_topic(bridge_cache_all);
}
struct stasis_topic *ast_bridge_topic_all_cached(void)
{
return stasis_cp_all_topic_cached(bridge_cache_all);
}
int bridge_topics_init(struct ast_bridge *bridge)
{
if (ast_strlen_zero(bridge->uniqueid)) {
ast_log(LOG_ERROR, "Bridge id initialization required\n");
return -1;
}
bridge->topics = stasis_cp_single_create(bridge_cache_all,
bridge->uniqueid);
if (!bridge->topics) {
return -1;
}
return 0;
return bridge_topic_all;
}
struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
@@ -210,16 +187,7 @@ struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
return ast_bridge_topic_all();
}
return stasis_cp_single_topic(bridge->topics);
}
struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge)
{
if (!bridge) {
return ast_bridge_topic_all_cached();
}
return stasis_cp_single_topic_cached(bridge->topics);
return bridge->topic;
}
/*! \brief Destructor for bridge snapshots */
@@ -292,24 +260,106 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_create(struct ast_bridge *bridge
return snapshot;
}
void ast_bridge_publish_state(struct ast_bridge *bridge)
static void bridge_snapshot_update_dtor(void *obj)
{
struct ast_bridge_snapshot *snapshot;
struct stasis_message *msg;
struct ast_bridge_snapshot_update *update = obj;
if (!ast_bridge_snapshot_type()) {
return;
ast_debug(3, "Update: %p Old: %s New: %s\n", update,
update->old_snapshot ? update->old_snapshot->uniqueid : "<none>",
update->new_snapshot ? update->new_snapshot->uniqueid : "<none>");
ao2_cleanup(update->old_snapshot);
ao2_cleanup(update->new_snapshot);
}
static struct ast_bridge_snapshot_update *bridge_snapshot_update_create(
struct ast_bridge_snapshot *old, struct ast_bridge_snapshot *new)
{
struct ast_bridge_snapshot_update *update;
update = ao2_alloc_options(sizeof(*update), bridge_snapshot_update_dtor,
AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!update) {
return NULL;
}
update->old_snapshot = ao2_bump(old);
update->new_snapshot = ao2_bump(new);
ast_debug(3, "Update: %p Old: %s New: %s\n", update,
update->old_snapshot ? update->old_snapshot->uniqueid : "<none>",
update->new_snapshot ? update->new_snapshot->uniqueid : "<none>");
return update;
}
int bridge_topics_init(struct ast_bridge *bridge)
{
if (ast_strlen_zero(bridge->uniqueid)) {
ast_log(LOG_ERROR, "Bridge id initialization required\n");
return -1;
}
bridge->topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid);
if (!bridge->topic) {
return -1;
}
return 0;
}
void bridge_topics_destroy(struct ast_bridge *bridge)
{
struct ast_bridge_snapshot_update *update;
struct stasis_message *msg;
ast_assert(bridge != NULL);
snapshot = ast_bridge_snapshot_create(bridge);
if (!snapshot) {
if (!bridge->current_snapshot) {
bridge->current_snapshot = ast_bridge_snapshot_create(bridge);
if (!bridge->current_snapshot) {
return;
}
}
update = bridge_snapshot_update_create(bridge->current_snapshot, NULL);
if (!update) {
return;
}
msg = stasis_message_create(ast_bridge_snapshot_type(), snapshot);
ao2_ref(snapshot, -1);
msg = stasis_message_create(ast_bridge_snapshot_type(), update);
ao2_ref(update, -1);
if (!msg) {
return;
}
stasis_publish(ast_bridge_topic(bridge), msg);
ao2_ref(msg, -1);
stasis_topic_pool_delete_topic(bridge_topic_pool, stasis_topic_name(ast_bridge_topic(bridge)));
}
void ast_bridge_publish_state(struct ast_bridge *bridge)
{
struct ast_bridge_snapshot *new_snapshot;
struct ast_bridge_snapshot_update *update;
struct stasis_message *msg;
ast_assert(bridge != NULL);
new_snapshot = ast_bridge_snapshot_create(bridge);
if (!new_snapshot) {
return;
}
update = bridge_snapshot_update_create(bridge->current_snapshot, new_snapshot);
/* There may not have been an old snapshot */
ao2_cleanup(bridge->current_snapshot);
bridge->current_snapshot = new_snapshot;
if (!update) {
return;
}
msg = stasis_message_create(ast_bridge_snapshot_type(), update);
ao2_ref(update, -1);
if (!msg) {
return;
}
@@ -321,11 +371,20 @@ void ast_bridge_publish_state(struct ast_bridge *bridge)
static void bridge_publish_state_from_blob(struct ast_bridge *bridge,
struct ast_bridge_blob *obj)
{
struct ast_bridge_snapshot_update *update;
struct stasis_message *msg;
ast_assert(obj != NULL);
msg = stasis_message_create(ast_bridge_snapshot_type(), obj->bridge);
update = bridge_snapshot_update_create(bridge->current_snapshot, obj->bridge);
ao2_cleanup(bridge->current_snapshot);
bridge->current_snapshot = ao2_bump(obj->bridge);
if (!update) {
return;
}
msg = stasis_message_create(ast_bridge_snapshot_type(), update);
ao2_ref(update, -1);
if (!msg) {
return;
}
@@ -1250,35 +1309,37 @@ void ast_bridge_publish_attended_transfer(struct ast_attended_transfer_message *
ao2_ref(msg, -1);
}
struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid)
struct ast_bridge_snapshot *ast_bridge_get_snapshot_by_uniqueid(const char *uniqueid)
{
struct stasis_message *message;
struct ast_bridge *bridge;
struct ast_bridge_snapshot *snapshot;
ast_assert(!ast_strlen_zero(uniqueid));
message = stasis_cache_get(ast_bridge_cache(),
ast_bridge_snapshot_type(),
uniqueid);
if (!message) {
bridge = ast_bridge_find_by_id(uniqueid);
if (!bridge) {
return NULL;
}
snapshot = ao2_bump(stasis_message_data(message));
ao2_ref(message, -1);
ast_bridge_lock(bridge);
snapshot = ao2_bump(bridge->current_snapshot);
ast_bridge_unlock(bridge);
ao2_ref(bridge, -1);
return snapshot;
}
/*! \brief snapshot ID getter for caching topic */
static const char *bridge_snapshot_get_id(struct stasis_message *msg)
struct ast_bridge_snapshot *ast_bridge_get_snapshot(struct ast_bridge *bridge)
{
struct ast_bridge_snapshot *snapshot;
if (stasis_message_type(msg) != ast_bridge_snapshot_type()) {
if (!bridge) {
return NULL;
}
snapshot = stasis_message_data(msg);
return snapshot->uniqueid;
ast_bridge_lock(bridge);
snapshot = ao2_bump(bridge->current_snapshot);
ast_bridge_unlock(bridge);
return snapshot;
}
static void stasis_bridging_cleanup(void)
@@ -1290,8 +1351,10 @@ static void stasis_bridging_cleanup(void)
STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type);
ao2_cleanup(bridge_cache_all);
bridge_cache_all = NULL;
ao2_cleanup(bridge_topic_pool);
bridge_topic_pool = NULL;
ao2_cleanup(bridge_topic_all);
bridge_topic_all = NULL;
}
int ast_stasis_bridging_init(void)
@@ -1300,10 +1363,12 @@ int ast_stasis_bridging_init(void)
ast_register_cleanup(stasis_bridging_cleanup);
bridge_cache_all = stasis_cp_all_create("ast_bridge_topic_all",
bridge_snapshot_get_id);
if (!bridge_cache_all) {
bridge_topic_all = stasis_topic_create("ast_bridge_topic_all");
if (!bridge_topic_all) {
return -1;
}
bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all);
if (!bridge_topic_pool) {
return -1;
}