书接上文,本篇继续分享ClickHouse源码中一个重要的流,
FilterBlockInputStream
的实现,重点在于分析Clickhouse是如何在执行引擎实现向量化的Filter操作符,而利用这个Filter操作符的,就可以实现where, having
的数据过滤。
话不多说,准备发车~~ 本文的源码分析基于ClickHouse v19.16.2.2的版本。
1.Selection的实现
Selection
是关系代数之中重要的一个的一个运算,通常也会用σ
符合来selection的实现。
而在SQL语句之中,实现Selection运算的便是:where
与having
。而本文就要从一个简单的SQL语句出发,带领大家一同梳理Clickhouse的源码,来探究它是如何实现选择运算的。
先看如下的查询SELECT * FROM test where a > 3 and b < 1;
这里扫描了test
表,并且需要筛选出了a
列大于3且b
列小于1的行。老规矩,咱们先尝试打开ClickHouse的Debug日志看一下具体的执行的pipeline。(ClickHouse 20.6之后的版本,终于支持了使用Explain语句来查看执行计划,真是千呼万唤始出来啊~~)
这里分为了4个流,而咱们需要关注的流就是Filter
流,它实现了从存储引擎的数据读取数据,并且执行函数运算,并最终实现数据过滤的逻辑。
所以Clickhouse的表达式计算并不单单只由ExpressionBlockInputStream
来完成的,而FilterBlockInputStream
同样也需要包含Expression
进行表达式的向量化的计算与过滤。
吐槽时间:私以为这样的实现并不优雅,如果在Filter
上层再套一层ExpressionBlockinputStream
结构上会更加清晰。不过这样的实现可能会导致额外的性能损耗,Clickhouse为了实现查询的高效执行可谓是『丧心病狂』, 后续分析聚合函数的实现时,我们会见到更为Dirty
的代码。
2. FilterBlockInputStream的源码剖析
- FilterBlockInputStream readImpl()的实现
直接上代码看一下FilterBlockInputStream
的数据读取方法吧,这部分代码比较多。我们拆解出来梳理
/// Determine position of filter column. header = input->getHeader(); expression->execute(header); filter_column = header.getPositionByName(filter_column_name); auto & column_elem = header.safeGetByPosition(filter_column); /// Isn't the filter already constant? if (column_elem.column) constant_filter_description = ConstantFilterDescription(*column_elem.column);
首先,构造FilterBlockInputStream
时会首先读取下一级流的Block Header
。通过Header
来分析是否有常量列满足always true
或always false
的逻辑,来设置ConstantFilterDescription
。比如存在全部是null
列的过滤列,无论进行什么表达式计算,结果都是false
。如果这样的话,就直接放回空的block
给上层流就ok了。
if (expression->checkColumnIsAlwaysFalse(filter_column_name)) return {};// Function: checkColumnIsAlwaysFalsefor (auto & action : actions) { if (action.type == action.APPLY_FUNCTION && action.function_base) { auto name = action.function_base->getName(); if ((name == "in" || name == "globalIn") && action.result_name == column_name && action.argument_names.size() > 1) { set_to_check = action.argument_names[1]; } } }
接下来解析FilterBlockInputStream
之中所有的表达式,查询是否有in
或globalin
的函数调用,并且其第二个参数set
为空,那么同样表示表达式alwaysFalse
也可以直接返回为空的Block。
比如说有如下查询:select * from test2 where a in (select a from test2 where a > 10)
而这个子查询select a from test2 where a > 10
返回的是空集的话,那么就会被直接过滤了,返回空的block。
接下来进入一个while
循环,不断从底层的流读取数据,并进行对应的表达式计算。这里我删去了一些冗余的代码:
while (1) { res = children.back()->read(); expression->execute(res); size_t columns = res.columns(); ColumnPtr column = res.safeGetByPosition(filter_column).column;
这里的实现很简单,就是不停从底层的流读取数据Block,通过表达式计算生成filter_column
列。这个列是一组bool
列,标识了对应的行是否还应该存在。
举个栗子,如果有如下查询select * from test where a > 10 and b < 2
。ClickHouse的表达式会生成如下执行流程如下(注意:ClickHouse遵从函数式编程的逻辑,任意函数调用都会生成新的一列):
1. add const column : 102. function call : a > 10 (生成一组新生成的bool列,列名为`a > 10`)3. remove const column : 104. add const column : 25. function call : b < 2 (生成一组新生成的bool列,列名为`b < 2`)6. remove const column : 2 7. call function : a > 10 and b < 2 (生成一组新生成的bool列,列名为`a > 10 and b < 2`)8. remove column : a > 109. remove column : b < 2
而最终新生成的这列就是我们后续需要用到过滤最终结果的filter_column
列了。
接下来就进入最核心的一部分代码了,遍历Block之中除了const column
与filter_column
列的所有列,进行实际的数据过滤。IColumn
接口中实现了一个接口为filter
,也就是说,每一个列类型都需要实现一个过滤方法,用一组bool数组来过滤列数据。
/** Removes elements that don't match the filter. * Is used in WHERE and HAVING operations. * If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column; * if 0, then don't makes reserve(), * otherwise (i.e. < 0), makes reserve() using size of source column. */ using Filter = PaddedPODArray<UInt8>; virtual Ptr filter(const Filter & filt, ssize_t result_size_hint) const = 0;
我们直接跳到子类的实现中来看一下:
template <typename T>ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_size_hint) const{ const UInt8 * filt_pos = filt.data(); const UInt8 * filt_end = filt_pos + size; const T * data_pos = data.data(); while (filt_pos < filt_end) { if (*filt_pos) res_data.push_back(*data_pos); ++filt_pos; ++data_pos; } return res;}
这之中最为核心的就是这个while
循环,遍历bool
数组,然后将合法数据塞进一个新的列之中,最终新的列替换旧的列,就完成了一列数据的过滤。之后对于剩余的列依次按照上述流程过一遍就完成了整个block
的过滤。这里也可以看到,这个while
循环也是一组很简单,没有control flow break
的一段代码,能够给予编译器向量化优化的空间很大。当然,ClickHouse还提供了一个手工调用向量化API的过滤版本代码:
#ifdef __SSE2__ /** A slightly more optimized version. * Based on the assumption that often pieces of consecutive values * completely pass or do not pass the filter. * Therefore, we will optimistically check the parts of `SIMD_BYTES` values. */ static constexpr size_t SIMD_BYTES = 16; const __m128i zero16 = _mm_setzero_si128(); const UInt8 * filt_end_sse = filt_pos + size / SIMD_BYTES * SIMD_BYTES; while (filt_pos < filt_end_sse) { int mask = _mm_movemask_epi8(_mm_cmpgt_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(filt_pos)), zero16)); if (0 == mask) { /// Nothing is inserted. } else if (0xFFFF == mask) { res_data.insert(data_pos, data_pos + SIMD_BYTES); } else { for (size_t i = 0; i < SIMD_BYTES; ++i) if (filt_pos[i]) res_data.push_back(data_pos[i]); } filt_pos += SIMD_BYTES; data_pos += SIMD_BYTES; }
No comments:
Post a Comment