mirror of
				https://github.com/asterisk/asterisk.git
				synced 2025-10-29 07:24:55 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			262 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			262 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| # vim: set ts=8 sw=4 sts=4 et ai tw=79:
 | |
| '''
 | |
| Usage: ./spandspflow2pcap.py SPANDSP_LOG SENDFAX_PCAP
 | |
| 
 | |
| Takes a log from Asterisk with SpanDSP, extracts the "received" data
 | |
| and puts it in a pcap file. Use 'fax set debug on' and configure
 | |
| logger.conf to get fax logs.
 | |
| 
 | |
| Input data should look something like this::
 | |
| 
 | |
|     [2013-08-07 15:17:34] FAX[23479] res_fax.c: FLOW T.38 Rx     5: IFP c0 ...
 | |
| 
 | |
| Output data will look like a valid pcap file ;-)
 | |
| 
 | |
| This allows you to reconstruct received faxes into replayable pcaps.
 | |
| 
 | |
| Replaying is expected to be done by SIPp with sipp-sendfax.xml. The
 | |
| SIPp binary used for replaying must have image (fax) support. This means
 | |
| you'll need a version higher than 3.5.0 (unreleased when writing this),
 | |
| or the git master branch: https://github.com/SIPp/sipp
 | |
| 
 | |
| 
 | |
| Author: Walter Doekes, OSSO B.V. (2013,2015,2016,2019)
 | |
| License: Public Domain
 | |
| '''
 | |
| from base64 import b16decode
 | |
| from collections import namedtuple
 | |
| from datetime import datetime, timedelta
 | |
| from re import search
 | |
| from time import mktime
 | |
| from struct import pack
 | |
| import os
 | |
| import sys
 | |
| 
 | |
| 
 | |
| LOSSY = False
 | |
| EMPTY_RECOVERY = False
 | |
| 
 | |
| 
 | |
| IFP = namedtuple('IFP', 'date seqno data')  # datetime, int, bytearray
 | |
| 
 | |
| 
 | |
| def n2b(text):
 | |
|     """
 | |
|     Convert "aa bb cc" to bytearray('\xaa\xbb\xcc').
 | |
|     """
 | |
|     return bytearray(
 | |
|         b16decode(text.replace(' ', '').replace('\n', '').upper()))
 | |
| 
 | |
| 
 | |
