option to keep folding despite SOF interspersed packets
class UsbPacketBuffer(object):
"""Buffer of decoded UsbPackets that haven't been written to disk yet."""
- def __init__(self, pcap, max_fold=64):
+ def __init__(self, pcap, max_fold=64, fold_over_sof=False):
self.buffer = []
self.max_fold = max_fold
self.written = 0
self.seen = 0
self.pcap = pcap
self.last_timestamp = 0
+ self.fold_over_sof = fold_over_sof
def add(self, pkt):
self.buffer.append(pkt)
self.write(flush=True)
def fold(self, data, combo, pkt_count, buffer):
- count = 0
if combo == UsbPacketCombo.UNKNOWN:
return 0
- # check units of pkt_count for the same combo
- for i in range(0, len(buffer), pkt_count):
+
+ # scan for the same packet combo, counting how many packets in
+ # a row can be skipped
+ i = 0
+ while i < len(buffer):
next_combo, next_count, next_data = UsbPacketCombo.find(buffer[i:i+pkt_count])
- if next_combo != combo or next_count != pkt_count or next_data != data:
- break
- count += 1
- return count*pkt_count
+ if self.fold_over_sof:
+ # ignore SOF packets when folding
+ if next_combo != UsbPacketCombo.SOF and \
+ (next_combo != combo or next_count != pkt_count or next_data != data):
+ break
+ else:
+ # stop folding on SOFs
+ if next_combo != combo or next_count != pkt_count or next_data != data:
+ break
+ i += next_count
+ return i
def write(self, flush=False):
# only consume packets down to the maximum folding level,
events.append(event)
return events
-def csv_to_pcap(csvfile, pcapfile, unfolded=False):
+def csv_to_pcap(csvfile, pcapfile, unfolded=False, fold_over_sof=False, fold_max=1024):
# parse saleae CSV file into event objects
events = events_from_csv(csvfile)
pcap.write_info_header()
# initialize a buffer for processing packet combinations
- buffer = UsbPacketBuffer(pcap, max_fold=0 if unfolded else 128)
+ buffer = UsbPacketBuffer(pcap, max_fold=0 if unfolded else fold_max, fold_over_sof=fold_over_sof)
packet = None
for event in events: