Oracle
 sql >> Database >  >> RDS >> Oracle

C'è un modo per utilizzare FORALL per inserire dati da un array?

Mi interessa molto cosa sarebbe più veloce, quindi ho testato alcuni possibili modi per confrontarli:

  • semplice executemany senza trucchi.
  • lo stesso con APPEND_VALUES suggerimento all'interno della dichiarazione.
  • union all approccio che hai provato in un'altra domanda. Questo dovrebbe essere più lento di quanto sopra poiché genera un molto grande istruzione (che potenzialmente può richiedere più rete dei dati stessi). Quindi dovrebbe essere analizzato sul lato DB che consumerà anche molto tempo e trascurerà tutti i vantaggi (senza parlare del limite di dimensione potenziale). Quindi ho executemany l'ho redatto per testare con blocchi non per creare una singola istruzione per 100.000 record. Non ho usato la concatenazione di valori all'interno dell'istruzione, perché volevo tenerla al sicuro.
  • insert all . Gli stessi inconvenienti, ma nessun sindacato. Confrontalo con il union versione.
  • serializzare i dati in JSON ed eseguire la deserializzazione lato DB con json_table . Potenzialmente buone prestazioni con una singola istruzione breve e un singolo trasferimento di dati con un sovraccarico ridotto di JSON.
  • Il tuo FORALL nella procedura del wrapper PL/SQL. Dovrebbe essere uguale a executemany poiché fa lo stesso, ma sul lato del database. Spese di trasformazione dei dati nella raccolta.
  • Lo stesso FORALL , ma con un approccio colonnare per passare i dati:passa semplici elenchi di valori di colonna anziché di tipo complesso. Dovrebbe essere molto più veloce di FORALL con raccolta poiché non è necessario serializzare i dati nel tipo di raccolta.

Ho utilizzato Oracle Autonomous Database in Oracle Cloud con account gratuito. Ciascun metodo è stato eseguito per 10 volte in loop con lo stesso set di dati di input di 100.000 record, la tabella è stata ricreata prima di ogni test. Questo è il risultato che ho. I tempi di preparazione ed esecuzione qui sono rispettivamente la trasformazione dei dati alla chiamata del DB lato client.

>>> t = PerfTest(100000)
>>> t.run("exec_many", 10)
Method:  exec_many.
    Duration, avg: 2.3083874 s
    Preparation time, avg: 0.0 s
    Execution time, avg: 2.3083874 s
>>> t.run("exec_many_append", 10)
Method: exec_many_append.
    Duration, avg: 2.6031369 s
    Preparation time, avg: 0.0 s
    Execution time, avg: 2.6031369 s
>>> t.run("union_all", 10, 10000)
Method:  union_all.
    Duration, avg: 27.9444233 s
    Preparation time, avg: 0.0408773 s
    Execution time, avg: 27.8457551 s
>>> t.run("insert_all", 10, 10000)
Method: insert_all.
    Duration, avg: 70.6442494 s
    Preparation time, avg: 0.0289269 s
    Execution time, avg: 70.5541995 s
>>> t.run("json_table", 10)
Method: json_table.
    Duration, avg: 10.4648237 s
    Preparation time, avg: 9.7907693 s
    Execution time, avg: 0.621006 s
>>> t.run("forall", 10)
Method:     forall.
    Duration, avg: 5.5622837 s
    Preparation time, avg: 1.8972456000000002 s
    Execution time, avg: 3.6650380999999994 s
>>> t.run("forall_columnar", 10)
Method: forall_columnar.
    Duration, avg: 2.6702698000000002 s
    Preparation time, avg: 0.055710800000000005 s
    Execution time, avg: 2.6105702 s
>>> 

Il modo più veloce è solo executemany , non così tanta sorpresa. Interessante qui è che APPEND_VALUES non migliora la query e ottiene in media più tempo, quindi sono necessarie ulteriori indagini.

Informazioni su FORALL :come previsto, l'array individuale per ogni colonna richiede meno tempo in quanto non è necessaria la preparazione dei dati. È più o meno paragonabile a executemany , ma penso che l'overhead PL/SQL abbia un ruolo qui.

