flink kafka source的avro反序列化问题

对接了一个kafka,里面的topic使用avro编码,且每个topic的schema都不相同使用schema registry管理。
现在要反序列化keymessage,查看了现有的:

<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>

内的ConfluentRegistryAvroDeserializationSchema.
发现无论是forSpecific返回一个特定的类,还是forGeneric返回GenericRecord都需要指定read schame,而registry仅作为write schema使用, 也就是虽然返回的是GenericRecord, 但实际上都是同一个Schema反序列化的, 每个record的格式是一致的.

不太理解为什么是这个设计, 参考Is it possible to deserialize Avro message(consuming message from Kafka) without giving Reader schema in ConfluentRegistryAvroDeserializationSchema,实现了下:

public class KafkaGenericAvroDeserializationSchema
    implements KafkaDeserializationSchema<Tuple2<GenericRecord, GenericRecord>> {

  private final String registryUrl;
  private transient KafkaAvroDeserializer inner;

  public KafkaGenericAvroDeserializationSchema2(String registryUrl) {
    this.registryUrl = registryUrl;
  }

  private void checkInitialized() {
    if (inner == null) {
      Map<String, Object> props = new HashMap<>();
      props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
      props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
      SchemaRegistryClient client =
          new CachedSchemaRegistryClient(
              registryUrl, AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
      inner = new KafkaAvroDeserializer(client, props);
    }
  }

  @Override
  public boolean isEndOfStream(Tuple2<GenericRecord, GenericRecord> nextElement) {
    return false;
  }

  @Override
  public Tuple2<GenericRecord, GenericRecord> deserialize(ConsumerRecord<byte[], byte[]> record) {
    checkInitialized();
    final String topic = record.topic();
    GenericRecord key = (GenericRecord) inner.deserialize(topic, record.key());
    GenericRecord value = (GenericRecord) inner.deserialize(topic, record.value());
    return Tuple2.of(key, value);
  }

  @Override
  public TypeInformation<Tuple2<GenericRecord, GenericRecord>> getProducedType() {
    return new TupleTypeInfo<>(TypeExtractor.getForClass(GenericRecord.class),
                               TypeExtractor.getForClass(GenericRecord.class));
  }
}
...

main:

KafkaSource.<Tuple2<GenericRecord, GenericRecord>>builder()
                           ...
                           .setDeserializer(
                                   KafkaRecordDeserializationSchema.of(new KafkaGenericAvroDeserializationSchema(
                                           SCHAME_REGISTRY
                                   )))

运行报错:

Caused by: java.lang.UnsupportedOperationException
	at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	... 37 more

emmmm, kryo无法序列化GenericRecord,就算加上自定义的Collections$UnmodifiableCollection也会有其他报错.

再查阅资料发现:

Since Avro’s Schema class is not serializable, it can not be sent around as is. You can work around this by converting it to a String and parsing it back when needed. If you only do this once on initialization, there is practically no difference to sending it directly.
GenericRecord中包含Schema用来指导get/set工作, 这个类无法被kryo序列化.

这也能得出为什么ConfluentRegistryAvroDeserializationSchema的都需要指定Schame,因为有work around可以解决:

public static class AvroGenericSource implements SourceFunction<GenericRecord>, ResultTypeQueryable<GenericRecord> {
  private final GenericRecordAvroTypeInfo producedType;

  public AvroGenericSource(Schema schema) {
    this.producedType = new GenericRecordAvroTypeInfo(schema);
  }
  
  @Override
  public TypeInformation<GenericRecord> getProducedType() {
    return producedType;
  }
}

DataStream<GenericRecord> sourceStream =
    env.addSource(new AvroGenericSource())
        .returns(new GenericRecordAvroTypeInfo(schema));

其实就是这个场景下虽然返回了GenericRecord,但其实并不Generic,只有一种schema来使用,kryo就能正常处理了.

既然不能直接用, 那直接toString()然后在后续算子当json解析即可,或者在反序列化的方法里拿到自己需要的数据组成能序列化的数据.
在我这个场景中, 每个topic需要的字段名是已知的, 在反序列化中得到GenericRecord之后get出来再组装成POJO即可.