Merge pull request #290 in FS/freeswitch from ~SAFAROV/freeswitch2:mod_erlang_socket to master

* commit '97c25275a629f88700d28513a98f234d821ac9d7':
  FS-7628: mod_erlang_event - added ipv6 support
This commit is contained in:
Mike Jerris 2015-07-03 18:00:08 -05:00
commit a817701824
6 changed files with 132 additions and 173 deletions

View File

@ -1103,6 +1103,7 @@ SWITCH_DECLARE(switch_status_t) switch_socket_connect(switch_socket_t *sock, swi
SWITCH_DECLARE(uint16_t) switch_sockaddr_get_port(switch_sockaddr_t *sa); SWITCH_DECLARE(uint16_t) switch_sockaddr_get_port(switch_sockaddr_t *sa);
SWITCH_DECLARE(const char *) switch_get_addr(char *buf, switch_size_t len, switch_sockaddr_t *in); SWITCH_DECLARE(const char *) switch_get_addr(char *buf, switch_size_t len, switch_sockaddr_t *in);
SWITCH_DECLARE(switch_status_t) switch_getnameinfo(char **hostname, switch_sockaddr_t *sa, int32_t flags);
SWITCH_DECLARE(int32_t) switch_sockaddr_get_family(switch_sockaddr_t *sa); SWITCH_DECLARE(int32_t) switch_sockaddr_get_family(switch_sockaddr_t *sa);
SWITCH_DECLARE(switch_status_t) switch_sockaddr_ip_get(char **addr, switch_sockaddr_t *sa); SWITCH_DECLARE(switch_status_t) switch_sockaddr_ip_get(char **addr, switch_sockaddr_t *sa);
SWITCH_DECLARE(int) switch_sockaddr_equal(const switch_sockaddr_t *sa1, const switch_sockaddr_t *sa2); SWITCH_DECLARE(int) switch_sockaddr_equal(const switch_sockaddr_t *sa1, const switch_sockaddr_t *sa2);

View File

@ -64,6 +64,8 @@ void ei_link(listener_t *listener, erlang_pid * from, erlang_pid * to)
char msgbuf[2048]; char msgbuf[2048];
char *s; char *s;
int index = 0; int index = 0;
switch_socket_t *sock = NULL;
switch_os_sock_put(&sock, &listener->sockdes, listener->pool);
index = 5; /* max sizes: */ index = 5; /* max sizes: */
ei_encode_version(msgbuf, &index); /* 1 */ ei_encode_version(msgbuf, &index); /* 1 */
@ -79,13 +81,9 @@ void ei_link(listener_t *listener, erlang_pid * from, erlang_pid * to)
/* sum: 542 */ /* sum: 542 */
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
#ifdef WIN32 if (switch_socket_send(sock, msgbuf, (switch_size_t *) &index)) {
send(listener->sockfd, msgbuf, index, 0);
#else
if (write(listener->sockfd, msgbuf, index) == -1) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to link to process on %s\n", listener->peer_nodename); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to link to process on %s\n", listener->peer_nodename);
} }
#endif
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
} }
@ -329,30 +327,17 @@ int ei_decode_string_or_binary(char *buf, int *index, int maxlen, char *dst)
switch_status_t initialise_ei(struct ei_cnode_s *ec) switch_status_t initialise_ei(struct ei_cnode_s *ec)
{ {
int rv; char *thishostname = NULL;
struct sockaddr_in server_addr;
struct hostent *nodehost;
char thishostname[EI_MAXHOSTNAMELEN + 1] = "";
char thisnodename[MAXNODELEN + 1]; char thisnodename[MAXNODELEN + 1];
char thisalivename[MAXNODELEN + 1]; char thisalivename[MAXNODELEN + 1];
char *atsign; char *atsign;
/* zero out the struct before we use it */ if (zstr(listen_list.hostname) || !strncasecmp(prefs.ip, "0.0.0.0", 7) || !strncasecmp(prefs.ip, "::", 2)) {
memset(&server_addr, 0, sizeof(server_addr)); listen_list.hostname=(char *) switch_core_get_hostname();
}
/* convert the configured IP to network byte order, handing errors */ if (strlen(listen_list.hostname) > EI_MAXHOSTNAMELEN) {
rv = switch_inet_pton(AF_INET, prefs.ip, &server_addr.sin_addr.s_addr); *(listen_list.hostname+EI_MAXHOSTNAMELEN) = '\0';
if (rv == 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not parse invalid ip address: %s\n", prefs.ip);
return SWITCH_STATUS_FALSE;
} else if (rv == -1) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error when parsing ip address %s : %s\n", prefs.ip, strerror(errno));
return SWITCH_STATUS_FALSE;
} }
/* set the address family and port */
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(prefs.port);
/* copy the prefs.nodename into something we can modify */ /* copy the prefs.nodename into something we can modify */
strncpy(thisalivename, prefs.nodename, MAXNODELEN); strncpy(thisalivename, prefs.nodename, MAXNODELEN);
@ -363,39 +348,19 @@ switch_status_t initialise_ei(struct ei_cnode_s *ec)
/* truncate the alivename at the @ */ /* truncate the alivename at the @ */
*atsign = '\0'; *atsign = '\0';
} else { } else {
#ifdef WIN32
if ((nodehost = gethostbyaddr((const char *) &server_addr.sin_addr.s_addr, sizeof(server_addr.sin_addr.s_addr), AF_INET)))
#else
if ((nodehost = gethostbyaddr((const char *) &server_addr.sin_addr.s_addr, sizeof(server_addr.sin_addr.s_addr), AF_INET)))
#endif
memcpy(thishostname, nodehost->h_name, EI_MAXHOSTNAMELEN);
if (zstr_buf(thishostname) || !strncasecmp(prefs.ip, "0.0.0.0", 7)) {
gethostname(thishostname, EI_MAXHOSTNAMELEN);
}
if (prefs.shortname) { if (prefs.shortname) {
char *off; char *off;
if ((off = strchr(thishostname, '.'))) { if ((off = strchr(listen_list.hostname, '.'))) {
*off = '\0'; *off = '\0';
} }
} else {
if (!(_res.options & RES_INIT)) {
// init the resolver
res_init();
}
if (_res.dnsrch[0] && !zstr_buf(_res.dnsrch[0])) {
strncat(thishostname, ".", 1);
strncat(thishostname, _res.dnsrch[0], EI_MAXHOSTNAMELEN - strlen(thishostname));
}
} }
snprintf(thisnodename, MAXNODELEN + 1, "%s@%s", prefs.nodename, thishostname); snprintf(thisnodename, MAXNODELEN + 1, "%s@%s", prefs.nodename, listen_list.hostname);
} }
/* init the ei stuff */ /* init the ei stuff */
if (ei_connect_xinit(ec, thishostname, thisalivename, thisnodename, (Erl_IpAddr) (&server_addr.sin_addr.s_addr), prefs.cookie, 0) < 0) { if (ei_connect_xinit(ec, listen_list.hostname, thisalivename, thisnodename, (Erl_IpAddr) listen_list.addr, prefs.cookie, 0) < 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }

View File

@ -111,7 +111,7 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
_ei_x_encode_string(&ebuf, reply); _ei_x_encode_string(&ebuf, reply);
switch_mutex_lock(acs->listener->sock_mutex); switch_mutex_lock(acs->listener->sock_mutex);
ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index); ei_send(acs->listener->sockdes, &acs->pid, ebuf.buff, ebuf.index);
switch_mutex_unlock(acs->listener->sock_mutex); switch_mutex_unlock(acs->listener->sock_mutex);
#ifdef EI_DEBUG #ifdef EI_DEBUG
ei_x_print_msg(&ebuf, &acs->pid, 1); ei_x_print_msg(&ebuf, &acs->pid, 1);
@ -139,7 +139,7 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
switch_mutex_lock(acs->listener->sock_mutex); switch_mutex_lock(acs->listener->sock_mutex);
ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index); ei_send(acs->listener->sockdes, &acs->pid, rbuf.buff, rbuf.index);
switch_mutex_unlock(acs->listener->sock_mutex); switch_mutex_unlock(acs->listener->sock_mutex);
#ifdef EI_DEBUG #ifdef EI_DEBUG
ei_x_print_msg(&rbuf, &acs->pid, 1); ei_x_print_msg(&rbuf, &acs->pid, 1);
@ -1332,7 +1332,7 @@ static switch_status_t handle_net_kernel_msg(listener_t *listener, erlang_msg *
ei_x_encode_atom(rbuf, "yes"); ei_x_encode_atom(rbuf, "yes");
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &pid, rbuf->buff, rbuf->index); ei_send(listener->sockdes, &pid, rbuf->buff, rbuf->index);
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
#ifdef EI_DEBUG #ifdef EI_DEBUG
ei_x_print_msg(rbuf, &pid, 1); ei_x_print_msg(rbuf, &pid, 1);
@ -1398,7 +1398,7 @@ int handle_msg(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buf
return 0; return 0;
} else if (rbuf->index > 1) { } else if (rbuf->index > 1) {
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index); ei_send(listener->sockdes, &msg->from, rbuf->buff, rbuf->index);
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
#ifdef EI_DEBUG #ifdef EI_DEBUG
ei_x_print_msg(rbuf, &msg->from, 1); ei_x_print_msg(rbuf, &msg->from, 1);

