flink kafka source的avro反序列化问题
对接了一个kafka
,里面的topic使用avro
编码,且每个topic的schema
都不相同使用schema registry管理。
现在要反序列化key
和message
,查看了现有的:
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
内的ConfluentRegistryAvroDeserializationSchema
.
发现无论是forSpecific
返回一个特定的类,还是forGeneric
返回GenericRecord
都需要指定read schame
,而registry仅作为write schema
使用, 也就是虽然返回的是GenericRecord
, 但实际上都是同一个Schema
反序列化的, 每个record的格式是一致的.
不太理解为什么是这个设计, 参考Is it possible to deserialize Avro message(consuming message from Kafka) without giving Reader schema in ConfluentRegistryAvroDeserializationSchema,实现了下:
public class KafkaGenericAvroDeserializationSchema
implements KafkaDeserializationSchema<Tuple2<GenericRecord, GenericRecord>> {
private final String registryUrl;
private transient KafkaAvroDeserializer inner;
public KafkaGenericAvroDeserializationSchema2(String registryUrl) {
this.registryUrl = registryUrl;
}
private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);
}
}
@Override
public boolean isEndOfStream(Tuple2<GenericRecord, GenericRecord> nextElement) {
return false;
}
@Override
public Tuple2<GenericRecord, GenericRecord> deserialize(ConsumerRecord<byte[], byte[]> record) {
checkInitialized();
final String topic = record.topic();
GenericRecord key = (GenericRecord) inner.deserialize(topic, record.key());
GenericRecord value = (GenericRecord) inner.deserialize(topic, record.value());
return Tuple2.of(key, value);
}
@Override
public TypeInformation<Tuple2<GenericRecord, GenericRecord>> getProducedType() {
return new TupleTypeInfo<>(TypeExtractor.getForClass(GenericRecord.class),
TypeExtractor.getForClass(GenericRecord.class));
}
}
...
main:
KafkaSource.<Tuple2<GenericRecord, GenericRecord>>builder()
...
.setDeserializer(
KafkaRecordDeserializationSchema.of(new KafkaGenericAvroDeserializationSchema(
SCHAME_REGISTRY
)))
运行报错:
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 37 more
emmmm, kryo
无法序列化GenericRecord
,就算加上自定义的Collections$UnmodifiableCollection
也会有其他报错.
再查阅资料发现:
Since Avro’s Schema class is not serializable, it can not be sent around as is. You can work around this by converting it to a String and parsing it back when needed. If you only do this once on initialization, there is practically no difference to sending it directly.
GenericRecord
中包含Schema
用来指导get/set工作, 这个类无法被kryo
序列化.
这也能得出为什么ConfluentRegistryAvroDeserializationSchema
的都需要指定Schame
,因为有work around可以解决:
public static class AvroGenericSource implements SourceFunction<GenericRecord>, ResultTypeQueryable<GenericRecord> {
private final GenericRecordAvroTypeInfo producedType;
public AvroGenericSource(Schema schema) {
this.producedType = new GenericRecordAvroTypeInfo(schema);
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return producedType;
}
}
DataStream<GenericRecord> sourceStream =
env.addSource(new AvroGenericSource())
.returns(new GenericRecordAvroTypeInfo(schema));
其实就是这个场景下虽然返回了GenericRecord
,但其实并不Generic,只有一种schema
来使用,kryo
就能正常处理了.
既然不能直接用, 那直接toString()
然后在后续算子当json解析即可,或者在反序列化的方法里拿到自己需要的数据组成能序列化的数据.
在我这个场景中, 每个topic
需要的字段名是已知的, 在反序列化中得到GenericRecord
之后get出来再组装成POJO即可.