Mysql
 sql >> Database >  >> RDS >> Mysql

Inserimento collettivo JdbcIO di Google Dataflow (Apache beam) nel database mysql

EDIT 27-01-2018:

Si scopre che questo problema è correlato a DirectRunner. Se esegui la stessa pipeline utilizzando DataflowRunner, dovresti ottenere batch che in realtà contengono fino a 1.000 record. DirectRunner crea sempre pacchetti di dimensione 1 dopo un'operazione di raggruppamento.

Risposta originale:

Ho riscontrato lo stesso problema durante la scrittura su database cloud utilizzando JdbcIO di Apache Beam. Il problema è che mentre JdbcIO supporta la scrittura fino a 1.000 record in un batch, in realtà non l'ho mai visto scrivere più di 1 riga alla volta (devo ammettere:questo utilizzava sempre DirectRunner in un ambiente di sviluppo).

Ho quindi aggiunto una funzionalità a JdbcIO in cui puoi controllare tu stesso la dimensione dei batch raggruppando i tuoi dati e scrivendo ogni gruppo come un batch. Di seguito è riportato un esempio di come utilizzare questa funzione basato sull'esempio originale di WordCount di Apache Beam.

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

La differenza con il normale metodo di scrittura di JdbcIO è il nuovo metodo writeIterable() che accetta un PCollection<Iterable<RowT>> come input invece di PCollection<RowT> . Ogni Iterable viene scritto come un batch nel database.

La versione di JdbcIO con questa aggiunta può essere trovata qui:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java

L'intero progetto di esempio contenente l'esempio sopra può essere trovato qui:https://github.com/ esempio di olavloite/spanner-beam

(C'è anche una richiesta pull in sospeso su Apache Beam per includerla nel progetto)