码迷,mamicode.com
首页 > 其他好文 > 详细

flink kafka consumer with avro schema. handling null

时间:2020-03-31 22:52:42      阅读:94      评论:0      收藏:0      [点我收藏+]

标签:pac   ==   else   produce   extractor   exception   long   catch   lin   

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

    private static final long serialVersionUID = 1L;

    private final Class<T> avroType;

    private transient DatumReader<T> reader;
    private transient BinaryDecoder decoder;

    public AvroDeserializationSchema(Class<T> avroType) {
        this.avroType = avroType;
    }


    public T deserialize(byte[] message) {
        ensureInitialized();
        try {
            decoder = DecoderFactory.get().binaryDecoder(message, decoder);
            T t = reader.read(null, decoder);
            return t;
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private void ensureInitialized() {
        if (reader == null) {
            if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
                reader = new SpecificDatumReader<T>(avroType);
 } else {
                reader = new ReflectDatumReader<T>(avroType);
            }
        }
    }


    public boolean isEndOfStream(T nextElement) {
        return false;
    }


    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avroType);
    }
}




https://developer.aliyun.com/ask/131116?spm=a2c6h.13159736


https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

flink kafka consumer with avro schema. handling null

标签:pac   ==   else   produce   extractor   exception   long   catch   lin   

原文地址:https://www.cnblogs.com/connie313/p/12609114.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!