flink kafka source的avro反序列化问题

对接了一个kafka,里面的topic使用avro编码,且每个topic的schema都不相同使用schema registry管理。
现在要反序列化keymessage,查看了现有的:

1
2
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>

内的ConfluentRegistryAvroDeserializationSchema.
发现无论是forSpecific返回一个特定的类,还是forGeneric返回GenericRecord都需要指定read schame,而registry仅作为write schema使用, 也就是虽然返回的是GenericRecord, 但实际上都是同一个Schema反序列化的, 每个record的格式是一致的.

不太理解为什么是这个设计, 参考Is it possible to deserialize Avro message(consuming message from Kafka) without giving Reader schema in ConfluentRegistryAvroDeserializationSchema,实现了下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class KafkaGenericAvroDeserializationSchema
implements KafkaDeserializationSchema<Tuple2<GenericRecord, GenericRecord>> {

private final String registryUrl;
private transient KafkaAvroDeserializer inner;

public KafkaGenericAvroDeserializationSchema2(String registryUrl) {
this.registryUrl = registryUrl;
}

private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);
}
}

@Override
public boolean isEndOfStream(Tuple2<GenericRecord, GenericRecord> nextElement) {
return false;
}

@Override
public Tuple2<GenericRecord, GenericRecord> deserialize(ConsumerRecord<byte[], byte[]> record) {
checkInitialized();
final String topic = record.topic();
GenericRecord key = (GenericRecord) inner.deserialize(topic, record.key());
GenericRecord value = (GenericRecord) inner.deserialize(topic, record.value());
return Tuple2.of(key, value);
}

@Override
public TypeInformation<Tuple2<GenericRecord, GenericRecord>> getProducedType() {
return new TupleTypeInfo<>(TypeExtractor.getForClass(GenericRecord.class),
TypeExtractor.getForClass(GenericRecord.class));
}
}
...

main:

1
2
3
4
5
6
KafkaSource.<Tuple2<GenericRecord, GenericRecord>>builder()
...
.setDeserializer(
KafkaRecordDeserializationSchema.of(new KafkaGenericAvroDeserializationSchema(
SCHAME_REGISTRY
)))

运行报错:

1
2
3
4
5
6
7
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 37 more

emmmm, kryo无法序列化GenericRecord,就算加上自定义的Collections$UnmodifiableCollection也会有其他报错.

再查阅资料发现:

Since Avro’s Schema class is not serializable, it can not be sent around as is. You can work around this by converting it to a String and parsing it back when needed. If you only do this once on initialization, there is practically no difference to sending it directly.
GenericRecord中包含Schema用来指导get/set工作, 这个类无法被kryo序列化.

这也能得出为什么ConfluentRegistryAvroDeserializationSchema的都需要指定Schame,因为有work around可以解决:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static class AvroGenericSource implements SourceFunction<GenericRecord>, ResultTypeQueryable<GenericRecord> {
private final GenericRecordAvroTypeInfo producedType;

public AvroGenericSource(Schema schema) {
this.producedType = new GenericRecordAvroTypeInfo(schema);
}

@Override
public TypeInformation<GenericRecord> getProducedType() {
return producedType;
}
}

DataStream<GenericRecord> sourceStream =
env.addSource(new AvroGenericSource())
.returns(new GenericRecordAvroTypeInfo(schema));

其实就是这个场景下虽然返回了GenericRecord,但其实并不Generic,只有一种schema来使用,kryo就能正常处理了.

既然不能直接用, 那直接toString()然后在后续算子当json解析即可,或者在反序列化的方法里拿到自己需要的数据组成能序列化的数据.
在我这个场景中, 每个topic需要的字段名是已知的, 在反序列化中得到GenericRecord之后get出来再组装成POJO即可.