# KafkaStreamingJob.java

```
// KafkaStreamingJob.java
...
        DataStream row;

        if (FORMAT.equals("avro")) {
            row = stream.map(
                    new MapFunction<Tuple2<String, String>, Map<String, Object>>() {
                        @Override
                        public Map<String, Object> map(Tuple2<String, String> record) throws Exception {
                            return DataStreamTransformer.avro(schema, record);
                        }
                    }).name("[AVRO] MapFunction<Tuple2<String, String>, Map<String, Object>>");
        } else {
            row = stream.map(
                    new MapFunction<ObjectNode, Map<String, Object>>() {
                        @Override
                        public Map<String, Object> map(ObjectNode record) throws Exception {
                            return DataStreamTransformer.json(schema, record);
                        }
                    }
            ).name("[JSON] MapFunction<ObjectNode, Map<String, Object>>");
        }
...
```
