From 231fc1f4eb4262223fcac5e3bd5dcd467be429bc Mon Sep 17 00:00:00 2001 From: Michael Jerris Date: Wed, 11 Feb 2009 17:16:44 +0000 Subject: [PATCH] Wed Jan 28 12:06:06 CST 2009 Pekka Pessi * sofia-sip/su_wait.h: added su_timer_deferrable(), su_task_wakeup(), su_root_set_max_defer(), su_root_get_max_defer() and su_task_deferrable(). Added implementation to different main-loop implementations in libsofia-sip-ua/su. Fixed su_task_is_running(). In libsofia-sip-ua/su, added tests for deferred timers. In libsofia-sip-ua-glib/su-glib/su_source.c, added su_source_wakeup() and su_source_is_running(). Using su_base_port_send() instead of su_source_send(). Using su_base_port_deferable() and su_base_port_max_defer(), too. git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@11852 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- libs/sofia-sip/.update | 2 +- .../libsofia-sip-ua/su/sofia-sip/su_wait.h | 8 +- .../libsofia-sip-ua/su/su_base_port.c | 78 ++++++++-- .../libsofia-sip-ua/su/su_devpoll_port.c | 6 +- .../libsofia-sip-ua/su/su_epoll_port.c | 6 +- .../libsofia-sip-ua/su/su_kqueue_port.c | 6 +- .../libsofia-sip-ua/su/su_osx_runloop.c | 4 + .../libsofia-sip-ua/su/su_poll_port.c | 6 +- libs/sofia-sip/libsofia-sip-ua/su/su_port.h | 69 ++++++++- .../libsofia-sip-ua/su/su_pthread_port.c | 3 + libs/sofia-sip/libsofia-sip-ua/su/su_root.c | 98 ++++++++++++- .../libsofia-sip-ua/su/su_select_port.c | 6 +- .../libsofia-sip-ua/su/su_socket_port.c | 20 +-- libs/sofia-sip/libsofia-sip-ua/su/su_timer.c | 61 ++++++-- .../libsofia-sip-ua/su/su_win32_port.c | 6 +- .../libsofia-sip-ua/su/torture_su_root.c | 137 +++++++++++++++++- 16 files changed, 452 insertions(+), 64 deletions(-) diff --git a/libs/sofia-sip/.update b/libs/sofia-sip/.update index 91ae56a049..fb51470fa6 100644 --- a/libs/sofia-sip/.update +++ b/libs/sofia-sip/.update @@ -1 +1 @@ -Wed Feb 11 11:15:57 CST 2009 +Wed Feb 11 11:16:35 CST 2009 diff --git a/libs/sofia-sip/libsofia-sip-ua/su/sofia-sip/su_wait.h b/libs/sofia-sip/libsofia-sip-ua/su/sofia-sip/su_wait.h index d8d8ae2be2..189481f8ef 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/sofia-sip/su_wait.h +++ b/libs/sofia-sip/libsofia-sip-ua/su/sofia-sip/su_wait.h @@ -484,6 +484,9 @@ SOFIAPUBFUN int su_root_release(su_root_t *root); SOFIAPUBFUN int su_root_obtain(su_root_t *root); SOFIAPUBFUN int su_root_has_thread(su_root_t *root); +SOFIAPUBFUN int su_root_set_max_defer(su_root_t *, su_duration_t max_defer); +SOFIAPUBFUN su_duration_t su_root_get_max_defer(su_root_t const *self); + /* Timers */ SOFIAPUBFUN su_timer_t *su_timer_create(su_task_r const, su_duration_t msec) __attribute__((__malloc__)); @@ -503,10 +506,10 @@ SOFIAPUBFUN su_root_t *su_timer_root(su_timer_t const *); SOFIAPUBFUN int su_timer_expire(su_timer_queue_t * const, su_duration_t *tout, su_time_t now); +SOFIAPUBFUN int su_timer_deferrable(su_timer_t *t, int value); /* Tasks */ -/** NULL task. */ SOFIAPUBVAR su_task_r const su_task_null; SOFIAPUBFUN _su_task_r su_task_init(su_task_r task); @@ -519,6 +522,9 @@ SOFIAPUBFUN int su_task_is_running(su_task_r const); SOFIAPUBFUN su_root_t *su_task_root(su_task_r const self); SOFIAPUBFUN su_timer_queue_t *su_task_timers(su_task_r const self); +SOFIAPUBFUN su_timer_queue_t *su_task_deferrable(su_task_r const task); + +SOFIAPUBFUN int su_task_wakeup(su_task_r const task); SOFIAPUBFUN int su_task_execute(su_task_r const task, int (*function)(void *), void *arg, diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_base_port.c b/libs/sofia-sip/libsofia-sip-ua/su/su_base_port.c index 4020400be0..0773ea6b48 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_base_port.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_base_port.c @@ -73,6 +73,8 @@ int su_base_port_init(su_port_t *self, su_port_vtable_t const *vtable) if (self) { self->sup_vtable = vtable; self->sup_tail = &self->sup_head; + self->sup_max_defer = 15 * 1000; + return su_port_obtain(self); } @@ -149,7 +151,6 @@ struct _GSource *su_base_port_gsource(su_port_t *self) /** @internal Send a message to the port. * - * @retval 1 if port thread needs to be woken * @retval 0 if there are other messages in queue, too * @retval -1 upon an error */ @@ -167,7 +168,10 @@ int su_base_port_send(su_port_t *self, su_msg_r rmsg) su_port_unlock(self, "su_port_send"); - return wakeup; + if (wakeup > 0) + su_port_wakeup(self); + + return 0; } else { su_msg_destroy(rmsg); @@ -317,12 +321,12 @@ int su_base_port_multishot(su_port_t *self, int multishot) */ void su_base_port_run(su_port_t *self) { - su_duration_t tout = 0; + su_duration_t tout = 0, tout2 = 0; assert(su_port_own_thread(self)); for (self->sup_running = 1; self->sup_running;) { - tout = 2000; + tout = self->sup_max_defer; if (self->sup_prepoll) self->sup_prepoll(self->sup_pp_magic, self->sup_pp_root); @@ -330,8 +334,11 @@ void su_base_port_run(su_port_t *self) if (self->sup_head) self->sup_vtable->su_port_getmsgs(self); - if (self->sup_timers) - su_timer_expire(&self->sup_timers, &tout, su_now()); + if (self->sup_timers || self->sup_deferrable) { + su_time_t now = su_now(); + su_timer_expire(&self->sup_timers, &tout, now); + su_timer_expire(&self->sup_deferrable, &tout2, now); + } if (!self->sup_running) break; @@ -349,13 +356,13 @@ void su_base_port_run_tune(su_port_t *self) { int i; int timers = 0, messages = 0, events = 0; - su_duration_t tout = 0, tout0; + su_duration_t tout = 0, tout2 = 0; su_time_t started = su_now(), woken = started, bedtime = woken; assert(su_port_own_thread(self)); for (self->sup_running = 1; self->sup_running;) { - tout = 2000; + tout = self->sup_max_defer; timers = 0, messages = 0; @@ -365,8 +372,12 @@ void su_base_port_run_tune(su_port_t *self) if (self->sup_head) messages = self->sup_vtable->su_port_getmsgs(self); - if (self->sup_timers) - timers = su_timer_expire(&self->sup_timers, &tout, su_now()); + if (self->sup_timers || self->sup_deferrable) { + su_time_t now = su_now(); + timers = + su_timer_expire(&self->sup_timers, &tout, now) + + su_timer_expire(&self->sup_deferrable, &tout2, now); + } if (!self->sup_running) break; @@ -404,6 +415,16 @@ void su_base_port_break(su_port_t *self) self->sup_running = 0; } +/** @internal + * Check if port is running. + * + * @param self pointer to port + */ +int su_base_port_is_running(su_port_t const *self) +{ + return self->sup_running != 0; +} + /** @internal Block until wait object is signaled or timeout. * * This function waits for wait objects and the timers associated with @@ -435,6 +456,9 @@ su_duration_t su_base_port_step(su_port_t *self, su_duration_t tout) if (self->sup_timers) su_timer_expire(&self->sup_timers, &tout, now); + if (self->sup_deferrable) + su_timer_expire(&self->sup_deferrable, &tout, now); + /* if there are messages do a quick wait */ if (self->sup_head) tout = 0; @@ -444,15 +468,27 @@ su_duration_t su_base_port_step(su_port_t *self, su_duration_t tout) else tout = SU_WAIT_FOREVER; - if (self->sup_head) + if (self->sup_head) { if (self->sup_vtable->su_port_getmsgs(self)) { /* Check for wait events that may have been generated by messages */ if (self->sup_vtable->su_port_wait_events(self, 0)) tout = 0; } + } - if (self->sup_timers) - su_timer_expire(&self->sup_timers, &tout, su_now()); + if (self->sup_timers || self->sup_deferrable) { + su_duration_t tout2 = SU_WAIT_FOREVER; + + now = su_now(); + su_timer_expire(&self->sup_timers, &tout, now); + su_timer_expire(&self->sup_deferrable, &tout2, now); + + if (tout == SU_WAIT_FOREVER && tout2 != SU_WAIT_FOREVER) { + if (tout2 < self->sup_max_defer) + tout2 = self->sup_max_defer; + tout = tout2; + } + } if (self->sup_head) tout = 0; @@ -501,6 +537,22 @@ su_timer_queue_t *su_base_port_timers(su_port_t *self) return &self->sup_timers; } +su_timer_queue_t *su_base_port_deferrable(su_port_t *self) +{ + return &self->sup_deferrable; +} + +int su_base_port_max_defer(su_port_t *self, + su_duration_t *return_duration, + su_duration_t *set_duration) +{ + if (set_duration && *set_duration > 0) + self->sup_max_defer = *set_duration; + if (return_duration) + *return_duration = self->sup_max_defer; + return 0; +} + /* ====================================================================== * Clones */ diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_devpoll_port.c b/libs/sofia-sip/libsofia-sip-ua/su/su_devpoll_port.c index 2d5f01a2d8..a56dfa6395 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_devpoll_port.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_devpoll_port.c @@ -127,7 +127,7 @@ su_port_vtable_t const su_devpoll_port_vtable[1] = su_base_port_incref, su_devpoll_port_decref, su_base_port_gsource, - su_socket_port_send, + su_base_port_send, su_devpoll_port_register, su_devpoll_port_unregister, su_devpoll_port_deregister, @@ -148,6 +148,10 @@ su_port_vtable_t const su_devpoll_port_vtable[1] = su_base_port_start_shared, su_pthread_port_wait, su_pthread_port_execute, + su_base_port_deferrable, + su_base_port_max_defer, + su_socket_port_wakeup, + su_base_port_is_running, }}; static char const *su_devpoll_port_name(su_port_t const *self) diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_epoll_port.c b/libs/sofia-sip/libsofia-sip-ua/su/su_epoll_port.c index 1cc6745e31..702a53b787 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_epoll_port.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_epoll_port.c @@ -122,7 +122,7 @@ su_port_vtable_t const su_epoll_port_vtable[1] = su_base_port_incref, su_epoll_port_decref, su_base_port_gsource, - su_socket_port_send, + su_base_port_send, su_epoll_port_register, su_epoll_port_unregister, su_epoll_port_deregister, @@ -143,6 +143,10 @@ su_port_vtable_t const su_epoll_port_vtable[1] = su_base_port_start_shared, su_pthread_port_wait, su_pthread_port_execute, + su_base_port_deferrable, + su_base_port_max_defer, + su_socket_port_wakeup, + su_base_port_is_running, }}; static char const *su_epoll_port_name(su_port_t const *self) diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_kqueue_port.c b/libs/sofia-sip/libsofia-sip-ua/su/su_kqueue_port.c index 5a4a6002ad..f9381288a4 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_kqueue_port.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_kqueue_port.c @@ -119,7 +119,7 @@ su_port_vtable_t const su_kqueue_port_vtable[1] = su_base_port_incref, su_kqueue_port_decref, su_base_port_gsource, - su_socket_port_send, + su_base_port_send, su_kqueue_port_register, su_kqueue_port_unregister, su_kqueue_port_deregister, @@ -140,6 +140,10 @@ su_port_vtable_t const su_kqueue_port_vtable[1] = su_base_port_start_shared, su_pthread_port_wait, su_pthread_port_execute, + su_base_port_deferrable, + su_base_port_max_defer, + su_socket_port_wakeup, + su_base_port_is_running, }}; static char const *su_kqueue_port_name(su_port_t const *self) diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_osx_runloop.c b/libs/sofia-sip/libsofia-sip-ua/su/su_osx_runloop.c index 45e8f73b68..a13e2e8b52 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_osx_runloop.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_osx_runloop.c @@ -204,6 +204,10 @@ su_port_vtable_t const su_osx_port_vtable[1] = su_base_port_start_shared, su_pthread_port_wait, su_pthread_port_execute, + su_base_port_deferrable, + su_base_port_max_defer, + su_socket_port_wakeup, + su_base_port_is_running, }}; /* XXX - mela static void su_osx_port_destroy(su_port_t *self); */ diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_poll_port.c b/libs/sofia-sip/libsofia-sip-ua/su/su_poll_port.c index 3936884123..4472726737 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_poll_port.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_poll_port.c @@ -120,7 +120,7 @@ su_port_vtable_t const su_poll_port_vtable[1] = su_base_port_incref, su_poll_port_decref, su_base_port_gsource, - su_socket_port_send, + su_base_port_send, su_poll_port_register, su_poll_port_unregister, su_poll_port_deregister, @@ -141,6 +141,10 @@ su_port_vtable_t const su_poll_port_vtable[1] = su_base_port_start_shared, su_pthread_port_wait, su_pthread_port_execute, + su_base_port_deferrable, + su_base_port_max_defer, + su_socket_port_wakeup, + su_base_port_is_running, }}; static char const *su_poll_port_name(su_port_t const *self) diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_port.h b/libs/sofia-sip/libsofia-sip-ua/su/su_port.h index 71d8630e95..092cfbdf65 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_port.h +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_port.h @@ -150,6 +150,14 @@ typedef struct su_port_vtable { int (*su_port_execute)(su_task_r const task, int (*function)(void *), void *arg, int *return_value); + + /* >= 1.12.11 */ + su_timer_queue_t *(*su_port_deferrable)(su_port_t *port); + int (*su_port_max_defer)(su_port_t *port, + su_duration_t *return_duration, + su_duration_t *set_duration); + int (*su_port_wakeup)(su_port_t *port); + int (*su_port_is_running)(su_port_t const *port); } su_port_vtable_t; SOFIAPUBFUN su_port_t *su_port_create(void) @@ -244,6 +252,12 @@ int su_port_send(su_port_t *self, su_msg_r rmsg) return base->sup_vtable->su_port_send(self, rmsg); } +su_inline +int su_port_wakeup(su_port_t *self) +{ + su_virtual_port_t *base = (su_virtual_port_t *)self; + return base->sup_vtable->su_port_wakeup(self); +} su_inline int su_port_register(su_port_t *self, @@ -372,7 +386,7 @@ int su_port_remove_prepoll(su_port_t *self, } su_inline -su_timer_t **su_port_timers(su_port_t *self) +su_timer_queue_t *su_port_timers(su_port_t *self) { su_virtual_port_t *base = (su_virtual_port_t *)self; return base->sup_vtable->su_port_timers(self); @@ -399,6 +413,41 @@ int su_port_getmsgs_from(su_port_t *self, su_port_t *cloneport) return base->sup_vtable->su_port_getmsgs_from(self, cloneport); } +/** Extension from >= 1.12.11 */ + +su_inline +su_timer_queue_t *su_port_deferrable(su_port_t *self) +{ + su_virtual_port_t *base = (su_virtual_port_t *)self; + + if (base == NULL) + return (void *)(errno = EFAULT), NULL; + + return base->sup_vtable->su_port_deferrable(self); +} + +su_inline +int su_port_max_defer(su_port_t *self, + su_duration_t *return_duration, + su_duration_t *set_duration) +{ + su_virtual_port_t *base = (su_virtual_port_t *)self; + + if (base == NULL) + return (errno = EFAULT), -1; + + return base->sup_vtable->su_port_max_defer(self, + return_duration, + set_duration); +} + +su_inline +int su_port_is_running(su_port_t const *self) +{ + su_virtual_port_t *base = (su_virtual_port_t *)self; + return base && base->sup_vtable->su_port_is_running(self); +} + SOFIAPUBFUN void su_port_wait(su_clone_r rclone); SOFIAPUBFUN int su_port_execute(su_task_r const task, @@ -427,7 +476,9 @@ typedef struct su_base_port_s { su_msg_t *sup_head, **sup_tail; /* Timer list */ - su_timer_queue_t sup_timers; + su_timer_queue_t sup_timers, sup_deferrable; + + su_duration_t sup_max_defer; /**< Maximum time to defer */ unsigned sup_running; /**< In su_root_run() loop? */ } su_base_port_t; @@ -468,7 +519,7 @@ SOFIAPUBFUN int su_base_port_add_prepoll(su_port_t *self, SOFIAPUBFUN int su_base_port_remove_prepoll(su_port_t *self, su_root_t *root); -SOFIAPUBFUN su_timer_t **su_base_port_timers(su_port_t *self); +SOFIAPUBFUN su_timer_queue_t *su_base_port_timers(su_port_t *self); SOFIAPUBFUN int su_base_port_multishot(su_port_t *self, int multishot); @@ -479,6 +530,14 @@ SOFIAPUBFUN int su_base_port_start_shared(su_root_t *parent, su_root_deinit_f deinit); SOFIAPUBFUN void su_base_port_wait(su_clone_r rclone); +SOFIAPUBFUN su_timer_queue_t *su_base_port_deferrable(su_port_t *self); + +SOFIAPUBFUN int su_base_port_max_defer(su_port_t *self, + su_duration_t *return_duration, + su_duration_t *set_duration); + +SOFIAPUBFUN int su_base_port_is_running(su_port_t const *self); + /* ---------------------------------------------------------------------- */ #if SU_HAVE_PTHREADS @@ -493,7 +552,7 @@ typedef struct su_pthread_port_s { pthread_t sup_tid; pthread_mutex_t sup_obtained[1]; -#if 0 +#if 0 /* Pausing and resuming are not used */ pthread_mutex_t sup_runlock[1]; pthread_cond_t sup_resume[1]; short sup_paused; /**< True if thread is paused */ @@ -535,7 +594,6 @@ SOFIAPUBFUN int su_pthread_port_execute(su_task_r const task, int (*function)(void *), void *arg, int *return_value); - #if 0 SOFIAPUBFUN int su_pthread_port_pause(su_port_t *self); SOFIAPUBFUN int su_pthread_port_resume(su_port_t *self); @@ -570,6 +628,7 @@ SOFIAPUBFUN int su_socket_port_init(su_socket_port_t *, su_port_vtable_t const *); SOFIAPUBFUN void su_socket_port_deinit(su_socket_port_t *self); SOFIAPUBFUN int su_socket_port_send(su_port_t *self, su_msg_r rmsg); +SOFIAPUBFUN int su_socket_port_wakeup(su_port_t *self); SOFIA_END_DECLS diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_pthread_port.c b/libs/sofia-sip/libsofia-sip-ua/su/su_pthread_port.c index defed9b9a1..a571b4dab6 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_pthread_port.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_pthread_port.c @@ -315,6 +315,9 @@ static void *su_pthread_port_clone_main(void *varg) task->sut_root->sur_magic = arg->magic; task->sut_root->sur_deinit = arg->deinit; + su_root_set_max_defer(task->sut_root, + su_root_get_max_defer(arg->parent)); + if (arg->init(task->sut_root, arg->magic) == 0) { su_pthread_port_return_to_parent(arg, 0), arg = NULL; diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_root.c b/libs/sofia-sip/libsofia-sip-ua/su/su_root.c index af5d638499..128dd8be06 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_root.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_root.c @@ -86,6 +86,8 @@ struct su_root_s; * - su_root_run() [Do not call from cloned task] * - su_root_break() [Do not call from cloned task] * - su_root_step() [Do not call from cloned task] + * - su_root_get_max_defer() + * - su_root_set_max_defer() * - su_root_task() * * New tasks can be created via su_clone_start() function. @@ -127,6 +129,7 @@ int su_timer_reset_all(su_timer_t **t0, su_task_r); * Tasks */ +/** NULL task. */ su_task_r const su_task_null = SU_TASK_R_INIT; #define SU_TASK_ZAP(t, f) \ @@ -259,13 +262,13 @@ int su_task_cmp(su_task_r const a, su_task_r const b) * * @retval true (nonzero) if task is not stopped, * @retval zero if it is null or stopped. + * + * @note A task sharing thread with another task is considered stopped when + * ever the the main task is stopped. */ int su_task_is_running(su_task_r const task) { - return - task && - task->sut_port && - task->sut_root; + return task && task->sut_root && su_port_is_running(task->sut_port); } /** @internal @@ -318,14 +321,43 @@ int su_task_detach(su_task_r self) * * @param task task handle * - * @return A timer list of the task. If there are no timers, it returns - * NULL. + * @return A timer list of the task. */ su_timer_queue_t *su_task_timers(su_task_r const task) { return task->sut_port ? su_port_timers(task->sut_port) : NULL; } +/**Return the queue for deferrable timers associated with given task. + * + * @param task task handle + * + * @return A timer list of the task. + * + * @NEW_1_12_11 + */ +su_timer_queue_t *su_task_deferrable(su_task_r const task) +{ + return task ? su_port_deferrable(task->sut_port) : NULL; +} + +/** Wakeup a task. + * + * Wake up a task. This function is mainly useful when using deferrable + * timers executed upon wakeup. + * + * @param task task handle + * + * @retval 0 if succesful + * @retval -1 upon an error + * + * @NEW_1_12_11 + */ +int su_task_wakeup(su_task_r const task) +{ + return task ? su_port_wakeup(task->sut_port) : -1; +} + /** Execute the @a function by @a task thread. * * @retval 0 if successful @@ -447,6 +479,10 @@ void su_root_destroy(su_root_t *self) unregistered = su_port_unregister_all(port, self); reset = su_timer_reset_all(su_task_timers(self->sur_task), self->sur_task); + if (su_task_deferrable(self->sur_task)) + reset += su_timer_reset_all(su_task_deferrable(self->sur_task), + self->sur_task); + if (unregistered || reset) SU_DEBUG_1(("su_root_destroy: " "%u registered waits, %u timers\n", @@ -601,6 +637,56 @@ int su_root_unregister(su_root_t *self, return su_port_unregister(self->sur_port, self, wait, callback, arg); } +/** Set maximum defer time. + * + * The deferrable timers can be deferred until the task is otherwise + * activated, however, they are deferred no longer than the maximum defer + * time. The maximum defer time determines also the maximum time during + * which task waits for events while running. The maximum defer time is 15 + * seconds by default. + * + * Cloned tasks inherit the maximum defer time. + * + * @param self pointer to root object + * @param max_defer maximum defer time in milliseconds + * + * @retval 0 when successful + * @retval -1 upon an error + * + * @sa su_timer_deferrable() + * + * @NEW_1_12_11 + */ +int su_root_set_max_defer(su_root_t *self, su_duration_t max_defer) +{ + if (!self) + return -1; + + return su_port_max_defer(self->sur_port, &max_defer, &max_defer); +} + +/** Get maximum defer time. + * + * The deferrable timers can be deferred until the task is otherwise + * activated, however, they are deferred no longer than the maximum defer + * time. The maximum defer time is 15 seconds by default. + * + * @param root pointer to root object + * + * @return Maximum defer time + * + * @NEW_1_12_7 + */ +su_duration_t su_root_get_max_defer(su_root_t const *self) +{ + su_duration_t max_defer = SU_WAIT_MAX; + + if (self != NULL) + su_port_max_defer(self->sur_port, &max_defer, NULL); + + return max_defer; +} + /** Remove a su_wait_t registration. * * The function su_root_deregister() deregisters a su_wait_t object. The diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_select_port.c b/libs/sofia-sip/libsofia-sip-ua/su/su_select_port.c index abb5cdc2c1..3079a32b35 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_select_port.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_select_port.c @@ -139,7 +139,7 @@ su_port_vtable_t const su_select_port_vtable[1] = su_base_port_incref, su_select_port_decref, su_base_port_gsource, - su_socket_port_send, + su_base_port_send, su_select_port_register, su_select_port_unregister, su_select_port_deregister, @@ -160,6 +160,10 @@ su_port_vtable_t const su_select_port_vtable[1] = su_base_port_start_shared, su_pthread_port_wait, su_pthread_port_execute, + su_base_port_deferrable, + su_base_port_max_defer, + su_socket_port_wakeup, + su_base_port_is_running, }}; static char const *su_select_port_name(su_port_t const *self) diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_socket_port.c b/libs/sofia-sip/libsofia-sip-ua/su/su_socket_port.c index 3de4a9a61f..5bee4ea5b6 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_socket_port.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_socket_port.c @@ -179,23 +179,17 @@ void su_socket_port_deinit(su_port_t *self) su_pthread_port_deinit(self); } -/** @internal Send a message to the port. */ -int su_socket_port_send(su_port_t *self, su_msg_r rmsg) +/** @internal Wake up the port. */ +int su_socket_port_wakeup(su_port_t *self) { - int wakeup = su_base_port_send(self, rmsg); + assert(self->sup_mbox[SU_MBOX_SEND] != INVALID_SOCKET); - if (wakeup < 0) - return -1; - - if (wakeup) { - assert(self->sup_mbox[SU_MBOX_SEND] != INVALID_SOCKET); - - if (send(self->sup_mbox[SU_MBOX_SEND], "X", 1, 0) == -1) { + if (!su_pthread_port_own_thread(self) && + send(self->sup_mbox[SU_MBOX_SEND], "X", 1, 0) == -1) { #if HAVE_SOCKETPAIR - if (su_errno() != EWOULDBLOCK) + if (su_errno() != EWOULDBLOCK) #endif - su_perror("su_msg_send: send()"); - } + su_perror("su_msg_send: send()"); } return 0; diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_timer.c b/libs/sofia-sip/libsofia-sip-ua/su/su_timer.c index 8923c69446..1d45b66065 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_timer.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_timer.c @@ -164,7 +164,9 @@ struct su_timer_s { su_timer_arg_t *sut_arg; /**< Pointer to argument data */ su_time_t sut_run; /**< When this timer was last waken up */ unsigned sut_woken; /**< Timer has waken up this many times */ - unsigned short sut_running; /**< Timer is running */ + + unsigned sut_running:2;/**< Timer is running */ + unsigned sut_deferrable:1;/**< Timer can be deferrable */ }; /** Timer running status */ @@ -246,9 +248,9 @@ su_timer_set0(su_timer_queue_t *timers, * @retval NULL upon an error */ static -su_timer_queue_t *su_timer_tree(su_timer_t const *t, - int use_sut_duration, - char const *caller) +su_timer_queue_t *su_timer_queue(su_timer_t const *t, + int use_sut_duration, + char const *caller) { su_timer_queue_t *timers; @@ -265,7 +267,10 @@ su_timer_queue_t *su_timer_tree(su_timer_t const *t, return NULL; } - timers = su_task_timers(t->sut_task); + if (t->sut_deferrable) + timers = su_task_deferrable(t->sut_task); + else + timers = su_task_timers(t->sut_task); if (timers == NULL) { SU_DEBUG_1(("%s(%p): %s\n", caller, (void *)t, "invalid timer")); @@ -285,7 +290,7 @@ su_timer_queue_t *su_timer_tree(su_timer_t const *t, * Allocate and initialize an instance of su_timer_t. * * @param task a task for root object with which the timer will be associated - * @param msec the default duration of the timer + * @param msec the default duration of the timer in milliseconds * * @return A pointer to allocated timer instance, NULL on error. */ @@ -307,6 +312,7 @@ su_timer_t *su_timer_create(su_task_r const task, su_duration_t msec) return retval; } + /** Destroy a timer. * * Deinitialize and free an instance of su_timer_t. @@ -322,6 +328,7 @@ void su_timer_destroy(su_timer_t *t) } } + /** Set the timer for the given @a interval. * * Sets (starts) the given timer to expire after the specified duration. @@ -338,7 +345,7 @@ int su_timer_set_interval(su_timer_t *t, su_timer_arg_t *arg, su_duration_t interval) { - su_timer_queue_t *timers = su_timer_tree(t, 0, "su_timer_set_interval"); + su_timer_queue_t *timers = su_timer_queue(t, 0, "su_timer_set_interval"); return su_timer_set0(timers, t, wakeup, arg, su_now(), interval); } @@ -359,7 +366,7 @@ int su_timer_set(su_timer_t *t, su_timer_f wakeup, su_timer_arg_t *arg) { - su_timer_queue_t *timers = su_timer_tree(t, 1, "su_timer_set"); + su_timer_queue_t *timers = su_timer_queue(t, 1, "su_timer_set"); return su_timer_set0(timers, t, wakeup, arg, su_now(), t->sut_duration); } @@ -380,7 +387,7 @@ int su_timer_set_at(su_timer_t *t, su_wakeup_arg_t *arg, su_time_t when) { - su_timer_queue_t *timers = su_timer_tree(t, 0, "su_timer_set_at"); + su_timer_queue_t *timers = su_timer_queue(t, 0, "su_timer_set_at"); return su_timer_set0(timers, t, wakeup, arg, when, 0); } @@ -407,7 +414,7 @@ int su_timer_run(su_timer_t *t, su_timer_f wakeup, su_timer_arg_t *arg) { - su_timer_queue_t *timers = su_timer_tree(t, 1, "su_timer_run"); + su_timer_queue_t *timers = su_timer_queue(t, 1, "su_timer_run"); su_time_t now; if (timers == NULL) @@ -440,7 +447,7 @@ int su_timer_set_for_ever(su_timer_t *t, su_timer_f wakeup, su_timer_arg_t *arg) { - su_timer_queue_t *timers = su_timer_tree(t, 1, "su_timer_set_for_ever"); + su_timer_queue_t *timers = su_timer_queue(t, 1, "su_timer_set_for_ever"); su_time_t now; if (timers == NULL) @@ -463,7 +470,7 @@ int su_timer_set_for_ever(su_timer_t *t, */ int su_timer_reset(su_timer_t *t) { - su_timer_queue_t *timers = su_timer_tree(t, 0, "su_timer_reset"); + su_timer_queue_t *timers = su_timer_queue(t, 0, "su_timer_reset"); if (timers == NULL) return -1; @@ -508,7 +515,7 @@ int su_timer_expire(su_timer_queue_t * const timers, if (SU_TIME_CMP(t->sut_when, now) > 0) { su_duration_t at = su_duration(t->sut_when, now); - if (at < *timeout) + if (at < *timeout || *timeout < 0) *timeout = at; break; @@ -622,3 +629,31 @@ su_root_t *su_timer_root(su_timer_t const *t) return t ? su_task_root(t->sut_task) : NULL; } + +/** Change timer as deferrable (or as undeferrable). + * + * A deferrable timer is executed after the given timeout, however, the task + * tries to avoid being woken up only because the timeout. Deferable timers + * have their own queue and timers there are ignored when calculating the + * timeout for epoll()/select()/whatever unless the timeout would exceed the + * maximum defer time. The maximum defer time is 15 seconds by default, but + * it can be modified by su_root_set_max_defer(). + * + * @param t pointer to the timer + * @param value make timer deferrable if true, undeferrable if false + * + * @return 0 if succesful, -1 upon an error + * + * @sa su_root_set_max_defer() + * + * @NEW_1_12_7 + */ +int su_timer_deferrable(su_timer_t *t, int value) +{ + if (t == NULL || su_task_deferrable(t->sut_task) == NULL) + return errno = EINVAL, -1; + + t->sut_deferrable = value != 0; + + return 0; +} diff --git a/libs/sofia-sip/libsofia-sip-ua/su/su_win32_port.c b/libs/sofia-sip/libsofia-sip-ua/su/su_win32_port.c index cc4a3852df..cf55be190d 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/su_win32_port.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/su_win32_port.c @@ -118,7 +118,7 @@ su_port_vtable_t const su_wsevent_port_vtable[1] = su_base_port_incref, su_wsevent_port_decref, su_base_port_gsource, - su_socket_port_send, + su_base_port_send, su_wsevent_port_register, su_wsevent_port_unregister, su_wsevent_port_deregister, @@ -139,6 +139,10 @@ su_port_vtable_t const su_wsevent_port_vtable[1] = su_base_port_start_shared, su_pthread_port_wait, su_pthread_port_execute, + su_base_port_deferrable, + su_base_port_max_defer, + su_socket_port_wakeup, + su_base_port_is_running, }}; static char const *su_wsevent_port_name(su_port_t const *self) diff --git a/libs/sofia-sip/libsofia-sip-ua/su/torture_su_root.c b/libs/sofia-sip/libsofia-sip-ua/su/torture_su_root.c index e23a8ea351..1d270f8502 100644 --- a/libs/sofia-sip/libsofia-sip-ua/su/torture_su_root.c +++ b/libs/sofia-sip/libsofia-sip-ua/su/torture_su_root.c @@ -61,6 +61,8 @@ typedef struct test_ep_s test_ep_t; #include #endif +#define ALARM_IN_SECONDS 120 + struct test_ep_s { test_ep_t *next, **prev, **list; int i; @@ -99,6 +101,8 @@ struct root_test_s { unsigned rt_executed:1; + unsigned rt_t1:1, rt_t2:1; + unsigned :0; test_ep_at rt_ep[5]; @@ -161,7 +165,6 @@ int test_api(root_test_t *rt) END(); } - #if SU_HAVE_PTHREADS #include @@ -589,6 +592,61 @@ void send_a_reporter_msg(root_test_t *rt, rt->rt_sent_reporter = 1; } +static void expire1(root_test_t *rt, su_timer_t *t, su_timer_arg_t *arg) +{ + (void)arg; + rt->rt_t1 = 1; +} + +static void expire2(root_test_t *rt, su_timer_t *t, su_timer_arg_t *arg) +{ + (void)arg; + rt->rt_t2 = 1; +} + +int timer_test(root_test_t rt[1]) +{ + BEGIN(); + + su_timer_t *t1, *t2; + su_duration_t defer; + + TEST_1(t1 = su_timer_create(su_root_task(rt->rt_root), 100)); + TEST_1(t2 = su_timer_create(su_root_task(rt->rt_root), 110)); + + rt->rt_t1 = rt->rt_t2 = 0; + + TEST_1(su_root_step(rt->rt_root, 0) == SU_WAIT_FOREVER); + + TEST_1(su_root_set_max_defer(rt->rt_root, 30000) != -1); + TEST(su_root_get_max_defer(rt->rt_root), 30000); + + if (su_timer_deferrable(t1, 1) == 0) { + /* + * If only a deferrable timer is set, su_root_step() should return + * about the maximum defer time, which now defaults to 15 seconds + */ + TEST(su_timer_set(t1, expire1, NULL), 0); + defer = su_root_step(rt->rt_root, 0); + TEST_1(defer > 100); + } + else { + TEST(su_timer_set(t1, expire1, NULL), 0); + } + + TEST(su_timer_set(t2, expire2, NULL), 0); + + while (su_root_step(rt->rt_root, 100) != SU_WAIT_FOREVER) + ; + + TEST_1(rt->rt_t1 && rt->rt_t2); + + su_timer_destroy(t1); + su_timer_destroy(t2); + + END(); +} + static int set_execute_bit_and_return_3(void *void_rt) { root_test_t *rt = void_rt; @@ -612,13 +670,34 @@ static void receive_simple_msg(root_test_t *rt, su_task_cmp(su_msg_to(msg), su_task_null) == 0; } -static int clone_test(root_test_t rt[1]) +static void expire1destroy(root_test_t *rt, su_timer_t *t, su_timer_arg_t *arg) +{ + (void)arg; + rt->rt_t1 = 1; + su_timer_destroy(t); +} + +static int set_deferrable_timer(void *void_rt) +{ + root_test_t *rt = void_rt; + su_timer_t *t1; + + TEST_1(t1 = su_timer_create(su_clone_task(rt->rt_clone), 100)); + TEST_1(su_timer_deferrable(t1, 1) == 0); + TEST(su_timer_set(t1, expire1destroy, NULL), 0); + + return 0; +} + +static int clone_test(root_test_t rt[1], int multithread) { BEGIN(); su_msg_r m = SU_MSG_R_INIT; int retval; + su_root_threading(rt->rt_root, multithread); + rt->rt_fail_init = 0; rt->rt_fail_deinit = 0; rt->rt_success_init = 0; @@ -635,6 +714,10 @@ static int clone_test(root_test_t rt[1]) TEST_1(rt->rt_fail_init); TEST_1(rt->rt_fail_deinit); + /* Defer longer than maximum allowed run time */ + TEST_1(su_root_set_max_defer(rt->rt_root, ALARM_IN_SECONDS * 1000) != -1); + TEST(su_root_get_max_defer(rt->rt_root), ALARM_IN_SECONDS * 1000); + TEST(su_clone_start(rt->rt_root, rt->rt_clone, rt, @@ -643,6 +726,7 @@ static int clone_test(root_test_t rt[1]) TEST_1(rt->rt_success_init); TEST_1(!rt->rt_success_deinit); + /* Test su_task_execute() */ retval = -1; rt->rt_executed = 0; TEST(su_task_execute(su_clone_task(rt->rt_clone), @@ -666,6 +750,28 @@ static int clone_test(root_test_t rt[1]) TEST(rt->rt_msg_received, 1); + if (multithread) { + TEST_1(su_task_is_running(su_clone_task(rt->rt_clone))); + } + else { + TEST_1(!su_task_is_running(su_clone_task(rt->rt_clone))); + } + + /* Test su_wakeup() */ + if (multithread) { + retval = -1; + rt->rt_t1 = 0; + TEST(su_task_execute(su_clone_task(rt->rt_clone), + set_deferrable_timer, rt, + &retval), 0); + TEST(retval, 0); + + while (rt->rt_t1 == 0) { + TEST(su_root_step(rt->rt_root, 100), SU_WAIT_FOREVER); + su_task_wakeup(su_clone_task(rt->rt_clone)); + } + } + /* Make sure 3-way handshake is done as expected */ TEST(su_msg_create(m, su_clone_task(rt->rt_clone), @@ -707,10 +813,21 @@ void usage(int exitcode) exit(exitcode); } +#if HAVE_ALARM +#include + +static RETSIGTYPE sig_alarm(int s) +{ + fprintf(stderr, "%s: FAIL! test timeout!\n", name); + exit(1); +} +#endif + int main(int argc, char *argv[]) { root_test_t *rt, rt0[1] = {{{ SU_HOME_INIT(rt0) }}}, rt1[1]; int retval = 0; + int no_alarm = 0; int i; struct { @@ -746,6 +863,8 @@ int main(int argc, char *argv[]) rt->rt_flags |= tst_verbatim; else if (strcmp(argv[i], "-a") == 0) rt->rt_flags |= tst_abort; + else if (strcmp(argv[i], "--no-alarm") == 0) + no_alarm = 1; #if SU_HAVE_IN6 else if (strcmp(argv[i], "-6") == 0) rt->rt_family = AF_INET6; @@ -754,6 +873,13 @@ int main(int argc, char *argv[]) usage(1); } +#if HAVE_ALARM + if (!no_alarm) { + signal(SIGALRM, sig_alarm); + alarm(ALARM_IN_SECONDS); + } +#endif + #if HAVE_OPEN_C rt->rt_flags |= tst_verbatim; #endif @@ -770,10 +896,9 @@ int main(int argc, char *argv[]) retval |= init_test(rt, prefer[i].name, prefer[i].create, prefer[i].start); retval |= register_test(rt); retval |= event_test(rt); - su_root_threading(rt->rt_root, 1); - retval |= clone_test(rt); - su_root_threading(rt->rt_root, 0); - retval |= clone_test(rt); + retval |= timer_test(rt); + retval |= clone_test(rt, 1); + retval |= clone_test(rt, 0); retval |= deinit_test(rt); } while (prefer[++i].create);