HBase
 sql >> Database >  >> NoSQL >> HBase

Serializzazione affidabile dei messaggi in Apache Kafka utilizzando Apache Avro, parte 1

In Apache Kafka, le applicazioni Java chiamate producer scrivono messaggi strutturati in un cluster Kafka (composto da broker). Allo stesso modo, le applicazioni Java chiamate consumer leggono questi messaggi dallo stesso cluster. In alcune organizzazioni ci sono diversi gruppi incaricati di scrivere e gestire i produttori ei consumatori. In questi casi, uno dei principali punti deboli può essere il coordinamento del formato del messaggio concordato tra produttori e consumatori.

Questo esempio mostra come utilizzare Apache Avro per serializzare i record prodotti in Apache Kafka consentendo l'evoluzione degli schemi e l'aggiornamento non sincrono delle applicazioni producer e consumer.

Serializzazione e deserializzazione

Un record Kafka (precedentemente chiamato messaggio) è costituito da una chiave, un valore e intestazioni. Kafka non è a conoscenza della struttura dei dati nella chiave e nel valore dei record. Li gestisce come array di byte. Ma i sistemi che leggono i record da Kafka si preoccupano dei dati in quei record. Quindi è necessario produrre dati in un formato leggibile. Il formato dei dati che usi dovrebbe

  • Sii compatto
  • Sii veloce per codificare e decodificare
  • Consenti evoluzione
  • Consenti ai sistemi a monte (quelli che scrivono su un cluster Kafka) e ai sistemi a valle (quelli che leggono dallo stesso cluster Kafka) di eseguire l'aggiornamento a schemi più recenti in momenti diversi

JSON, ad esempio, è autoesplicativo ma non è un formato di dati compatto ed è lento da analizzare. Avro è un framework di serializzazione veloce che crea un output relativamente compatto. Ma per leggere i record Avro, è necessario lo schema con cui i dati sono stati serializzati.

Un'opzione è archiviare e trasferire lo schema con il record stesso. Questo va bene in un file in cui memorizzi lo schema una volta e lo usi per un numero elevato di record. La memorizzazione dello schema in ogni singolo record Kafka, tuttavia, aggiunge un sovraccarico significativo in termini di spazio di archiviazione e utilizzo della rete. Un'altra opzione consiste nell'avere un insieme concordato di mappature degli identificatori-schema e fare riferimento agli schemi in base ai loro identificatori nel record.

Da Object a Kafka Record e ritorno

Le applicazioni del produttore non devono convertire i dati direttamente in array di byte. KafkaProducer è una classe generica che richiede all'utente di specificare i tipi di chiavi e valori. Quindi, i produttori accettano istanze di ProducerRecord che hanno lo stesso tipo di parametri. La conversione dall'oggetto all'array di byte viene eseguita da un serializzatore. Kafka fornisce alcuni serializzatori primitivi:ad esempio, IntegerSerializer , ByteArraySerializer , StringSerializer . Sul lato consumer, Deserializer simili convertono array di byte in un oggetto che l'applicazione può gestire.

Quindi ha senso collegarsi a livello di serializzatore e deserializzatore e consentire agli sviluppatori di applicazioni di produttori e consumatori di utilizzare la comoda interfaccia fornita da Kafka. Sebbene le ultime versioni di Kafka consentano ExtendedSerializers e ExtendedDeserializers per accedere alle intestazioni, abbiamo deciso di includere l'identificatore dello schema nella chiave e nel valore dei record Kafka invece di aggiungere le intestazioni dei record.

Avro Essentials

Avro è un framework di serializzazione dei dati (e chiamata di procedura remota). Utilizza un documento JSON chiamato schema per descrivere le strutture di dati. La maggior parte dell'utilizzo di Avro avviene tramite GenericRecord o sottoclassi di SpecificRecord. Le classi Java generate dagli schemi Avro sono sottoclassi di quest'ultimo, mentre le prime possono essere utilizzate senza una previa conoscenza della struttura dati con cui si lavora.

Quando due schemi soddisfano un insieme di regole di compatibilità, i dati scritti con uno schema (chiamato schema del writer) possono essere letti come se fossero stati scritti con l'altro (chiamato schema del lettore). Gli schemi hanno una forma canonica che ha tutti i dettagli irrilevanti per la serializzazione, come i commenti, eliminati per facilitare il controllo dell'equivalenza.

VersionedSchema e SchemaProvider

Come accennato in precedenza, abbiamo bisogno di una mappatura uno-a-uno tra gli schemi e i loro identificatori. A volte è più facile fare riferimento agli schemi per nome. Quando viene creato uno schema compatibile, può essere considerato una versione successiva dello schema. Quindi possiamo fare riferimento a schemi con un nome, una coppia di versioni. Chiamiamo insieme lo schema, il suo identificatore, il nome e la versione VersionedSchema . Questo oggetto potrebbe contenere metadati aggiuntivi richiesti dall'applicazione.

public class VersionedSchema {
  private final int id;
  private final String name;
  private final int version;
  private final Schema schema;

  public VersionedSchema(int id, String name, int version, Schema schema) {
    this.id = id;
    this.name = name;
    this.version = version;
    this.schema = schema;
  }

  public String getName() {
    return name;
  }

  public int getVersion() {
    return version;
  }

  public Schema getSchema() {
    return schema;
  }
    
  public int getId() {
    return id;
  }
}

SchemaProvider gli oggetti possono cercare le istanze di VersionedSchema .

public interface SchemaProvider extends AutoCloseable {
  public VersionedSchema get(int id);
  public VersionedSchema get(String schemaName, int schemaVersion);
  public VersionedSchema getMetadata(Schema schema);
}

