自定义 Flink State 的序列化方式是一种高阶的使用技巧,在很多复杂场景下,通过自定义 Serializer 可以在兼容性、性能等方面获得一定的收益。
State Serializer 作用
通过下面两行代码,我们创建了一个 Integer 类型的 ValueState:
// 创建 Integer 类型的 ValueState
ValueStateDescriptor<Integer> valueState = new ValueStateDescriptor<>("value", Types.INT);
ValueState<Integer> state = getRuntimeContext().getState(valueState);
我们传入的 Types.INT
让 Flink 知道了这个 State 的类型是 Integer 类型,并为这个 Integer 类型创建了对应的 IntSerializer。IntSerializer 有两个作用:
- snapshot 过程中将 State 的值,也就是 Integer 对象,序列化成二进制数据,写入持久化存储中
- restore 过程中将持久化存储中的二进制数据,反序列化成 Integer 对象
那么,State Serializer 其实是定义了 State 的二进制格式,通过自定义的方式,我们可以解决一些日常开发中常见的问题,比如:
- 复杂对象的序列化开销过大,比如 Kryo 需要写入 className 等冗余信息
- State 对象增减字段造成状态无法恢复
基本概念
一个完整的 State Serializer 由 Serializer 和 SerializerSnapshot 组成。其中:
- Serializer 定义指定类型的序列化和反序列化方法
- SerializerSnapshot 定义 Serializer 元信息的序列化方式(是快照中的一部分)
- TypeSerializerSchemaCompatibility 表示作业重启后用户定义的 TypeSerializer 和快照中 TypeSerializer 兼容性
在官方文档 Custom Serialization for Managed State 中对自定义 State Serializer 有不少的介绍,大家可以通过这篇文章了解更细节的相关概念。
实现一个 Serializer
我们以 DecimalDataSerializer(用于序列化 DecimalData 类型数据)为例,来研究 Serializer 具体的实现。
我们先看 DecimalData 的数据结构:
public final class DecimalData implements Comparable<DecimalData> {
final int precision; // 精度(字段长度)
final int scale; // 范围(小数的位数)
final long longVal; // 精度较小时,选择用 long 值表示
BigDecimal decimalVal; // 精度较大时,选择用 java.math.BigDecimal 表示
// 判断是否超出最大精度(即是否可以使用 long 值表示)
public boolean isCompact() {
return precision <= MAX_COMPACT_PRECISION;
}
// 转换成 long 值
public long toUnscaledLong() {
if (isCompact()) {
return longVal;
} else {
return toBigDecimal().unscaledValue().longValueExact();
}
}
// 转换成 bytes
public byte[] toUnscaledBytes() {
return toBigDecimal().unscaledValue().toByteArray();
}
}
对于这个数据结构,我们来看一下他对应的 Serializer:
public final class DecimalDataSerializer extends TypeSerializer<DecimalData> {
private final int precision;
private final int scale;
// 构造函数,初始化 precision 和 scale
public DecimalDataSerializer(int precision, int scale) {
this.precision = precision;
this.scale = scale;
}
@Override
public void serialize(DecimalData record, DataOutputView target) throws IOException {
if (DecimalData.isCompact(precision)) {
// 当前精度小,使用 long 值表示,写出 long 值
assert record.isCompact();
target.writeLong(record.toUnscaledLong());
} else {
// 当前精度大,使用 BigDecimal 表示
byte[] bytes = record.toUnscaledBytes();
target.writeInt(bytes.length);
target.write(bytes);
}
}
@Override
public DecimalData deserialize(DataInputView source) throws IOException {
if (DecimalData.isCompact(precision)) {
// 当前精度小,读取 long 值,初始化 DecimalData
long longVal = source.readLong();
return DecimalData.fromUnscaledLong(longVal, precision, scale);
} else {
// 当前精度大,读取 bytes,初始化 DecimalData
int length = source.readInt();
byte[] bytes = new byte[length];
source.readFully(bytes);
return DecimalData.fromUnscaledBytes(bytes, precision, scale);
}
}
}
上面的代码中展示了核心的两个方法:
- serialize:将 DecimalData 对象转化为二进制数据写入到 DataOutputView
- deserialize:将 DataInputView 中的二进制数据按序读取,得到 DecimalData 对象
至于这两个方法在什么时候需要调用,是读者无需关心的,因为不管是写入到 Rpc 传输还是写入到具体的 Local/Remote 文件系统,都会将对应的 OutputStream 和 InputStream 封装到 DataOutputView 和 DataInputView 中。
支持多版本兼容的例子
通过上面的例子可以发现,作业经过修改后,如果 Serializer 的 precision 和 scale 发生了变化,那么由于 DecimalData.isCompact(precision)
的结果与之前不同,所以已经序列化的数据可能无法被反序列化回来。假如我们需要设计一个可以兼容一些改动的 Serializer,需要改动哪些地方?
序列化中加入版本信息
如果我们期望可以灵活改动 precision 和 scale,那么以上面的 serialize 和 deserialize 方法为例:
int currentVersion = 2;
@Override
public void serialize(DecimalData record, DataOutputView target) throws IOException {
target.writeInt(currentVersion); // 新增版本信息
// ... 和之前一致
}
@Override
public DecimalData deserialize(DataInputView source) throws IOException {
int version = source.readInt();
if (version == currentVersion) {
return deserializeCurrentVersion(source);
} else {
return deserializeVersion1(source);
}
}
// 解析当前版本的数据结构
private DecimalData deserializeCurrentVersion(DataInputView source) {
// 使用当前版本的 precision 和 scale 进行解析
}
// 解析 Version 1 版本的数据结构
private DecimalData deserializeVersion1(DataInputView source) {
// 使用 Version 1 中的 precision 和 scale 进行解析并转化成当前结构的数据
}
调整兼容性判断
在修改了 DecimalData 的 precision 和 scale 之后,我们会发现快照中的 DecimalDataSerializer 和新创建的 DecimalDataSerializer 的属性已经对不上了,在 Flink 的实现中,会对 precision 和 scale 进行判断,如果不一致就抛出 TypeSerializerSchemaCompatibility.incompatible()
的异常。而目前我们加入了版本信息,则可以直接返回 TypeSerializerSchemaCompatibility.compatibleAsIs()
。
兼容性判断分为四种,分别是:
- COMPATIBLE_AS_IS:表示兼容,并且今后使用用户新定义的 Serializer
- COMPATIBLE_AFTER_MIGRATION:表示兼容,但是需要重新刷一遍数据,使用快照中的 Serializer 反序列化数据,并使用新的 Serializer 序列化数据
- COMPATIBLE_WITH_RECONFIGURED_SERIALIZER:表示兼容,用户需要返回一个新的 Serializer 作为今后使用的 Serializer
- INCOMPATIBLE:表示不兼容,作业抛异常退出
引用
State Serializer 在平台型应用中使用的会比较多,尤其是写 DataStream 框架的同学,因为用户往往在 State 中存储的是业务数据的状态,里面的字段的增减是非常普遍的操作。对于期望自定义 State Serializer 的同学,参考一下很多 Flink 内部实现的 Serializer,比如上面提到的 IntSerializer 和 DecimalDataSerializer。