PostgreSQL
 sql >> Database >  >> RDS >> PostgreSQL

Hang in Python script usando SQLAlchemy e multiprocessing

Credo che il TypeError deriva da multiprocessing è get .

Ho rimosso tutto il codice DB dal tuo script. Dai un'occhiata a questo:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Usando r.wait restituisce il risultato atteso, ma utilizzando r.get genera TypeError . Come descritto in documenti di python , usa r.wait dopo un map_async .

Modifica :Devo modificare la mia risposta precedente. Ora credo che il TypeError deriva da SQLAlchemy. Ho modificato il mio script per riprodurre l'errore.

Modifica 2 :Sembra che il problema sia multiprocessing.pool non funziona bene se un lavoratore solleva un'eccezione il cui costruttore richiede un parametro (vedi anche qui ).

Ho modificato il mio script per evidenziarlo.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Nel tuo caso, dato che il tuo codice solleva un'eccezione SQLAlchemy, l'unica soluzione a cui riesco a pensare è catturare tutte le eccezioni in do funzione e rilancia una normale Exception invece. Qualcosa del genere:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Modifica 3 :quindi, sembra essere un bug con Python , ma le eccezioni appropriate in SQLAlchemy risolverebbero il problema:quindi, ho sollevato il problema con SQLAlchemy , anche.

Per risolvere il problema, penso alla soluzione alla fine di Modifica 2 farebbe (avvolgere i callback in try-except e re-raise).