DVS事件相机Rosbag数据预处理算法(eroam)

1. 问题陈述

DVS事件相机输出的异步事件流通常以不定长EventArray消息的形式存储在rosbag中,原始数据存在两类核心问题:第一,热像素(坏像素)会产生大量无意义的噪声事件,干扰下游算法的特征提取、运动估计等流程;第二,原始EventArray的打包时长不固定,且用户通常仅需要指定时间段内的事件数据,直接使用原始数据需要额外做时间对齐、噪声过滤操作。本算法针对离线DVS rosbag数据的预处理需求,实现坏像素剔除、自定义时间段裁剪、固定时长事件分块三个核心功能,输出标准化的事件数据供下游算法使用。

2. Intuition

直接遍历原始EventArray消息、按原始消息边界输出的naive预处理方案存在三个核心缺陷:第一,若用线性查找判断坏像素,当坏像素数量较多时查询效率大幅下降;第二,无全局时间对齐逻辑,输出的事件段时长波动大,不利于下游算法的批处理;第三,无法灵活裁剪指定时间段的事件,容易引入冗余数据。本算法的核心思路是采用全局时间滑动窗口划分事件段,用有序集合存储坏像素实现对数复杂度的查询,仅保留指定时间范围内的有效事件,输出固定时长的标准化事件块。 该逻辑可类比视频剪辑中的固定时长分段导出:先剪掉不需要的开头结尾片段,去除噪点帧,再按固定时长切割为多个片段,每个片段的长度完全一致,方便后续批量处理。

3. 形式化定义

我们首先对算法的输入、输出和约束做数学化定义: 设输入的事件集合为 \(\mathcal{E} = {e_i}_{i=1}^N\),其中每个事件 \(e_i = (x_i, y_i, t_i, p_i)\)\(x_i \in [0, W-1], y_i \in [0, H-1]\) 为像素坐标,\(W,H\) 为事件相机分辨率,\(t_i\) 为事件时间戳,\(p_i \in {0,1}\) 为事件极性。 坏像素集合为 \(\mathcal{B} = {(x_b, y_b)}_{b=1}^M\),为预先标定的热像素坐标。 算法配置参数包括:起始时间偏移 \(t\_{offset} \geq 0\),处理时长上限 \(t\_{limit}\)\(t\_{limit}=-1\) 表示处理至bag末尾),分块时长 \(T\_{seg} > 0\)

首先计算全局处理时间范围:

\[ t\_{start} = t\_{bag_start} + t\_{offset} \]
其中 \(t\_{bag_start}\) 为原始bag中第一个事件的时间戳。若 \(t\_{limit} > 0\),则处理截止时间为:
\[ t\_{end} = t\_{start} + t\_{limit} \]
否则 \(t\_{end}\) 为原始bag中最后一个事件的时间戳。

输出的分块事件集合为 \(\mathcal{S} = {S_k}_{k=0}^{K-1}\),其中每个分块 \(S_k\) 对应时间窗口 \([t_{start} + k \cdot T_{seg}, t_{start} + (k+1) \cdot T_{seg})\),分块内的事件满足三个约束:

  1. 时间约束:\(t_i \in [t_{start}, t_{end})\)
  2. 噪声约束:\((x_i, y_i) \notin \mathcal{B}\)
  3. 分块归属:\(k = \lfloor \frac{t_i - t\_{start}}{T\_{seg}} \rfloor\)

所有空分块(即 \(S_k = \emptyset\))不会被写入输出rosbag,减少冗余存储。

4. 算法

算法的执行逻辑可分为参数解析、事件过滤、分块写入三个阶段,伪代码如下:

Input: input_bag_path, output_bag_path, event_topic, segment_duration, start_time_offset=0.0, duration_limit=-1.0, bad_pixels_list
Output: 预处理后的rosbag文件
1: bad_pixel_set ← convert bad_pixels_list to ordered set
2: open input_bag in read mode, open output_bag in write mode
3: initialize is_first_event = True, processing_started = False, current_events = empty list
4: for each message in input_bag view do
5:   if message.topic == event_topic then
6:       event_array ← instantiate message to EventArray type
7:       if event_array is null then continue
8:       if is_first_event is True then
9:           bag_start_time ← event_array.events[0].ts
10:          actual_start_time ← bag_start_time + start_time_offset
11:          if duration_limit > 0 then
12:              actual_end_time ← actual_start_time + duration_limit
13:          end if
14:          is_first_event ← False
15:      end if
16:      for each event in event_array.events do
17:          if event.ts < actual_start_time then continue
18:          if duration_limit > 0 and event.ts >= actual_end_time then goto FINISH
19:          if processing_started is False then
20:              processing_started ← True
21:              segment_start_time ← actual_start_time
22:          end if
23:          if (event.x, event.y) in bad_pixel_set then continue
24:          segment_elapsed = event.ts - segment_start_time
25:          if segment_elapsed < segment_duration then
26:              add event to current_events
27:          else
28:              if current_events is not empty then
29:                  new_event_array.header = event_array.header
30:                  new_event_array.header.stamp = segment_start_time
31:                  new_event_array.events = current_events
32:                  write new_event_array to output_bag
33:              end if
34:              clear current_events
35:              segment_start_time += segment_duration
36:              while event.ts - segment_start_time >= segment_duration do
37:                  segment_start_time += segment_duration
38:              end while
39:              add event to current_events
40:          end if
41:      end for
42:  else
43:      // 可选处理其他topic消息,默认注释
44:  end if
45: end for
46: FINISH:
47: if current_events is not empty then
48:   new_event_array.header.stamp = segment_start_time
49:   new_event_array.events = current_events
50:   write new_event_array to output_bag
51: end if
52: close input_bag and output_bag

