# KafkaConsumer.java

Kafka Consumer class:

```
// KafkaConsumer.java
...
public class KafkaConsumer {
    ...
    public KafkaConsumer(Map<String, String> config)
    {
        ...
        switch(format) {
            case "json":
                kafkaConsumer = new FlinkKafkaConsumer<ObjectNode>(
                        config.get("kafka_topic"),
                        new CustomJSONKeyValueDeserializationSchema(true),
                        properties);
                break;
            case "avro":
                kafkaConsumer = new FlinkKafkaConsumer<Tuple2<String, String>>(
                        config.get("kafka_topic"),
                        new CustomKafkaDeserializationSchema(config.get("kafka_schema_registry_url")),
                        properties);
                break;
            default:
                kafkaConsumer = new FlinkKafkaConsumer<ObjectNode>(
                        config.get("kafka_topic"),
                        new CustomJSONKeyValueDeserializationSchema(true),
                        properties);
                break;
        }

        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);

        switch (config.get("kafka_fetch_mode")) {
            case "custom":
                Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
                specificStartOffsets.put(
                    new KafkaTopicPartition(
                        config.get("kafka_topic"),
                        Integer.parseInt(config.get("kafka_start_partition"))
                    ),
                    Long.parseLong(config.get("kafka_start_offset"))
                );
                kafkaConsumer.setStartFromSpecificOffsets(specificStartOffsets);
                break;
            case "earliest":
                kafkaConsumer.setStartFromEarliest();
                break;
            case "latest":
                kafkaConsumer.setStartFromLatest();
            case "timestamp":
                kafkaConsumer.setStartFromTimestamp(
                        Long.parseLong(config.get("kafka_start_timestamp"))
                );
                break;
            default:
                kafkaConsumer.setStartFromLatest();
                break;
        }
    }
    
    public FlinkKafkaConsumer getKafkaConsumer() {
        return kafkaConsumer;
    }
}
```
