Ecco una risposta migliore, grazie a Mike Byer
from sqlalchemy import MetaData, Column, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import CreateTable
from sqlalchemy.ext.compiler import compiles
import textwrap
@compiles(CreateTable, "oracle")
def _add_suffixes(element, compiler, **kw):
text = compiler.visit_create_table(element, **kw)
if "oracle_partition" in element.element.info:
text += textwrap.dedent(
element.element.info["oracle_partition"]).strip()
return text
# use mock strategy just to illustrate this w/o my getting
# on an oracle box
def execute_sql(stmt):
print stmt.compile(dialect=engine.dialect)
engine = create_engine("oracle://", execute_sql, strategy="mock")
metadata = MetaData()
Base = declarative_base(metadata=metadata)
class Foo(Base):
__tablename__ = 'foo'
name = Column(String(10), primary_key=True)
__table_args__ = {
'info': {
'oracle_partition': """
PARTITION BY HASH(name)
( PARTITION p1 TABLESPACE tbs1
, PARTITION p2 TABLESPACE tbs2
, PARTITION p3 TABLESPACE tbs3
, PARTITION p4 TABLESPACE tbs4
)
"""
}
}
Foo.__table__.create(bind=engine)
Usando il classico:
m = MetaData()
t = Table(
'sales_hash', m,
Column('s_productid', NUMBER),
Column('s_saledate', DATE),
Column('s_custid', NUMBER),
Column('s_totalprice', NUMBER),
info={
"oracle_partition": """
PARTITION BY HASH(s_productid)
( PARTITION p1 TABLESPACE tbs1
, PARTITION p2 TABLESPACE tbs2
, PARTITION p3 TABLESPACE tbs3
, PARTITION p4 TABLESPACE tbs4
)
"""
}
)