DVS事件相机Rosbag数据预处理算法(eroam)
1. 问题陈述
DVS事件相机输出的异步事件流通常以不定长EventArray消息的形式存储在rosbag中,原始数据存在两类核心问题:第一,热像素(坏像素)会产生大量无意义的噪声事件,干扰下游算法的特征提取、运动估计等流程;第二,原始EventArray的打包时长不固定,且用户通常仅需要指定时间段内的事件数据,直接使用原始数据需要额外做时间对齐、噪声过滤操作。本算法针对离线DVS rosbag数据的预处理需求,实现坏像素剔除、自定义时间段裁剪、固定时长事件分块三个核心功能,输出标准化的事件数据供下游算法使用。
2. Intuition
直接遍历原始EventArray消息、按原始消息边界输出的naive预处理方案存在三个核心缺陷:第一,若用线性查找判断坏像素,当坏像素数量较多时查询效率大幅下降;第二,无全局时间对齐逻辑,输出的事件段时长波动大,不利于下游算法的批处理;第三,无法灵活裁剪指定时间段的事件,容易引入冗余数据。本算法的核心思路是采用全局时间滑动窗口划分事件段,用有序集合存储坏像素实现对数复杂度的查询,仅保留指定时间范围内的有效事件,输出固定时长的标准化事件块。 该逻辑可类比视频剪辑中的固定时长分段导出:先剪掉不需要的开头结尾片段,去除噪点帧,再按固定时长切割为多个片段,每个片段的长度完全一致,方便后续批量处理。
3. 形式化定义
我们首先对算法的输入、输出和约束做数学化定义: 设输入的事件集合为 \(\mathcal{E} = {e_i}_{i=1}^N\),其中每个事件 \(e_i = (x_i, y_i, t_i, p_i)\),\(x_i \in [0, W-1], y_i \in [0, H-1]\) 为像素坐标,\(W,H\) 为事件相机分辨率,\(t_i\) 为事件时间戳,\(p_i \in {0,1}\) 为事件极性。 坏像素集合为 \(\mathcal{B} = {(x_b, y_b)}_{b=1}^M\),为预先标定的热像素坐标。 算法配置参数包括:起始时间偏移 \(t\_{offset} \geq 0\),处理时长上限 \(t\_{limit}\)(\(t\_{limit}=-1\) 表示处理至bag末尾),分块时长 \(T\_{seg} > 0\)。
首先计算全局处理时间范围:
输出的分块事件集合为 \(\mathcal{S} = {S_k}_{k=0}^{K-1}\),其中每个分块 \(S_k\) 对应时间窗口 \([t_{start} + k \cdot T_{seg}, t_{start} + (k+1) \cdot T_{seg})\),分块内的事件满足三个约束:
- 时间约束:\(t_i \in [t_{start}, t_{end})\)
- 噪声约束:\((x_i, y_i) \notin \mathcal{B}\)
- 分块归属:\(k = \lfloor \frac{t_i - t\_{start}}{T\_{seg}} \rfloor\)
所有空分块(即 \(S_k = \emptyset\))不会被写入输出rosbag,减少冗余存储。
4. 算法
算法的执行逻辑可分为参数解析、事件过滤、分块写入三个阶段,伪代码如下:
Input: input_bag_path, output_bag_path, event_topic, segment_duration, start_time_offset=0.0, duration_limit=-1.0, bad_pixels_list
Output: 预处理后的rosbag文件
1: bad_pixel_set ← convert bad_pixels_list to ordered set
2: open input_bag in read mode, open output_bag in write mode
3: initialize is_first_event = True, processing_started = False, current_events = empty list
4: for each message in input_bag view do
5: if message.topic == event_topic then
6: event_array ← instantiate message to EventArray type
7: if event_array is null then continue
8: if is_first_event is True then
9: bag_start_time ← event_array.events[0].ts
10: actual_start_time ← bag_start_time + start_time_offset
11: if duration_limit > 0 then
12: actual_end_time ← actual_start_time + duration_limit
13: end if
14: is_first_event ← False
15: end if
16: for each event in event_array.events do
17: if event.ts < actual_start_time then continue
18: if duration_limit > 0 and event.ts >= actual_end_time then goto FINISH
19: if processing_started is False then
20: processing_started ← True
21: segment_start_time ← actual_start_time
22: end if
23: if (event.x, event.y) in bad_pixel_set then continue
24: segment_elapsed = event.ts - segment_start_time
25: if segment_elapsed < segment_duration then
26: add event to current_events
27: else
28: if current_events is not empty then
29: new_event_array.header = event_array.header
30: new_event_array.header.stamp = segment_start_time
31: new_event_array.events = current_events
32: write new_event_array to output_bag
33: end if
34: clear current_events
35: segment_start_time += segment_duration
36: while event.ts - segment_start_time >= segment_duration do
37: segment_start_time += segment_duration
38: end while
39: add event to current_events
40: end if
41: end for
42: else
43: // 可选处理其他topic消息,默认注释
44: end if
45: end for
46: FINISH:
47: if current_events is not empty then
48: new_event_array.header.stamp = segment_start_time
49: new_event_array.events = current_events
50: write new_event_array to output_bag
51: end if
52: close input_bag and output_bag
对应的执行流程图如下:
5. 复杂度分析
时间复杂度
设总事件数为 \(N\),坏像素数量为 \(M\):
- 预处理阶段将坏像素列表转换为有序集合的时间复杂度为 \(O(M \log M)\)
- 每个事件的处理包含一次坏像素集合查找,复杂度为 \(O(\log M)\),事件的插入、写出操作均为摊还 \(O(1)\)
- 最坏情况:所有事件均符合过滤条件,总时间复杂度为 \(O(M \log M + N \log M)\)。由于实际场景中坏像素数量通常不超过1000,\(\log M\) 可视为常数,实际运行效率接近线性 \(O(N)\)
- 最好情况:所有事件均不在指定时间范围内,总时间复杂度为 \(O(M \log M + K)\),其中 \(K\) 为bag内消息总数
空间复杂度
算法仅存储当前分块的事件列表和坏像素集合,无需加载全量事件到内存:
- 坏像素集合占用空间为 \(O(M)\)
- 当前分块事件列表的最大长度为单个分块内的事件数 \(S\_{max}\)
- 总空间复杂度为 \(O(M + S\_{max})\),与输入bag的总大小无关,适合处理GB级别的大型rosbag文件。
6. 实现注意事项
工程实现与理论定义存在几处针对性优化:
- 源码中使用
goto语句跳出多层嵌套循环,相比设置多层flag变量的方案,代码逻辑更简洁,运行效率更高,是C++中处理嵌套循环提前退出的标准工程实践。 - 起始时间偏移
start_time_offset是相对于bag中第一个事件的时间戳计算,而非rosbag的创建时间,避免bag创建时间与实际事件起始时间不一致导致的裁剪错误。 - 处理跨多个空时间段的事件时,会循环累加分块起始时间直到找到对应分块,不会写入空的EventArray消息,减少输出bag的冗余体积。
- 输出EventArray的
header.stamp被设置为对应分块的起始时间,而非原始消息的时间戳,方便下游算法直接按消息时间戳对齐事件段。 - 源码默认不处理事件topic之外的其他消息,用户可根据需求取消对应代码行的注释,将符合时间范围的其他消息(如IMU、图像、位姿)也写入输出bag。
7. 对比分析
| 对比方案 | 本算法 | rosbag filter | DV SDK rosbag导出工具 | rpg_dvs_ros dvs_renderer |
|---|---|---|---|---|
| 坏像素过滤 | 支持,可动态配置 | 不支持 | 不支持 | 仅支持硬编码坏像素 |
| 固定时长分块 | 支持,可配置分块长度 | 不支持 | 不支持 | 不支持 |
| 自定义时间段裁剪 | 支持偏移+时长限制 | 仅支持全局时间范围 | 不支持 | 不支持 |
| 适用场景 | 离线rosbag预处理 | 通用rosbag裁剪 | aedat转rosbag | 实时事件流可视化 |
| 内存占用 | 低,仅存当前分块 | 中,需缓存消息块 | 高,需加载全量aedat | 中,实时处理缓存 |
本算法专门针对离线事件相机rosbag的预处理场景设计,输出的事件段时长固定,可直接输入下游事件-based SLAM、动作识别算法,无需额外时间对齐操作。