PostgreSQL
 sql >> Database >  >> RDS >> PostgreSQL

Problemi di connessione con SQLAlchemy e processi multipli

Citando "Come si utilizzano motori/connessioni/sessioni con il multiprocessing Python o os.fork()?" con maggiore enfasi:

L'oggetto SQLAlchemy Engine fa riferimento a un pool di connessioni di connessioni al database esistenti. Pertanto, quando questo oggetto viene replicato in un processo figlio, l'obiettivo è garantire che nessuna connessione al database venga trasferita .

e

Tuttavia, nel caso in cui una sessione o una connessione attiva per la transazione venga condivisa, non esiste una soluzione automatica per questo; un'applicazione deve garantire che un nuovo processo figlio avvii solo nuovi oggetti Connection e transazioni, nonché oggetti Session ORM.

Il problema deriva dal processo figlio biforcuto che eredita la session globale live , che sta trattenendo una Connection . Quando target chiama init , sovrascrive i riferimenti globali al engine e session , diminuendo così i loro refcount a 0 nel bambino, costringendolo a finalizzare. Se, ad esempio, in un modo o nell'altro crei un altro riferimento alla sessione ereditata nel bambino, impedisci che venga ripulita, ma non farlo. Dopo main si è unito e torna a lavorare come al solito sta cercando di utilizzare la connessione ora potenzialmente finalizzata o comunque non sincronizzata. Non sono sicuro del motivo per cui questo provoca un errore solo dopo un certo numero di iterazioni.

L'unico modo per gestire questa situazione usando i globali come fai tu è

  1. Chiudi tutte le sessioni
  2. Chiama engine.dispose()

prima di biforcare. Ciò eviterà che le connessioni perdano al bambino. Ad esempio:

def main():
    global session
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        dummy_id = dummy.id
        # Return the Connection to the pool
        session.close()
        # Dispose of it!
        engine.dispose()
        # ...or call your cleanup() function, which does the same
        p = multiprocessing.Process(target=target, args=(dummy_id,))
        p.start()
        p.join()
        # Start a new session
        session = Session()
        dummy = session.query(Dummy).get(dummy_id)
        assert dummy.value == 2
    finally:
        cleanup()

Il tuo secondo esempio non attiva la finalizzazione nel bambino, quindi sembra funzionare, anche se potrebbe essere rotto come il primo, poiché sta ancora ereditando una copia della sessione e la sua connessione definita localmente in main .