Un'altra parte interessante per me è JSON:la maggior parte del tempo è stata dedicata alla scrittura di LOB nel database e alla serializzazione, ma la query stessa è stata molto veloce. Forse l'operazione di scrittura può essere migliorata in qualche modo con chuncsize o in qualche altro modo per passare i dati LOB nell'istruzione select, ma dal mio codice è tutt'altro che un approccio molto semplice e diretto con executemany .

Ci sono anche possibili approcci senza Python che dovrebbero essere più veloci come strumenti nativi per dati esterni, ma non li ho testati:

Di seguito è riportato il codice che ho utilizzato per il test.

import cx_Oracle as db
import os, random, json
import datetime as dt


class PerfTest:
  
  def __init__(self, size):
    self._con = db.connect(
      os.environ["ora_cloud_usr"],
      os.environ["ora_cloud_pwd"],
      "test_low",
      encoding="UTF-8"
    )
    self._cur = self._con.cursor()
    self.inp = [(i, "Test {i}".format(i=i), random.random()) for i in range(size)]
  
  def __del__(self):
    if self._con:
      self._con.rollback()
      self._con.close()
 
#Create objets
  def setup(self):
    try:
      self._cur.execute("drop table rand")
      #print("table dropped")
    except:
      pass
  
    self._cur.execute("""create table rand(
      id int,
      str varchar2(100),
      val number
    )""")
    
    self._cur.execute("""create or replace package pkg_test as
  type ts_test is record (
    id rand.id%type,
    str rand.str%type,
    val rand.val%type
  );
  type tt_test is table of ts_test index by pls_integer;
  
  type tt_ids is table of rand.id%type index by pls_integer;
  type tt_strs is table of rand.str%type index by pls_integer;
  type tt_vals is table of rand.val%type index by pls_integer;
  
  procedure write_data(p_data in tt_test);
  procedure write_data_columnar(
    p_ids in tt_ids,
    p_strs in tt_strs,
    p_vals in tt_vals
  );

end;""")
    self._cur.execute("""create or replace package body pkg_test as
  procedure write_data(p_data in tt_test)
  as
  begin
    forall i in indices of p_data
      insert into rand(id, str, val)
      values (p_data(i).id, p_data(i).str, p_data(i).val)
    ;
    
    commit;

  end;
  
  procedure write_data_columnar(
    p_ids in tt_ids,
    p_strs in tt_strs,
    p_vals in tt_vals
  ) as
  begin
    forall i in indices of p_ids
      insert into rand(id, str, val)
      values (p_ids(i), p_strs(i), p_vals(i))
    ;
    
    commit;
    
  end;

end;
""")

 
  def build_union(self, size):
      return """insert into rand(id, str, val)
    select id, str, val from rand where 1 = 0 union all
    """ + """ union all """.join(
      ["select :{}, :{}, :{} from dual".format(i*3+1, i*3+2, i*3+3)
        for i in range(size)]
    )
 
 
  def build_insert_all(self, size):
      return """
      """.join(
      ["into rand(id, str, val) values (:{}, :{}, :{})".format(i*3+1, i*3+2, i*3+3)
        for i in range(size)]
    )


#Test case with executemany
  def exec_many(self):
    start = dt.datetime.now()
    self._cur.executemany("insert into rand(id, str, val) values (:1, :2, :3)", self.inp)
    self._con.commit()
    
    return (dt.timedelta(0), dt.datetime.now() - start)
 
 
#The same as above but with prepared statement (no parsing)
  def exec_many_append(self):
    start = dt.datetime.now()
    self._cur.executemany("insert /*+APPEND_VALUES*/ into rand(id, str, val) values (:1, :2, :3)", self.inp)
    self._con.commit()
    
    return (dt.timedelta(0), dt.datetime.now() - start)


#Union All approach (chunked). Should have large parse time
  def union_all(self, size):
##Chunked list of big tuples
    start_prepare = dt.datetime.now()
    new_inp = [
      tuple([item for t in r for item in t])
      for r in list(zip(*[iter(self.inp)]*size))
    ]
    new_stmt = self.build_union(size)
    
    dur_prepare = dt.datetime.now() - start_prepare
    
    #Execute unions
    start_exec = dt.datetime.now()
    self._cur.executemany(new_stmt, new_inp)
    dur_exec = dt.datetime.now() - start_exec

