如何自定义 Flink State 使用的 Serializer

自定义 Flink State 的序列化方式是一种高阶的使用技巧,在很多复杂场景下,通过自定义 Serializer 可以在兼容性、性能等方面获得一定的收益。

State Serializer 作用

通过下面两行代码,我们创建了一个 Integer 类型的 ValueState:

// 创建 Integer 类型的 ValueState
ValueStateDescriptor<Integer> valueState = new ValueStateDescriptor<>("value", Types.INT);
ValueState<Integer> state = getRuntimeContext().getState(valueState);

我们传入的 Types.INT 让 Flink 知道了这个 State 的类型是 Integer 类型,并为这个 Integer 类型创建了对应的 IntSerializer。IntSerializer 有两个作用:

  • snapshot 过程中将 State 的值,也就是 Integer 对象,序列化成二进制数据,写入持久化存储中
  • restore 过程中将持久化存储中的二进制数据,反序列化成 Integer 对象

那么,State Serializer 其实是定义了 State 的二进制格式,通过自定义的方式,我们可以解决一些日常开发中常见的问题,比如:

  1. 复杂对象的序列化开销过大,比如 Kryo 需要写入 className 等冗余信息
  2. State 对象增减字段造成状态无法恢复

基本概念

一个完整的 State Serializer 由 Serializer 和 SerializerSnapshot 组成。其中:

  • Serializer 定义指定类型的序列化和反序列化方法
  • SerializerSnapshot 定义 Serializer 元信息的序列化方式(是快照中的一部分)
  • TypeSerializerSchemaCompatibility 表示作业重启后用户定义的 TypeSerializer 和快照中 TypeSerializer 兼容性

在官方文档 Custom Serialization for Managed State 中对自定义 State Serializer 有不少的介绍,大家可以通过这篇文章了解更细节的相关概念。

实现一个 Serializer

我们以 DecimalDataSerializer(用于序列化 DecimalData 类型数据)为例,来研究 Serializer 具体的实现。

我们先看 DecimalData 的数据结构:

public final class DecimalData implements Comparable<DecimalData> {
    final int precision;   // 精度(字段长度)
    final int scale;  // 范围(小数的位数)
    final long longVal;  // 精度较小时,选择用 long 值表示
    BigDecimal decimalVal;   // 精度较大时,选择用 java.math.BigDecimal 表示

    // 判断是否超出最大精度(即是否可以使用 long 值表示)
    public boolean isCompact() {
        return precision <= MAX_COMPACT_PRECISION;
    }

    // 转换成 long 值
    public long toUnscaledLong() {
        if (isCompact()) {
            return longVal;
        } else {
            return toBigDecimal().unscaledValue().longValueExact();
        }
    }    

    // 转换成 bytes
    public byte[] toUnscaledBytes() {
        return toBigDecimal().unscaledValue().toByteArray();
    }    
}

对于这个数据结构,我们来看一下他对应的 Serializer:

public final class DecimalDataSerializer extends TypeSerializer<DecimalData> {
    private final int precision;
    private final int scale;

    // 构造函数,初始化 precision 和 scale
    public DecimalDataSerializer(int precision, int scale) {
        this.precision = precision;
        this.scale = scale;
    }

    @Override
    public void serialize(DecimalData record, DataOutputView target) throws IOException {
        if (DecimalData.isCompact(precision)) {
            // 当前精度小,使用 long 值表示,写出 long 值
            assert record.isCompact();
            target.writeLong(record.toUnscaledLong());
        } else {
            // 当前精度大,使用 BigDecimal 表示
            byte[] bytes = record.toUnscaledBytes();
            target.writeInt(bytes.length);
            target.write(bytes);
        }
    }

    @Override
    public DecimalData deserialize(DataInputView source) throws IOException {
        if (DecimalData.isCompact(precision)) {
            // 当前精度小,读取 long 值,初始化 DecimalData
            long longVal = source.readLong();
            return DecimalData.fromUnscaledLong(longVal, precision, scale);
        } else {
            // 当前精度大,读取 bytes,初始化 DecimalData
            int length = source.readInt();
            byte[] bytes = new byte[length];
            source.readFully(bytes);
            return DecimalData.fromUnscaledBytes(bytes, precision, scale);
        }
    }
}

上面的代码中展示了核心的两个方法:

  • serialize:将 DecimalData 对象转化为二进制数据写入到 DataOutputView
  • deserialize:将 DataInputView 中的二进制数据按序读取,得到 DecimalData 对象

至于这两个方法在什么时候需要调用,是读者无需关心的,因为不管是写入到 Rpc 传输还是写入到具体的 Local/Remote 文件系统,都会将对应的 OutputStream 和 InputStream 封装到 DataOutputView 和 DataInputView 中。

支持多版本兼容的例子

通过上面的例子可以发现,作业经过修改后,如果 Serializer 的 precision 和 scale 发生了变化,那么由于 DecimalData.isCompact(precision) 的结果与之前不同,所以已经序列化的数据可能无法被反序列化回来。假如我们需要设计一个可以兼容一些改动的 Serializer,需要改动哪些地方?

序列化中加入版本信息

如果我们期望可以灵活改动 precision 和 scale,那么以上面的 serialize 和 deserialize 方法为例:

    int currentVersion = 2;

    @Override
    public void serialize(DecimalData record, DataOutputView target) throws IOException {
        target.writeInt(currentVersion); // 新增版本信息
        // ... 和之前一致
    }

    @Override
    public DecimalData deserialize(DataInputView source) throws IOException {
        int version = source.readInt();
        if (version == currentVersion) {
            return deserializeCurrentVersion(source);
        } else {
            return deserializeVersion1(source);
        }
    }    

    // 解析当前版本的数据结构
    private DecimalData deserializeCurrentVersion(DataInputView source) {
        // 使用当前版本的 precision 和 scale 进行解析
    }

    // 解析 Version 1 版本的数据结构
    private DecimalData deserializeVersion1(DataInputView source) {
        // 使用 Version 1 中的 precision 和 scale 进行解析并转化成当前结构的数据
    }    

调整兼容性判断

在修改了 DecimalData 的 precision 和 scale 之后,我们会发现快照中的 DecimalDataSerializer 和新创建的 DecimalDataSerializer 的属性已经对不上了,在 Flink 的实现中,会对 precision 和 scale 进行判断,如果不一致就抛出 TypeSerializerSchemaCompatibility.incompatible() 的异常。而目前我们加入了版本信息,则可以直接返回 TypeSerializerSchemaCompatibility.compatibleAsIs()

兼容性判断分为四种,分别是:

  • COMPATIBLE_AS_IS:表示兼容,并且今后使用用户新定义的 Serializer
  • COMPATIBLE_AFTER_MIGRATION:表示兼容,但是需要重新刷一遍数据,使用快照中的 Serializer 反序列化数据,并使用新的 Serializer 序列化数据
  • COMPATIBLE_WITH_RECONFIGURED_SERIALIZER:表示兼容,用户需要返回一个新的 Serializer 作为今后使用的 Serializer
  • INCOMPATIBLE:表示不兼容,作业抛异常退出

引用

  1. DecimalDataSerializer.java
  2. Custom Serialization for Managed State

State Serializer 在平台型应用中使用的会比较多,尤其是写 DataStream 框架的同学,因为用户往往在 State 中存储的是业务数据的状态,里面的字段的增减是非常普遍的操作。对于期望自定义 State Serializer 的同学,参考一下很多 Flink 内部实现的 Serializer,比如上面提到的 IntSerializer 和 DecimalDataSerializer。