summary history branches tags files
saleae_usb_pcap.py
#!/usr/bin/env python3
#
# saleae_usb_pcap -- convert CSV export of Saleae Logic USB analyzer to PcapNG for Wireshark
#
# author: Trevor Bentley <trevor@trevorbentley.com>
# date: 2025-10-22
# license: 0BSD
#
# Only works with USB 2.0 Full Speed devices.
#
#
# Exporting CSV from Saleae:
#
# 1) capture signal on USB D+ and D- lines
# 2) enable "USB LS and FS" analyzer in Full Speed mode with "Bytes" decode level
# 3) click "..."->"Export Table" in the Data Table section
# 4) choose All columns, All data, CSV format, and enable "Use ISO8601 timestamps"
#
#
# Process with:
#
# $ ./saleae_usb_pcap.py exported_from_saleae.csv converted.pcap
#
#
# Open with:
#
# $ wireshark converted.pcap
#

import argparse
import csv
import os
import struct
import sys
import textwrap

from datetime import datetime
from enum import Enum

BLOCKTYPE_EPB = 0x06
LINKTYPE_WIRESHARK_UPPER_PDU = 252
LINKTYPE_USB_2_0_FULL_SPEED = 294
TAG_PDU_DISSECTOR_NAME = 12
TAG_IF_TSRESOL = 9
RESOL_USEC = 6

# PcapNG format and header examples looted from:
#
# * https://pcapng.com/
# * https://github.com/ataradov/usb-sniffer/blob/main/software/capture.c
# * https://www.tcpdump.org/linktypes.html
# * https://github.com/greatscottgadgets/packetry/blob/main/src/file.rs
#
class PcapWriter(object):
    def __init__(self, pcapfile):
        self.pcapfile = open(pcapfile, "wb")
        self.write_file_header()
        self.write_usb_header()
        self.write_info_header()

    def pad(self, data):
        return data + b'\x00' * ((4 - (len(data) % 4)) % 4)

    def insert_and_append_length(self, data, offset, length):
        data = data[0:offset] + struct.pack("=I", length) + data[offset+4:]
        return data + struct.pack("=I", length)

    def option(self, tag, text):
        text = text.encode("utf-8")
        return self.pad(struct.pack("=HH", tag, len(text)) + text)

    def syslog_pdu(self, text):
        pdu_name = "syslog".encode("utf-8")
        # this is inexplicably the only packet that has to be big-endian
        hdr = struct.pack(">HH", TAG_PDU_DISSECTOR_NAME, len(pdu_name)) + pdu_name
        return hdr + self.option(0x00, "") + text.encode("utf-8")

    def write_file_header(self):
        hdr = struct.pack("=IIIHHII", 0x0a0d0d0a, 0, 0x1a2b3c4d, 1, 0, 0xffffffff, 0xffffffff)
        hdr += self.option(0x02, "saleae CSV 2 PCAP converter")
        hdr += self.option(0x00, "")
        hdr = self.insert_and_append_length(hdr, 4, len(hdr) + 4)
        self.pcapfile.write(hdr)

    def write_usb_header(self):
        hdr = struct.pack("=IIHHI", 1, 0, LINKTYPE_USB_2_0_FULL_SPEED, 0, 0x0000ffff)
        hdr += self.option(0x02, "usb")
        hdr += self.option(0x03, "CSV 2 PCAP Interface")
        hdr += struct.pack("=HHI", TAG_IF_TSRESOL, 1, RESOL_USEC)
        hdr += self.option(0x00, "")
        hdr = self.insert_and_append_length(hdr, 4, len(hdr) + 4)
        self.pcapfile.write(hdr)

    def write_info_header(self):
        hdr = struct.pack("=IIHHI", 1, 0, LINKTYPE_WIRESHARK_UPPER_PDU, 0, 0x0000ffff)
        hdr += self.option(0x02, "info")
        hdr += self.option(0x03, "Out-Of-Band USB info")
        hdr += struct.pack("=HHI", TAG_IF_TSRESOL, 1, RESOL_USEC)
        hdr += self.option(0x00, "")
        hdr = self.insert_and_append_length(hdr, 4, len(hdr) + 4)
        self.pcapfile.write(hdr)

    def write_packet(self, timestamp, data):
        length = len(data)
        pkt = struct.pack("=IIIIIII", BLOCKTYPE_EPB, 0, 0, timestamp >> 32, timestamp & 0xffffffff, length, length) + data
        pkt = self.pad(pkt)
        pkt += self.option(0x00, "")
        pkt = self.insert_and_append_length(pkt, 4, len(pkt) + 4)
        self.pcapfile.write(pkt)

    def write_syslog(self, timestamp, text):
        pdu = self.syslog_pdu(text)
        length = len(pdu)
        pkt = struct.pack("=IIIIIII", BLOCKTYPE_EPB, 0, 1, timestamp >> 32, timestamp & 0xffffffff, length, length)
        pkt = self.pad(pkt + pdu)
        pkt = self.insert_and_append_length(pkt, 4, len(pkt) + 4)
        self.pcapfile.write(pkt)


