# JSON DataStreamTransformer.java

```
// DataStreamTransformer.java
...
        public static Map<String, Object> json(
            Map<String, String> schema,
            ObjectNode record) throws Exception {
        Map<String, Object> row = new HashMap<>();

        Map<String, Object> value = new ObjectMapper()
            .readValue(
                record.get("value").toString(),
                HashMap.class
            );

        Map<String, Object> payload =(Map<String, Object>) value.get("payload");
        Map<String, Object> after =(Map<String, Object>) payload.get("after");
        
        for (Map.Entry<String, Object> entry : after.entrySet()) {
            String key = entry.getKey();
            Object val = entry.getValue();

            if (val != null) {
                if (schema.containsKey(key)) {
                    row.put(key, val.toString());
                }
            }
        }

        row.put("kafka_topic", record.get("metadata").get("topic").toString());
        row.put("kafka_partition", record.get("metadata").get("partition").toString());
        row.put("kafka_offset", record.get("metadata").get("offset").toString());
        row.put("kafka_timestamp", record.get("metadata").get("timestamp").toString());
        row.put("kafka_op", payload.get("op").toString());

        return row;
    }
...
```
