使用Clickhouse直接写入Bitmap探索历程

一、早期的实现
对于小容量bitmap,直接可以使用Insert语句写入
insert [table] (bitmap) VALUS (bitmapBuild[toUInt64(1),bitmapBuild[toUInt64(2)])
这种方式可以直接是实现

但对于百万千万甚至过亿的Bitmap显然这个SQL过长是执行不了无法写入的
这时候用到了物化列

我们建表的时候指定一个字符串列
然后bitmap列可以通过字符串列物化得到

CREATE TABLE bitmaptable
(
    `end_time` DateTime,
    `roaring_str` String,
    `bitmap` AggregateFunction(groupBitmap,
 UInt64) MATERIALIZED base64Decode(roaring_str)
)

这种方式需要把Bitmap序列化后提交

public class ClickHouseUtil {
    public static ByteBuffer serialize(Roaring64NavigableMap rb64) throws IOException {
        // ck中rbm对小于32的基数进行了优化,使用smallset进行存放
        if (rb64.getLongCardinality() <= 32) {
            // the serialization structure of roaringbitmap in clickhouse: Byte(1), utils.VarInt(SerializedSizeInBytes), ByteArray(RoaringBitmap)
            // and long occupies 8 bytes
            ByteBuffer tmp = ByteBuffer.allocate(1 + 1 + 8 * rb64.getIntCardinality());
            ByteBuffer byteBuf =tmp.order()== ByteOrder.LITTLE_ENDIAN? tmp : tmp.slice().order(ByteOrder.LITTLE_ENDIAN);
            byteBuf.put((byte) 0);
            byteBuf.put((byte)rb64.getIntCardinality());
            Arrays.stream(rb64.toArray()).forEach(i -> byteBuf.putLong(i));
            return  byteBuf;
        } else {
            // Roaring64NavigableMap serialize with prefix of "signedLongs" and "highToBitmap.size()"
            // Refer to the implementation of the serialize method of Roaring64NavigableMap, remove the prefix bytes
            int rbmPrefixBytes = 1 + 8;
            int serializedSizeInBytes = (int) rb64.serializedSizeInBytes() - rbmPrefixBytes;
            int varIntLen = VarInt.varLongSize(serializedSizeInBytes);
            // the serialization structure of roaringbitmap in clickhouse: Byte(1), utils.VarInt(SerializedSizeInBytes), ByteArray(RoaringBitmap)
            ByteBuffer tmp = ByteBuffer.allocate(1 + varIntLen + serializedSizeInBytes);
            ByteBuffer byteBuf = tmp.order()== ByteOrder.LITTLE_ENDIAN? tmp : tmp.slice().order(ByteOrder.LITTLE_ENDIAN);
            byteBuf.put((byte) 1);
            VarInt.putVarInt(serializedSizeInBytes, byteBuf);
            ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
            rb64.serialize(new DataOutputStream(byteStream));
            byte[] bytes = byteStream.toByteArray();
            byteBuf.put(Arrays.copyOfRange(bytes,rbmPrefixBytes, (int)rb64.serializedSizeInBytes()));
            return byteBuf;
        }

    }
}

调用

String roaringStr = new String(Base64.getEncoder().encode(ClickHouseUtil.serialize(insertBitmap).array()));
statement.setString(paramsSize + 2, roaringStr);

二、JDBC的支持
后来jdbc支持了bitmap的写入

String sql = "INSERT INTO bitmap.crowd(crowd_code, crowd_version, offset_bitmap) VALUES(?, ?, ?)";
        try{
            pstmt = conn.prepareStatement(sql);
            pstmt.setString(1, crowdCode);
            pstmt.setString(2, version);

            Roaring64NavigableMap bitmap = new Roaring64NavigableMap();
            bitmap.add(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16);
            ClickHouseBitmap wrap = ClickHouseBitmap.wrap(bitmap, ClickHouseDataType.UInt64);
            pstmt.setObject(3, wrap);
            pstmt.execute();
        }catch (Exception e){
            e.printStackTrace();
        }

但经过DEBUG发现,最终JDBC还是通过转成bitmapBuild这种方式的SQL来实现插入
显然大容量的bitmap插入的时候直接就会报错
三、大容量bitmap如何写入
同事翻阅jdbc文档,发现有一种input方式可以流式写入
https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-jdbc

String sql = "INSERT INTO avatar.crowdtest SELECT code,version,offset_bitmap FROM input('code String,version String,offset_bitmap AggregateFunction(groupBitmap,UInt64)')";
        String uuid = ATool.uuid();
        try (PreparedStatement ps = dataSource.getConnection().prepareStatement(
                sql)) {
            ps.setString(1, code); // col1
            ps.setString(2, uuid); // col2, setTimestamp is slow and not recommended

//            final long[] ints = LongStream.range(1, 200_000_000).toArray();
//            final Roaring64NavigableMap bitmap1 = Roaring64NavigableMap.bitmapOf(ints);
//            Object obj = ClickHouseBitmap.wrap(ints);
            ps.setObject(3, ClickHouseBitmap.wrap(bitmap.getSourceBitmap(), ClickHouseDataType.UInt64)); // col3
            ps.addBatch(); // parameters will be write into buffered stream immediately in binary format
            ps.executeBatch(); // stream everything on-hand into ClickHouse
        }

本地调试后发现可以写入,但是上线后又一直写不进去
后来又发现

<dependency>
    <!-- please stop using ru.yandex.clickhouse as it's been deprecated -->
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.2-patch11</version>
    <!-- use uber jar with all dependencies included, change classifier to http for smaller jar -->
    <classifier>all</classifier>
    <exclusions>
        <exclusion>
            <groupId>*</groupId>
            <artifactId>*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

官方例子里是这样引入的
照着例子改了一遍上去发现demo可以写入
但是实际生产表里依旧写不进去,而且不报错

最后把demo里用到的临时表和生产表做了对比,看看到底哪里有问题
1、demo里3个字段,而且写的时候顺序也一致
2、生产表6个字段,有一部分字段有默认值就没有设值,而且写的时候顺序有所调整

最后调整了每一个字段都按顺序写到INSERT的SQL语句里,成功实现写入。

官方文档的主要问题:
1、给的例子相当不明确,也没有详细的说明,全靠猜
2、报错提示看不出来具体问题出在哪里,也都靠推理和猜测

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容