class UsbEventType(Enum):
    """The known types a UsbEvent can have"""
    # States without byte representation on bus
    NONE      = 0x00
    RESET     = 0x01
    BYTE      = 0x02
    EOP       = 0x03
    # USB PIDs
    ERROR     = 0x3c
    IN        = 0x69
    SYNC      = 0x80
    SOF       = 0xa5
    NAK       = 0x5a
    ACK       = 0xd2


class UsbEvent(object):
    """The smallest identifiable unit on the bus, a byte or state change."""
    def __init__(self, timestamp, field):
        self.timestamp_us = timestamp
        self.kind = UsbEventType.NONE
        self.byte = None
        if field == "Reset":
            self.kind = UsbEventType.RESET
        elif field == "EOP":
            self.kind = UsbEventType.EOP
        elif field == "Error packet":
            self.kind = UsbEventType.ERROR
        elif field.startswith("Byte"):
            self.kind = UsbEventType.BYTE
            self.byte = int(field.split(" ")[1], 16)
            self.kind.byte = self.byte


class UsbPacket(object):
    """A collection of events between a SYNC and EOP (or similar)"""
    def __init__(self, event):
        self.start = None
        self.end = None
        self.events = []
        self.data = []

        if event is not None:
            self.start = event.timestamp_us
            self.events.append(event)
            if event.byte is not None and event.byte != UsbEventType.SYNC.value:
                print(f"WARNING: packet start is not a sync byte: {event.byte}")
                self.data.append(event.byte)

    def add(self, event):
        self.events.append(event)
        if event.byte is not None:
            self.data.append(event.byte)

    def complete(self):
        self.end = self.events[-1].timestamp_us
        return self

    def is_complete(self):
        return self.end is not None


class UsbPacketCombo(Enum):
    """A group of one or more packets that can be skipped if repeated."""
    UNKNOWN = 0x00
    SOF     = 0x01
    IN_NAK  = 0x02

    @classmethod
    def find(cls, pkts):
        """Determine if pkts starts with one of the known foldable combinations

        Returns tuple: (combo_type, number_of_packets, unique_data)

        unique_data is the subset of the combination of packet data
        that is relevant to determining if packet combos are similar
        enough to fold.  For instance, every SOF packet has a
        different frame number and CRC, so only the first byte
        matters, while IN requests only match if all of their data
        matches.

        """
        if len(pkts) >= 1 and len(pkts[0].data) == 3 and pkts[0].data[0] == UsbEventType.SOF.value:
            return UsbPacketCombo.SOF, 1, [UsbEventType.SOF.value]
        elif len(pkts) >= 2 and len(pkts[0].data) > 1 and len(pkts[1].data) == 1 and \
             pkts[0].data[0] == UsbEventType.IN.value and pkts[1].data[0] == UsbEventType.NAK.value:
            return UsbPacketCombo.IN_NAK, 2, pkts[0].data + pkts[1].data
        return UsbPacketCombo.UNKNOWN, 1, []


class UsbPacketBuffer(object):
    """Buffer of decoded UsbPackets that haven't been written to disk yet."""
    def __init__(self, pcap, max_fold=64, fold_over_sof=False):
        self.buffer = []
        self.max_fold = max_fold
        self.written = 0
        self.seen = 0
        self.pcap = pcap
        self.last_timestamp = 0
        self.fold_over_sof = fold_over_sof

    def add(self, pkt):
        self.buffer.append(pkt)

    def flush(self):
        self.write(flush=True)

    def fold(self, data, combo, pkt_count, buffer):
        if combo == UsbPacketCombo.UNKNOWN:
            return 0

        # scan for the same packet combo, counting how many packets in
        # a row can be skipped
        i = 0
        while i < len(buffer):
            next_combo, next_count, next_data = UsbPacketCombo.find(buffer[i:i+pkt_count])
            if self.fold_over_sof:
                # ignore SOF packets when folding
                if next_combo != UsbPacketCombo.SOF and \
                   (next_combo != combo or next_count != pkt_count or next_data != data):
                    break
            else:
                # stop folding on SOFs
                if next_combo != combo or next_count != pkt_count or next_data != data:
                    break
            i += next_count
        return i

    def write(self, flush=False):
        # only consume packets down to the maximum folding level,
        # unless this is the end and we're flushing everything.
        min_level = 0 if flush else self.max_fold
        while len(self.buffer) > min_level:
            # see if the buffer starts with a known, foldable packet combination
            combo, pkts, pkt_data = UsbPacketCombo.find(self.buffer)
            folded = self.fold(pkt_data, combo, pkts, self.buffer[pkts:])

            # write the packet combo (or individual packet, if unknown)
            for _ in range(pkts):
                pkt = self.buffer[0]
                self.pcap.write_packet(pkt.start, bytes(pkt.data))
                self.written += 1
                self.seen += 1
                self.last_timestamp = pkt.events[-1].timestamp_us
                self.buffer = self.buffer[1:]

            # if there are foldable duplicate packet combos, drop them
            # and replace with a syslog packet
            if folded:
                event = self.buffer[0].events[0]
                type_str = "packets" if pkts == 1 else "packet combos"
                self.pcap.write_syslog(event.timestamp_us, f"-- repeated {folded} {combo.name} {type_str} --")
                self.last_timestamp = self.buffer[folded-1].events[-1].timestamp_us
                self.buffer = self.buffer[folded:]
                self.seen += folded


