PostgreSQL
 sql >> Database >  >> RDS >> PostgreSQL

Come utilizzare bindparam() in un'espressione compilata personalizzata?

Capito. I valori effettivi di bindparams sono conservati in un oggetto chiamato SQLCompiler che è generalmente specifico del dialetto.Qui(link a GitHub) è dove i bindparams vengono archiviati in un SQLCompiler istanza durante il processo di compilazione della query

Quindi la versione finale del mio frammento di codice è simile a questa:

class values(FromClause):
    named_with_column = True

    def __init__(self, columns, *args, **kw):
        self._column_args = columns
        self.list = args
        self.alias_name = self.name = kw.pop('alias_name', None)

    def _populate_column_collection(self):
        # self._columns.update((col.name, col) for col in self._column_args)
        for c in self._column_args:
            c._make_proxy(self, c.name)


@compiles(values)
def compile_values(clause, compiler, asfrom=False, **kw):
    def decide(value, column):
        add_type_hint = False
        if isinstance(value, array) and not value.clauses:  # for empty array literals
            add_type_hint = True

        if isinstance(value, ClauseElement):
            intermediate = compiler.process(value)
            if add_type_hint:
                intermediate += '::' + str(column.type)
            return intermediate

        elif value is None:
            return compiler.render_literal_value(
                value,
                NULLTYPE
            ) + '::' + str(column.type)
        else:
            return compiler.process(
                bindparam(
                    None,
                    value=compiler.render_literal_value(
                        value,
                        column.type
                    ).strip("'")
                )
            ) + '::' + str(column.type)

    columns = clause.columns
    v = "VALUES %s" % ", ".join(
        "(%s)" % ", ".join(
            decide(elem, column)
            for elem, column in zip(tup, columns))
        for tup in clause.list
    )
    if asfrom:
        if clause.alias_name:
            v = "(%s) AS %s (%s)" % (v, clause.alias_name, (", ".join(c.name for c in clause.columns)))
        else:
            v = "(%s)" % v
    return v