PostgreSQL
 sql >> Database >  >> RDS >> PostgreSQL

Coda lavori come tabella SQL con più consumatori (PostgreSQL)

Uso postgres anche per una coda FIFO. Inizialmente ho usato ACCESS EXCLUSIVE, che produce risultati corretti in alta concorrenza, ma ha lo sfortunato effetto di escludersi a vicenda con pg_dump, che acquisisce un blocco ACCESS SHARE durante la sua esecuzione. Questo fa sì che la mia funzione next() si blocchi per un tempo molto lungo (la durata di pg_dump). Questo non era accettabile dal momento che siamo un negozio 24 ore su 24, 7 giorni su 7 e ai clienti non piaceva il tempo morto in coda nel cuore della notte.

Ho pensato che ci dovesse essere un blocco meno restrittivo che sarebbe comunque sicuro per la concorrenza e non si blocca mentre pg_dump è in esecuzione. La mia ricerca mi ha portato a questo post SO.

Poi ho fatto delle ricerche.

Le seguenti modalità sono sufficienti per una funzione FIFO queue NEXT() che aggiornerà lo stato di un lavoro da accodato per correre senza alcun errore di concorrenza e inoltre non si blocca contro pg_dump:

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

Domanda:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

Il risultato è simile a:

UPDATE 1
 job_id
--------
     98
(1 row)

Ecco uno script di shell che verifica tutte le diverse modalità di blocco con una concorrenza elevata (30).

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

Il codice è anche qui se vuoi modificare:https://gist.github.com/1083936

Sto aggiornando la mia applicazione per utilizzare la modalità ESCLUSIVA poiché è la modalità più restrittiva che a) è corretta e b) non è in conflitto con pg_dump. Ho scelto il più restrittivo poiché sembra il meno rischioso in termini di modifica dell'app da ACCESS EXCLUSIVE senza essere un super esperto nel blocco postgres.

Mi sento abbastanza a mio agio con il mio banco di prova e con le idee generali alla base della risposta. Spero che condividerlo aiuti a risolvere questo problema per gli altri.