PostgreSQL
 sql >> Database >  >> RDS >> PostgreSQL

@Tailable(spring-data-reactive-mongodb) equivalente in spring-data-r2dbc

Ero sullo stesso problema, non ero sicuro di aver trovato una soluzione o meno, ma sono stato in grado di ottenere qualcosa di simile procedendo come segue. Per prima cosa, ho aggiunto il trigger al mio tavolo

CREATE TRIGGER trigger_name
    AFTER INSERT OR DELETE OR UPDATE 
    ON table_name
    FOR EACH ROW
    EXECUTE PROCEDURE trigger_function_name;

Ciò imposterà un trigger sulla tabella ogni volta che una riga viene aggiornata, eliminata o inserita. Quindi chiamerà la funzione trigger che ho impostato che assomigliava a questa:

CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS 
$BODY$
DECLARE
    payload JSON;
BEGIN
    payload = row_to_json(NEW);
    PERFORM pg_notify('notification_name', payload::text);
    RETURN NULL;
END;
$BODY$;

Questo mi consentirà di "ascoltare" uno qualsiasi di questi aggiornamenti dal mio progetto di avvio primaverile e invierà l'intera riga come carico utile. Successivamente, nel mio progetto di avvio primaverile ho configurato una connessione al mio db.

@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
                .host("host")
                .database("db")
                .port(port)
                .username("username")
                .password("password")
                .schema("schema")
                .connectTimeout(Duration.ofMinutes(2))
                .build());
    }
}

Con ciò lo autowire (iniezione di dipendenza) nel costruttore nella mia classe di servizio e lo lancio in una classe r2dbc PostgressqlConnection in questo modo:

this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();

Ora vogliamo "ascoltare" il nostro tavolo e ricevere una notifica quando eseguiamo alcuni aggiornamenti al nostro tavolo. Per fare ciò, impostiamo un metodo di inizializzazione che viene eseguito dopo l'iniezione delle dipendenze utilizzando l'annotazione @PostContruct

@PostConstruct
private void postConstruct() {
    postgresqlConnection.createStatement("LISTEN notification_name").execute()
            .flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}

Si noti che ascoltiamo qualunque nome mettiamo all'interno del metodo pg_notify. Inoltre vogliamo impostare un metodo per chiudere la connessione quando il bean sta per essere buttato via, in questo modo:

@PreDestroy
private void preDestroy() {
    postgresqlConnection.close().subscribe();
}

Ora creo semplicemente un metodo che restituisce un Flux di tutto ciò che è attualmente nella mia tabella e lo unisco anche alle mie notifiche, come ho detto prima che le notifiche arrivassero come json, quindi ho dovuto deserializzarlo e ho deciso di usarlo ObjectMapper. Quindi, assomiglierà a questo:

private Flux<YourClass> getUpdatedRows() {
    return postgresqlConnection.getNotifications().map(notification -> {
        try {
            //deserialize json
            return objectMapper.readValue(notification.getParameter(), YourClass.class);
        } catch (IOException e) {
            //handle exception
        }
    });
}

public Flux<YourClass> getDocuments() {
    return documentRepository.findAll().share().concatWith(getUpdatedRows());
}

Spero che questo aiuti.Ciao!