from datetime import datetime
from enum import Enum
+BLOCKTYPE_EPB = 0x06
+LINKTYPE_WIRESHARK_UPPER_PDU = 252
+LINKTYPE_USB_2_0_FULL_SPEED = 294
+TAG_PDU_DISSECTOR_NAME = 12
+TAG_IF_TSRESOL = 9
+RESOL_USEC = 6
+
# PcapNG format and header examples looted from:
#
# * https://pcapng.com/
class PcapWriter(object):
def __init__(self, pcapfile):
self.pcapfile = open(pcapfile, "wb")
+ self.write_file_header()
+ self.write_usb_header()
+ self.write_info_header()
+
+ def pad(self, data):
+ return data + b'\x00' * ((4 - (len(data) % 4)) % 4)
- def __del__(self):
- self.pcapfile.close()
- del self.pcapfile
+ def insert_and_append_length(self, data, offset, length):
+ data = data[0:offset] + struct.pack("=I", length) + data[offset+4:]
+ return data + struct.pack("=I", length)
def option(self, tag, text):
text = text.encode("utf-8")
- opt = struct.pack("=HH", tag, len(text)) + text
- padding = (4 - (len(opt) % 4)) % 4
- return opt + b'\x00'*padding
-
- def insert_length(self, data, offset, length):
- return data[0:offset] + struct.pack("=I", length) + data[offset+4:]
+ return self.pad(struct.pack("=HH", tag, len(text)) + text)
def syslog_pdu(self, text):
pdu_name = "syslog".encode("utf-8")
# this is inexplicably the only packet that has to be big-endian
- EXP_PDU_TAG_DISSECTOR_NAME = 12
- hdr = struct.pack(">HH", EXP_PDU_TAG_DISSECTOR_NAME, len(pdu_name)) + pdu_name
- hdr += self.option(0x00, "")
- hdr += text.encode("utf-8")
- return hdr
-
- def write_syslog(self, timestamp, text):
- pdu = self.syslog_pdu(text)
- length = len(pdu)
- pkt = struct.pack("=IIIIIII", 6, 0, 1, timestamp >> 32, timestamp & 0xffffffff, length, length)
- pkt += pdu
- padding = (4 - (len(pkt) % 4)) % 4
- pkt += b'\x00'*padding
-
- length = len(pkt) + 4
- pkt = self.insert_length(pkt, 4, length)
- pkt += struct.pack("=I", length)
- self.pcapfile.write(pkt)
+ hdr = struct.pack(">HH", TAG_PDU_DISSECTOR_NAME, len(pdu_name)) + pdu_name
+ return hdr + self.option(0x00, "") + text.encode("utf-8")
def write_file_header(self):
hdr = struct.pack("=IIIHHII", 0x0a0d0d0a, 0, 0x1a2b3c4d, 1, 0, 0xffffffff, 0xffffffff)
hdr += self.option(0x02, "saleae CSV 2 PCAP converter")
hdr += self.option(0x00, "")
-
- length = len(hdr) + 4
- hdr = self.insert_length(hdr, 4, length)
- hdr += struct.pack("=I", length)
+ hdr = self.insert_and_append_length(hdr, 4, len(hdr) + 4)
self.pcapfile.write(hdr)
def write_usb_header(self):
- hdr = struct.pack("=IIHHI", 1, 0, 294, 0, 0x0000ffff) # LINKTYPE_USB_2_0_FULL_SPEED
+ hdr = struct.pack("=IIHHI", 1, 0, LINKTYPE_USB_2_0_FULL_SPEED, 0, 0x0000ffff)
hdr += self.option(0x02, "usb")
hdr += self.option(0x03, "CSV 2 PCAP Interface")
- hdr += struct.pack("=HHI", 9, 1, 6)
+ hdr += struct.pack("=HHI", TAG_IF_TSRESOL, 1, RESOL_USEC)
hdr += self.option(0x00, "")
-
- length = len(hdr) + 4
- hdr = self.insert_length(hdr, 4, length)
- hdr += struct.pack("=I", length)
+ hdr = self.insert_and_append_length(hdr, 4, len(hdr) + 4)
self.pcapfile.write(hdr)
def write_info_header(self):
- hdr = struct.pack("=IIHHI", 1, 0, 252, 0, 0x0000ffff) # LINKTYPE_WIRESHARK_UPPER_PDU
+ hdr = struct.pack("=IIHHI", 1, 0, LINKTYPE_WIRESHARK_UPPER_PDU, 0, 0x0000ffff)
hdr += self.option(0x02, "info")
hdr += self.option(0x03, "Out-Of-Band USB info")
- hdr += struct.pack("=HHI", 9, 1, 6)
+ hdr += struct.pack("=HHI", TAG_IF_TSRESOL, 1, RESOL_USEC)
hdr += self.option(0x00, "")
-
- length = len(hdr) + 4
- hdr = self.insert_length(hdr, 4, length)
- hdr += struct.pack("=I", length)
+ hdr = self.insert_and_append_length(hdr, 4, len(hdr) + 4)
self.pcapfile.write(hdr)
def write_packet(self, timestamp, data):
length = len(data)
- pkt = struct.pack("=IIIIIII", 6, 0, 0, timestamp >> 32, timestamp & 0xffffffff, length, length) + data
- padding = (4 - (len(pkt) % 4)) % 4
- pkt += b'\x00'*padding
+ pkt = struct.pack("=IIIIIII", BLOCKTYPE_EPB, 0, 0, timestamp >> 32, timestamp & 0xffffffff, length, length) + data
+ pkt = self.pad(pkt)
pkt += self.option(0x00, "")
+ pkt = self.insert_and_append_length(pkt, 4, len(pkt) + 4)
+ self.pcapfile.write(pkt)
- length = len(pkt) + 4
- pkt = self.insert_length(pkt, 4, length)
- pkt += struct.pack("=I", length)
+ def write_syslog(self, timestamp, text):
+ pdu = self.syslog_pdu(text)
+ length = len(pdu)
+ pkt = struct.pack("=IIIIIII", BLOCKTYPE_EPB, 0, 1, timestamp >> 32, timestamp & 0xffffffff, length, length)
+ pkt = self.pad(pkt + pdu)
+ pkt = self.insert_and_append_length(pkt, 4, len(pkt) + 4)
self.pcapfile.write(pkt)
class UsbEventType(Enum):
"""The known types a UsbEvent can have"""
+ # States without byte representation on bus
NONE = 0x00
RESET = 0x01
- SYNC = 0x02
- SOF = 0x03
- EOP = 0x04
- KEEPALIVE = 0x05
- ERROR = 0x06
- BYTE = 0x80
-
- def __str__(self):
- if self.value == UsbEventType.BYTE.value and getattr(self, "byte", None) is not None:
- return f"UsbEventType.BYTE[{self.byte:02x}]"
- else:
- return super().__str__()
+ BYTE = 0x02
+ EOP = 0x03
+ # USB PIDs
+ ERROR = 0x3c
+ IN = 0x69
+ SYNC = 0x80
+ SOF = 0xa5
+ NAK = 0x5a
+ ACK = 0xd2
class UsbEvent(object):
self.byte = int(field.split(" ")[1], 16)
self.kind.byte = self.byte
- def __str__(self):
- return str(self.kind)
-
class UsbPacket(object):
"""A collection of events between a SYNC and EOP (or similar)"""
if event is not None:
self.start = event.timestamp_us
self.events.append(event)
- if event.byte is not None and event.byte != 0x80:
+ if event.byte is not None and event.byte != UsbEventType.SYNC.value:
print(f"WARNING: packet start is not a sync byte: {event.byte}")
self.data.append(event.byte)
matches.
"""
- if len(pkts) == 0:
- return UsbPacketCombo.UNKNOWN, 1, []
- if len(pkts) >= 1 and len(pkts[0].data) == 3 and \
- pkts[0].data[0] == 0xa5:
- return UsbPacketCombo.SOF, 1, [0xa5]
+ if len(pkts) >= 1 and len(pkts[0].data) == 3 and pkts[0].data[0] == UsbEventType.SOF.value:
+ return UsbPacketCombo.SOF, 1, [UsbEventType.SOF.value]
elif len(pkts) >= 2 and len(pkts[0].data) > 1 and len(pkts[1].data) == 1 and \
- pkts[0].data[0] == 0x69 and pkts[1].data[0] == 0x5a:
+ pkts[0].data[0] == UsbEventType.IN.value and pkts[1].data[0] == UsbEventType.NAK.value:
return UsbPacketCombo.IN_NAK, 2, pkts[0].data + pkts[1].data
return UsbPacketCombo.UNKNOWN, 1, []
events = []
with open(csvfile, newline='') as csvfile:
reader = csv.reader(csvfile, delimiter=',', quotechar='"')
+ next(reader) # consume CSV header
for idx,row in enumerate(reader):
- if idx == 0:
- continue # header
_title,_frametype,timestamp,_duration,data = row
timestamp_us = int(datetime.fromisoformat(timestamp).timestamp()*1000*1000)
- event = UsbEvent(timestamp_us, data)
- events.append(event)
+ events.append(UsbEvent(timestamp_us, data))
return events
def csv_to_pcap(csvfile, pcapfile, unfolded=False, fold_over_sof=False, fold_max=1024):
- # parse saleae CSV file into event objects
- events = events_from_csv(csvfile)
-
- # open pcap file and write mandatory headers
- pcap = PcapWriter(pcapfile)
- pcap.write_file_header()
- pcap.write_usb_header()
- pcap.write_info_header()
+ events = events_from_csv(csvfile) # parse saleae CSV file into event objects
+ pcap = PcapWriter(pcapfile) # open pcap file and write mandatory headers
+ packet = None # currently processing packet
# initialize a buffer for processing packet combinations
buffer = UsbPacketBuffer(pcap, max_fold=0 if unfolded else fold_max, fold_over_sof=fold_over_sof)
- packet = None
for event in events:
- # write any ready packets to pcap file
- buffer.write()
+ buffer.write() # write any ready packets to pcap file
# fill in and queue new packets based on events
if event.kind == UsbEventType.RESET or \
buffer.add(packet.complete())
if event.kind != UsbEventType.EOP:
# record resets and errors as syslog packets
- pcap.write_syslog(event.timestamp_us, str(event.kind))
+ pcap.write_syslog(event.timestamp_us, str(event.kind.name))
elif event.kind == UsbEventType.BYTE:
if packet is None or packet.is_complete():
# start a new packet
if packet is not None and not packet.is_complete():
buffer.add(packet.complete())
- # flush all packets to pcap file
- buffer.flush()
+ buffer.flush() # flush all packets to pcap file
pcap.write_syslog(buffer.last_timestamp, "end of capture")
print(f"Wrote {buffer.written} pcap packets ({buffer.seen} processed)")
args = parser.parse_args()
csv_to_pcap(args.csv_file, args.pcap_file, unfolded=args.unfolded, fold_over_sof=args.fold_over_sof, fold_max=args.fold_max)
-
if __name__ == "__main__":
main()