PostgreSQL
 sql >> Database >  >> RDS >> PostgreSQL

SQLAlchemy:raggruppa per giorno su più tabelle

SQL funziona con e restituisce dati tabulari (o relazioni, se preferisci pensarla in questo modo, ma non tutte le tabelle SQL sono relazioni). Ciò che ciò implica è che una tabella nidificata come quella illustrata nella domanda non è una caratteristica così comune. Esistono modi per produrre qualcosa del genere in Postgresql, ad esempio utilizzando array di JSON o compositi, ma è del tutto possibile semplicemente recuperare i dati tabulari ed eseguire l'annidamento nell'applicazione. Python ha itertools.groupby() , che si adatta abbastanza bene al conto, dati i dati ordinati.

L'errore column "incoming.id" must appear in the GROUP BY clause... sta dicendo che i non aggregati nell'elenco di selezione, con clausola, ecc. devono apparire nel GROUP BY clausola o essere utilizzati in forma aggregata, per timore che abbiano possibilmente valori indeterminati . In altre parole il valore dovrebbe essere prelevato solo da qualche riga del gruppo, perché GROUP BY condensa le righe raggruppate in un'unica riga , e chiunque potrebbe indovinare da quale riga sono stati scelti. L'implementazione potrebbe consentirlo, come fa SQLite e MySQL, ma lo standard SQL lo vieta. L'eccezione alla regola è quando esiste una dipendenza funzionale ; il GROUP BY clausola determina i non aggregati. Pensa a un join tra le tabelle A e B raggruppati per A chiave primaria di. Indipendentemente dalla riga di un gruppo, il sistema sceglierà i valori per A 's colonne da, sarebbero le stesse poiché il raggruppamento è stato eseguito in base alla chiave primaria.

Per affrontare l'approccio generale previsto in 3 punti, un modo sarebbe selezionare un'unione di entrata e uscita, ordinata in base ai loro timestamp. Poiché non esiste una gerarchia ereditaria configurazione––poiché potrebbe non essercene nemmeno una, non ho familiarità con la contabilità––un ritorno all'utilizzo delle tuple dei risultati principali e semplici rende le cose più facili in questo caso:

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)

Quindi per formare la struttura annidata itertools.groupby() viene utilizzato:

date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]

Il risultato finale è un elenco di 2 tuple di data e un elenco di dizionari di voci in ordine crescente. Non proprio la soluzione ORM, ma fa il lavoro. Un esempio:

In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [57]: session.commit()

In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    ...:     where(Incoming.accountID == 1)
    ...: 
    ...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    ...:     where(Outgoing.accountID == 1)
    ...: 
    ...: all_entries = incoming.union(outgoing)
    ...: all_entries = all_entries.order_by(all_entries.c.timestamp)
    ...: all_entries = db_session.execute(all_entries)

In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
    ...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]: 
[(datetime.date(2019, 9, 1),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 5,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 2),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 3),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 2,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
    'type': 'outgoing'}])]

Come accennato, Postgresql può produrre praticamente lo stesso risultato dell'utilizzo di un array di JSON:

from sqlalchemy.dialects.postgresql import aggregate_order_by

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing).alias('all_entries')

day = func.date_trunc('day', all_entries.c.timestamp)

stmt = select([day,
               func.array_agg(aggregate_order_by(
                   func.row_to_json(literal_column('all_entries.*')),
                   all_entries.c.timestamp))]).\
    group_by(day).\
    order_by(day)

db_session.execute(stmt).fetchall()

Se infatti Incoming e Outgoing possono essere considerati figli di una base comune, ad esempio Entry , l'uso delle unioni può essere in qualche modo automatizzato con ereditarietà della tabella concreta :

from sqlalchemy.ext.declarative import AbstractConcreteBase

class Entry(AbstractConcreteBase, Base):
    pass

class Incoming(Entry):
    __tablename__ = 'incoming'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="incomings")

    __mapper_args__ = {
        'polymorphic_identity': 'incoming',
        'concrete': True
    }

class Outgoing(Entry):
    __tablename__ = 'outgoing'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="outgoings")

    __mapper_args__ = {
        'polymorphic_identity': 'outgoing',
        'concrete': True
    }

Purtroppo utilizzo di AbstractConcreteBase richiede una chiamata manuale a configure_mappers() quando tutte le classi necessarie sono state definite; in questo caso la prima possibilità è dopo aver definito User , perché Account dipende da esso attraverso le relazioni:

from sqlalchemy.orm import configure_mappers
configure_mappers()

Quindi per recuperare tutti i Incoming e Outgoing in una singola query ORM polimorfa usa Entry :

session.query(Entry).\
    filter(Entry.accountID == accountID).\
    order_by(Entry.timestamp).\
    all()

e continua a usare itertools.groupby() come sopra nell'elenco risultante di Incoming e Outgoing .