Mi interessa molto cosa sarebbe più veloce, quindi ho testato alcuni possibili modi per confrontarli:
- semplice
executemany
senza trucchi. - lo stesso con
APPEND_VALUES
suggerimento all'interno della dichiarazione. union all
approccio che hai provato in un'altra domanda. Questo dovrebbe essere più lento di quanto sopra poiché genera un molto grande istruzione (che potenzialmente può richiedere più rete dei dati stessi). Quindi dovrebbe essere analizzato sul lato DB che consumerà anche molto tempo e trascurerà tutti i vantaggi (senza parlare del limite di dimensione potenziale). Quindi hoexecutemany
l'ho redatto per testare con blocchi non per creare una singola istruzione per 100.000 record. Non ho usato la concatenazione di valori all'interno dell'istruzione, perché volevo tenerla al sicuro.insert all
. Gli stessi inconvenienti, ma nessun sindacato. Confrontalo con ilunion
versione.- serializzare i dati in JSON ed eseguire la deserializzazione lato DB con
json_table
. Potenzialmente buone prestazioni con una singola istruzione breve e un singolo trasferimento di dati con un sovraccarico ridotto di JSON. - Il tuo
FORALL
nella procedura del wrapper PL/SQL. Dovrebbe essere uguale aexecutemany
poiché fa lo stesso, ma sul lato del database. Spese di trasformazione dei dati nella raccolta. - Lo stesso
FORALL
, ma con un approccio colonnare per passare i dati:passa semplici elenchi di valori di colonna anziché di tipo complesso. Dovrebbe essere molto più veloce diFORALL
con raccolta poiché non è necessario serializzare i dati nel tipo di raccolta.
Ho utilizzato Oracle Autonomous Database in Oracle Cloud con account gratuito. Ciascun metodo è stato eseguito per 10 volte in loop con lo stesso set di dati di input di 100.000 record, la tabella è stata ricreata prima di ogni test. Questo è il risultato che ho. I tempi di preparazione ed esecuzione qui sono rispettivamente la trasformazione dei dati alla chiamata del DB lato client.
>>> t = PerfTest(100000)
>>> t.run("exec_many", 10)
Method: exec_many.
Duration, avg: 2.3083874 s
Preparation time, avg: 0.0 s
Execution time, avg: 2.3083874 s
>>> t.run("exec_many_append", 10)
Method: exec_many_append.
Duration, avg: 2.6031369 s
Preparation time, avg: 0.0 s
Execution time, avg: 2.6031369 s
>>> t.run("union_all", 10, 10000)
Method: union_all.
Duration, avg: 27.9444233 s
Preparation time, avg: 0.0408773 s
Execution time, avg: 27.8457551 s
>>> t.run("insert_all", 10, 10000)
Method: insert_all.
Duration, avg: 70.6442494 s
Preparation time, avg: 0.0289269 s
Execution time, avg: 70.5541995 s
>>> t.run("json_table", 10)
Method: json_table.
Duration, avg: 10.4648237 s
Preparation time, avg: 9.7907693 s
Execution time, avg: 0.621006 s
>>> t.run("forall", 10)
Method: forall.
Duration, avg: 5.5622837 s
Preparation time, avg: 1.8972456000000002 s
Execution time, avg: 3.6650380999999994 s
>>> t.run("forall_columnar", 10)
Method: forall_columnar.
Duration, avg: 2.6702698000000002 s
Preparation time, avg: 0.055710800000000005 s
Execution time, avg: 2.6105702 s
>>>
Il modo più veloce è solo executemany
, non così tanta sorpresa. Interessante qui è che APPEND_VALUES
non migliora la query e ottiene in media più tempo, quindi sono necessarie ulteriori indagini.
Informazioni su FORALL
:come previsto, l'array individuale per ogni colonna richiede meno tempo in quanto non è necessaria la preparazione dei dati. È più o meno paragonabile a executemany
, ma penso che l'overhead PL/SQL abbia un ruolo qui.
Un'altra parte interessante per me è JSON:la maggior parte del tempo è stata dedicata alla scrittura di LOB nel database e alla serializzazione, ma la query stessa è stata molto veloce. Forse l'operazione di scrittura può essere migliorata in qualche modo con chuncsize o in qualche altro modo per passare i dati LOB nell'istruzione select, ma dal mio codice è tutt'altro che un approccio molto semplice e diretto con executemany
.
Ci sono anche possibili approcci senza Python che dovrebbero essere più veloci come strumenti nativi per dati esterni, ma non li ho testati:
Di seguito è riportato il codice che ho utilizzato per il test.
import cx_Oracle as db
import os, random, json
import datetime as dt
class PerfTest:
def __init__(self, size):
self._con = db.connect(
os.environ["ora_cloud_usr"],
os.environ["ora_cloud_pwd"],
"test_low",
encoding="UTF-8"
)
self._cur = self._con.cursor()
self.inp = [(i, "Test {i}".format(i=i), random.random()) for i in range(size)]
def __del__(self):
if self._con:
self._con.rollback()
self._con.close()
#Create objets
def setup(self):
try:
self._cur.execute("drop table rand")
#print("table dropped")
except:
pass
self._cur.execute("""create table rand(
id int,
str varchar2(100),
val number
)""")
self._cur.execute("""create or replace package pkg_test as
type ts_test is record (
id rand.id%type,
str rand.str%type,
val rand.val%type
);
type tt_test is table of ts_test index by pls_integer;
type tt_ids is table of rand.id%type index by pls_integer;
type tt_strs is table of rand.str%type index by pls_integer;
type tt_vals is table of rand.val%type index by pls_integer;
procedure write_data(p_data in tt_test);
procedure write_data_columnar(
p_ids in tt_ids,
p_strs in tt_strs,
p_vals in tt_vals
);
end;""")
self._cur.execute("""create or replace package body pkg_test as
procedure write_data(p_data in tt_test)
as
begin
forall i in indices of p_data
insert into rand(id, str, val)
values (p_data(i).id, p_data(i).str, p_data(i).val)
;
commit;
end;
procedure write_data_columnar(
p_ids in tt_ids,
p_strs in tt_strs,
p_vals in tt_vals
) as
begin
forall i in indices of p_ids
insert into rand(id, str, val)
values (p_ids(i), p_strs(i), p_vals(i))
;
commit;
end;
end;
""")
def build_union(self, size):
return """insert into rand(id, str, val)
select id, str, val from rand where 1 = 0 union all
""" + """ union all """.join(
["select :{}, :{}, :{} from dual".format(i*3+1, i*3+2, i*3+3)
for i in range(size)]
)
def build_insert_all(self, size):
return """
""".join(
["into rand(id, str, val) values (:{}, :{}, :{})".format(i*3+1, i*3+2, i*3+3)
for i in range(size)]
)
#Test case with executemany
def exec_many(self):
start = dt.datetime.now()
self._cur.executemany("insert into rand(id, str, val) values (:1, :2, :3)", self.inp)
self._con.commit()
return (dt.timedelta(0), dt.datetime.now() - start)
#The same as above but with prepared statement (no parsing)
def exec_many_append(self):
start = dt.datetime.now()
self._cur.executemany("insert /*+APPEND_VALUES*/ into rand(id, str, val) values (:1, :2, :3)", self.inp)
self._con.commit()
return (dt.timedelta(0), dt.datetime.now() - start)
#Union All approach (chunked). Should have large parse time
def union_all(self, size):
##Chunked list of big tuples
start_prepare = dt.datetime.now()
new_inp = [
tuple([item for t in r for item in t])
for r in list(zip(*[iter(self.inp)]*size))
]
new_stmt = self.build_union(size)
dur_prepare = dt.datetime.now() - start_prepare
#Execute unions
start_exec = dt.datetime.now()
self._cur.executemany(new_stmt, new_inp)
dur_exec = dt.datetime.now() - start_exec
##In case the size is not a divisor
remainder = len(self.inp) % size
if remainder > 0 :
start_prepare = dt.datetime.now()
new_stmt = self.build_union(remainder)
new_inp = tuple([
item for t in self.inp[-remainder:] for item in t
])
dur_prepare += dt.datetime.now() - start_prepare
start_exec = dt.datetime.now()
self._cur.execute(new_stmt, new_inp)
dur_exec += dt.datetime.now() - start_exec
self._con.commit()
return (dur_prepare, dur_exec)
#The same as union all, but with no need to union something
def insert_all(self, size):
##Chunked list of big tuples
start_prepare = dt.datetime.now()
new_inp = [
tuple([item for t in r for item in t])
for r in list(zip(*[iter(self.inp)]*size))
]
new_stmt = """insert all
{}
select * from dual"""
dur_prepare = dt.datetime.now() - start_prepare
#Execute
start_exec = dt.datetime.now()
self._cur.executemany(
new_stmt.format(self.build_insert_all(size)),
new_inp
)
dur_exec = dt.datetime.now() - start_exec
##In case the size is not a divisor
remainder = len(self.inp) % size
if remainder > 0 :
start_prepare = dt.datetime.now()
new_inp = tuple([
item for t in self.inp[-remainder:] for item in t
])
dur_prepare += dt.datetime.now() - start_prepare
start_exec = dt.datetime.now()
self._cur.execute(
new_stmt.format(self.build_insert_all(remainder)),
new_inp
)
dur_exec += dt.datetime.now() - start_exec
self._con.commit()
return (dur_prepare, dur_exec)
#Serialize at server side and do deserialization at DB side
def json_table(self):
start_prepare = dt.datetime.now()
new_inp = json.dumps([
{ "id":t[0], "str":t[1], "val":t[2]} for t in self.inp
])
lob_var = self._con.createlob(db.DB_TYPE_CLOB)
lob_var.write(new_inp)
start_exec = dt.datetime.now()
self._cur.execute("""
insert into rand(id, str, val)
select id, str, val
from json_table(
to_clob(:json), '$[*]'
columns
id int,
str varchar2(100),
val number
)
""", json=lob_var)
dur_exec = dt.datetime.now() - start_exec
self._con.commit()
return (start_exec - start_prepare, dur_exec)
#PL/SQL with FORALL
def forall(self):
start_prepare = dt.datetime.now()
collection_type = self._con.gettype("PKG_TEST.TT_TEST")
record_type = self._con.gettype("PKG_TEST.TS_TEST")
def recBuilder(x):
rec = record_type.newobject()
rec.ID = x[0]
rec.STR = x[1]
rec.VAL = x[2]
return rec
inp_collection = collection_type.newobject([
recBuilder(i) for i in self.inp
])
start_exec = dt.datetime.now()
self._cur.callproc("pkg_test.write_data", [inp_collection])
dur_exec = dt.datetime.now() - start_exec
return (start_exec - start_prepare, dur_exec)
#PL/SQL with FORALL and plain collections
def forall_columnar(self):
start_prepare = dt.datetime.now()
ids, strs, vals = map(list, zip(*self.inp))
start_exec = dt.datetime.now()
self._cur.callproc("pkg_test.write_data_columnar", [ids, strs, vals])
dur_exec = dt.datetime.now() - start_exec
return (start_exec - start_prepare, dur_exec)
#Run test
def run(self, method, iterations, *args):
#Cleanup schema
self.setup()
start = dt.datetime.now()
runtime = []
for i in range(iterations):
single_run = getattr(self, method)(*args)
runtime.append(single_run)
dur = dt.datetime.now() - start
dur_prep_total = sum([i.total_seconds() for i, _ in runtime])
dur_exec_total = sum([i.total_seconds() for _, i in runtime])
print("""Method: {meth}.
Duration, avg: {run_dur} s
Preparation time, avg: {prep} s
Execution time, avg: {ex} s""".format(
inp_s=len(self.inp),
meth=method,
run_dur=dur.total_seconds() / iterations,
prep=dur_prep_total / iterations,
ex=dur_exec_total / iterations
))