diff --git a/apps/app_meetme.c b/apps/app_meetme.c index e1fedb47bf..cd6a4f72c6 100644 --- a/apps/app_meetme.c +++ b/apps/app_meetme.c @@ -1139,7 +1139,7 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(meetme_talking_type); STASIS_MESSAGE_TYPE_DEFN_LOCAL(meetme_talk_request_type); static void meetme_stasis_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message); + struct stasis_message *message); static void meetme_stasis_cleanup(void) { @@ -1226,7 +1226,7 @@ static int meetme_stasis_init(void) } static void meetme_stasis_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_channel_blob *channel_blob = stasis_message_data(message); struct stasis_message_type *message_type; diff --git a/apps/app_queue.c b/apps/app_queue.c index 50c7f526a4..e72997fa82 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -1832,7 +1832,7 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_dump_type); STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_ringnoanswer_type); static void queue_channel_manager_event(void *data, - struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_subscription *sub, struct stasis_message *message) { const char *type = data; @@ -1858,7 +1858,7 @@ static void queue_channel_manager_event(void *data, } static void queue_multi_channel_manager_event(void *data, - struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_subscription *sub, struct stasis_message *message) { const char *type = data; @@ -1902,7 +1902,7 @@ static void queue_multi_channel_manager_event(void *data, } static void queue_member_manager_event(void *data, - struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_subscription *sub, struct stasis_message *message) { const char *type = data; @@ -2140,7 +2140,7 @@ static int is_member_available(struct call_queue *q, struct member *mem) } /*! \brief set a member's status based on device state of that member's interface*/ -static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg) { struct ao2_iterator miter, qiter; struct ast_device_state_message *dev_state; @@ -5185,7 +5185,7 @@ static void send_agent_complete(const char *queuename, struct ast_channel_snapsh } static void queue_agent_cb(void *userdata, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *msg) + struct stasis_message *msg) { struct ast_channel_blob *agent_blob; @@ -5401,7 +5401,7 @@ static void log_attended_transfer(struct queue_stasis_data *queue_data, struct a * \param msg The stasis message for the bridge enter event */ static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *msg) + struct stasis_message *msg) { struct queue_stasis_data *queue_data = userdata; struct ast_bridge_blob *enter_blob = stasis_message_data(msg); @@ -5434,7 +5434,7 @@ static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub, * \param msg The stasis message for the blind transfer event */ static void handle_blind_transfer(void *userdata, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *msg) + struct stasis_message *msg) { struct queue_stasis_data *queue_data = userdata; struct ast_bridge_blob *blind_blob = stasis_message_data(msg); @@ -5503,7 +5503,7 @@ static void handle_blind_transfer(void *userdata, struct stasis_subscription *su * \param msg The stasis message for the attended transfer event. */ static void handle_attended_transfer(void *userdata, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *msg) + struct stasis_message *msg) { struct queue_stasis_data *queue_data = userdata; struct ast_attended_transfer_message *atxfer_msg = stasis_message_data(msg); @@ -5558,7 +5558,7 @@ static void handle_attended_transfer(void *userdata, struct stasis_subscription * subroutines for further processing. */ static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *msg) + struct stasis_message *msg) { if (stasis_subscription_final_message(sub, msg)) { ao2_cleanup(userdata); @@ -5578,7 +5578,7 @@ static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub, * \param msg The stasis message for the local optimization begin event */ static void handle_local_optimization_begin(void *userdata, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *msg) + struct stasis_message *msg) { struct queue_stasis_data *queue_data = userdata; struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg); @@ -5630,7 +5630,7 @@ static void handle_local_optimization_begin(void *userdata, struct stasis_subscr * \param msg The stasis message for the local optimization end event */ static void handle_local_optimization_end(void *userdata, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *msg) + struct stasis_message *msg) { struct queue_stasis_data *queue_data = userdata; struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg); @@ -5695,7 +5695,7 @@ static void handle_local_optimization_end(void *userdata, struct stasis_subscrip * \param msg The stasis message for the hangup event. */ static void handle_hangup(void *userdata, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *msg) + struct stasis_message *msg) { struct queue_stasis_data *queue_data = userdata; struct ast_channel_blob *channel_blob = stasis_message_data(msg); @@ -5756,7 +5756,7 @@ static void handle_hangup(void *userdata, struct stasis_subscription *sub, * subroutines for further processing. */ static void queue_channel_cb(void *userdata, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *msg) + struct stasis_message *msg) { if (stasis_subscription_final_message(sub, msg)) { ao2_cleanup(userdata); diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c index c32b781844..1ab164511a 100644 --- a/apps/app_voicemail.c +++ b/apps/app_voicemail.c @@ -12606,7 +12606,7 @@ static void mwi_sub_event_cb(struct stasis_subscription_change *change) } } -static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg) { struct stasis_subscription_change *change; /* Only looking for subscription change notices here */ @@ -12629,7 +12629,7 @@ static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct static int dump_cache(void *obj, void *arg, int flags) { struct stasis_message *msg = obj; - mwi_event_cb(NULL, NULL, NULL, msg); + mwi_event_cb(NULL, NULL, msg); return 0; } diff --git a/apps/confbridge/confbridge_manager.c b/apps/confbridge/confbridge_manager.c index 57bf64b7c8..1b8eab24b8 100644 --- a/apps/confbridge/confbridge_manager.c +++ b/apps/confbridge/confbridge_manager.c @@ -224,63 +224,54 @@ static void confbridge_publish_manager_event( } static void confbridge_start_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { confbridge_publish_manager_event(message, "ConfbridgeStart", NULL); } static void confbridge_end_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { confbridge_publish_manager_event(message, "ConfbridgeEnd", NULL); } static void confbridge_leave_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { confbridge_publish_manager_event(message, "ConfbridgeLeave", NULL); } static void confbridge_join_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { confbridge_publish_manager_event(message, "ConfbridgeJoin", NULL); } static void confbridge_start_record_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { confbridge_publish_manager_event(message, "ConfbridgeRecord", NULL); } static void confbridge_stop_record_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { confbridge_publish_manager_event(message, "ConfbridgeStopRecord", NULL); } static void confbridge_mute_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { confbridge_publish_manager_event(message, "ConfbridgeMute", NULL); } static void confbridge_unmute_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { confbridge_publish_manager_event(message, "ConfbridgeUnmute", NULL); } static void confbridge_talking_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { RAII_VAR(struct ast_str *, extra_text, NULL, ast_free); diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c index 5fc9683675..f7cfa8cd76 100644 --- a/channels/chan_dahdi.c +++ b/channels/chan_dahdi.c @@ -553,7 +553,7 @@ static int restart_monitor(void); static int dahdi_sendtext(struct ast_channel *c, const char *text); -static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg) { /* This module does not handle MWI in an event-based manner. However, it * subscribes to MWI for each mailbox that is configured so that the core diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index fb68131b60..b880f5d948 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -1270,8 +1270,8 @@ static void build_rand_pad(unsigned char *buf, ssize_t len); static int get_unused_callno(enum callno_type type, int validated, callno_entry *entry); static int replace_callno(const void *obj); static void sched_delay_remove(struct sockaddr_in *sin, callno_entry entry); -static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message); -static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message); +static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message); +static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message); static struct ast_channel_tech iax2_tech = { .type = "IAX2", @@ -1331,7 +1331,7 @@ static void iax2_lock_owner(int callno) } } -static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg) { /* The MWI subscriptions exist just so the core knows we care about those * mailboxes. However, we just grab the events out of the cache when it @@ -1378,7 +1378,7 @@ static int network_change_sched_cb(const void *data) } static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { /* This callback is only concerned with network change messages from the system topic. */ if (stasis_message_type(message) != ast_network_change_type()) { @@ -1392,7 +1392,7 @@ static void network_change_stasis_cb(void *data, struct stasis_subscription *sub } static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { if (stasis_message_type(message) != ast_named_acl_change_type()) { return; diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c index b28fce3559..8fcdebfad7 100644 --- a/channels/chan_mgcp.c +++ b/channels/chan_mgcp.c @@ -486,7 +486,7 @@ static struct ast_channel_tech mgcp_tech = { .func_channel_read = acf_channel_read, }; -static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg) { /* This module does not handle MWI in an event-based manner. However, it * subscribes to MWI for each mailbox that is configured so that the core diff --git a/channels/chan_sip.c b/channels/chan_sip.c index 9cbfd715d8..2a821dd73f 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -1324,9 +1324,9 @@ static int sip_poke_noanswer(const void *data); static int sip_poke_peer(struct sip_peer *peer, int force); static void sip_poke_all_peers(void); static void sip_peer_hold(struct sip_pvt *p, int hold); -static void mwi_event_cb(void *, struct stasis_subscription *, struct stasis_topic *, struct stasis_message *); -static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message); -static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message); +static void mwi_event_cb(void *, struct stasis_subscription *, struct stasis_message *); +static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message); +static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message); static void sip_keepalive_all_peers(void); /*--- Applications, functions, CLI and manager command helpers */ @@ -16825,7 +16825,7 @@ static void sip_peer_hold(struct sip_pvt *p, int hold) } /*! \brief Receive MWI events that we have subscribed to */ -static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg) { struct sip_peer *peer = userdata; if (stasis_subscription_final_message(sub, msg)) { @@ -16872,7 +16872,7 @@ static int network_change_sched_cb(const void *data) return 0; } -static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) { /* This callback is only concerned with network change messages from the system topic. */ if (stasis_message_type(message) != ast_network_change_type()) { @@ -28940,7 +28940,7 @@ static int restart_monitor(void) } static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { if (stasis_message_type(message) != ast_named_acl_change_type()) { return; diff --git a/channels/chan_skinny.c b/channels/chan_skinny.c index b620796162..7cf592a261 100644 --- a/channels/chan_skinny.c +++ b/channels/chan_skinny.c @@ -1639,7 +1639,7 @@ static int skinny_indicate(struct ast_channel *ast, int ind, const void *data, s static int skinny_fixup(struct ast_channel *oldchan, struct ast_channel *newchan); static int skinny_senddigit_begin(struct ast_channel *ast, char digit); static int skinny_senddigit_end(struct ast_channel *ast, char digit, unsigned int duration); -static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg); +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg); static int skinny_dialer_cb(const void *data); static int skinny_reload(void); @@ -2300,7 +2300,7 @@ static int skinny_register(struct skinny_req *req, struct skinnysession *s) set_callforwards(l, NULL, SKINNY_CFWD_ALL|SKINNY_CFWD_BUSY|SKINNY_CFWD_NOANSWER); register_exten(l); /* initialize MWI on line and device */ - mwi_event_cb(l, NULL, NULL, NULL); + mwi_event_cb(l, NULL, NULL); AST_LIST_TRAVERSE(&l->sublines, subline, list) { ast_extension_state_add(subline->context, subline->exten, skinny_extensionstate_cb, subline->container); } @@ -3529,7 +3529,7 @@ static void update_connectedline(struct skinny_subchannel *sub, const void *data send_callinfo(sub); } -static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg) { struct skinny_line *l = userdata; struct skinny_device *d = l->device; diff --git a/channels/sig_pri.c b/channels/sig_pri.c index a6d134e1b3..9f40077237 100644 --- a/channels/sig_pri.c +++ b/channels/sig_pri.c @@ -8892,7 +8892,7 @@ static void sig_pri_send_mwi_indication(struct sig_pri_span *pri, const char *vm * * \return Nothing */ -static void sig_pri_mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void sig_pri_mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg) { struct sig_pri_span *pri = userdata; const char *mbox_context; diff --git a/funcs/func_presencestate.c b/funcs/func_presencestate.c index 75cef8a549..49f8e78a92 100644 --- a/funcs/func_presencestate.c +++ b/funcs/func_presencestate.c @@ -649,7 +649,7 @@ struct test_cb_data { sem_t sem; }; -static void test_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void test_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg) { struct test_cb_data *cb_data = userdata; if (stasis_message_type(msg) != ast_presence_state_message_type()) { diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 70bb973faf..529aa12bbd 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -347,18 +347,6 @@ const char *stasis_topic_name(const struct stasis_topic *topic); */ void stasis_publish(struct stasis_topic *topic, struct stasis_message *message); -/*! - * \brief Publish a message from a specified topic to all the subscribers of a - * possibly different topic. - * \param topic Topic to publish message to. - * \param topic Original topic message was from. - * \param message Message - * \since 12 - */ -void stasis_forward_message(struct stasis_topic *topic, - struct stasis_topic *publisher_topic, - struct stasis_message *message); - /*! * \brief Wait for all pending messages on a given topic to be processed. * \param topic Topic to await pending messages on. @@ -381,11 +369,10 @@ struct stasis_subscription; /*! * \brief Callback function type for Stasis subscriptions. * \param data Data field provided with subscription. - * \param topic Topic to which the message was published. * \param message Published message. * \since 12 */ -typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message); +typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message); /*! * \brief Create a subscription. @@ -583,8 +570,6 @@ struct stasis_message_type *stasis_cache_update_type(void); * \since 12 */ struct stasis_cache_update { - /*! \brief Topic that published \c new_snapshot */ - struct stasis_topic *topic; /*! \brief Convenience reference to snapshot type */ struct stasis_message_type *type; /*! \brief Old value from the cache */ diff --git a/include/asterisk/stasis_internal.h b/include/asterisk/stasis_internal.h index 67ab88ff04..01e5812422 100644 --- a/include/asterisk/stasis_internal.h +++ b/include/asterisk/stasis_internal.h @@ -62,7 +62,7 @@ struct stasis_message; */ struct stasis_subscription *internal_stasis_subscribe( struct stasis_topic *topic, - void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message), + void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message), void *data, int needs_mailbox); diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index 5cb574854f..ab523290c8 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -175,6 +175,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o */ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener); +/*! + * \brief Sets the local data associated with a taskprocessor. + * + * \since 12.0.0 + * + * See ast_taskprocessor_push_local(). + * + * \param tps Task processor. + * \param local_data Local data to associate with \a tps. + */ +void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data); + /*! * \brief Unreference the specified taskprocessor and its reference count will decrement. * @@ -197,6 +209,32 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps); */ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap); +/*! \brief Local data parameter */ +struct ast_taskprocessor_local { + /*! Local data, associated with the taskprocessor. */ + void *local_data; + /*! Data pointer passed with this task. */ + void *data; +}; + +/*! + * \brief Push a task into the specified taskprocessor queue and signal the + * taskprocessor thread. + * + * The callback receives a \ref ast_taskprocessor_local struct, which contains + * both the provided \a datap pointer, and any local data set on the + * taskprocessor with ast_taskprocessor_set_local(). + * + * \param tps The taskprocessor structure + * \param task_exe The task handling function to push into the taskprocessor queue + * \param datap The data to be used by the task handling function + * \retval 0 success + * \retval -1 failure + * \since 12.0.0 + */ +int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, + int (*task_exe)(struct ast_taskprocessor_local *local), void *datap); + /*! * \brief Pop a task off the taskprocessor and execute it. * diff --git a/main/ccss.c b/main/ccss.c index 061c45a7ca..3068c6ffac 100644 --- a/main/ccss.c +++ b/main/ccss.c @@ -1397,7 +1397,7 @@ static void generic_monitor_instance_list_destructor(void *obj) ast_free((char *)generic_list->device_name); } -static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg); +static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg); static struct generic_monitor_instance_list *create_new_generic_list(struct ast_cc_monitor *monitor) { struct generic_monitor_instance_list *generic_list = ao2_t_alloc(sizeof(*generic_list), @@ -1471,7 +1471,7 @@ static int generic_monitor_devstate_tp_cb(void *data) return 0; } -static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg) { /* Wow, it's cool that we've picked up on a state change, but we really want * the actual work to be done in the core's taskprocessor execution thread @@ -2750,7 +2750,7 @@ static int cc_generic_agent_stop_ringing(struct ast_cc_agent *agent) return 0; } -static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg) { struct ast_cc_agent *agent = userdata; enum ast_device_state new_state; diff --git a/main/cdr.c b/main/cdr.c index f7af298651..ea0f9c0e4b 100644 --- a/main/cdr.c +++ b/main/cdr.c @@ -1839,7 +1839,7 @@ static int finalized_state_process_party_a(struct cdr_object *cdr, struct ast_ch * \param topic The topic this message was published for * \param message The message */ -static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_message *message) { RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup); RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup); @@ -2020,7 +2020,7 @@ static int check_new_cdr_needed(struct ast_channel_snapshot *old_snapshot, * \param topic The topic this message was published for * \param message The message */ -static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_message *message) { RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup); RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup); @@ -2150,7 +2150,7 @@ static int filter_bridge_messages(struct ast_bridge_snapshot *bridge) * \param message The message - hopefully a bridge one! */ static void handle_bridge_leave_message(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_bridge_blob *update = stasis_message_data(message); struct ast_bridge_snapshot *bridge = update->bridge; @@ -2450,7 +2450,7 @@ static void handle_standard_bridge_enter_message(struct cdr_object *cdr, * \param message The message - hopefully a bridge one! */ static void handle_bridge_enter_message(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_bridge_blob *update = stasis_message_data(message); struct ast_bridge_snapshot *bridge = update->bridge; @@ -2494,7 +2494,7 @@ static void handle_bridge_enter_message(void *data, struct stasis_subscription * * \param message The message about who got parked * */ static void handle_parked_call_message(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_parked_call_payload *payload = stasis_message_data(message); struct ast_channel_snapshot *channel = payload->parkee; diff --git a/main/cel.c b/main/cel.c index 36daf2066d..0d78b5cceb 100644 --- a/main/cel.c +++ b/main/cel.c @@ -1019,7 +1019,6 @@ static int cel_filter_channel_snapshot(struct ast_channel_snapshot *snapshot) } static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct stasis_cache_update *update = stasis_message_data(message); @@ -1082,7 +1081,6 @@ static struct ast_str *cel_generate_peer_str( static void cel_bridge_enter_cb( void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_bridge_blob *blob = stasis_message_data(message); @@ -1110,7 +1108,6 @@ static void cel_bridge_enter_cb( static void cel_bridge_leave_cb( void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_bridge_blob *blob = stasis_message_data(message); @@ -1138,7 +1135,6 @@ static void cel_bridge_leave_cb( static void cel_parking_cb( void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_parked_call_payload *parked_payload = stasis_message_data(message); @@ -1183,7 +1179,6 @@ static void save_dialstatus(struct ast_multi_channel_blob *blob) } static void cel_dial_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_multi_channel_blob *blob = stasis_message_data(message); @@ -1218,7 +1213,6 @@ static void cel_dial_cb(void *data, struct stasis_subscription *sub, static void cel_generic_cb( void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_channel_blob *obj = stasis_message_data(message); @@ -1241,7 +1235,6 @@ static void cel_generic_cb( static void cel_blind_transfer_cb( void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_bridge_blob *obj = stasis_message_data(message); @@ -1289,7 +1282,6 @@ static void cel_blind_transfer_cb( static void cel_attended_transfer_cb( void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_attended_transfer_message *xfer = stasis_message_data(message); @@ -1342,7 +1334,6 @@ static void cel_attended_transfer_cb( static void cel_pickup_cb( void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_multi_channel_blob *obj = stasis_message_data(message); @@ -1364,7 +1355,6 @@ static void cel_pickup_cb( static void cel_local_cb( void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_multi_channel_blob *obj = stasis_message_data(message); diff --git a/main/devicestate.c b/main/devicestate.c index bcf07ff0b2..158d1f817e 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -610,7 +610,7 @@ static int aggregate_state_changed(char *device, enum ast_device_state new_aggre return 1; } -static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg) { enum ast_device_state aggregate_state; char *device; diff --git a/main/endpoints.c b/main/endpoints.c index b33e33f1a8..bdcf401ba0 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -152,7 +152,7 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint, /*! \brief Handler for channel snapshot cache clears */ static void endpoint_cache_clear(void *data, - struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_subscription *sub, struct stasis_message *message) { struct ast_endpoint *endpoint = data; @@ -174,7 +174,7 @@ static void endpoint_cache_clear(void *data, } static void endpoint_default(void *data, - struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_subscription *sub, struct stasis_message *message) { struct stasis_endpoint *endpoint = data; diff --git a/main/manager.c b/main/manager.c index 75e20c21d5..69def4b1fb 100644 --- a/main/manager.c +++ b/main/manager.c @@ -1151,7 +1151,7 @@ static const struct { {{ "restart", "gracefully", NULL }}, }; -static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message); +static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message); static void acl_change_stasis_subscribe(void) { @@ -1427,7 +1427,6 @@ struct ast_str *ast_manager_str_from_json_object(struct ast_json *blob, key_excl } static void manager_default_msg_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup); @@ -1444,7 +1443,6 @@ static void manager_default_msg_cb(void *data, struct stasis_subscription *sub, } static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_json_payload *payload = stasis_message_data(message); @@ -7640,7 +7638,6 @@ static void load_channelvars(struct ast_variable *var) #ifdef TEST_FRAMEWORK static void test_suite_event_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_test_suite_message_payload *payload; @@ -8344,7 +8341,7 @@ static int __init_manager(int reload, int by_external_config) } static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { if (stasis_message_type(message) != ast_named_acl_change_type()) { return; diff --git a/main/manager_bridges.c b/main/manager_bridges.c index 38f9af4771..fad676b567 100644 --- a/main/manager_bridges.c +++ b/main/manager_bridges.c @@ -180,7 +180,6 @@ bridge_snapshot_monitor bridge_monitors[] = { }; static void bridge_snapshot_update(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { RAII_VAR(struct ast_str *, bridge_event_string, NULL, ast_free); @@ -221,7 +220,6 @@ static void bridge_snapshot_update(void *data, struct stasis_subscription *sub, } static void bridge_merge_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_bridge_merge_message *merge_msg = stasis_message_data(message); @@ -254,7 +252,6 @@ static void bridge_merge_cb(void *data, struct stasis_subscription *sub, } static void channel_enter_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { static const char *swap_name = "SwapUniqueid: "; @@ -283,7 +280,6 @@ static void channel_enter_cb(void *data, struct stasis_subscription *sub, } static void channel_leave_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_bridge_blob *blob = stasis_message_data(message); diff --git a/main/manager_channels.c b/main/manager_channels.c index d39687ffd0..0bebb216c0 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -565,7 +565,6 @@ channel_snapshot_monitor channel_monitors[] = { }; static void channel_snapshot_update(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); @@ -616,7 +615,7 @@ static int userevent_exclusion_cb(const char *key) } static void channel_user_event_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_channel_blob *obj = stasis_message_data(message); RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); @@ -667,7 +666,7 @@ static void publish_basic_channel_event(const char *event, int class, struct ast } static void channel_hangup_request_cb(void *data, - struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_subscription *sub, struct stasis_message *message) { struct ast_channel_blob *obj = stasis_message_data(message); @@ -707,7 +706,7 @@ static void channel_hangup_request_cb(void *data, } static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free); struct ast_channel_snapshot *spyer; @@ -730,7 +729,7 @@ static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub, } static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free); RAII_VAR(struct ast_str *, spyee_channel_string, NULL, ast_free); @@ -765,7 +764,7 @@ static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub } static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_channel_blob *obj = stasis_message_data(message); RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); @@ -806,7 +805,7 @@ static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub, } static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_channel_blob *obj = stasis_message_data(message); RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); @@ -853,7 +852,7 @@ static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub, } static void channel_hangup_handler_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); struct ast_channel_blob *payload = stasis_message_data(message); @@ -884,7 +883,7 @@ static void channel_hangup_handler_cb(void *data, struct stasis_subscription *su } static void channel_fax_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); RAII_VAR(struct ast_str *, event_buffer, ast_str_create(256), ast_free); @@ -957,7 +956,7 @@ static void channel_fax_cb(void *data, struct stasis_subscription *sub, } static void channel_moh_start_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_channel_blob *payload = stasis_message_data(message); struct ast_json *blob = payload->blob; @@ -977,7 +976,7 @@ static void channel_moh_start_cb(void *data, struct stasis_subscription *sub, } static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_channel_blob *payload = stasis_message_data(message); @@ -985,7 +984,7 @@ static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub, } static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_channel_blob *payload = stasis_message_data(message); @@ -993,7 +992,7 @@ static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub } static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_channel_blob *payload = stasis_message_data(message); @@ -1004,7 +1003,7 @@ static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub, * \brief Callback processing messages for channel dialing */ static void channel_dial_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_multi_channel_blob *obj = stasis_message_data(message); const char *dialstatus; @@ -1051,7 +1050,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub, } static void channel_hold_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_channel_blob *obj = stasis_message_data(message); const char *musicclass; @@ -1083,7 +1082,7 @@ static void channel_hold_cb(void *data, struct stasis_subscription *sub, } static void channel_unhold_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_channel_blob *obj = stasis_message_data(message); RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); diff --git a/main/manager_endpoints.c b/main/manager_endpoints.c index 634283728a..b5f5b31c28 100644 --- a/main/manager_endpoints.c +++ b/main/manager_endpoints.c @@ -46,14 +46,9 @@ static void manager_endpoints_shutdown(void) } static void endpoint_state_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { - /* XXX This looks wrong. Nothing should post or forward to a caching - * topic directly. Maybe ast_endpoint_topic() would be correct? I'd have - * to dig to make sure I don't break anything, though. - */ - stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached(), message); + stasis_publish(ast_manager_get_topic(), message); } int manager_endpoints_init(void) diff --git a/main/manager_mwi.c b/main/manager_mwi.c index 9847bd4a73..849c315e13 100644 --- a/main/manager_mwi.c +++ b/main/manager_mwi.c @@ -54,7 +54,6 @@ static int exclude_event_cb(const char *key) /*! \brief Generic MWI event callback used for one-off events from voicemail modules */ static void mwi_app_event_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_mwi_blob *payload = stasis_message_data(message); @@ -86,7 +85,6 @@ static void mwi_app_event_cb(void *data, struct stasis_subscription *sub, } static void mwi_update_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct ast_mwi_state *mwi_state; diff --git a/main/pbx.c b/main/pbx.c index 09f3d95ec7..2a415401f1 100644 --- a/main/pbx.c +++ b/main/pbx.c @@ -5111,7 +5111,7 @@ static void get_device_state_causing_channels(struct ao2_container *c) ao2_iterator_destroy(&iter); } -static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg) { struct ast_device_state_message *dev_state; struct ast_hint *hint; @@ -11369,7 +11369,7 @@ static int pbx_builtin_sayphonetic(struct ast_channel *chan, const char *data) return res; } -static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg) { struct ast_presence_state_message *presence_state = stasis_message_data(msg); struct ast_hint *hint; diff --git a/main/sounds_index.c b/main/sounds_index.c index 9f70ef6cc0..2fcd23908d 100644 --- a/main/sounds_index.c +++ b/main/sounds_index.c @@ -281,7 +281,7 @@ static void sounds_cleanup(void) } static void format_update_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { ast_sounds_reindex(); } diff --git a/main/stasis.c b/main/stasis.c index 807ba43441..42c9017699 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -249,7 +249,6 @@ static void subscription_dtor(void *obj) * \param message Message to send. */ static void subscription_invoke(struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { /* Notify that the final message has been received */ @@ -260,7 +259,7 @@ static void subscription_invoke(struct stasis_subscription *sub, } /* Since sub is mostly immutable, no need to lock sub */ - sub->callback(sub->data, sub, topic, message); + sub->callback(sub->data, sub, message); /* Notify that the final message has been processed */ if (stasis_subscription_final_message(sub, message)) { @@ -301,6 +300,9 @@ struct stasis_subscription *internal_stasis_subscribe( if (!sub->mailbox) { return NULL; } + ast_taskprocessor_set_local(sub->mailbox, sub); + /* Taskprocessor has a reference */ + ao2_ref(sub, +1); } ao2_ref(topic, +1); @@ -327,6 +329,13 @@ struct stasis_subscription *stasis_subscribe( return internal_stasis_subscribe(topic, callback, data, 1); } +static int sub_cleanup(void *data) +{ + struct stasis_subscription *sub = data; + ao2_cleanup(sub); + return 0; +} + struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) { /* The subscription may be the last ref to this topic. Hold @@ -349,6 +358,11 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) /* Now let everyone know about the unsubscribe */ send_subscription_unsubscribe(topic, sub); + /* When all that's done, remove the ref the mailbox has on the sub */ + if (sub->mailbox) { + ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub); + } + /* Unsubscribing unrefs the subscription */ ao2_cleanup(sub); return NULL; @@ -475,93 +489,39 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s return ast_vector_remove_elem_unordered(topic->subscribers, sub); } -/*! - * \internal - * \brief Information needed to dispatch a message to a subscription - */ -struct dispatch { - /*! Topic message was published to */ - struct stasis_topic *topic; - /*! The message itself */ - struct stasis_message *message; - /*! Subscription receiving the message */ - struct stasis_subscription *sub; -}; - -static void dispatch_dtor(struct dispatch *dispatch) -{ - ao2_cleanup(dispatch->topic); - ao2_cleanup(dispatch->message); - ao2_cleanup(dispatch->sub); - - ast_free(dispatch); -} - -static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub) -{ - struct dispatch *dispatch; - - ast_assert(topic != NULL); - ast_assert(message != NULL); - ast_assert(sub != NULL); - - dispatch = ast_malloc(sizeof(*dispatch)); - if (!dispatch) { - return NULL; - } - - dispatch->topic = topic; - ao2_ref(topic, +1); - - dispatch->message = message; - ao2_ref(message, +1); - - dispatch->sub = sub; - ao2_ref(sub, +1); - - return dispatch; -} - /*! * \brief Dispatch a message to a subscriber * \param data \ref dispatch object * \return 0 */ -static int dispatch_exec(void *data) +static int dispatch_exec(struct ast_taskprocessor_local *local) { - struct dispatch *dispatch = data; + struct stasis_subscription *sub = local->local_data; + struct stasis_message *message = local->data; - subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message); - dispatch_dtor(dispatch); + subscription_invoke(sub, message); + ao2_cleanup(message); return 0; } static void dispatch_message(struct stasis_subscription *sub, - struct stasis_topic *publisher_topic, struct stasis_message *message) + struct stasis_message *message) { if (sub->mailbox) { - struct dispatch *dispatch; - - dispatch = dispatch_create(publisher_topic, message, sub); - if (!dispatch) { + ao2_bump(message); + if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) { + /* Push failed; ugh. */ ast_log(LOG_DEBUG, "Dropping dispatch\n"); - return; - } - - if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) { - /* Push failed; just delete the dispatch. - */ - ast_log(LOG_DEBUG, "Dropping dispatch\n"); - dispatch_dtor(dispatch); + ao2_cleanup(message); } } else { /* Dispatch directly */ - subscription_invoke(sub, publisher_topic, message); + subscription_invoke(sub, message); } } -void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message) +void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message) { size_t i; /* The topic may be unref'ed by the subscription invocation. @@ -571,23 +531,18 @@ void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *pu SCOPED_AO2LOCK(lock, topic); ast_assert(topic != NULL); - ast_assert(publisher_topic != NULL); ast_assert(message != NULL); for (i = 0; i < ast_vector_size(topic->subscribers); ++i) { - struct stasis_subscription *sub = ast_vector_get(topic->subscribers, i); + struct stasis_subscription *sub = + ast_vector_get(topic->subscribers, i); ast_assert(sub != NULL); - dispatch_message(sub, publisher_topic, message); + dispatch_message(sub, message); } } -void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) -{ - stasis_forward_message(topic, topic, message); -} - /*! * \brief Forwarding information * @@ -748,7 +703,7 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic, stasis_publish(topic, msg); /* Now we have to dispatch to the subscription itself */ - dispatch_message(sub, topic, msg); + dispatch_message(sub, msg); } struct topic_pool_entry { diff --git a/main/stasis_cache.c b/main/stasis_cache.c index d4375520da..279210d5b6 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -339,8 +339,6 @@ struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_messa static void stasis_cache_update_dtor(void *obj) { struct stasis_cache_update *update = obj; - ao2_cleanup(update->topic); - update->topic = NULL; ao2_cleanup(update->old_snapshot); update->old_snapshot = NULL; ao2_cleanup(update->new_snapshot); @@ -349,12 +347,11 @@ static void stasis_cache_update_dtor(void *obj) update->type = NULL; } -static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot) +static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot) { RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - ast_assert(topic != NULL); ast_assert(old_snapshot != NULL || new_snapshot != NULL); update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor, @@ -363,8 +360,6 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s return NULL; } - ao2_ref(topic, +1); - update->topic = topic; if (old_snapshot) { ao2_ref(old_snapshot, +1); update->old_snapshot = old_snapshot; @@ -390,7 +385,7 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s } static void caching_topic_exec(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup); struct stasis_caching_topic *caching_topic = data; @@ -418,7 +413,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, if (clear_id) { old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL); if (old_snapshot) { - update = update_create(topic, old_snapshot, NULL); + update = update_create(old_snapshot, NULL); stasis_publish(caching_topic->topic, update); return; } @@ -440,7 +435,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message); - update = update_create(topic, old_snapshot, message); + update = update_create(old_snapshot, message); if (update == NULL) { return; } diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 864cf42c47..8c82decfef 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -184,14 +184,13 @@ static int find_route( static void router_dispatch(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct stasis_message_router *router = data; struct stasis_message_route route; if (find_route(router, message, &route) == 0) { - route.callback(route.data, sub, topic, message); + route.callback(route.data, sub, message); } if (stasis_subscription_final_message(sub, message)) { diff --git a/main/stasis_wait.c b/main/stasis_wait.c index e94c686e1b..32b59718c1 100644 --- a/main/stasis_wait.c +++ b/main/stasis_wait.c @@ -55,7 +55,7 @@ static void caching_guarantee_dtor(void *obj) } static void guarantee_handler(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { /* Wait for our particular message */ if (data == message) { diff --git a/main/taskprocessor.c b/main/taskprocessor.c index ebb6200c37..189219d668 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -48,11 +48,15 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") */ struct tps_task { /*! \brief The execute() task callback function pointer */ - int (*execute)(void *datap); + union { + int (*execute)(void *datap); + int (*execute_local)(struct ast_taskprocessor_local *local); + } callback; /*! \brief The data pointer for the task execute() function */ void *datap; /*! \brief AST_LIST_ENTRY overhead */ AST_LIST_ENTRY(tps_task) list; + unsigned int wants_local:1; }; /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */ @@ -69,6 +73,7 @@ struct ast_taskprocessor { const char *name; /*! \brief Taskprocessor statistics */ struct tps_taskprocessor_stats *stats; + void *local_data; /*! \brief Taskprocessor current queue size */ long tps_queue_size; /*! \brief Taskprocessor queue */ @@ -282,10 +287,41 @@ int ast_tps_init(void) static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap) { struct tps_task *t; - if ((t = ast_calloc(1, sizeof(*t)))) { - t->execute = task_exe; - t->datap = datap; + if (!task_exe) { + ast_log(LOG_ERROR, "task_exe is NULL!\n"); + return NULL; } + + t = ast_calloc(1, sizeof(*t)); + if (!t) { + ast_log(LOG_ERROR, "failed to allocate task!\n"); + return NULL; + } + + t->callback.execute = task_exe; + t->datap = datap; + + return t; +} + +static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap) +{ + struct tps_task *t; + if (!task_exe) { + ast_log(LOG_ERROR, "task_exe is NULL!\n"); + return NULL; + } + + t = ast_calloc(1, sizeof(*t)); + if (!t) { + ast_log(LOG_ERROR, "failed to allocate task!\n"); + return NULL; + } + + t->callback.execute_local = task_exe; + t->datap = datap; + t->wants_local = 1; + return t; } @@ -643,6 +679,13 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam return __allocate_taskprocessor(name, listener); } +void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, + void *local_data) +{ + SCOPED_AO2LOCK(lock, tps); + tps->local_data = local_data; +} + /* decrement the taskprocessor reference count and unlink from the container if necessary */ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps) { @@ -664,20 +707,21 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps) } /* push the task into the taskprocessor queue */ -int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap) +static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t) { - struct tps_task *t; int previous_size; int was_empty; - if (!tps || !task_exe) { - ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor"); + if (!tps) { + ast_log(LOG_ERROR, "tps is NULL!\n"); return -1; } - if (!(t = tps_task_alloc(task_exe, datap))) { - ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name); + + if (!t) { + ast_log(LOG_ERROR, "t is NULL!\n"); return -1; } + ao2_lock(tps); AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list); previous_size = tps->tps_queue_size++; @@ -688,8 +732,19 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void * return 0; } +int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap) +{ + return taskprocessor_push(tps, tps_task_alloc(task_exe, datap)); +} + +int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap) +{ + return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap)); +} + int ast_taskprocessor_execute(struct ast_taskprocessor *tps) { + struct ast_taskprocessor_local local; struct tps_task *t; int size; @@ -701,9 +756,18 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) } tps->executing = 1; + + if (t->wants_local) { + local.local_data = tps->local_data; + local.data = t->datap; + } ao2_unlock(tps); - t->execute(t->datap); + if (t->wants_local) { + t->callback.execute_local(&local); + } else { + t->callback.execute(t->datap); + } tps_task_free(t); ao2_lock(tps); diff --git a/res/parking/parking_applications.c b/res/parking/parking_applications.c index fc74ac2be3..7bd8922662 100644 --- a/res/parking/parking_applications.c +++ b/res/parking/parking_applications.c @@ -739,7 +739,7 @@ announce_cleanup: cap_slin = ast_format_cap_destroy(cap_slin); } -static void park_announce_update_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +static void park_announce_update_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) { struct park_announce_subscription_data *pa_data = data; char *dial_string = pa_data->dial_string; diff --git a/res/parking/parking_bridge_features.c b/res/parking/parking_bridge_features.c index 0e5e05d8f1..6c1d4d65e1 100644 --- a/res/parking/parking_bridge_features.c +++ b/res/parking/parking_bridge_features.c @@ -125,7 +125,7 @@ static void parker_parked_call_message_response(struct ast_parked_call_payload * } } -static void parker_update_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +static void parker_update_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) { if (stasis_subscription_final_message(sub, message)) { ast_free(data); diff --git a/res/parking/parking_manager.c b/res/parking/parking_manager.c index dac1910909..0c577018a0 100644 --- a/res/parking/parking_manager.c +++ b/res/parking/parking_manager.c @@ -545,7 +545,7 @@ static void parked_call_message_response(struct ast_parked_call_payload *parked_ ); } -static void parking_event_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +static void parking_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) { if (stasis_message_type(message) == ast_parked_call_type()) { struct ast_parked_call_payload *parked_call_message = stasis_message_data(message); diff --git a/res/res_agi.c b/res/res_agi.c index 25251f37e0..84dcbebe9c 100644 --- a/res/res_agi.c +++ b/res/res_agi.c @@ -1040,7 +1040,7 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_exec_type); STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_end_type); static void agi_channel_manager_event(void *data, - struct stasis_subscription *sub, struct stasis_topic *topic, + struct stasis_subscription *sub, struct stasis_message *message) { const char *type = data; diff --git a/res/res_chan_stats.c b/res/res_chan_stats.c index a43c564b1e..9d1e8c02e7 100644 --- a/res/res_chan_stats.c +++ b/res/res_chan_stats.c @@ -57,7 +57,7 @@ static struct stasis_message_router *router; * \param message The message itself. */ static void statsmaker(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { RAII_VAR(struct ast_str *, metric, NULL, ast_free); @@ -89,7 +89,7 @@ static void statsmaker(void *data, struct stasis_subscription *sub, * \param message The message itself. */ static void updates(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { /* Since this came from a message router, we know the type of the * message. We can cast the data without checking its type. @@ -139,7 +139,7 @@ static void updates(void *data, struct stasis_subscription *sub, * \param message The message itself. */ static void default_route(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { if (stasis_subscription_final_message(sub, message)) { /* Much like with the regular subscription, you may need to diff --git a/res/res_jabber.c b/res/res_jabber.c index b534e803b7..c1cee9b7ae 100644 --- a/res/res_jabber.c +++ b/res/res_jabber.c @@ -371,8 +371,8 @@ static void aji_pubsub_purge_nodes(struct aji_client *client, const char* collection_name); static void aji_publish_mwi(struct aji_client *client, const char *mailbox, const char *context, const char *oldmsgs, const char *newmsgs); -static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg); -static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg); +static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg); +static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg); static iks* aji_build_publish_skeleton(struct aji_client *client, const char *node, const char *event_type, unsigned int cachable); /* No transports in this version */ @@ -3235,7 +3235,7 @@ int ast_aji_disconnect(struct aji_client *client) * \param data void pointer to ast_client structure * \return void */ -static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg) { const char *mailbox; const char *context; @@ -3269,7 +3269,7 @@ static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasi * \param data void pointer to ast_client structure * \return void */ -static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg) { struct aji_client *client = data; struct ast_device_state_message *dev_state; @@ -3291,7 +3291,7 @@ static int cached_devstate_cb(void *obj, void *arg, int flags) { struct stasis_message *msg = obj; struct aji_client *client = arg; - aji_devstate_cb(client, device_state_sub, NULL, msg); + aji_devstate_cb(client, device_state_sub, msg); return 0; } diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index e2b9b630dd..17d648b747 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -118,7 +118,7 @@ struct mwi_subscription { }; static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *msg); + struct stasis_message *msg); static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char *mailbox, struct mwi_subscription *mwi_sub) { @@ -603,7 +603,7 @@ static int serialized_cleanup(void *userdata) } static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *msg) + struct stasis_message *msg) { struct mwi_subscription *mwi_sub = userdata; diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c index cca5a7c396..91da22fde6 100644 --- a/res/res_pjsip_refer.c +++ b/res/res_pjsip_refer.c @@ -143,7 +143,7 @@ static int refer_progress_notify(void *data) } static void refer_progress_bridge(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct refer_progress *progress = data; struct ast_bridge_blob *enter_blob; diff --git a/res/res_security_log.c b/res/res_security_log.c index d06f9f7549..e56f7f76fc 100644 --- a/res/res_security_log.c +++ b/res/res_security_log.c @@ -117,7 +117,7 @@ static void security_event_stasis_cb(struct ast_json *json) } static void security_stasis_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct ast_json_payload *payload = stasis_message_data(message); diff --git a/res/res_stasis_test.c b/res/res_stasis_test.c index 58df8c1451..099e1af78f 100644 --- a/res/res_stasis_test.c +++ b/res/res_stasis_test.c @@ -120,7 +120,7 @@ struct stasis_message_sink *stasis_message_sink_create(void) * the initial lazy binding will still work as expected. */ static void message_sink_cb(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct stasis_message_sink *sink = data; diff --git a/res/res_xmpp.c b/res/res_xmpp.c index 1d8f62824e..bd66e70cc7 100644 --- a/res/res_xmpp.c +++ b/res/res_xmpp.c @@ -1318,7 +1318,7 @@ static void xmpp_pubsub_publish_device_state(struct ast_xmpp_client *client, con * \param data void pointer to ast_client structure * \return void */ -static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg) { struct ast_xmpp_client *client = data; const char *mailbox, *context; @@ -1351,7 +1351,7 @@ static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, stru * \param data void pointer to ast_client structure * \return void */ -static void xmpp_pubsub_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) +static void xmpp_pubsub_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg) { struct ast_xmpp_client *client = data; struct ast_device_state_message *dev_state; @@ -1566,7 +1566,7 @@ static int cached_devstate_cb(void *obj, void *arg, int flags) { struct stasis_message *msg = obj; struct ast_xmpp_client *client = arg; - xmpp_pubsub_devstate_cb(client, client->device_state_sub, NULL, msg); + xmpp_pubsub_devstate_cb(client, client->device_state_sub, msg); return 0; } diff --git a/res/stasis/app.c b/res/stasis/app.c index 2c84f0c3de..bc1268fb7e 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -220,7 +220,7 @@ static void app_dtor(void *obj) } static void sub_default_handler(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct app *app = data; RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); @@ -363,7 +363,6 @@ static channel_snapshot_monitor channel_monitors[] = { static void sub_channel_update_handler(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct app *app = data; @@ -411,7 +410,6 @@ static struct ast_json *simple_bridge_event( static void sub_bridge_update_handler(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); @@ -447,7 +445,7 @@ static void sub_bridge_update_handler(void *data, } static void bridge_merge_handler(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { struct app *app = data; struct ast_bridge_merge_message *merge; @@ -476,7 +474,7 @@ static void bridge_merge_handler(void *data, struct stasis_subscription *sub, } /* Forward the message to the app */ - stasis_forward_message(app->topic, topic, message); + stasis_publish(app->topic, message); } struct app *app_create(const char *name, stasis_app_cb handler, void *data) diff --git a/tests/test_devicestate.c b/tests/test_devicestate.c index ff5d681f4c..5a3d255d14 100644 --- a/tests/test_devicestate.c +++ b/tests/test_devicestate.c @@ -309,7 +309,7 @@ static struct consumer *consumer_create(void) { return consumer; } -static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message) { struct consumer *consumer = data; RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup); @@ -342,7 +342,7 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st } } -static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_message *message) { struct consumer *consumer = data; diff --git a/tests/test_stasis.c b/tests/test_stasis.c index ac6154d88d..1e911e092c 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -183,7 +183,7 @@ static struct consumer *consumer_create(int ignore_subscriptions) { return consumer; } -static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message) { struct consumer *consumer = data; RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup); @@ -711,7 +711,6 @@ AST_TEST_DEFINE(cache) /* Check for new snapshot messages */ ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0])); actual_update = stasis_message_data(consumer->messages_rxed[0]); - ast_test_validate(test, topic == actual_update->topic); ast_test_validate(test, NULL == actual_update->old_snapshot); ast_test_validate(test, test_message1_1 == actual_update->new_snapshot); ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1")); @@ -720,7 +719,6 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1])); actual_update = stasis_message_data(consumer->messages_rxed[1]); - ast_test_validate(test, topic == actual_update->topic); ast_test_validate(test, NULL == actual_update->old_snapshot); ast_test_validate(test, test_message2_1 == actual_update->new_snapshot); ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2")); @@ -736,7 +734,6 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, 3 == actual_len); actual_update = stasis_message_data(consumer->messages_rxed[2]); - ast_test_validate(test, topic == actual_update->topic); ast_test_validate(test, test_message2_1 == actual_update->old_snapshot); ast_test_validate(test, test_message2_2 == actual_update->new_snapshot); ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2")); @@ -752,7 +749,6 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, 4 == actual_len); actual_update = stasis_message_data(consumer->messages_rxed[3]); - ast_test_validate(test, topic == actual_update->topic); ast_test_validate(test, test_message1_1 == actual_update->old_snapshot); ast_test_validate(test, NULL == actual_update->new_snapshot); ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1")); @@ -1226,7 +1222,7 @@ AST_TEST_DEFINE(to_ami) } static void noop(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) + struct stasis_message *message) { /* no-op */ } diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c index 70400a9ec7..be48f92488 100644 --- a/tests/test_taskprocessor.c +++ b/tests/test_taskprocessor.c @@ -48,6 +48,31 @@ struct task_data { int task_complete; }; +static void task_data_dtor(void *obj) +{ + struct task_data *task_data = obj; + + ast_mutex_destroy(&task_data->lock); + ast_cond_destroy(&task_data->cond); +} + +/*! \brief Create a task_data object */ +static struct task_data *task_data_create(void) +{ + struct task_data *task_data = + ao2_alloc(sizeof(*task_data), task_data_dtor); + + if (!task_data) { + return NULL; + } + + ast_cond_init(&task_data->cond, NULL); + ast_mutex_init(&task_data->lock); + task_data->task_complete = 0; + + return task_data; +} + /*! * \brief Queued task for baseline test. * @@ -64,6 +89,30 @@ static int task(void *data) return 0; } +/*! + * \brief Wait for a task to execute. + */ +static int task_wait(struct task_data *task_data) +{ + struct timeval start = ast_tvnow(); + struct timespec end; + SCOPED_MUTEX(lock, &task_data->lock); + + end.tv_sec = start.tv_sec + 30; + end.tv_nsec = start.tv_usec * 1000; + + while (!task_data->task_complete) { + int res; + res = ast_cond_timedwait(&task_data->cond, &task_data->lock, + &end); + if (res == ETIMEDOUT) { + return -1; + } + } + + return 0; +} + /*! * \brief Baseline test for default taskprocessor * @@ -73,12 +122,9 @@ static int task(void *data) */ AST_TEST_DEFINE(default_taskprocessor) { - struct ast_taskprocessor *tps; - struct task_data task_data; - struct timeval start; - struct timespec ts; - enum ast_test_result_state res = AST_TEST_PASS; - int timedwait_res; + RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference); + RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup); + int res; switch (cmd) { case TEST_INIT: @@ -99,36 +145,21 @@ AST_TEST_DEFINE(default_taskprocessor) return AST_TEST_FAIL; } - start = ast_tvnow(); - - ts.tv_sec = start.tv_sec + 30; - ts.tv_nsec = start.tv_usec * 1000; - - ast_cond_init(&task_data.cond, NULL); - ast_mutex_init(&task_data.lock); - task_data.task_complete = 0; - - ast_taskprocessor_push(tps, task, &task_data); - ast_mutex_lock(&task_data.lock); - while (!task_data.task_complete) { - timedwait_res = ast_cond_timedwait(&task_data.cond, &task_data.lock, &ts); - if (timedwait_res == ETIMEDOUT) { - break; - } + task_data = task_data_create(); + if (!task_data) { + ast_test_status_update(test, "Unable to create task_data\n"); + return AST_TEST_FAIL; } - ast_mutex_unlock(&task_data.lock); - if (!task_data.task_complete) { + ast_taskprocessor_push(tps, task, task_data); + + res = task_wait(task_data); + if (res != 0) { ast_test_status_update(test, "Queued task did not execute!\n"); - res = AST_TEST_FAIL; - goto test_end; + return AST_TEST_FAIL; } -test_end: - tps = ast_taskprocessor_unreference(tps); - ast_mutex_destroy(&task_data.lock); - ast_cond_destroy(&task_data.cond); - return res; + return AST_TEST_PASS; } #define NUM_TASKS 20000 @@ -631,12 +662,78 @@ AST_TEST_DEFINE(taskprocessor_shutdown) return AST_TEST_PASS; } +static int local_task_exe(struct ast_taskprocessor_local *local) +{ + int *local_data = local->local_data; + struct task_data *task_data = local->data; + + *local_data = 1; + task(task_data); + + return 0; +} + +AST_TEST_DEFINE(taskprocessor_push_local) +{ + RAII_VAR(struct ast_taskprocessor *, tps, NULL, + ast_taskprocessor_unreference); + struct task_data *task_data; + int local_data; + int res; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = "/main/taskprocessor/"; + info->summary = "Test of pushing local data"; + info->description = + "Ensures that local data is passed along."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + + tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT); + if (!tps) { + ast_test_status_update(test, "Unable to create test taskprocessor\n"); + return AST_TEST_FAIL; + } + + + task_data = task_data_create(); + if (!task_data) { + ast_test_status_update(test, "Unable to create task_data\n"); + return AST_TEST_FAIL; + } + + local_data = 0; + ast_taskprocessor_set_local(tps, &local_data); + + ast_taskprocessor_push_local(tps, local_task_exe, task_data); + + res = task_wait(task_data); + if (res != 0) { + ast_test_status_update(test, "Queued task did not execute!\n"); + return AST_TEST_FAIL; + } + + if (local_data != 1) { + ast_test_status_update(test, + "Queued task did not set local_data!\n"); + return AST_TEST_FAIL; + } + + return AST_TEST_PASS; +} + static int unload_module(void) { ast_test_unregister(default_taskprocessor); ast_test_unregister(default_taskprocessor_load); ast_test_unregister(taskprocessor_listener); ast_test_unregister(taskprocessor_shutdown); + ast_test_unregister(taskprocessor_push_local); return 0; } @@ -646,6 +743,7 @@ static int load_module(void) ast_test_register(default_taskprocessor_load); ast_test_register(taskprocessor_listener); ast_test_register(taskprocessor_shutdown); + ast_test_register(taskprocessor_push_local); return AST_MODULE_LOAD_SUCCESS; }