对应的执行流程图如下:

flowchart LR A[开始] --> B[读取配置参数] B --> C[解析坏像素列表为有序集合] C --> D[打开输入/输出rosbag] D --> E[遍历bag内所有消息] E --> F{消息属于事件topic?} F -->|否| G[可选写入其他topic消息] G --> E F -->|是| H[解析为EventArray消息] H --> I{是首个事件消息?} I -->|是| J[计算全局起止处理时间] J --> K[遍历EventArray内单个事件] I -->|否| K K --> L{事件时间早于起始时间?} L -->|是| K L -->|否| M{事件时间晚于截止时间?} M -->|是| N[进入收尾流程] M -->|否| O{是首次处理有效事件?} O -->|是| P[初始化首个分块起始时间] P --> Q{坐标在坏像素集合中?} O -->|否| Q Q -->|是| K Q -->|否| R{事件属于当前分块?} R -->|是| S[加入当前分块事件列表] S --> K R -->|否| T[写入当前分块到输出bag] T --> U[重置分块, 跳过空时间段] U --> S N --> V{当前分块非空?} V -->|是| W[写入剩余分块到输出bag] V -->|否| X[关闭输入/输出bag] W --> X X --> Y[结束]

5. 复杂度分析

时间复杂度

设总事件数为 \(N\),坏像素数量为 \(M\)

  1. 预处理阶段将坏像素列表转换为有序集合的时间复杂度为 \(O(M \log M)\)
  2. 每个事件的处理包含一次坏像素集合查找,复杂度为 \(O(\log M)\),事件的插入、写出操作均为摊还 \(O(1)\)
  3. 最坏情况:所有事件均符合过滤条件,总时间复杂度为 \(O(M \log M + N \log M)\)。由于实际场景中坏像素数量通常不超过1000,\(\log M\) 可视为常数,实际运行效率接近线性 \(O(N)\)
  4. 最好情况:所有事件均不在指定时间范围内,总时间复杂度为 \(O(M \log M + K)\),其中 \(K\) 为bag内消息总数

空间复杂度

算法仅存储当前分块的事件列表和坏像素集合,无需加载全量事件到内存:

  1. 坏像素集合占用空间为 \(O(M)\)
  2. 当前分块事件列表的最大长度为单个分块内的事件数 \(S\_{max}\)
  3. 总空间复杂度为 \(O(M + S\_{max})\),与输入bag的总大小无关,适合处理GB级别的大型rosbag文件。

6. 实现注意事项

工程实现与理论定义存在几处针对性优化:

  1. 源码中使用goto语句跳出多层嵌套循环,相比设置多层flag变量的方案,代码逻辑更简洁,运行效率更高,是C++中处理嵌套循环提前退出的标准工程实践。
  2. 起始时间偏移start_time_offset是相对于bag中第一个事件的时间戳计算,而非rosbag的创建时间,避免bag创建时间与实际事件起始时间不一致导致的裁剪错误。
  3. 处理跨多个空时间段的事件时,会循环累加分块起始时间直到找到对应分块,不会写入空的EventArray消息,减少输出bag的冗余体积。
  4. 输出EventArray的header.stamp被设置为对应分块的起始时间,而非原始消息的时间戳,方便下游算法直接按消息时间戳对齐事件段。
  5. 源码默认不处理事件topic之外的其他消息,用户可根据需求取消对应代码行的注释,将符合时间范围的其他消息(如IMU、图像、位姿)也写入输出bag。

7. 对比分析

对比方案 本算法 rosbag filter DV SDK rosbag导出工具 rpg_dvs_ros dvs_renderer
坏像素过滤 支持,可动态配置 不支持 不支持 仅支持硬编码坏像素
固定时长分块 支持,可配置分块长度 不支持 不支持 不支持
自定义时间段裁剪 支持偏移+时长限制 仅支持全局时间范围 不支持 不支持
适用场景 离线rosbag预处理 通用rosbag裁剪 aedat转rosbag 实时事件流可视化
内存占用 低,仅存当前分块 中,需缓存消息块 高,需加载全量aedat 中,实时处理缓存

本算法专门针对离线事件相机rosbag的预处理场景设计,输出的事件段时长固定,可直接输入下游事件-based SLAM、动作识别算法,无需额外时间对齐操作。