# CustomJSONKeyValueDeserializationSchema.java

```
// CustomJSONKeyValueDeserializationSchema.java
...
public class CustomJSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode> {
    ...
    public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        ...
        if (record.value() != null) {
            node.set("value", (JsonNode)this.mapper.readValue((byte[])record.value(), JsonNode.class));
        }

        if (this.includeMetadata) {
            node.putObject("metadata")
                    .put("offset", record.offset())
                    .put("topic", record.topic())
                    .put("partition", record.partition())
                    .put("timestamp", record.timestamp());
        }

        return node;
    }
    ...
}
...
```