def events_from_csv(csvfile):
    events = []
    with open(csvfile, newline='') as csvfile:
        reader = csv.reader(csvfile, delimiter=',', quotechar='"')
        next(reader) # consume CSV header
        for idx,row in enumerate(reader):
            _title,_frametype,timestamp,_duration,data = row
            timestamp_us = int(datetime.fromisoformat(timestamp).timestamp()*1000*1000)
            events.append(UsbEvent(timestamp_us, data))
    return events

def csv_to_pcap(csvfile, pcapfile, unfolded=False, fold_over_sof=False, fold_max=1024):
    events = events_from_csv(csvfile) # parse saleae CSV file into event objects
    pcap = PcapWriter(pcapfile) # open pcap file and write mandatory headers
    packet = None # currently processing packet

    # initialize a buffer for processing packet combinations
    buffer = UsbPacketBuffer(pcap, max_fold=0 if unfolded else fold_max, fold_over_sof=fold_over_sof)

    for event in events:
        buffer.write() # write any ready packets to pcap file

        # fill in and queue new packets based on events
        if event.kind == UsbEventType.RESET or \
           event.kind == UsbEventType.EOP or \
           event.kind == UsbEventType.ERROR:
            if packet is not None:
                # complete any active packet
                buffer.add(packet.complete())
            if event.kind != UsbEventType.EOP:
                # record resets and errors as syslog packets
                pcap.write_syslog(event.timestamp_us, str(event.kind.name))
        elif event.kind == UsbEventType.BYTE:
            if packet is None or packet.is_complete():
                # start a new packet
                packet = UsbPacket(event)
            else:
                # continue an existing packet
                packet.add(event)

    # in case a final packet was incomplete, finish it
    if packet is not None and not packet.is_complete():
        buffer.add(packet.complete())

    buffer.flush() # flush all packets to pcap file

    pcap.write_syslog(buffer.last_timestamp, "end of capture")
    print(f"Wrote {buffer.written} pcap packets ({buffer.seen} processed)")


def main():
    parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
                                     description=textwrap.dedent("""
    Takes a CSV file exported from Saleae Logic's USB analyzer in
    'bytes' decoder mode and outputs a pcapng file compatible with
    Wireshark.

    ----

    By default, this script detects some duplicate packets or
    combinations of packets and "folds" them into a "repeated X times"
    syslog metadata packet to cut down on noise.  This behavior can be
    customized:

      - Specify '--unfolded' to disable folding entirely and output
        every individual packet identified.

      - Specify '--fold-max' to change the maximum number of packets
        that can be folded into one packet.

      - Specify '--fold-over-sof' to allow folding to continue even if
        when SOF packets are present in between.  Otherwise very long
        folds will be interrupted periodically by SOF packets.

    ----
    """))
    parser.add_argument('csv_file', help="input CSV file exported from Saleae Logic")
    parser.add_argument('pcap_file', help="output pcap file (will be overwritten)")
    parser.add_argument('-u', '--unfolded', action='store_true', help="don't fold repeated events, output all packets")
    parser.add_argument('-s', '--fold-over-sof', action='store_true', help="don't stop folding for SOF packets")
    parser.add_argument('-m', '--fold-max', type=int, default=1024, help="maximum duplicate packets to fold into one (default: 1024)")
    args = parser.parse_args()
    csv_to_pcap(args.csv_file, args.pcap_file, unfolded=args.unfolded, fold_over_sof=args.fold_over_sof, fold_max=args.fold_max)

if __name__ == "__main__":
    main()