##In case the size is not a divisor
    remainder = len(self.inp) % size
    if remainder > 0 :
      start_prepare = dt.datetime.now()
      new_stmt = self.build_union(remainder)
      new_inp = tuple([
        item for t in self.inp[-remainder:] for item in t
      ])
      dur_prepare += dt.datetime.now() - start_prepare
      
      start_exec = dt.datetime.now()
      self._cur.execute(new_stmt, new_inp)
      dur_exec += dt.datetime.now() - start_exec

    self._con.commit()
    
    return (dur_prepare, dur_exec)


#The same as union all, but with no need to union something
  def insert_all(self, size):
##Chunked list of big tuples
    start_prepare = dt.datetime.now()
    new_inp = [
      tuple([item for t in r for item in t])
      for r in list(zip(*[iter(self.inp)]*size))
    ]
    new_stmt = """insert all
    {}
    select * from dual"""
    dur_prepare = dt.datetime.now() - start_prepare
    
    #Execute
    start_exec = dt.datetime.now()
    self._cur.executemany(
      new_stmt.format(self.build_insert_all(size)),
      new_inp
    )
    dur_exec = dt.datetime.now() - start_exec

##In case the size is not a divisor
    remainder = len(self.inp) % size
    if remainder > 0 :
      start_prepare = dt.datetime.now()
      new_inp = tuple([
        item for t in self.inp[-remainder:] for item in t
      ])
      dur_prepare += dt.datetime.now() - start_prepare
      
      start_exec = dt.datetime.now()
      self._cur.execute(
        new_stmt.format(self.build_insert_all(remainder)),
        new_inp
      )
      dur_exec += dt.datetime.now() - start_exec

    self._con.commit()
    
    return (dur_prepare, dur_exec)

    
#Serialize at server side and do deserialization at DB side
  def json_table(self):
    start_prepare = dt.datetime.now()
    new_inp = json.dumps([
      { "id":t[0], "str":t[1], "val":t[2]} for t in self.inp
    ])
    
    lob_var = self._con.createlob(db.DB_TYPE_CLOB)
    lob_var.write(new_inp)
    
    start_exec = dt.datetime.now()
    self._cur.execute("""
    insert into rand(id, str, val)
    select id, str, val
    from json_table(
      to_clob(:json), '$[*]'
      columns
        id int,
        str varchar2(100),
        val number
    )
    """, json=lob_var)
    dur_exec = dt.datetime.now() - start_exec
    
    self._con.commit()
    
    return (start_exec - start_prepare, dur_exec)


#PL/SQL with FORALL
  def forall(self):
    start_prepare = dt.datetime.now()
    collection_type = self._con.gettype("PKG_TEST.TT_TEST")
    record_type = self._con.gettype("PKG_TEST.TS_TEST")
    
    def recBuilder(x):
      rec = record_type.newobject()
      rec.ID = x[0]
      rec.STR = x[1]
      rec.VAL = x[2]
      
      return rec

    inp_collection = collection_type.newobject([
      recBuilder(i) for i in self.inp
    ])
    
    start_exec = dt.datetime.now()
    self._cur.callproc("pkg_test.write_data", [inp_collection])
    dur_exec = dt.datetime.now() - start_exec
    
    return (start_exec - start_prepare, dur_exec)


#PL/SQL with FORALL and plain collections
  def forall_columnar(self):
    start_prepare = dt.datetime.now()
    ids, strs, vals = map(list, zip(*self.inp))
    start_exec = dt.datetime.now()
    self._cur.callproc("pkg_test.write_data_columnar", [ids, strs, vals])
    dur_exec = dt.datetime.now() - start_exec
    
    return (start_exec - start_prepare, dur_exec)

  
#Run test
  def run(self, method, iterations, *args):
    #Cleanup schema
    self.setup()

    start = dt.datetime.now()
    runtime = []
    for i in range(iterations):
      single_run = getattr(self, method)(*args)
      runtime.append(single_run)
    
    dur = dt.datetime.now() - start
    dur_prep_total = sum([i.total_seconds() for i, _ in runtime])
    dur_exec_total = sum([i.total_seconds() for _, i in runtime])
    
    print("""Method: {meth}.
    Duration, avg: {run_dur} s
    Preparation time, avg: {prep} s
    Execution time, avg: {ex} s""".format(
      inp_s=len(self.inp),
      meth=method,
      run_dur=dur.total_seconds() / iterations,
      prep=dur_prep_total / iterations,
      ex=dur_exec_total / iterations
    ))