class UsbEventType(Enum):
+ """The known types a UsbEvent can have"""
NONE = 0x00
RESET = 0x01
SYNC = 0x02
class UsbEvent(object):
+ """The smallest identifiable unit on the bus, a byte or state change."""
def __init__(self, timestamp, field):
self.timestamp_us = timestamp
self.kind = UsbEventType.NONE
return str(self.kind)
+class UsbPacket(object):
+ """A collection of events between a SYNC and EOP (or similar)"""
+ def __init__(self, event):
+ self.start = None
+ self.end = None
+ self.events = []
+ self.data = []
+
+ if event is not None:
+ self.start = event.timestamp_us
+ self.events.append(event)
+ if event.byte is not None and event.byte != 0x80:
+ print(f"WARNING: packet start is not a sync byte: {event.byte}")
+ self.data.append(event.byte)
+
+ def add(self, event):
+ self.events.append(event)
+ if event.byte is not None:
+ self.data.append(event.byte)
+
+ def complete(self):
+ self.end = self.events[-1].timestamp_us
+ return self
+
+ def is_complete(self):
+ return self.end is not None
+
+
+class UsbPacketCombo(Enum):
+ """A group of one or more packets that can be skipped if repeated."""
+ UNKNOWN = 0x00
+ SOF = 0x01
+ IN_NAK = 0x02
+
+ @classmethod
+ def find(cls, pkts):
+ """Determine if pkts starts with one of the known foldable combinations
+
+ Returns tuple: (combo_type, number_of_packets, unique_data)
+
+ unique_data is the subset of the combination of packet data
+ that is relevant to determining if packet combos are similar
+ enough to fold. For instance, every SOF packet has a
+ different frame number and CRC, so only the first byte
+ matters, while IN requests only match if all of their data
+ matches.
+
+ """
+ if len(pkts) == 0:
+ return UsbPacketCombo.UNKNOWN, 1, []
+ if len(pkts) >= 1 and len(pkts[0].data) == 3 and \
+ pkts[0].data[0] == 0xa5:
+ return UsbPacketCombo.SOF, 1, [0xa5]
+ elif len(pkts) >= 2 and len(pkts[0].data) > 1 and len(pkts[1].data) == 1 and \
+ pkts[0].data[0] == 0x69 and pkts[1].data[0] == 0x5a:
+ return UsbPacketCombo.IN_NAK, 2, pkts[0].data + pkts[1].data
+ return UsbPacketCombo.UNKNOWN, 1, []
+
+
+class UsbPacketBuffer(object):
+ """Buffer of decoded UsbPackets that haven't been written to disk yet."""
+ def __init__(self, pcap, max_fold=64):
+ self.buffer = []
+ self.max_fold = max_fold
+ self.written = 0
+ self.seen = 0
+ self.pcap = pcap
+ self.last_timestamp = 0
+
+ def add(self, pkt):
+ self.buffer.append(pkt)
+
+ def flush(self):
+ self.write(flush=True)
+
+ def fold(self, data, combo, pkt_count, buffer):
+ count = 0
+ if combo == UsbPacketCombo.UNKNOWN:
+ return 0
+ # check units of pkt_count for the same combo
+ for i in range(0, len(buffer), pkt_count):
+ next_combo, next_count, next_data = UsbPacketCombo.find(buffer[i:i+pkt_count])
+ if next_combo != combo or next_count != pkt_count or next_data != data:
+ break
+ count += 1
+ return count*pkt_count
+
+ def write(self, flush=False):
+ # only consume packets down to the maximum folding level,
+ # unless this is the end and we're flushing everything.
+ min_level = 0 if flush else self.max_fold
+ while len(self.buffer) > min_level:
+ # see if the buffer starts with a known, foldable packet combination
+ combo, pkts, pkt_data = UsbPacketCombo.find(self.buffer)
+ folded = self.fold(pkt_data, combo, pkts, self.buffer[pkts:])
+
+ # write the packet combo (or individual packet, if unknown)
+ for _ in range(pkts):
+ pkt = self.buffer[0]
+ self.pcap.write_packet(pkt.start, bytes(pkt.data))
+ self.written += 1
+ self.seen += 1
+ self.last_timestamp = pkt.events[-1].timestamp_us
+ self.buffer = self.buffer[1:]
+
+ # if there are foldable duplicate packet combos, drop them
+ # and replace with a syslog packet
+ if folded:
+ event = self.buffer[0].events[0]
+ type_str = "packets" if pkts == 1 else "packet combos"
+ self.pcap.write_syslog(event.timestamp_us, f"-- repeated {folded} {combo.name} {type_str} --")
+ self.last_timestamp = self.buffer[folded-1].events[-1].timestamp_us
+ self.buffer = self.buffer[folded:]
+ self.seen += folded
+
+
def events_from_csv(csvfile):
events = []
with open(csvfile, newline='') as csvfile:
events.append(event)
return events
-def csv_to_pcap(csvfile, pcapfile):
+def csv_to_pcap(csvfile, pcapfile, unfolded=False):
+ # parse saleae CSV file into event objects
events = events_from_csv(csvfile)
+ # open pcap file and write mandatory headers
pcap = PcapWriter(pcapfile)
pcap.write_file_header()
pcap.write_usb_header()
pcap.write_info_header()
- pkts = 0
- start = None
- data = []
+ # initialize a buffer for processing packet combinations
+ buffer = UsbPacketBuffer(pcap, max_fold=0 if unfolded else 128)
+ packet = None
for event in events:
- if event.kind == UsbEventType.RESET or event.kind == UsbEventType.EOP or event.kind == UsbEventType.ERROR:
- if start is not None:
- pcap.write_packet(start, bytes(data))
- pkts += 1
- start = None
- data = []
- continue
+ # write any ready packets to pcap file
+ buffer.write()
+
+ # fill in and queue new packets based on events
+ if event.kind == UsbEventType.RESET or \
+ event.kind == UsbEventType.EOP or \
+ event.kind == UsbEventType.ERROR:
+ if packet is not None:
+ # complete any active packet
+ buffer.add(packet.complete())
+ if event.kind != UsbEventType.EOP:
+ # record resets and errors as syslog packets
+ pcap.write_syslog(event.timestamp_us, str(event.kind))
elif event.kind == UsbEventType.BYTE:
- if start is None:
- start = event.timestamp_us
- if event.byte != 0x80:
- print("WARNING: packet start is not a sync byte")
- continue
- data.append(event.byte)
+ if packet is None or packet.is_complete():
+ # start a new packet
+ packet = UsbPacket(event)
+ else:
+ # continue an existing packet
+ packet.add(event)
+
+ # in case a final packet was incomplete, finish it
+ if packet is not None and not packet.is_complete():
+ buffer.add(packet.complete())
- # in case a final packet was pending
- if start is not None:
- pcap.write_packet(start, bytes(data))
- pkts += 1
- start = None
- data = []
+ # flush all packets to pcap file
+ buffer.flush()
- print(f"Wrote {pkts} pcap packets")
+ pcap.write_syslog(buffer.last_timestamp, "end of capture")
+ print(f"Wrote {buffer.written} pcap packets ({buffer.seen} processed)")
def main():
parser = argparse.ArgumentParser()
- parser.add_argument('csv')
- parser.add_argument('pcap')
+ parser.add_argument('csv', help="input CSV file exported from Saleae Logic")
+ parser.add_argument('pcap', help="output pcap file (will be overwritten)")
+ parser.add_argument('-u', '--unfolded', action='store_true', help="don't fold repeated events, output all packets")
args = parser.parse_args()
- csv_to_pcap(args.csv, args.pcap)
+ csv_to_pcap(args.csv, args.pcap, unfolded=args.unfolded)
if __name__ == "__main__":