对接了一个kafka
,里面的topic使用avro
编码,且每个topic的schema
都不相同使用schema registry管理。 现在要反序列化key
和message
,查看了现有的:
1 2 <groupId > org.apache.flink</groupId > <artifactId > flink-avro-confluent-registry</artifactId >
内的ConfluentRegistryAvroDeserializationSchema
. 发现无论是forSpecific
返回一个特定的类,还是forGeneric
返回GenericRecord
都需要指定read schame
,而registry仅作为write schema
使用, 也就是虽然返回的是GenericRecord
, 但实际上都是同一个Schema
反序列化的, 每个record的格式是一致的.
不太理解为什么是这个设计, 参考Is it possible to deserialize Avro message(consuming message from Kafka) without giving Reader schema in ConfluentRegistryAvroDeserializationSchema ,实现了下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class KafkaGenericAvroDeserializationSchema implements KafkaDeserializationSchema <Tuple2<GenericRecord, GenericRecord>> { private final String registryUrl; private transient KafkaAvroDeserializer inner; public KafkaGenericAvroDeserializationSchema2 (String registryUrl) { this .registryUrl = registryUrl; } private void checkInitialized () { if (inner == null ) { Map<String, Object> props = new HashMap <>(); props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false ); SchemaRegistryClient client = new CachedSchemaRegistryClient ( registryUrl, AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); inner = new KafkaAvroDeserializer (client, props); } } @Override public boolean isEndOfStream (Tuple2<GenericRecord, GenericRecord> nextElement) { return false ; } @Override public Tuple2<GenericRecord, GenericRecord> deserialize (ConsumerRecord<byte [], byte []> record) { checkInitialized(); final String topic = record.topic(); GenericRecord key = (GenericRecord) inner.deserialize(topic, record.key()); GenericRecord value = (GenericRecord) inner.deserialize(topic, record.value()); return Tuple2.of(key, value); } @Override public TypeInformation<Tuple2<GenericRecord, GenericRecord>> getProducedType () { return new TupleTypeInfo <>(TypeExtractor.getForClass(GenericRecord.class), TypeExtractor.getForClass(GenericRecord.class)); } } ...
main:
1 2 3 4 5 6 KafkaSource.<Tuple2<GenericRecord, GenericRecord>>builder() ... .setDeserializer( KafkaRecordDeserializationSchema.of(new KafkaGenericAvroDeserializationSchema ( SCHAME_REGISTRY )))
运行报错:
1 2 3 4 5 6 7 Caused by: java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 37 more
emmmm, kryo
无法序列化GenericRecord
,就算加上自定义的Collections$UnmodifiableCollection
也会有其他报错.
再查阅资料 发现:
Since Avro’s Schema class is not serializable, it can not be sent around as is. You can work around this by converting it to a String and parsing it back when needed. If you only do this once on initialization, there is practically no difference to sending it directly.GenericRecord
中包含Schema
用来指导get/set工作, 这个类无法被kryo
序列化.
这也能得出为什么ConfluentRegistryAvroDeserializationSchema
的都需要指定Schame
,因为有work around可以解决:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static class AvroGenericSource implements SourceFunction <GenericRecord>, ResultTypeQueryable<GenericRecord> { private final GenericRecordAvroTypeInfo producedType; public AvroGenericSource (Schema schema) { this .producedType = new GenericRecordAvroTypeInfo (schema); } @Override public TypeInformation<GenericRecord> getProducedType () { return producedType; } } DataStream<GenericRecord> sourceStream = env.addSource(new AvroGenericSource ()) .returns(new GenericRecordAvroTypeInfo (schema));
其实就是这个场景下虽然返回了GenericRecord
,但其实并不Generic,只有一种schema
来使用,kryo
就能正常处理了.
既然不能直接用, 那直接toString()
然后在后续算子当json解析即可,或者在反序列化的方法里拿到自己需要的数据组成能序列化的数据. 在我这个场景中, 每个topic
需要的字段名是已知的, 在反序列化中得到GenericRecord
之后get出来再组装成POJO即可.