# CustomKafkaDeserializationSchema.java

```
// CustomKafkaDeserializationSchema.java
...
public class CustomKafkaDeserializationSchema
        implements CustomAvroDeserializationSchema<Tuple2<String, String>> {
    ...
    @Override
    public Tuple2<String, String> deserialize(
            byte[] messageKey,
            byte[] message,
            String topic,
            int partition,
            long offset,
            long timestamp
    ) {
        checkInitialized();

        String key = String.format(
                "%s|%d|%d|%d",
                topic, partition, offset, timestamp);

        return new Tuple2<String, String>(
                key,
                kafkaAvroDeserializer.deserialize(key, message).toString()
        );
    }

    private void checkInitialized() {
        if (kafkaAvroDeserializer == null) {
            Map<String, Object> props = new HashMap<>();
            props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
            props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            kafkaAvroDeserializer = new KafkaAvroDeserializer(client, props);
        }
    }
}
```