L'implementazione di questa interfaccia è illustrata in "Implementing a Schema Store" in un futuro post sul blog.

Serializzare dati generici

Quando si serializza un record, dobbiamo prima capire quale schema usare. Ogni record ha un getSchema metodo. Ma scoprire l'identificatore dallo schema potrebbe richiedere molto tempo. In genere è più efficiente impostare lo schema al momento dell'inizializzazione. Questo può essere fatto direttamente per identificatore o per nome e versione. Inoltre, quando si producono su più argomenti, potremmo voler impostare schemi diversi per argomenti diversi e scoprire lo schema dal nome dell'argomento fornito come parametro al metodo serialize(T, String) . Questa logica è omessa nei nostri esempi per motivi di brevità e semplicità.

private VersionedSchema getSchema(T data, String topic) {
  return schemaProvider.getMetadata( data.getSchema());
}

Con lo schema in mano, dobbiamo memorizzarlo nel nostro messaggio. La serializzazione dell'ID come parte del messaggio ci offre una soluzione compatta, poiché tutta la magia avviene nel serializzatore/deserializzatore. Consente inoltre un'integrazione molto semplice con altri framework e librerie che già supportano Kafka e consente all'utente di utilizzare il proprio serializzatore (come Spark).

Usando questo approccio, scriviamo prima l'identificatore dello schema sui primi quattro byte.

private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {
    try (DataOutputStream os = new DataOutputStream(stream)) {
    os.writeInt(id);
  }
}

Quindi possiamo creare un DatumWriter e serializzare l'oggetto.

private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
  DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema);
  datumWriter.write(data, encoder);
  encoder.flush();
}

Mettendo tutto insieme, abbiamo implementato un serializzatore di dati generico.

public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> {

  private SchemaProvider schemaProvider;

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    schemaProvider = SchemaUtils.getSchemaProvider(configs);
  }

  @Override
  public byte[] serialize(String topic, T data) {
    try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
      VersionedSchema schema = getSchema(data, topic);
   
      writeSchemaId(stream, schema.getId());
      writeSerializedAvro(stream, data, schema.getSchema());
      return stream.toByteArray();
    } catch (IOException e) {
      throw new RuntimeException("Could not serialize data", e);
    }
  }

  private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...}

  private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...}

  private VersionedSchema getSchema(T data, String topic) {...}

  @Override
  public void close() {
    try {
      schemaProvider.close();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

Deserializzare dati generici

La deserializzazione può funzionare con un singolo schema (con cui sono stati scritti i dati dello schema), ma è possibile specificare uno schema del lettore diverso. Lo schema del lettore deve essere compatibile con lo schema con cui i dati sono stati serializzati, ma non deve essere equivalente. Per questo motivo, abbiamo introdotto i nomi degli schemi. Ora possiamo specificare che vogliamo leggere i dati con una versione specifica di uno schema. Al momento dell'inizializzazione leggiamo le versioni dello schema desiderate per nome dello schema e memorizziamo i metadati in readerSchemasByName per un rapido accesso. Ora possiamo leggere ogni record scritto con una versione compatibile dello schema come se fosse stato scritto con la versione specificata.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  this.schemaProvider = SchemaUtils.getSchemaProvider(configs);
  this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider);
}

Quando un record deve essere deserializzato, leggiamo prima l'identificatore dello schema del writer. Ciò consente di cercare lo schema del lettore per nome. Con entrambi gli schemi disponibili possiamo creare un GeneralDatumReader e leggi il record.

@Override
public GenericData.Record deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {

    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);

    VersionedSchema readerSchema =
        readerSchemasByName.get(writerSchema.getName());
    GenericData.Record avroRecord = readAvroRecord(stream,
        writerSchema.getSchema(), readerSchema.getSchema());
    return avroRecord;
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private int readSchemaId(InputStream stream ) throws IOException {
  try(DataInputStream is = new DataInputStream(stream)) {
    return is.readInt();
  }
}

private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema,
      readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  GenericData.Record record = new GenericData.Record(readerSchema);
  datumReader.read(record, decoder);
  return record;
}

Gestire record specifici

Il più delle volte c'è una classe che vogliamo usare per i nostri record. Questa classe viene quindi generalmente generata da uno schema Avro. Apache Avro fornisce strumenti per generare codice Java da schemi. Uno di questi strumenti è il plug-in Avro Maven. Le classi generate hanno lo schema da cui sono state generate disponibile in fase di esecuzione. Ciò rende la serializzazione e la deserializzazione più semplici ed efficaci. Per la serializzazione possiamo usare la classe per scoprire l'identificatore dello schema da usare.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString();
  try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) {
    Class<?> recordClass = Class.forName(className);
    Schema writerSchema = new
        SpecificData(recordClass.getClassLoader()).getSchema(recordClass);
    this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}

Quindi non abbiamo bisogno della logica per determinare lo schema da argomento e dati. Usiamo lo schema disponibile nella classe record per scrivere i record.

Allo stesso modo, per la deserializzazione, lo schema del lettore può essere trovato dalla classe stessa. La logica di deserializzazione diventa più semplice, perché lo schema del lettore è fisso al momento della configurazione e non è necessario ricercarlo in base al nome dello schema.

@Override
public T deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {
    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);
    return readAvroRecord(stream, writerSchema.getSchema(), readerSchema);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  return datumReader.read(null, decoder);
}

Lettura aggiuntiva

Per ulteriori informazioni sulla compatibilità degli schemi, consulta la specifica Avro per la risoluzione degli schemi.

Per ulteriori informazioni sui moduli canonici, consulta la specifica Avro per l'analisi di moduli canonici per schemi.

La prossima volta...

La parte 2 mostrerà un'implementazione di un sistema per archiviare le definizioni dello schema Avro.