CustomKafkaDeserializationSchema.java
// CustomKafkaDeserializationSchema.java
...
public class CustomKafkaDeserializationSchema
implements CustomAvroDeserializationSchema<Tuple2<String, String>> {
...
@Override
public Tuple2<String, String> deserialize(
byte[] messageKey,
byte[] message,
String topic,
int partition,
long offset,
long timestamp
) {
checkInitialized();
String key = String.format(
"%s|%d|%d|%d",
topic, partition, offset, timestamp);
return new Tuple2<String, String>(
key,
kafkaAvroDeserializer.deserialize(key, message).toString()
);
}
private void checkInitialized() {
if (kafkaAvroDeserializer == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
kafkaAvroDeserializer = new KafkaAvroDeserializer(client, props);
}
}
}Last updated