EDIT 27-01-2018:
Si scopre che questo problema è correlato a DirectRunner. Se esegui la stessa pipeline utilizzando DataflowRunner, dovresti ottenere batch che in realtà contengono fino a 1.000 record. DirectRunner crea sempre pacchetti di dimensione 1 dopo un'operazione di raggruppamento.
Risposta originale:
Ho riscontrato lo stesso problema durante la scrittura su database cloud utilizzando JdbcIO di Apache Beam. Il problema è che mentre JdbcIO supporta la scrittura fino a 1.000 record in un batch, in realtà non l'ho mai visto scrivere più di 1 riga alla volta (devo ammettere:questo utilizzava sempre DirectRunner in un ambiente di sviluppo).
Ho quindi aggiunto una funzionalità a JdbcIO in cui puoi controllare tu stesso la dimensione dei batch raggruppando i tuoi dati e scrivendo ogni gruppo come un batch. Di seguito è riportato un esempio di come utilizzare questa funzione basato sull'esempio originale di WordCount di Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
La differenza con il normale metodo di scrittura di JdbcIO è il nuovo metodo writeIterable()
che accetta un PCollection<Iterable<RowT>>
come input invece di PCollection<RowT>
. Ogni Iterable viene scritto come un batch nel database.
La versione di JdbcIO con questa aggiunta può essere trovata qui:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java
L'intero progetto di esempio contenente l'esempio sopra può essere trovato qui:https://github.com/ esempio di olavloite/spanner-beam
(C'è anche una richiesta pull in sospeso su Apache Beam per includerla nel progetto)