# BigQuerySink.java

```
// BigQuerySink.java
public class BigQuerySink extends RichSinkFunction<Map<String, Object>> {
    ...
    @Override
    public void invoke(Map<String, Object> row, Context ctx) throws Exception {
        TableId tableId = TableId.of(datasetId, tableName);

        InsertAllResponse response = BigQueryInstance
                .getInstance(this.serviceAccountFilename)
                .bigQuery.insertAll(
                InsertAllRequest
                        .newBuilder(tableId)
                        .addRow(row)
                        .build()
        );

        if (response.hasErrors()) {
            // If any of the insertions failed, this lets you inspect the errors
            for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
                LOG.error(String.format("error entry: %s", entry.toString()));
            }

            // s3 gcs
            LOG.info("SHOULD LOG TO S3 or GCS");

            throw new Exception();
        }
    }
}
```
