如何将CSV文件导入到没有任何列名或架构的BigQuery表中?

我目前正在编写一个Java实用程序,用于将几个CSV文件从GCS导入BigQuery。我可以通过 轻松实现此目的,但我想使用数据流作业来执行此操作。因此,我正在使用Dataflow的Pipeline和ParDo转换器(返回TableRow以将其应用于BigQueryIO),并且我已经为转换创建了StringToRowConverter()。这里实际问题开始了 - 我被迫为目标表指定架构,尽管我不想在不存在的情况下创建新表 - 只是尝试加载数据。因此,我不想手动设置 TableRow 的列名,因为我有大约 600 列。bq load

public class StringToRowConverter extends DoFn<String, TableRow> {

private static Logger logger = LoggerFactory.getLogger(StringToRowConverter.class);

public void processElement(ProcessContext c) {
    TableRow row = new TableRow();          
    row.set("DO NOT KNOW THE COLUMN NAME", c.element());
    c.output(row);
}
}

此外,假设该表已存在于BigQuery数据集中,我不需要创建它,并且CSV文件也以正确的顺序包含列。

如果此方案没有解决方法,并且数据加载需要列名,则可以将其放在 CSV 文件的第一行中。

任何帮助将不胜感激。


答案 1

若要避免创建表,应在管道配置期间使用 BigQueryIO.Write 的BigQueryIO.Write.CreateDisposition.CREATE_NEVER。资料来源:https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write

您不需要预先知道 BigQuery 表架构,您可以动态发现它。例如,您可以使用 BigQuery API (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get) 来查询表架构,并将其作为 StringToRowConverter 类的参数传递。另一个选项,假设第一行是标题,是跳过第一行,并使用它来正确映射文件的其余部分。

下面的代码实现了第二种方法,并将输出配置为追加到现有的 BigQuery 表中。

public class DFJob {

    public static class StringToRowConverter extends DoFn<String, TableRow> {

        private String[] columnNames;

        private boolean isFirstRow = true;

        public void processElement(ProcessContext c) {
            TableRow row = new TableRow();

            String[] parts = c.element().split(",");

            if (isFirstRow) {
                columnNames = Arrays.copyOf(parts, parts.length);
                isFirstRow = false;
            } else {
                for (int i = 0; i < parts.length; i++) {
                    row.set(columnNames[i], parts[i]);
                }
                c.output(row);
            }
        }
    }

    public static void main(String[] args) {
        DataflowPipelineOptions options = PipelineOptionsFactory.create()
                .as(DataflowPipelineOptions.class);
        options.setRunner(BlockingDataflowPipelineRunner.class);

        Pipeline p = Pipeline.create(options);

        p.apply(TextIO.Read.from("gs://dataflow-samples/myfile.csv"))
                .apply(ParDo.of(new StringToRowConverter()))
                .apply(BigQueryIO.Write.to("myTable")
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

        PipelineResult result = p.run();
    }
}

答案 2