KafkaConsumer.java
// KafkaConsumer.java
...
public class KafkaConsumer {
...
public KafkaConsumer(Map<String, String> config)
{
...
switch(format) {
case "json":
kafkaConsumer = new FlinkKafkaConsumer<ObjectNode>(
config.get("kafka_topic"),
new CustomJSONKeyValueDeserializationSchema(true),
properties);
break;
case "avro":
kafkaConsumer = new FlinkKafkaConsumer<Tuple2<String, String>>(
config.get("kafka_topic"),
new CustomKafkaDeserializationSchema(config.get("kafka_schema_registry_url")),
properties);
break;
default:
kafkaConsumer = new FlinkKafkaConsumer<ObjectNode>(
config.get("kafka_topic"),
new CustomJSONKeyValueDeserializationSchema(true),
properties);
break;
}
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
switch (config.get("kafka_fetch_mode")) {
case "custom":
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(
new KafkaTopicPartition(
config.get("kafka_topic"),
Integer.parseInt(config.get("kafka_start_partition"))
),
Long.parseLong(config.get("kafka_start_offset"))
);
kafkaConsumer.setStartFromSpecificOffsets(specificStartOffsets);
break;
case "earliest":
kafkaConsumer.setStartFromEarliest();
break;
case "latest":
kafkaConsumer.setStartFromLatest();
case "timestamp":
kafkaConsumer.setStartFromTimestamp(
Long.parseLong(config.get("kafka_start_timestamp"))
);
break;
default:
kafkaConsumer.setStartFromLatest();
break;
}
}
public FlinkKafkaConsumer getKafkaConsumer() {
return kafkaConsumer;
}
}Last updated