+#!/usr/bin/env python3
+#
+# saleae_usb_pcap -- convert CSV export of Saleae Logic USB analyzer to PcapNG for Wireshark
+#
+# author: Trevor Bentley <trevor@trevorbentley.com>
+# date: 2025-10-22
+# license: 0BSD
+#
+# Only works with USB 2.0 Full Speed devices.
+#
+#
+# Exporting CSV from Saleae:
+#
+# 1) capture signal on USB D+ and D- lines
+# 2) enable "USB LS and FS" analyzer in Full Speed mode with "Bytes" decode level
+# 3) click "..."->"Export Table" in the Data Table section
+# 4) choose All columns, All data, CSV format, and enable "Use ISO8601 timestamps"
+#
+#
+# Process with:
+#
+# $ ./saleae_usb_pcap.py exported_from_saleae.csv converted.pcap
+#
+#
+# Open with:
+#
+# $ wireshark converted.pcap
+#
+
+import argparse
+import csv
+import os
+import struct
+import sys
+
+from datetime import datetime
+from enum import Enum
+
+# PcapNG format and header examples looted from:
+#
+# * https://pcapng.com/
+# * https://github.com/ataradov/usb-sniffer/blob/main/software/capture.c
+# * https://www.tcpdump.org/linktypes.html
+# * https://github.com/greatscottgadgets/packetry/blob/main/src/file.rs
+#
+class PcapWriter(object):
+ def __init__(self, pcapfile):
+ self.pcapfile = open(pcapfile, "wb")
+
+ def __del__(self):
+ self.pcapfile.close()
+ del self.pcapfile
+
+ def option(self, tag, text):
+ text = text.encode("utf-8")
+ opt = struct.pack("=HH", tag, len(text)) + text
+ padding = (4 - (len(opt) % 4)) % 4
+ return opt + b'\x00'*padding
+
+ def write_option(self, tag, text):
+ self.pcapfile.write(self.option(tag, text))
+
+ def insert_length(self, data, offset, length):
+ return data[0:offset] + struct.pack("=I", length) + data[offset+4:]
+
+ def write_file_header(self):
+ hdr = struct.pack("=IIIHHII", 0x0a0d0d0a, 0, 0x1a2b3c4d, 1, 0, 0xffffffff, 0xffffffff)
+ hdr += self.option(0x02, "saleae CSV 2 PCAP converter")
+ hdr += self.option(0x00, "")
+
+ length = len(hdr) + 4
+ hdr = self.insert_length(hdr, 4, length)
+ hdr += struct.pack("=I", length)
+ self.pcapfile.write(hdr)
+
+ def write_usb_header(self):
+ hdr = struct.pack("=IIHHI", 1, 0, 294, 0, 0x0000ffff) # LINKTYPE_USB_2_0_FULL_SPEED
+ hdr += self.option(0x02, "usb")
+ hdr += self.option(0x03, "CSV 2 PCAP Interface")
+ hdr += struct.pack("=HHI", 9, 1, 6)
+ hdr += self.option(0x00, "")
+
+ length = len(hdr) + 4
+ hdr = self.insert_length(hdr, 4, length)
+ hdr += struct.pack("=I", length)
+ self.pcapfile.write(hdr)
+
+ def write_info_header(self):
+ hdr = struct.pack("=IIHHI", 1, 0, 252, 0, 0x0000ffff) # LINKTYPE_WIRESHARK_UPPER_PDU
+ hdr += self.option(0x02, "info")
+ hdr += self.option(0x03, "Out-Of-Band USB info")
+ hdr += struct.pack("=HHI", 9, 1, 6)
+ hdr += self.option(0x00, "")
+
+ length = len(hdr) + 4
+ hdr = self.insert_length(hdr, 4, length)
+ hdr += struct.pack("=I", length)
+ self.pcapfile.write(hdr)
+
+ def write_packet(self, timestamp, data):
+ length = len(data)
+ pkt = struct.pack("=IIIIIII", 6, 0, 0, timestamp >> 32, timestamp & 0xffffffff, length, length) + data
+ padding = (4 - (len(pkt) % 4)) % 4
+ pkt += b'\x00'*padding
+ pkt += self.option(0x00, "")
+
+ length = len(pkt) + 4
+ pkt = self.insert_length(pkt, 4, length)
+ pkt += struct.pack("=I", length)
+ self.pcapfile.write(pkt)
+
+
+class UsbEventType(Enum):
+ NONE = 0x00
+ RESET = 0x01
+ SYNC = 0x02
+ SOF = 0x03
+ EOP = 0x04
+ KEEPALIVE = 0x05
+ ERROR = 0x06
+ BYTE = 0x80
+
+ def __str__(self):
+ if self.value == UsbEventType.BYTE.value and getattr(self, "byte", None) is not None:
+ return f"UsbEventType.BYTE[{self.byte:02x}]"
+ else:
+ return super().__str__()
+
+
+class UsbEvent(object):
+ def __init__(self, timestamp, field):
+ self.timestamp_us = timestamp
+ self.kind = UsbEventType.NONE
+ self.byte = None
+ if field == "Reset":
+ self.kind = UsbEventType.RESET
+ elif field == "EOP":
+ self.kind = UsbEventType.EOP
+ elif field == "Error packet":
+ self.kind = UsbEventType.ERROR
+ elif field.startswith("Byte"):
+ self.kind = UsbEventType.BYTE
+ self.byte = int(field.split(" ")[1], 16)
+ self.kind.byte = self.byte
+
+ def __str__(self):
+ return str(self.kind)
+
+
+def events_from_csv(csvfile):
+ events = []
+ with open(csvfile, newline='') as csvfile:
+ reader = csv.reader(csvfile, delimiter=',', quotechar='"')
+ for idx,row in enumerate(reader):
+ if idx == 0:
+ continue # header
+ _title,_frametype,timestamp,_duration,data = row
+ timestamp_us = int(datetime.fromisoformat(timestamp).timestamp()*1000*1000)
+ event = UsbEvent(timestamp_us, data)
+ events.append(event)
+ return events
+
+def csv_to_pcap(csvfile, pcapfile):
+ events = events_from_csv(csvfile)
+
+ pcap = PcapWriter(pcapfile)
+ pcap.write_file_header()
+ pcap.write_usb_header()
+ pcap.write_info_header()
+
+ pkts = 0
+ start = None
+ data = []
+
+ for event in events:
+ if event.kind == UsbEventType.RESET or event.kind == UsbEventType.EOP or event.kind == UsbEventType.ERROR:
+ if start is not None:
+ pcap.write_packet(start, bytes(data))
+ pkts += 1
+ start = None
+ data = []
+ continue
+ elif event.kind == UsbEventType.BYTE:
+ if start is None:
+ start = event.timestamp_us
+ if event.byte != 0x80:
+ print("WARNING: packet start is not a sync byte")
+ continue
+ data.append(event.byte)
+
+ # in case a final packet was pending
+ if start is not None:
+ pcap.write_packet(start, bytes(data))
+ pkts += 1
+ start = None
+ data = []
+
+ print(f"Wrote {pkts} pcap packets")
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('csv')
+ parser.add_argument('pcap')
+ args = parser.parse_args()
+ csv_to_pcap(args.csv, args.pcap)
+
+
+if __name__ == "__main__":
+ main()