PostgreSQL
 sql >> Database >  >> RDS >> PostgreSQL

Django ORM perde connessioni quando si utilizza ThreadPoolExecutor

La mia ipotesi è che il ThreadPoolExecutor non è ciò che sta creando la connessione DB, ma i lavori con thread sono quelli che mantengono la connessione. Ho già avuto a che fare con questo.

Ho finito per creare questo wrapper, per assicurarmi che i thread vengano chiusi manualmente ogni volta che i lavori vengono eseguiti in un ThreadPoolExecutor. Questo dovrebbe essere utile per garantire che le connessioni non siano trapelate, finora non ho riscontrato perdite durante l'utilizzo di questo codice.

from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from django.db import connection

class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
    """
    When a function is passed into the ThreadPoolExecutor via either submit() or map(), 
    this will wrap the function, and make sure that close_django_db_connection() is called 
    inside the thread when it's finished so Django doesn't leak DB connections.

    Since map() calls submit(), only submit() needs to be overwritten.
    """
    def close_django_db_connection(self):
        connection.close()

    def generate_thread_closing_wrapper(self, fn):
        @wraps(fn)
        def new_func(*args, **kwargs):
            try:
                return fn(*args, **kwargs)
            finally:
                self.close_django_db_connection()
        return new_func

    def submit(*args, **kwargs):
        """
        I took the args filtering/unpacking logic from 
   
        https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py 
        
        so I can properly get the function object the same way it was done there.
        """
        if len(args) >= 2:
            self, fn, *args = args
            fn = self.generate_thread_closing_wrapper(fn=fn)
        elif not args:
            raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
                        "needs an argument")
        elif 'fn' in kwargs:
            fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
            self, *args = args
    
        return super(self.__class__, self).submit(fn, *args, **kwargs)

Quindi puoi semplicemente usare questo:

    with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
        results = list(executor.map(func, args_list))

...e sii fiducioso che le connessioni si chiuderanno.