BigQuerySink.java
// BigQuerySink.java
public class BigQuerySink extends RichSinkFunction<Map<String, Object>> {
...
@Override
public void invoke(Map<String, Object> row, Context ctx) throws Exception {
TableId tableId = TableId.of(datasetId, tableName);
InsertAllResponse response = BigQueryInstance
.getInstance(this.serviceAccountFilename)
.bigQuery.insertAll(
InsertAllRequest
.newBuilder(tableId)
.addRow(row)
.build()
);
if (response.hasErrors()) {
// If any of the insertions failed, this lets you inspect the errors
for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
LOG.error(String.format("error entry: %s", entry.toString()));
}
// s3 gcs
LOG.info("SHOULD LOG TO S3 or GCS");
throw new Exception();
}
}
}Last updated