FS-9078 added hepv2 and hepv3 support

This commit is contained in:
Alexandr Dubovikov 2016-04-18 16:48:50 +02:00
parent d3184ef000
commit 981b528c48
5 changed files with 476 additions and 29 deletions

View File

@ -5,6 +5,15 @@
<!-- <param name="auto-restart" value="false"/> -->
<param name="debug-presence" value="0"/>
<!-- <param name="capture-server" value="udp:homer.domain.com:5060"/> -->
<!--
the new format for HEPv2/v3 and capture ID
protocol:host:port;hep=2;capture_id=200;
-->
<!-- <param name="capture-server" value="udp:homer.domain.com:5060;hep=3;capture_id=100"/> -->
</global_settings>
<!--

View File

@ -1 +1 @@
Wed Sep 9 12:25:38 CDT 2015
Mon Apr 18 10:41:03 CEST 2016

View File

@ -130,6 +130,13 @@ struct hep_iphdr{
struct in_addr hp_dst; /* source and dest address */
};
/* HEPv2 */
struct hep_timehdr{
uint32_t tv_sec; /* seconds */
uint32_t tv_usec; /* useconds */
uint16_t captid; /* Capture ID node */
};
#if SU_HAVE_IN6
struct hep_ip6hdr {
struct in6_addr hp6_src; /* source address */
@ -137,6 +144,86 @@ struct hep_ip6hdr {
};
#endif
/* HEPv3 types */
struct hep_chunk {
uint16_t vendor_id;
uint16_t type_id;
uint16_t length;
} __attribute__((packed));
typedef struct hep_chunk hep_chunk_t;
struct hep_chunk_uint8 {
hep_chunk_t chunk;
uint8_t data;
} __attribute__((packed));
typedef struct hep_chunk_uint8 hep_chunk_uint8_t;
struct hep_chunk_uint16 {
hep_chunk_t chunk;
uint16_t data;
} __attribute__((packed));
typedef struct hep_chunk_uint16 hep_chunk_uint16_t;
struct hep_chunk_uint32 {
hep_chunk_t chunk;
uint32_t data;
} __attribute__((packed));
typedef struct hep_chunk_uint32 hep_chunk_uint32_t;
struct hep_chunk_str {
hep_chunk_t chunk;
char *data;
} __attribute__((packed));
typedef struct hep_chunk_str hep_chunk_str_t;
struct hep_chunk_ip4 {
hep_chunk_t chunk;
struct in_addr data;
} __attribute__((packed));
typedef struct hep_chunk_ip4 hep_chunk_ip4_t;
struct hep_chunk_ip6 {
hep_chunk_t chunk;
struct in6_addr data;
} __attribute__((packed));
typedef struct hep_chunk_ip6 hep_chunk_ip6_t;
struct hep_chunk_payload {
hep_chunk_t chunk;
char *data;
} __attribute__((packed));
typedef struct hep_chunk_payload hep_chunk_payload_t;
struct hep_ctrl {
char id[4];
uint16_t length;
} __attribute__((packed));
typedef struct hep_ctrl hep_ctrl_t;
struct hep_generic {
hep_ctrl_t header;
hep_chunk_uint8_t ip_family;
hep_chunk_uint8_t ip_proto;
hep_chunk_uint16_t src_port;
hep_chunk_uint16_t dst_port;
hep_chunk_uint32_t time_sec;
hep_chunk_uint32_t time_usec;
hep_chunk_uint8_t proto_t;
hep_chunk_uint32_t capt_id;
} __attribute__((packed));
typedef struct hep_generic hep_generic_t;
/** Maximum size when streaming. */
#define MSG_SSIZE_MAX (USIZE_MAX)

View File

@ -306,6 +306,8 @@ struct tport_master {
su_socket_t mr_capt_sock;
char *mr_capt_name; /**< Servername for capturing received/sent data */
tport_primary_t *mr_primaries; /**< List of primary contacts */
unsigned mr_prot_ver; /* hep version */
unsigned mr_agent_id; /* agent version */
tport_params_t mr_params[1];
@ -486,6 +488,12 @@ void tport_dump_iovec(tport_t const *self, msg_t *msg,
void tport_capt_msg(tport_t const *self, msg_t *msg, size_t n,
su_iovec_t const iov[], size_t iovused, char const *what);
int tport_capt_msg_hepv2(tport_t const *self, msg_t *msg, size_t n,
su_iovec_t const iov[], size_t iovused, char const *what, char **buffer);
int tport_capt_msg_hepv3(tport_t const *self, msg_t *msg, size_t n,
su_iovec_t const iov[], size_t iovused, char const *what, char **buffer);
int tport_tcp_ping(tport_t *self, su_time_t now);
int tport_tcp_pong(tport_t *self);

View File

@ -143,7 +143,9 @@ int tport_open_log(tport_master_t *mr, tagi_t *tags)
char *captname, *p, *host_s;
char port[10];
su_addrinfo_t *ai = NULL, hints[1] = {{ 0 }};
unsigned len =0;
unsigned len =0, iport = 0;
if (mr->mr_capt_name && mr->mr_capt_sock && strcmp(capt, mr->mr_capt_name) == 0)
return n;
@ -174,15 +176,58 @@ int tport_open_log(tport_master_t *mr, tagi_t *tags)
*p = '\0';
p++;
if (atoi(p) <1024 || atoi(p)>65536)
iport = atoi(p);
if (iport <1024 || iport >65536)
{
su_log("invalid port number; must be in [1024,65536]\n");
return n;
}
strncpy(port, p, sizeof(port));
snprintf(port, sizeof(port), "%d", iport);
/* default values for capture protocol and agent id */
mr->mr_prot_ver = 3;
mr->mr_agent_id = 200;
/* get all params */
while(p)
{
/* check ; in the URL */
if( (p = strchr(p+1, ';')) == 0 ) {
break;
}
*p = '\0';
p++;
SU_DEBUG_7(("events HEP RRR DATA [%s]\n", p));
if(strncmp(p, "hep=",4) == 0) {
p+=4;
mr->mr_prot_ver = atoi(p);
/* hepv3 come later */
if (mr->mr_prot_ver < 1 || mr->mr_prot_ver > 3)
{
su_log("invalid hep version number; must be in [1-3]\n");
mr->mr_prot_ver = 3;
return n;
}
}
else if(strncmp(p, "capture_id=", 11) == 0) {
p+=11;
if((mr->mr_agent_id = atoi(p)) == 0)
{
mr->mr_agent_id = 200;
su_log("invalid capture id number; must be uint32 \n");
return n;
}
}
else {
su_log("unsupported capture param\n");
return n;
}
}
/* check if we have [] */
if (host_s[0] == '[') {
@ -211,7 +256,6 @@ int tport_open_log(tport_master_t *mr, tagi_t *tags)
hints->ai_socktype = SOCK_DGRAM;
hints->ai_protocol = IPPROTO_UDP;
if (su_getaddrinfo(host_s, port, hints, &ai)) {
su_perror("capture: su_getaddrinfo()");
return n;
@ -357,8 +401,63 @@ void tport_capt_msg(tport_t const *self, msg_t *msg, size_t n,
{
int buflen = 0, error;
char* buffer = NULL;
tport_master_t *mr;
assert(self);
mr = self->tp_master;
/* If we don't have socket, go out */
if (!mr->mr_capt_sock) {
su_log("error: capture socket is not open\n");
return;
}
switch(mr->mr_prot_ver)
{
case 3:
buflen = tport_capt_msg_hepv3(self, msg, n, iov, iovused, what, &buffer);
break;
case 2:
case 1:
buflen = tport_capt_msg_hepv2(self, msg, n, iov, iovused, what, &buffer);
break;
default:
su_log("error: unsupported hep version\n");
break;
}
if(buflen > 0) {
/* check if we have error i.e. capture server is down */
if ((error = su_soerror(mr->mr_capt_sock))) {
su_perror("error: tport_logging: capture socket error");
goto done;
}
su_send(mr->mr_capt_sock, buffer, buflen, 0);
}
done:
/* Now we release it */
if(buffer) free(buffer);
return;
}
/** Capture the data from the iovec */
int tport_capt_msg_hepv2 (tport_t const *self, msg_t *msg, size_t n,
su_iovec_t const iov[], size_t iovused, char const *what, char **buffer)
{
int buflen = 0;
su_sockaddr_t const *su, *su_self;
struct hep_hdr hep_header;
struct hep_timehdr hep_time = {0};
su_time_t now;
#if __sun__
struct hep_iphdr hep_ipheader = {{{{0}}}};
#else
@ -367,8 +466,7 @@ void tport_capt_msg(tport_t const *self, msg_t *msg, size_t n,
#if SU_HAVE_IN6
struct hep_ip6hdr hep_ip6header = {{{{0}}}};
#endif
int eth_frame_len = 8000;
char* buffer;
int eth_frame_len = 16000;
size_t i, dst = 0;
tport_master_t *mr;
@ -382,14 +480,14 @@ void tport_capt_msg(tport_t const *self, msg_t *msg, size_t n,
/* If we don't have socket, go out */
if (!mr->mr_capt_sock) {
su_log("error: capture socket is not open\n");
return;
return 0;
}
/*buffer for ethernet frame*/
buffer = (void*)malloc(eth_frame_len);
*buffer = (void*)malloc(eth_frame_len);
/* VOIP Header */
hep_header.hp_v = 1;
hep_header.hp_v = mr->mr_prot_ver;
hep_header.hp_f = su->su_family;
/* Header Length */
hep_header.hp_l = sizeof(struct hep_hdr);
@ -423,19 +521,22 @@ void tport_capt_msg(tport_t const *self, msg_t *msg, size_t n,
hep_header.hp_dport = dst ? su->su_port : su_self->su_port;
hep_header.hp_sport = dst ? su_self->su_port : su->su_port;
if (hep_header.hp_v == 2){
hep_header.hp_l += sizeof(struct hep_timehdr);
}
/* Copy hepheader */
memset(buffer, '\0', eth_frame_len);
memcpy(buffer, &hep_header, sizeof(struct hep_hdr));
memset(*buffer, '\0', eth_frame_len);
memcpy(*buffer, &hep_header, sizeof(struct hep_hdr));
buflen = sizeof(struct hep_hdr);
if(su->su_family == AF_INET) {
memcpy(buffer + buflen, &hep_ipheader, sizeof(struct hep_iphdr));
memcpy(*buffer + buflen, &hep_ipheader, sizeof(struct hep_iphdr));
buflen += sizeof(struct hep_iphdr);
}
#if SU_HAVE_IN6
else if(su->su_family == AF_INET6) {
memcpy(buffer+buflen, &hep_ip6header, sizeof(struct hep_ip6hdr));
memcpy(*buffer+buflen, &hep_ip6header, sizeof(struct hep_ip6hdr));
buflen += sizeof(struct hep_ip6hdr);
}
#endif
@ -444,6 +545,23 @@ void tport_capt_msg(tport_t const *self, msg_t *msg, size_t n,
goto done;
}
/* copy time header */
if (hep_header.hp_v == 2) {
/* now */
now = su_now();
/* should check for ifdef HAVE_LOCALTIME_R instead -_- */
#if defined(HAVE_GETTIMEOFDAY) || defined(HAVE_CLOCK_MONOTONIC)
hep_time.tv_sec = (now.tv_sec - SU_TIME_EPOCH); /* see su_time0.c 'now' is not really 'now', so we decrease it by SU_TIME_EPOCH */
#else
hep_time.tv_sec = now.tv_sec;
#endif
hep_time.tv_usec = now.tv_usec;
hep_time.captid = mr->mr_agent_id;
memcpy((void*)*buffer+buflen, &hep_time, sizeof(struct hep_timehdr));
buflen += sizeof(struct hep_timehdr);
}
for (i = 0; i < iovused && n > 0; i++) {
size_t len = iov[i].mv_len;
if (len > n)
@ -452,23 +570,248 @@ void tport_capt_msg(tport_t const *self, msg_t *msg, size_t n,
if((buflen + len) > eth_frame_len)
break;
memcpy(buffer + buflen , (void*)iov[i].mv_base, len);
memcpy(*buffer + buflen , (void*)iov[i].mv_base, len);
buflen +=len;
n -= len;
}
/* check if we have error i.e. capture server is down */
if ((error = su_soerror(mr->mr_capt_sock))) {
su_perror("error: tport_logging: capture socket error");
goto done;
}
su_send(mr->mr_capt_sock, buffer, buflen, 0);
return buflen;
done:
/* Now we release it */
if(buffer) free(buffer);
return;
if(*buffer) {
free(*buffer);
*buffer = NULL;
}
return 0;
}
/** Capture the data from the iovec */
int tport_capt_msg_hepv3 (tport_t const *self, msg_t *msg, size_t n,
su_iovec_t const iov[], size_t iovused, char const *what, char **buffer)
{
su_sockaddr_t const *su, *su_self;
struct hep_generic *hg=NULL;
unsigned int buflen=0, iplen=0,tlen=0, payload_len = 0;
su_time_t now;
hep_chunk_ip4_t src_ip4, dst_ip4;
hep_chunk_t payload_chunk;
int orig_n = 0;
#if SU_HAVE_IN6
hep_chunk_ip6_t src_ip6, dst_ip6;
#endif
int eth_frame_len = 16000;
size_t i, dst = 0;
tport_master_t *mr;
assert(self); assert(msg);
su = msg_addr(msg);
su_self = self->tp_pri->pri_primary->tp_addr;
mr = self->tp_master;
/* If we don't have socket, go out */
if (!mr->mr_capt_sock) {
su_log("error: capture socket is not open\n");
return 0;
}
/*buffer for ethernet frame*/
hg = malloc(sizeof(struct hep_generic));
memset(hg, 0, sizeof(struct hep_generic));
/* header set */
memcpy(hg->header.id, "\x48\x45\x50\x33", 4);
/* IP proto */
hg->ip_family.chunk.vendor_id = htons(0x0000);
hg->ip_family.chunk.type_id = htons(0x0001);
hg->ip_family.data = su->su_family;
hg->ip_family.chunk.length = htons(sizeof(hg->ip_family));
/* PROTOCOL */
if(strcmp(self->tp_name->tpn_proto, "tcp") == 0) hg->ip_proto.data = IPPROTO_TCP;
else if(strcmp(self->tp_name->tpn_proto, "tls") == 0) hg->ip_proto.data = IPPROTO_IDP; /* FAKE*/
else if(strcmp(self->tp_name->tpn_proto, "sctp") == 0) hg->ip_proto.data = IPPROTO_SCTP;
else if(strcmp(self->tp_name->tpn_proto, "ws") == 0) hg->ip_proto.data = IPPROTO_TCP;
else if(strcmp(self->tp_name->tpn_proto, "wss") == 0) hg->ip_proto.data = IPPROTO_TCP;
else hg->ip_proto.data = IPPROTO_UDP; /* DEFAULT UDP */
/* Proto ID */
hg->ip_proto.chunk.vendor_id = htons(0x0000);
hg->ip_proto.chunk.type_id = htons(0x0002);
hg->ip_proto.chunk.length = htons(sizeof(hg->ip_proto));
/* Check destination */
if(strncmp("sent", what, 4) == 0) dst = 1;
/* copy destination and source IPs*/
if(su->su_family == AF_INET) {
/* SRC IP */
src_ip4.chunk.vendor_id = htons(0x0000);
src_ip4.chunk.type_id = htons(0x0003);
memcpy(dst ? &dst_ip4.data : &src_ip4.data, &su->su_sin.sin_addr.s_addr, sizeof(su->su_sin.sin_addr.s_addr));
src_ip4.chunk.length = htons(sizeof(src_ip4));
/* DST IP */
dst_ip4.chunk.vendor_id = htons(0x0000);
dst_ip4.chunk.type_id = htons(0x0004);
memcpy(dst ? &src_ip4.data : &dst_ip4.data, &su_self->su_sin.sin_addr.s_addr, sizeof(su_self->su_sin.sin_addr.s_addr));
dst_ip4.chunk.length = htons(sizeof(dst_ip4));
iplen = sizeof(dst_ip4) + sizeof(src_ip4);
}
#if SU_HAVE_IN6
else if(su->su_family == AF_INET6) {
/* SRC IPv6 */
src_ip6.chunk.vendor_id = htons(0x0000);
src_ip6.chunk.type_id = htons(0x0005);
memcpy(dst ? &dst_ip6.data : &src_ip6.data, &su->su_sin.sin_addr.s_addr, sizeof(su->su_sin.sin_addr.s_addr));
src_ip6.chunk.length = htons(sizeof(src_ip6));
/* DST IPv6 */
dst_ip6.chunk.vendor_id = htons(0x0000);
dst_ip6.chunk.type_id = htons(0x0006);
memcpy(dst ? &src_ip6.data : &dst_ip6.data, &su_self->su_sin.sin_addr.s_addr, sizeof(su_self->su_sin.sin_addr.s_addr));
dst_ip6.chunk.length = htons(sizeof(dst_ip6));
iplen = sizeof(dst_ip6) + sizeof(src_ip6);
}
#endif
else {
su_perror("error: tport_logging hepv3: capture: unsupported protocol family");
goto done;
}
/* SRC PORT */
hg->src_port.chunk.vendor_id = htons(0x0000);
hg->src_port.chunk.type_id = htons(0x0007);
hg->src_port.data = dst ? htons(su->su_port) : htons(su_self->su_port);
hg->src_port.chunk.length = htons(sizeof(hg->src_port));
/* DST PORT */
hg->dst_port.chunk.vendor_id = htons(0x0000);
hg->dst_port.chunk.type_id = htons(0x0008);
hg->dst_port.data = dst ? htons(su_self->su_port) : htons(su->su_port);
hg->dst_port.chunk.length = htons(sizeof(hg->dst_port));
/* TIMESTAMP SEC */
hg->time_sec.chunk.vendor_id = htons(0x0000);
hg->time_sec.chunk.type_id = htons(0x0009);
hg->time_sec.chunk.length = htons(sizeof(hg->time_sec));
now = su_now();
/* should check for ifdef HAVE_LOCALTIME_R instead -_- */
#if defined(HAVE_GETTIMEOFDAY) || defined(HAVE_CLOCK_MONOTONIC)
hg->time_sec.data = htonl(now.tv_sec - SU_TIME_EPOCH); /* see su_time0.c 'now' is not really 'now', so we decrease it by SU_TIME_EPOCH */
#else
hg->time_sec.data = htonl(now.tv_sec);
#endif
/* TIMESTAMP USEC */
hg->time_usec.chunk.vendor_id = htons(0x0000);
hg->time_usec.chunk.type_id = htons(0x000a);
hg->time_usec.data = htonl(now.tv_usec);
hg->time_usec.chunk.length = htons(sizeof(hg->time_usec));
/* Protocol TYPE */
hg->proto_t.chunk.vendor_id = htons(0x0000);
hg->proto_t.chunk.type_id = htons(0x000b);
hg->proto_t.data = 0x001; //SIP
hg->proto_t.chunk.length = htons(sizeof(hg->proto_t));
/* Capture ID */
hg->capt_id.chunk.vendor_id = htons(0x0000);
hg->capt_id.chunk.type_id = htons(0x000c);
hg->capt_id.data = htons(mr->mr_agent_id);
hg->capt_id.chunk.length = htons(sizeof(hg->capt_id));
/* Payload caclulation */
orig_n = n;
for (i = 0; i < iovused && n > 0; i++) {
size_t len = iov[i].mv_len;
if (len > n) len = n;
if((payload_len + len) > eth_frame_len) break;
payload_len +=len;
n -= len;
}
/* restore n */
n = orig_n;
/* Payload */
payload_chunk.vendor_id = htons(0x0000);
payload_chunk.type_id = htons(0x000f);
payload_chunk.length = htons(sizeof(payload_chunk) + payload_len);
tlen = sizeof(struct hep_generic) + payload_len + iplen + sizeof(hep_chunk_t);
/* total */
hg->header.length = htons(tlen);
*buffer = (void*)malloc(tlen);
if (*buffer==NULL){
su_perror("error: tport_logging hepv3: no memory for buffer");
goto done;
}
memcpy((void*) *buffer, hg, sizeof(struct hep_generic));
buflen = sizeof(struct hep_generic);
/* IPv4 */
if(su->su_family == AF_INET) {
/* SRC IP */
memcpy((void*) *buffer+buflen, &src_ip4, sizeof(struct hep_chunk_ip4));
buflen += sizeof(struct hep_chunk_ip4);
memcpy((void*) *buffer+buflen, &dst_ip4, sizeof(struct hep_chunk_ip4));
buflen += sizeof(struct hep_chunk_ip4);
}
#if SU_HAVE_IN6
/* IPv6 */
else if(su->su_family == AF_INET6) {
/* SRC IPv6 */
memcpy((void*) *buffer+buflen, &src_ip4, sizeof(struct hep_chunk_ip6));
buflen += sizeof(struct hep_chunk_ip6);
memcpy((void*) *buffer+buflen, &dst_ip6, sizeof(struct hep_chunk_ip6));
buflen += sizeof(struct hep_chunk_ip6);
}
#endif
/* PAYLOAD CHUNK */
memcpy((void*) *buffer+buflen, &payload_chunk, sizeof(struct hep_chunk));
buflen += sizeof(struct hep_chunk);
/* PAYLOAD */
for (i = 0; i < iovused && n > 0; i++) {
size_t len = iov[i].mv_len;
if (len > n) len = n;
/* if the packet too big for us */
if((buflen + len) > eth_frame_len)
break;
memcpy(*buffer + buflen , (void*)iov[i].mv_base, len);
buflen +=len;
n -= len;
}
return buflen;
done:
/* Now we release it */
if(hg) free(hg);
return 0;
}