| class SkipPacket(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class FaxPcap(object):
 | |
|     PCAP_PREAMBLE = n2b(
 | |
|         'd4 c3 b2 a1 02 00 04 00'
 | |
|         '00 00 00 00 00 00 00 00'
 | |
|         'ff ff 00 00 71 00 00 00')
 | |
| 
 | |
|     def __init__(self, outfile):
 | |
|         self.outfile = outfile
 | |
|         self.date = None
 | |
|         self.seqno = None
 | |
|         self.udpseqno = 128
 | |
|         self.prev_data = None
 | |
| 
 | |
|         # Only do this if at pos 0?
 | |
| 
 | |
|     def add(self, ifp):
 | |
|         """
 | |
|         Add the IFP packet.
 | |
| 
 | |
|         T.38 basic format of UDPTL payload section with redundancy:
 | |
| 
 | |
|         UDPTL_SEQNO
 | |
|         - 2 sequence number (big endian)
 | |
|         UDPTL_PRIMARY_PAYLOAD (T30?)
 | |
|         - 1 subpacket length (excluding this byte)
 | |
|         - 1 type of message (e.g. 0xd0 for data(?))
 | |
|         - 1 items in data field (e.g. 0x01)
 | |
|         - 2 length of data (big endian)
 | |
|         - N data
 | |
|         RECOVERY (optional)
 | |
|         - 2 count of previous seqno packets (big endian)
 | |
|         - N UDPTL_PRIMARY_PAYLOAD of (seqno-1)
 | |
|         - N UDPTL_PRIMARY_PAYLOAD of (seqno-2)
 | |
|         - ...
 | |
|         """
 | |
|         # First packet?
 | |
|         if self.seqno is None:
 | |
|             # Add preamble.
 | |
|             self._add_preamble()
 | |
|             # Start a second late (optional).
 | |
|             self._add_garbage(ifp.date)
 | |
| 
 | |
|             # Set sequence, and fill with missing leading zeroes.
 | |
|             self.seqno = 0
 | |
|             for i in range(ifp.seqno):
 | |
|                 self.add(IFP(date=ifp.date, seqno=i, data=bytearray([0])))
 | |
| 
 | |
|         # Auto-increasing dates
 | |
|         if self.date is None or ifp.date > self.date:
 | |
|             self.date = ifp.date
 | |
|         elif ifp.date < self.date.replace(microsecond=0):
 | |
|             assert False, 'More packets than expected in 1s? {!r}/{!r}'.format(
 | |
|                 ifp.date, self.date)
 | |
|         else:
 | |
|             self.date += timedelta(microseconds=9000)
 | |
| 
 | |
|         # Add packet.
 | |
|         self.seqno = ifp.seqno
 | |
|         try:
 | |
|             self.outfile.write(self._make_packet(ifp.data))
 | |
|         except SkipPacket:
 | |
|             pass
 | |
| 
 | |
|     def _add_preamble(self):
 | |
|         self.outfile.write(self.PCAP_PREAMBLE)
 | |
| 
 | |
|     def _add_garbage(self, date):
 | |
|         if self.date is None or date > self.date:
 | |
|             self.date = date
 | |
| 
 | |
|         self.seqno = 0xffff
 | |
|         self.outfile.write(self._make_packet(
 | |
|             bytearray(b'GARBAGE'), is_ifp=False))
 | |
| 
 | |
|     def _make_packet(self, ifp_data, is_ifp=True):
 | |
|         sum16 = bytearray(b'\x43\x21')  # the OS fixes the checksums for us
 | |
| 
 | |
|         data = bytearray()
 | |
|         if is_ifp:
 | |
|             data.append(len(ifp_data))  # length
 | |
|             data.extend(ifp_data)       # data
 | |
|             self.prev_data, prev_data = data[:], self.prev_data
 | |
|         else:
 | |
|             data.extend(ifp_data)
 | |
|             prev_data = None
 | |
| 
 | |
|         if prev_data:
 | |
|             if LOSSY and (self.seqno % 3) == 2:
 | |
|                 self.udpseqno += 1
 | |
|                 raise SkipPacket()
 | |
| 
 | |
|             if EMPTY_RECOVERY:
 | |
|                 # struct ast_frame f[16], we have room for a few
 | |
|                 # packets.
 | |
|                 packets = 14
 | |
|                 data.extend([0, packets + 1] + [0] * packets)
 | |
|                 data.extend(prev_data)
 | |
|             else:
 | |
|                 # Add 1 previous packet, without the seqno.
 | |
|                 data.extend([0, 1])
 | |
|                 data.extend(prev_data)
 | |
| 
 | |
|         # Wrap it in UDP
 | |
|         udp = bytearray(
 | |
|             b'\x00\x01\x00\x02%(len)s%(sum16)s%(seqno)s%(data)s' % {
 | |
|                 b'len': pack('>H', len(data) + 10),
 | |
|                 b'sum16': sum16,
 | |
|                 b'seqno': pack('>H', self.seqno),
 | |
|                 b'data': data})
 | |
| 
 | |
|         # Wrap it in IP
 | |
|         ip = bytearray(
 | |
|             b'\x45\xb8%(len)s%(udpseqno)s\x00\x00\xf9\x11%(sum16)s'
 | |
|             b'\x01\x01\x01\x01\x02\x02\x02\x02%(udp)s' % {
 | |
|                 b'len': pack('>H', len(udp) + 20),
 | |
|                 b'udpseqno': pack('>H', self.udpseqno),
 | |
|                 b'sum16': sum16,
 | |
|                 b'udp': udp})
 | |
| 
 | |
|         # Wrap it in Ethernet
 | |
|         ethernet = bytearray(
 | |
|             b'\x00\x00\x00\x01\x00\x06\x00\x30\x48\xb1\x1c\x34\x00\x00'
 | |
|             b'\x08\x00%(ip)s' % {b'ip': ip})
 | |
| 
 | |
|         # Wrap it in a pcap packet
 | |
|         packet = bytearray(b'%(prelude)s%(ethernet)s' % {
 | |
|             b'prelude': pack(
 | |
|                 '<IIII', int(mktime(self.date.timetuple())),
 | |
|                 self.date.microsecond, len(ethernet), len(ethernet)),
 | |
|             b'ethernet': ethernet})
 | |
| 
 | |
|         # Increase values.
 | |
|         self.udpseqno += 1
 | |
| 
 | |
|         return packet
 | |
| 
 | |
| 
 | |
| class SpandspLog:
 | |
|     def __init__(self, fp):
 | |
|         self._fp = fp
 | |
| 
 | |
|     def __iter__(self):
 | |
|         r"""
 | |
|         Looks for lines line:
 | |
| 
 | |
|             [2013-08-07 15:17:34] FAX[23479] res_fax.c: \
 | |
|               FLOW T.38 Rx     5: IFP c0 01 80 00 00 ff
 | |
| 
 | |
|         And yields:
 | |
| 
 | |
|             IFP(date=..., seqno=..., data=...)
 | |
|         """
 | |
|         prev_seqno = None
 | |
| 
 | |
|         for lineno, line in enumerate(self._fp):
 | |
|             if 'FLOW T.38 Rx' not in line:
 | |
|                 continue
 | |
|             if 'IFP' not in line:
 | |
|                 continue
 | |
| 
 | |
|             match = search(r'(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)', line)
 | |
|             assert match
 | |
|             date = datetime(*[int(i) for i in match.groups()])
 | |
| 
 | |
|             match = search(r'Rx\s*(\d+):', line)
 | |
|             assert match
 | |
|             seqno = int(match.groups()[0])
 | |
| 
 | |
|             match = search(r': IFP ([0-9a-f ]+)', line)
 | |
|             assert match
 | |
|             data = n2b(match.groups()[0])
 | |
| 
 | |
|             if prev_seqno is not None:
 | |
|                 # Expected all sequence numbers. But you can safely disable
 | |
|                 # this check.
 | |
|                 assert seqno == prev_seqno + 1, '%s+1 != %s' % (
 | |
|                     seqno, prev_seqno)
 | |
|                 pass
 | |
|             prev_seqno = seqno
 | |
| 
 | |
|             yield IFP(date=date, seqno=seqno, data=data)
 | |
| 
 | |
| 
 | |
| def main(logname, pcapname):
 | |
|     with open(sys.argv[1], 'r') as infile:
 | |
|         log = SpandspLog(infile)
 | |
| 
 | |
|         # with open(sys.argv[2], 'xb') as outfile:  # py3 exclusive write, bin
 | |
|         create_or_fail = os.O_CREAT | os.O_EXCL | os.O_WRONLY
 | |
|         try:
 | |
|             fd = os.open(sys.argv[2], create_or_fail, 0o600)
 | |
|         except Exception:
 | |
|             raise
 | |
|         else:
 | |
|             with os.fdopen(fd, 'wb') as outfile:
 | |
|                 pcap = FaxPcap(outfile)
 | |
|                 for data in log:
 | |
|                     pcap.add(data)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     if len(sys.argv) != 3:
 | |
|         sys.stderr.write('Usage: {} LOGFILE PCAP\n'.format(sys.argv[0]))
 | |
|         sys.exit(1)
 | |
| 
 | |
|     main(sys.argv[1], sys.argv[2])
 |