View File

@ -308,25 +308,17 @@ static void event_handler(switch_event_t *event)
} }
#ifdef WIN32 static void close_socket(switch_socket_t ** sock)
static void close_socket(SOCKET * sock)
#else
static void close_socket(int *sock)
#endif
{ {
switch_mutex_lock(listen_list.sock_mutex);
if (*sock) { if (*sock) {
#ifdef WIN32 switch_socket_shutdown(*sock, SWITCH_SHUTDOWN_READWRITE);
shutdown(*sock, SD_BOTH); switch_socket_close(*sock);
closesocket(*sock); *sock = NULL;
#else
shutdown(*sock, SHUT_RDWR);
close(*sock);
#endif
sock = NULL;
} }
switch_mutex_unlock(listen_list.sock_mutex);
} }
static void add_listener(listener_t *listener) static void add_listener(listener_t *listener)
{ {
/* add me to the listeners so I get events */ /* add me to the listeners so I get events */
@ -564,8 +556,8 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
on our condition before the action starts. */ on our condition before the action starts. */
switch_mutex_lock(ptr->listener->sock_mutex); switch_mutex_lock(ptr->listener->sock_mutex);
if (ptr->listener->sockfd) { if (ptr->listener->sockdes) {
ei_sendto(ptr->listener->ec, ptr->listener->sockfd, &ptr->process, &buf); ei_sendto(ptr->listener->ec, ptr->listener->sockdes, &ptr->process, &buf);
} }
switch_mutex_unlock(ptr->listener->sock_mutex); switch_mutex_unlock(ptr->listener->sock_mutex);
} }
@ -674,7 +666,7 @@ static switch_status_t notify_new_session(listener_t *listener, session_elem_t *
session_element->uuid_str); session_element->uuid_str);
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
result = ei_sendto(listener->ec, listener->sockfd, &session_element->process, &lbuf); result = ei_sendto(listener->ec, listener->sockdes, &session_element->process, &lbuf);
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
if (result) { if (result) {
@ -746,7 +738,7 @@ static switch_status_t check_attached_sessions(listener_t *listener, int *msgs_s
ei_encode_switch_event(&ebuf, pevent); ei_encode_switch_event(&ebuf, pevent);
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); ei_sendto(listener->ec, listener->sockdes, &sp->process, &ebuf);
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
(*msgs_sent)++; (*msgs_sent)++;
ei_x_free(&ebuf); ei_x_free(&ebuf);
@ -762,7 +754,7 @@ static switch_status_t check_attached_sessions(listener_t *listener, int *msgs_s
_ei_x_encode_string(&ebuf, sp->uuid_str); _ei_x_encode_string(&ebuf, sp->uuid_str);
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); ei_sendto(listener->ec, listener->sockdes, &sp->process, &ebuf);
(*msgs_sent)++; (*msgs_sent)++;
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
ei_x_free(&ebuf); ei_x_free(&ebuf);
@ -787,7 +779,7 @@ static switch_status_t check_attached_sessions(listener_t *listener, int *msgs_s
ei_encode_switch_event(&ebuf, pevent); ei_encode_switch_event(&ebuf, pevent);
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); ei_sendto(listener->ec, listener->sockdes, &sp->process, &ebuf);
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
(*msgs_sent)++; (*msgs_sent)++;
ei_x_free(&ebuf); ei_x_free(&ebuf);
@ -864,7 +856,7 @@ static int check_log_queue(listener_t *listener)
ei_x_encode_empty_list(&lbuf); ei_x_encode_empty_list(&lbuf);
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
ei_sendto(listener->ec, listener->sockfd, &listener->log_process, &lbuf); ei_sendto(listener->ec, listener->sockdes, &listener->log_process, &lbuf);
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
msgs_sent ++; msgs_sent ++;
@ -895,7 +887,7 @@ static int check_event_queue(listener_t *listener)
ei_encode_switch_event(&ebuf, pevent); ei_encode_switch_event(&ebuf, pevent);
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
ei_sendto(listener->ec, listener->sockfd, &listener->event_process, &ebuf); ei_sendto(listener->ec, listener->sockdes, &listener->event_process, &ebuf);
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
msgs_sent++; msgs_sent++;
@ -985,7 +977,7 @@ static void listener_main_loop(listener_t *listener)
/* do we need the mutex when reading? */ /* do we need the mutex when reading? */
/*switch_mutex_lock(listener->sock_mutex); */ /*switch_mutex_lock(listener->sock_mutex); */
status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 1); status = ei_xreceive_msg_tmo(listener->sockdes, &msg, &buf, 1);
/*switch_mutex_unlock(listener->sock_mutex); */ /*switch_mutex_unlock(listener->sock_mutex); */
switch (status) { switch (status) {
@ -1089,7 +1081,7 @@ static switch_bool_t check_inbound_acl(listener_t *listener)
ei_x_buff buf; ei_x_buff buf;
ei_x_new(&buf); ei_x_new(&buf);
status = ei_xreceive_msg(listener->sockfd, &msg, &buf); status = ei_xreceive_msg(listener->sockdes, &msg, &buf);
/* get data off the socket, just so we can get the pid on the other end */ /* get data off the socket, just so we can get the pid on the other end */
if (status == ERL_MSG) { if (status == ERL_MSG) {
/* if we got a message, return an ACL error. */ /* if we got a message, return an ACL error. */
@ -1101,7 +1093,7 @@ static switch_bool_t check_inbound_acl(listener_t *listener)
ei_x_encode_atom(&rbuf, "acldeny"); ei_x_encode_atom(&rbuf, "acldeny");
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &msg.from, rbuf.buff, rbuf.index); ei_send(listener->sockdes, &msg.from, rbuf.buff, rbuf.index);
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
#ifdef EI_DEBUG #ifdef EI_DEBUG
ei_x_print_msg(&rbuf, &msg.from, 1); ei_x_print_msg(&rbuf, &msg.from, 1);
@ -1274,6 +1266,8 @@ static int config(void)
prefs.max_event_bulk = atoi(val); prefs.max_event_bulk = atoi(val);
} else if (!strcasecmp(var, "max-log-bulk") && !zstr(val)) { } else if (!strcasecmp(var, "max-log-bulk") && !zstr(val)) {
prefs.max_log_bulk = atoi(val); prefs.max_log_bulk = atoi(val);
} else if (!strcasecmp(var, "stop-on-bind-error")) {
prefs.stop_on_bind_error = switch_true(val) ? 1 : 0;
} }
} }
} }
@ -1333,7 +1327,7 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool); switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool);
switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool); switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool);
listener->sockfd = clientfd; listener->sockdes = clientfd;
listener->pool = pool; listener->pool = pool;
listener->ec = switch_core_alloc(listener->pool, sizeof(ei_cnode)); listener->ec = switch_core_alloc(listener->pool, sizeof(ei_cnode));
memcpy(listener->ec, ec, sizeof(ei_cnode)); memcpy(listener->ec, ec, sizeof(ei_cnode));
@ -1385,14 +1379,15 @@ void destroy_listener(listener_t * listener)
const void *key; const void *key;
void *value; void *value;
switch_hash_index_t *iter; switch_hash_index_t *iter;
switch_socket_t *sock=NULL;
switch_os_sock_put(&sock, &listener->sockdes, listener->pool);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
switch_thread_rwlock_wrlock(listener->rwlock); switch_thread_rwlock_wrlock(listener->rwlock);
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
if (listener->sockfd) { close_socket(&sock);
close_socket(&listener->sockfd); listener->sockdes = -1;
}
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
switch_core_hash_destroy(&listener->event_hash); switch_core_hash_destroy(&listener->event_hash);
@ -1514,7 +1509,8 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
char hash[100]; char hash[100];
spawn_reply_t *p; spawn_reply_t *p;
erlang_ref ref; erlang_ref ref;
switch_os_socket_t sockdes;
switch_os_sock_get(&sockdes, listen_list.sock);
ei_init_ref(listener->ec, &ref); ei_init_ref(listener->ec, &ref);
ei_hash_ref(&ref, hash); ei_hash_ref(&ref, hash);
@ -1550,7 +1546,7 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
ei_x_encode_ref(&rbuf, &ref); ei_x_encode_ref(&rbuf, &ref);
ei_x_encode_pid(&rbuf, ei_self(listener->ec)); ei_x_encode_pid(&rbuf, ei_self(listener->ec));
/* should lock with mutex? */ /* should lock with mutex? */
ei_reg_send(listener->ec, listener->sockfd, module, rbuf.buff, rbuf.index); ei_reg_send(listener->ec, sockdes, module, rbuf.buff, rbuf.index);
#ifdef EI_DEBUG #ifdef EI_DEBUG
ei_x_print_reg_msg(&rbuf, module, 1); ei_x_print_reg_msg(&rbuf, module, 1);
#endif #endif
@ -1559,7 +1555,7 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "rpc call: %s:%s(Ref)\n", module, function); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "rpc call: %s:%s(Ref)\n", module, function);
/* should lock with mutex? */ /* should lock with mutex? */
switch_mutex_lock(listener->sock_mutex); switch_mutex_lock(listener->sock_mutex);
ei_pid_from_rpc(listener->ec, listener->sockfd, &ref, module, function); ei_pid_from_rpc(listener->ec, sockdes, &ref, module, function);
switch_mutex_unlock(listener->sock_mutex); switch_mutex_unlock(listener->sock_mutex);
/* /*
char *argv[1]; char *argv[1];
@ -1750,7 +1746,7 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function)
} }
if (listener) { if (listener) {
ei_reg_send(listener->ec, listener->sockfd, reg_name, buf.buff, buf.index); ei_reg_send(listener->ec, listener->sockdes, reg_name, buf.buff, buf.index);
switch_thread_rwlock_unlock(listener->rwlock); switch_thread_rwlock_unlock(listener->rwlock);
} }
@ -1929,7 +1925,6 @@ SWITCH_STANDARD_API(erlang_cmd)
return status; return status;
} }
SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
{ {
switch_application_interface_t *app_interface; switch_application_interface_t *app_interface;
@ -1943,9 +1938,11 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
switch_thread_rwlock_create(&globals.bindings_rwlock, pool); switch_thread_rwlock_create(&globals.bindings_rwlock, pool);
switch_mutex_init(&globals.fetch_reply_mutex, SWITCH_MUTEX_DEFAULT, pool); switch_mutex_init(&globals.fetch_reply_mutex, SWITCH_MUTEX_DEFAULT, pool);
switch_mutex_init(&globals.listener_count_mutex, SWITCH_MUTEX_UNNESTED, pool); switch_mutex_init(&globals.listener_count_mutex, SWITCH_MUTEX_UNNESTED, pool);
switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
switch_core_hash_init(&globals.fetch_reply_hash); switch_core_hash_init(&globals.fetch_reply_hash);
/* intialize the unique reference stuff */ /* intialize the unique reference stuff */
switch_mutex_init(&listen_list.sock_mutex, SWITCH_MUTEX_NESTED, pool);
switch_mutex_init(&globals.ref_mutex, SWITCH_MUTEX_NESTED, pool); switch_mutex_init(&globals.ref_mutex, SWITCH_MUTEX_NESTED, pool);
globals.reference0 = 0; globals.reference0 = 0;
globals.reference1 = 0; globals.reference1 = 0;
@ -1953,7 +1950,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) { if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind to all events!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind to all events!\n");
close_socket(&listen_list.sockfd); close_socket(&listen_list.sock);
return SWITCH_STATUS_GENERR; return SWITCH_STATUS_GENERR;
} }
@ -1963,7 +1960,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
if (switch_xml_bind_search_function_ret(erlang_fetch, SWITCH_XML_SECTION_MAX, NULL, &bindings.search_binding) != SWITCH_STATUS_SUCCESS) { if (switch_xml_bind_search_function_ret(erlang_fetch, SWITCH_XML_SECTION_MAX, NULL, &bindings.search_binding) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't set up xml search bindings!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't set up xml search bindings!\n");
close_socket(&listen_list.sockfd); close_socket(&listen_list.sock);
return SWITCH_STATUS_GENERR; return SWITCH_STATUS_GENERR;
} }
@ -1983,92 +1980,80 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
{ {
switch_memory_pool_t *pool = NULL, *listener_pool = NULL; switch_memory_pool_t *pool = NULL, *listener_pool = NULL;
int rv; switch_status_t rv;
switch_sockaddr_t *sa;
switch_os_socket_t sockdes;
listener_t *listener; listener_t *listener;
uint32_t x = 0; uint32_t x = 0;
struct ei_cnode_s ec; struct ei_cnode_s ec;
ErlConnect conn; ErlConnect conn;
struct sockaddr_in server_addr;
int on = 1;
int clientfd; int clientfd;
int epmdfd; switch_os_socket_t epmdfd;
#ifdef WIN32 switch_socket_t *epmd_sock = NULL;
/* borrowed from MSDN, stupid winsock */
WORD wVersionRequested;
WSADATA wsaData;
wVersionRequested = MAKEWORD(2, 2);
if (WSAStartup(wVersionRequested, &wsaData) != 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Winsock initialization failed, oh well\n");
return SWITCH_STATUS_TERM;
}
if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Your winsock version doesn't support the 2.2 specification, bailing\n");
return SWITCH_STATUS_TERM;
}
#endif
memset(&listen_list, 0, sizeof(listen_list));
config();
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n");
return SWITCH_STATUS_TERM; return SWITCH_STATUS_TERM;
} }
/* zero out the struct before we use it */ config();
memset(&server_addr, 0, sizeof(server_addr));
/* convert the configured IP to network byte order, handing errors */
rv = switch_inet_pton(AF_INET, prefs.ip, &server_addr.sin_addr.s_addr);
if (rv == 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not parse invalid ip address: %s\n", prefs.ip);
goto init_failed;
} else if (rv == -1) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error when parsing ip address %s : %s\n", prefs.ip, strerror(errno));
goto init_failed;
}
/* set the address family and port */
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(prefs.port);
/* do the socket setup ei is too lazy to do for us */ /* do the socket setup ei is too lazy to do for us */
for (;;) { while (!prefs.done) {
switch_sockaddr_t *sa_local;
if ((listen_list.sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { rv = switch_sockaddr_info_get(&sa, !strcmp(prefs.ip, "*") ? NULL : prefs.ip, SWITCH_UNSPEC, prefs.port, 0, pool);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate socket on %s:%u\n", prefs.ip, prefs.port); if (rv) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not parse invalid ip address: %s\n", prefs.ip);
goto fail;
}
rv = switch_socket_create(&listen_list.sock, switch_sockaddr_get_family(sa), SOCK_STREAM, SWITCH_PROTO_TCP, pool);
if (rv) {
goto sock_fail;
}
switch_os_sock_get(&sockdes, listen_list.sock);
rv = switch_socket_opt_set(listen_list.sock, SWITCH_SO_REUSEADDR, 1);
if (rv) {
goto sock_fail; goto sock_fail;
} }
#ifdef WIN32 #ifdef WIN32
if (setsockopt(listen_list.sockfd, SOL_SOCKET, SO_REUSEADDR, (const char *) &on, sizeof(on))) { /* Enable dual-stack listening on Windows (if the listening address is IPv6), it's default on Linux */
#else if (switch_sockaddr_get_family(sa) == AF_INET6) {
if (setsockopt(listen_list.sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { rv = switch_socket_opt_set(listen_list_apr.sock, IPV6_V6ONLY, 0);
if (rv) {
goto sock_fail;
}
}
#endif #endif
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to enable SO_REUSEADDR for socket on %s:%u : %s\n", prefs.ip, prefs.port, rv = switch_socket_bind(listen_list.sock, sa);
strerror(errno)); if (rv) {
goto sock_fail;
}
rv = switch_socket_listen(listen_list.sock, 5);
if (rv) {
goto sock_fail; goto sock_fail;
} }
if (bind(listen_list.sockfd, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) { switch_socket_addr_get(&sa_local, SWITCH_FALSE, listen_list.sock);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to bind to %s:%u\n", prefs.ip, prefs.port); switch_get_addr(listen_list.addr, sizeof(listen_list.addr), sa_local);
goto sock_fail; switch_getnameinfo(&listen_list.hostname, sa_local, 0);
if (switch_sockaddr_get_family(sa) == AF_INET6) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Socket up listening on [%s]:%u\n", listen_list.addr, prefs.port);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Socket up listening on %s:%u\n", listen_list.addr, prefs.port);
} }
if (listen(listen_list.sockfd, 5) < 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to listen on %s:%u\n", prefs.ip, prefs.port);
goto sock_fail;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Socket %d up listening on %s:%u\n", listen_list.sockfd, prefs.ip, prefs.port);
break; break;
sock_fail: sock_fail:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error! Could not listen on %s:%u\n", prefs.ip, prefs.port);
if (prefs.stop_on_bind_error) {
prefs.done = 1;
goto fail;
}
switch_yield(100000); switch_yield(100000);
} }
@ -2079,7 +2064,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
if (SWITCH_STATUS_SUCCESS != initialise_ei(&ec)) { if (SWITCH_STATUS_SUCCESS != initialise_ei(&ec)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n");
goto init_failed; goto end;
} }
/* return value is -1 for error, a descriptor pointing to epmd otherwise */ /* return value is -1 for error, a descriptor pointing to epmd otherwise */
@ -2088,12 +2073,12 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
if (system("epmd -daemon")) { if (system("epmd -daemon")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,
"Failed to start epmd manually! Is epmd in $PATH? If not, start it yourself or run an erl shell with -sname or -name\n"); "Failed to start epmd manually! Is epmd in $PATH? If not, start it yourself or run an erl shell with -sname or -name\n");
goto init_failed; goto end;
} }
switch_yield(100000); switch_yield(100000);
if ((epmdfd = ei_publish(&ec, prefs.port)) == -1) { if ((epmdfd = ei_publish(&ec, prefs.port)) == -1) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to publish port to epmd AGAIN\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to publish port to epmd AGAIN\n");
goto init_failed; goto end;
} }
} }
@ -2101,7 +2086,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
listen_list.ready = 1; listen_list.ready = 1;
for (;;) {
while (!prefs.done) {
/* zero out errno because ei_accept doesn't differentiate between a /* zero out errno because ei_accept doesn't differentiate between a
* failed authentication or a socket failure, or a client version * failed authentication or a socket failure, or a client version
* mismatch or a godzilla attack */ * mismatch or a godzilla attack */
@ -2110,7 +2096,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
#else #else
errno = 0; errno = 0;
#endif #endif
if ((clientfd = ei_accept_tmo(&ec, (int) listen_list.sockfd, &conn, 500)) == ERL_ERROR) { if ((clientfd = ei_accept_tmo(&ec, (int) sockdes, &conn, 500)) == ERL_ERROR) {
if (prefs.done) { if (prefs.done) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Shutting Down\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Shutting Down\n");
break; break;
@ -2140,47 +2126,50 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching listener, connection from node %s, ip %s\n", conn.nodename, switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching listener, connection from node %s, ip %s\n", conn.nodename,
listener->remote_ip); listener->remote_ip);
launch_listener_thread(listener); launch_listener_thread(listener);
} else } else {
/* if we fail to create a listener (memory error), then the module will exit */ /* if we fail to create a listener (memory error), then the module will exit */
break; break;
} }
}
end:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Exiting module mod_erlang_event\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Exiting module mod_erlang_event\n");
/* cleanup epmd registration */ /* cleanup epmd registration */
ei_unpublish(&ec); switch_os_sock_put(&epmd_sock, &epmdfd, pool);
close_socket(&epmdfd); close_socket(&epmd_sock);
epmdfd = -1;
close_socket(&listen_list.sock);
init_failed:
close_socket(&listen_list.sockfd);
if (pool) { if (pool) {
switch_core_destroy_memory_pool(&pool); switch_core_destroy_memory_pool(&pool);
} }
if (listener_pool) { if (listener_pool) {
switch_core_destroy_memory_pool(&listener_pool); switch_core_destroy_memory_pool(&listener_pool);
} }
for (x = 0; x < prefs.acl_count; x++) { for (x = 0; x < prefs.acl_count; x++) {
switch_safe_free(prefs.acl[x]); switch_safe_free(prefs.acl[x]);
} }
prefs.done = 2; fail:
return SWITCH_STATUS_TERM; return SWITCH_STATUS_TERM;
} }
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown) SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown)
{ {
listener_t *l; listener_t *l;
int sanity = 0; int sanity = 0;
switch_socket_t *sock = NULL;
if (prefs.done == 0) /* main thread might already have exited */
prefs.done = 1; prefs.done = 1;
switch_log_unbind_logger(socket_logger); switch_log_unbind_logger(socket_logger);
/*close_socket(&listen_list.sockfd); */ close_socket(&listen_list.sock);
while (prefs.threads || prefs.done == 1) { while (prefs.threads) {
switch_yield(10000); switch_yield(10000);
if (++sanity == 1000) { if (++sanity == 1000) {
break; break;
@ -2193,7 +2182,9 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown)
switch_thread_rwlock_wrlock(globals.listener_rwlock); switch_thread_rwlock_wrlock(globals.listener_rwlock);
for (l = listen_list.listeners; l; l = l->next) { for (l = listen_list.listeners; l; l = l->next) {
close_socket(&l->sockfd); switch_os_sock_put(&sock, &l->sockdes, l->pool);
close_socket(&sock);
l->sockdes = -1;
} }
#ifdef WIN32 #ifdef WIN32

View File

@ -112,11 +112,7 @@ typedef enum {
5 call sessions will be "attached" to the same listener. 5 call sessions will be "attached" to the same listener.
*/ */
struct listener { struct listener {
#ifdef WIN32 switch_os_socket_t sockdes;
SOCKET sockfd;
#else
int sockfd;
#endif
struct ei_cnode_s *ec; struct ei_cnode_s *ec;
struct erlang_process log_process; struct erlang_process log_process;
struct erlang_process event_process; struct erlang_process event_process;
@ -170,6 +166,7 @@ struct api_command_struct {
}; };
struct globals_struct { struct globals_struct {
switch_mutex_t *listener_mutex;
switch_thread_rwlock_t *listener_rwlock; switch_thread_rwlock_t *listener_rwlock;
switch_thread_rwlock_t *bindings_rwlock; switch_thread_rwlock_t *bindings_rwlock;
switch_event_node_t *node; switch_event_node_t *node;
@ -187,11 +184,10 @@ struct globals_struct {
typedef struct globals_struct globals_t; typedef struct globals_struct globals_t;
struct listen_list_struct { struct listen_list_struct {
#ifdef WIN32 char *hostname;
SOCKET sockfd; char addr[64];
#else switch_socket_t *sock;
int sockfd; switch_mutex_t *sock_mutex;
#endif
listener_t *listeners; listener_t *listeners;
uint8_t ready; uint8_t ready;
}; };
@ -220,6 +216,7 @@ struct prefs_struct {
int compat_rel; int compat_rel;
int max_event_bulk; int max_event_bulk;
int max_log_bulk; int max_log_bulk;
int stop_on_bind_error;
}; };
typedef struct prefs_struct prefs_t; typedef struct prefs_struct prefs_t;

View File

@ -913,6 +913,11 @@ SWITCH_DECLARE(int32_t) switch_sockaddr_get_family(switch_sockaddr_t *sa)
return sa->family; return sa->family;
} }
SWITCH_DECLARE(switch_status_t) switch_getnameinfo(char **hostname, switch_sockaddr_t *sa, int32_t flags)
{
return apr_getnameinfo(hostname, sa, flags);
}
SWITCH_DECLARE(switch_status_t) switch_socket_atmark(switch_socket_t *sock, int *atmark) SWITCH_DECLARE(switch_status_t) switch_socket_atmark(switch_socket_t *sock, int *atmark)
{ {
return apr_socket_atmark(sock, atmark); return apr_socket_atmark(sock, atmark);