Credo che il TypeError
deriva da multiprocessing
è get
.
Ho rimosso tutto il codice DB dal tuo script. Dai un'occhiata a questo:
import multiprocessing
import sqlalchemy.exc
def do(kwargs):
i = kwargs['i']
print i
raise sqlalchemy.exc.ProgrammingError("", {}, None)
return i
pool = multiprocessing.Pool(processes=5) # start 4 worker processes
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
# Use get or wait?
# r.get()
r.wait()
pool.close()
pool.join()
print results
Usando r.wait
restituisce il risultato atteso, ma utilizzando r.get
genera TypeError
. Come descritto in documenti di python
, usa r.wait
dopo un map_async
.
Modifica :Devo modificare la mia risposta precedente. Ora credo che il TypeError
deriva da SQLAlchemy. Ho modificato il mio script per riprodurre l'errore.
Modifica 2 :Sembra che il problema sia multiprocessing.pool
non funziona bene se un lavoratore solleva un'eccezione il cui costruttore richiede un parametro (vedi anche qui
).
Ho modificato il mio script per evidenziarlo.
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
class GoodExc(Exception):
def __init__(self, a=None):
'''Optional param in the constructor.'''
self.a = a
def do(kwargs):
i = kwargs['i']
print i
raise BadExc('a')
# raise GoodExc('a')
return i
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Nel tuo caso, dato che il tuo codice solleva un'eccezione SQLAlchemy, l'unica soluzione a cui riesco a pensare è catturare tutte le eccezioni in do
funzione e rilancia una normale Exception
invece. Qualcosa del genere:
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
def do(kwargs):
try:
i = kwargs['i']
print i
raise BadExc('a')
return i
except Exception as e:
raise Exception(repr(e))
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Modifica 3 :quindi, sembra essere un bug con Python , ma le eccezioni appropriate in SQLAlchemy risolverebbero il problema:quindi, ho sollevato il problema con SQLAlchemy , anche.
Per risolvere il problema, penso alla soluzione alla fine di Modifica 2 farebbe (avvolgere i callback in try-except e re-raise).