# AVRO DataStreamTransformer.java

```
// DataStreamTransformer.java
...
        public static Map<String, Object> avro(
            Map<String, String> schema,
            Tuple2<String, String> record) throws Exception {
        Map<String, Object> row = new HashMap<>();

        // See CustomKafkaDeserializationSchema deserialize()
        String[] chunk1 = record.f0.split("\\|");

        String kafkaTopic = chunk1[0];
        String kafkaPartition = chunk1[1];
        String kafkaOffset = chunk1[2];
        String kafkaTimestamp = chunk1[3];

        Map<String, Object> chunk2 = new ObjectMapper().readValue(record.f1, HashMap.class);

        for (Map.Entry<String, Object> entry : chunk2.entrySet()) {
            String key = entry.getKey();
            Object val = entry.getValue();

            if (val != null) {
                if (schema.containsKey(key)) {
                    row.put(key, val.toString());
                }
            }
        }

        row.put("kafka_topic", kafkaTopic);
        row.put("kafka_partition", kafkaPartition);
        row.put("kafka_offset", kafkaOffset);
        row.put("kafka_timestamp", kafkaTimestamp);

        return row;
    }
...
```
