~sirn/fanboi2

0b820134660eca2f47e03ab76754c91f49520a08 — Kridsada Thanabulpong 5 years ago 73c3061
Add versioned implementation based on SQLAlchemy's History Meta.
M fanboi2/models/__init__.py => fanboi2/models/__init__.py +6 -5
@@ 1,21 1,22 @@
from ._base import DBSession, Base, JsonType
from ._identity import Identity
from ._redis_proxy import RedisProxy
from ._versioned import make_versioned
from .board import Board
from .topic import Topic
from .post import Post


redis_conn = RedisProxy()
identity = Identity(redis=redis_conn)


_MODELS = {
    'board': Board,
    'topic': Topic,
    'post': Post,
}


def serialize_model(type_):
    return _MODELS.get(type_)


redis_conn = RedisProxy()
identity = Identity(redis=redis_conn)
make_versioned(DBSession)

M fanboi2/models/_base.py => fanboi2/models/_base.py +4 -0
@@ 7,6 7,7 @@ from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.sqltypes import DateTime, Integer, Text
from sqlalchemy.sql.type_api import TypeDecorator
from zope.sqlalchemy import ZopeTransactionExtension
from ._versioned import make_versioned_class


RE_FIRST_CAP = re.compile('(.)([A-Z][a-z]+)')


@@ 46,5 47,8 @@ class BaseModel(object):
            setattr(self, key, value)


Versioned = make_versioned_class()


DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
Base = declarative_base(cls=BaseModel)

A fanboi2/models/_versioned.py => fanboi2/models/_versioned.py +380 -0
@@ 0,0 1,380 @@
from collections import OrderedDict
from sqlalchemy import event
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import attributes, object_mapper, mapper
from sqlalchemy.orm.exc import UnmappedColumnError
from sqlalchemy.orm.properties import RelationshipProperty
from sqlalchemy.sql import func
from sqlalchemy.sql.schema import Column, Table, ForeignKeyConstraint
from sqlalchemy.sql.sqltypes import Integer, String, DateTime


_versioned_mappers = []


def _is_fk_column(column, table):
    """Returns :type:`True` if column is referencing :attr:`table`.

    :param column: SQLAlchemy column.
    :param table: SQLAlchemy table.

    :type column: sqlalchemy.sql.schema.Column
    :type table: sqlalchemy.sql.schema.Table
    :rtype: bool
    """
    for fk in column.foreign_keys:
        if fk.references(table):
            return True
    return False


def _is_versioning_column(column):
    """Returns :type:`True` if column belongs to versioning table.

    :param column: SQLAlchemy column to check.

    :type column: sqlalchemy.sql.schema.Column
    :rtype: bool
    """
    return "version_meta" in column.info


def _copy_history_column(column):
    """Create a history copy of a SQLAlchemy column. The copied column will
    share the same SQL data type with the original column, but without defaults
    and its unique constraint removed.

    :param column: SQLAlchemy column to copy.

    :type column: sqlalchemy.sql.schema.Column
    :rtype: sqlalchemy.sql.schema.Column
    """
    new_column = column.copy()
    new_column.unique = False
    new_column.default = new_column.server_default = None
    column.info['history_copy'] = new_column
    return new_column


def _history_mapper(model_mapper):
    """Configure SQLAlchemy mapper and enable history support.

    :param model_mapper: SQLAlchemy mapper object for a model.

    :type model_mapper: sqlalchemy.orm.mapper.Mapper
    :rtype: sqlalchemy.orm.mapper.Mapper
    """
    model_class = model_mapper.class_
    model_table = model_mapper.local_table

    for prop in model_mapper.iterate_properties:
        getattr(model_class, prop.key).impl.active_history = True

    super_mapper = model_mapper.inherits
    super_history_mapper = getattr(model_class, '__history_mapper__', None)
    super_fks = []

    properties = OrderedDict()
    polymorphic_on = None

    if not super_mapper or model_table is not super_mapper.local_table:
        version_meta = {"version_meta": True}
        new_columns = []

        for column in model_table.c:
            if _is_versioning_column(column):
                continue

            try:
                original_prop = model_mapper.get_property_by_column(column)
            except UnmappedColumnError:
                continue

            new_column = _copy_history_column(column)
            new_columns.append(new_column)

            if super_mapper and \
               _is_fk_column(column, super_mapper.local_table):
                super_fks.append(
                    (new_column.key,
                     list(super_history_mapper.local_table.primary_key)[0]))

            if column is model_mapper.polymorphic_on:
                polymorphic_on = new_column

            if len(original_prop.columns) > 1 or \
               original_prop.columns[0].key != original_prop.key:
                properties[original_prop.key] = tuple(
                    c.info['history_copy'] for c in original_prop.columns)

        if super_mapper:
            super_fks.append((
                'version',
                super_history_mapper.local_table.c.version))

        new_columns.append(
            Column('version',
                   Integer,
                   primary_key=True,
                   autoincrement=False,
                   info=version_meta))

        new_columns.append(
            Column('change_type',
                   String,
                   info=version_meta))

        new_columns.append(
            Column('changed_at',
                   DateTime(timezone=True),
                   default=func.now(),
                   info=version_meta))

        if super_fks:
            new_columns.append(ForeignKeyConstraint(*zip(*super_fks)))

        new_table = Table(
            model_table.name + '_history',
            model_table.metadata,
            *new_columns,
            schema=model_table.schema)
    else:
        for column in model_mapper.c:
            if column.key not in super_history_mapper.local_table.c:
                new_column = _copy_history_column(column)
                super_history_mapper.local_table.append_column(new_column)
        new_table = None

    if super_history_mapper:
        bases = (super_history_mapper.class_,)
        if new_table is not None:
            properties['change_type'] = (
                (new_table.c.change_type,) + tuple(
                    super_history_mapper.attrs.change_type.columns))
            properties['changed_at'] = (
                (new_table.c.changed_at,) + tuple(
                    super_history_mapper.attrs.changed_at.columns))
    else:
        bases = model_mapper.base_mapper.class_.__bases__

    model_class.__history_mapper__ = mapper(
        type.__new__(type, "%sHistory" % (model_class.__name__), bases, {}),
        new_table,
        inherits=super_history_mapper,
        polymorphic_on=polymorphic_on,
        polymorphic_identity=model_mapper.polymorphic_identity,
        properties=properties)

    if not super_history_mapper:
        model_table.append_column(
            Column('version',
                   Integer,
                   default=1,
                   nullable=False))
        model_mapper.add_property('version', model_table.c.version)


def make_versioned(session, retrieve=None):
    """Enable the versioned mapper. If ``retrieve`` is given, the function
    will be used for retrieving a list of mapper objects (see also
    :func:`make_versioned_class`).

    :param session: SQLAlchemy session object.
    :param retrieve: Function for retrieve a list of mapper objects.

    :type session: sqlalchemy.orm.session.Session
    :type retrieve: function | None

    :rtype: sqlalchemy.orm.session.Session
    """
    _make_history_event(session)

    if retrieve is None:
        retrieve = lambda: _versioned_mappers

    for model_mapper in retrieve():
        _history_mapper(model_mapper)


def make_versioned_class(register=None):
    """Factory for creating a Versioned mixin. If ``register`` is given, the
    function will be used for registering the mapper object (see also
    :func:`make_versioned`).

    :param register: Function for registering a mapper object.
    :type register: function | None

    :rtype: class
    """
    if register is None:
        register = lambda m: _versioned_mappers.append(m)

    class _Versioned(object):
        """Mixin for enabling versioning for a model."""

        @declared_attr
        def __mapper_cls__(cls):
            def _map(cls, *args, **kwargs):
                model_mapper = mapper(cls, *args, **kwargs)
                register(model_mapper)
                return model_mapper
            return _map

    return _Versioned


def _is_versioned_object(obj):
    """Returns `True` if object is version-enabled.

    :param obj: SQLAlchemy model object.

    :type obj: sqlalchemy.ext.declarative.api.Base
    :rtype: bool
    """
    return hasattr(obj, '__history_mapper__')


def _create_version(obj, session, type_=None, force=False):
    """Create a new version for the given :attr:`obj`.

    :param obj: SQLAlchemy model object.
    :param session: SQLAlchemy session object.
    :param type_: Type of a change.
    :param force: Flag to always create version.

    :type obj: sqlalchemy.ext.declarative.api.Base
    :type session: sqlalchemy.orm.scoping.scoped_session
    :type type_: string
    :type force: bool
    """
    obj_mapper = object_mapper(obj)
    history_mapper = obj.__history_mapper__
    history_class = history_mapper.class_

    obj_state = attributes.instance_state(obj)
    obj_changed = False
    attr = {}

    for obj_mapper_, history_mapper_ in zip(
            obj_mapper.iterate_to_root(),
            history_mapper.iterate_to_root()):

        if history_mapper_.single:
            continue

        for history_column in history_mapper_.local_table.c:
            if _is_versioning_column(history_column):
                continue

            obj_column = obj_mapper_.local_table.c[history_column.key]

            try:
                prop = obj_mapper.get_property_by_column(obj_column)
            except UnmappedColumnError:
                continue

            # Force deferred columns to load.
            if prop.key not in obj_state.dict:
                getattr(obj, prop.key)

            added_, unchanged_, deleted_ = attributes.get_history(obj, prop.key)

            if deleted_:
                attr[prop.key] = deleted_[0]
                obj_changed = True
            elif unchanged_:
                attr[prop.key] = unchanged_[0]
            elif added_:
                obj_changed = True

    if not obj_changed:
        for prop in obj_mapper.iterate_properties:
            if isinstance(prop, RelationshipProperty) and \
               attributes.get_history(
                   obj,
                   prop.key,
                   passive=attributes.PASSIVE_NO_INITIALIZE).has_changes():
                for p in prop.local_columns:
                    if p.foreign_keys:
                        obj_changed = True
                        break
                if obj_changed is True:
                    break

    if not obj_changed and not force:
        return

    attr['version'] = obj.version
    attr['change_type'] = type_
    history = history_class()
    for key, value in attr.items():
        setattr(history, key, value)

    session.add(history)
    obj.version += 1


def _create_history_dirty(session, objs):
    """Create a new version for dirty objects.

    :param session: SQLAlchemy sesion object.
    :param objs: Dirty objects usually obtained by ``session.dirty``
    :type session: sqlalchemy.orm.session.Session
    :type objs: list[sqlalchemy.ext.declarative.api.Base]
    """
    for obj in objs:
        if _is_versioned_object(obj):
            _create_version(obj, session, type_='update')


def _create_history_deleted(session, objs):
    """Create a new version for deleted objects. This method will also create
    a new version for relation that do not have ``cascade`` parameter set in
    which SQLAlchemy will default to set its foreign keys to ``NULL``.

    :param session: SQLAlchemy sesion object.
    :param objs: Dirty objects usually obtained by ``session.deleted``
    :type session: sqlalchemy.orm.session.Session
    :type objs: list[sqlalchemy.ext.declarative.api.Base]
    """
    for obj in objs:
        if _is_versioned_object(obj):
            _create_version(obj, session, type_='delete', force=True)

        # Also handle NULL fks from SQLAlchemy Cascades. See also:
        # 6e1f34 lib/sqlalchemy/orm/dependency.py#L423-L424
        obj_mapper = object_mapper(obj)
        related = None

        def _create_cascade_version(target_obj):
            if not target_obj in objs:
                _create_version(target_obj,
                                session,
                                type_='update.cascade',
                                force=True)

        for prop in obj_mapper.iterate_properties:
            if isinstance(prop, RelationshipProperty) and \
               not prop.cascade.delete and \
               not prop.passive_deletes == 'all' and \
               _is_versioned_object(prop.mapper.class_):
                related = getattr(obj, prop.key)
                try:
                    for target_obj in related:
                        _create_cascade_version(target_obj)
                except TypeError:
                    _create_cascade_version(related)


def _make_history_event(session):
    """Registers an event for recording versioned object.

    :param session: SQLAlchemy session object.
    :type session: sqlalchemy.orm.session.Session
    :rtype: sqlalchemy.orm.session.Session
    """
    @event.listens_for(session, 'before_flush')
    def update_history(session_, context, instances):
        _create_history_dirty(session_, session_.dirty)
        _create_history_deleted(session_, session_.deleted)
    return session

M fanboi2/models/topic.py => fanboi2/models/topic.py +1 -1
@@ 95,7 95,7 @@ class Topic(Base):

@event.listens_for(DBSession, 'before_flush')
def update_topic_status(session, context, instances):
    for obj in filter(lambda obj: isinstance(obj, Post), session.new):
    for obj in filter(lambda m: isinstance(m, Post), session.new):
        topic = obj.topic
        if topic.post_count is not None and \
                topic.status == 'open' and \

M fanboi2/tests/__init__.py => fanboi2/tests/__init__.py +2 -1
@@ 100,8 100,9 @@ class ModelMixin(_ModelInstanceSetup, unittest.TestCase):

    def tearDown(self):
        super(ModelMixin, self).tearDown()
        redis_conn._redis = None
        transaction.abort()
        Base.metadata.drop_all()
        redis_conn._redis = None


class RegistryMixin(unittest.TestCase):

M fanboi2/tests/test_models.py => fanboi2/tests/test_models.py +830 -8
@@ 1,3 1,4 @@
import transaction
import unittest
from fanboi2.models import DBSession
from fanboi2.tests import DummyRedis, DATABASE_URI, ModelMixin


@@ 77,24 78,30 @@ class TestJsonType(unittest.TestCase):
        from fanboi2.models import JsonType
        return JsonType

    def _makeOne(self):
        from sqlalchemy import MetaData, Table, Column, Integer, create_engine
    def _makeMetaData(self):
        from sqlalchemy import MetaData, create_engine
        engine = create_engine(DATABASE_URI)
        metadata = MetaData(bind=engine)
        return metadata

    def _makeOne(self, metadata):
        from sqlalchemy import Table, Column, Integer
        table = Table(
            'foo', metadata,
            Column('baz', Integer),
            Column('bar', self._getTargetClass()),
        )
        metadata.drop_all()
        metadata.create_all()
        return table

    def test_compile(self):
        self.assertEqual(str(self._getTargetClass()()), "TEXT")

    def test_field(self):
        table = self._makeOne()
        metadata = self._makeMetaData()
        table = self._makeOne(metadata)
        metadata.drop_all()
        metadata.create_all()

        table.insert().execute(baz=1, bar={"x": 1})
        table.insert().execute(baz=2, bar=None)
        table.insert().execute(baz=3)  # bar should have default {} type.


@@ 103,6 110,8 @@ class TestJsonType(unittest.TestCase):
            table.select().order_by(table.c.baz).execute().fetchall()
        )

        metadata.drop_all()


class TestSerializeModel(unittest.TestCase):



@@ 114,7 123,7 @@ class TestSerializeModel(unittest.TestCase):
        self.assertIsNone(serialize_model('foo'))


class BaseModelTest(ModelMixin, unittest.TestCase):
class TestBaseModel(ModelMixin, unittest.TestCase):

    def _getTargetClass(self):
        from sqlalchemy import Column, Integer


@@ 137,6 146,819 @@ class BaseModelTest(ModelMixin, unittest.TestCase):
        self.assertEqual(model_class.__tablename__, 'mock_model')


class TestVersioned(unittest.TestCase):

    def _makeBase(self):
        from sqlalchemy.engine import create_engine
        from sqlalchemy.ext.declarative import declarative_base
        engine = create_engine(DATABASE_URI)
        Base = declarative_base()
        Base.metadata.bind = engine
        return Base

    def _makeSession(self, Base):
        from sqlalchemy.orm import scoped_session, sessionmaker
        from zope.sqlalchemy import ZopeTransactionExtension
        return scoped_session(
            sessionmaker(
                bind=Base.metadata.bind,
                extension=ZopeTransactionExtension()))

    def _getTargetClass(self, mappers):
        from fanboi2.models._versioned import make_versioned_class
        return make_versioned_class(lambda m: mappers.append(m))

    def _getSetupFunction(self, mappers):
        from fanboi2.models._versioned import make_versioned
        def _make_versioned(session):
            make_versioned(session, lambda: mappers)
        return _make_versioned

    def _makeTable(self, Base):
        Base.metadata.drop_all()
        Base.metadata.create_all()

    def _dropTable(self, Base):
        Base.metadata.drop_all()

    def test_versioned(self):
        from sqlalchemy.sql import func
        from sqlalchemy.sql.schema import Column
        from sqlalchemy.sql.sqltypes import Integer, String, DateTime

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'
            id = Column(Integer, primary_key=True)
            body = Column(String)
            created_at = Column(DateTime(timezone=True), default=func.now())

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        transaction.begin()

        thing = Thing(body='Hello, world')
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)

        thing.body = 'Hello, galaxy'
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 1)
        thing_v1 = Session.query(ThingHistory).filter_by(version=1).one()
        self.assertEqual(thing_v1.body, 'Hello, world')
        self.assertEqual(thing_v1.change_type, 'update')
        self.assertEqual(thing_v1.version, 1)
        self.assertIsNotNone(thing_v1.changed_at)
        self.assertIsNotNone(thing_v1.created_at)

        thing.body = 'Hello, universe'
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 3)
        self.assertEqual(Session.query(ThingHistory).count(), 2)
        thing_v2 = Session.query(ThingHistory).filter_by(version=2).one()
        self.assertEqual(thing_v2.body, 'Hello, galaxy')
        self.assertEqual(thing_v2.change_type, 'update')
        self.assertEqual(thing_v2.version, 2)
        self.assertIsNotNone(thing_v2.changed_at)
        self.assertIsNotNone(thing_v2.created_at)

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_column(self):
        from sqlalchemy.sql.schema import Column
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'
            id = Column(Integer, primary_key=True)
            ref = Column(String, unique=True, info={"version_meta": True})

        self._getSetupFunction(mappers)(Session)
        thing_history_table = Thing.__history_mapper__.local_table
        self.assertIsNone(getattr(thing_history_table.c, 'ref', None))

    def test_versioned_null(self):
        from sqlalchemy.sql.schema import Column
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'
            id = Column(Integer, primary_key=True)
            body = Column(String, nullable=True)

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        transaction.begin()

        thing = Thing()
        Session.add(thing)
        Session.flush()
        self.assertIsNone(thing.body)
        self.assertEqual(thing.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)

        thing.body = 'Hello, world'
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 1)
        thing_v1 = Session.query(ThingHistory).filter_by(version=1).one()
        self.assertIsNone(thing_v1.body)
        self.assertEqual(thing_v1.change_type, 'update')
        self.assertEqual(thing_v1.version, 1)
        self.assertIsNotNone(thing_v1.changed_at)

        thing.body = None
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 3)
        self.assertEqual(Session.query(ThingHistory).count(), 2)
        thing_v2 = Session.query(ThingHistory).filter_by(version=2).one()
        self.assertEqual('Hello, world', thing_v2.body)
        self.assertEqual(thing_v2.change_type, 'update')
        self.assertEqual(thing_v2.version, 2)
        self.assertIsNotNone(thing_v2.changed_at)

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_bool(self):
        from sqlalchemy.sql.schema import Column
        from sqlalchemy.sql.sqltypes import Integer, Boolean

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'
            id = Column(Integer, primary_key=True)
            boolean = Column(Boolean, default=False)

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        transaction.begin()

        thing = Thing()
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)

        thing.boolean = True
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 1)
        thing_v1 = Session.query(ThingHistory).filter_by(version=1).one()
        self.assertEqual(thing_v1.boolean, False)
        self.assertEqual(thing_v1.change_type, 'update')
        self.assertEqual(thing_v1.version, 1)
        self.assertIsNotNone(thing_v1.changed_at)

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_deferred(self):
        from sqlalchemy.orm import deferred
        from sqlalchemy.sql.schema import Column
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'
            id = Column(Integer, primary_key=True)
            name = Column(String, default=False)
            data = deferred(Column(String))

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        transaction.begin()

        thing = Thing(name='test', data='Hello, world')
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)

        transaction.commit()
        transaction.begin()

        thing = Session.query(Thing).first()
        thing.data = 'Hello, galaxy'
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 1)
        thing_v1 = Session.query(ThingHistory).filter_by(version=1).one()
        self.assertEqual(thing_v1.data, 'Hello, world')
        self.assertEqual(thing_v1.change_type, 'update')
        self.assertEqual(thing_v1.version, 1)
        self.assertIsNotNone(thing_v1.changed_at)

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_inheritance(self):
        from sqlalchemy.orm import column_property
        from sqlalchemy.sql.schema import Column, ForeignKey
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'

            id = Column(Integer, primary_key=True)
            name = Column(String)
            type = Column(String)

            __mapper_args__ = {
                'polymorphic_on': type,
                'polymorphic_identity': 'base',
            }

        class ThingSeparatePk(Thing):
            __tablename__ = 'thing_separate_pk'
            __mapper_args__ = {'polymorphic_identity': 'separate_pk'}

            id = column_property(Column(Integer, primary_key=True), Thing.id)
            thing_id = Column(Integer, ForeignKey('thing.id'))
            sub_data_1 = Column(String)

        class ThingSamePk(Thing):
            __tablename__ = 'thing_same_pk'
            __mapper_args__ = {'polymorphic_identity': 'same_pk'}

            id = Column(Integer, ForeignKey('thing.id'), primary_key=True)
            sub_data_2 = Column(String)

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        ThingSeparatePkHistory = ThingSeparatePk.__history_mapper__.class_
        ThingSamePkHistory = ThingSamePk.__history_mapper__.class_
        transaction.begin()

        thing = Thing(name='Foo')
        thing_sp = ThingSeparatePk(name='Bar', sub_data_1='Hello, world')
        thing_sm = ThingSamePk(name='Baz', sub_data_2='Hello, galaxy')
        Session.add_all([thing, thing_sp, thing_sm])
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(thing_sp.version, 1)
        self.assertEqual(thing_sm.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)
        self.assertEqual(Session.query(ThingSeparatePkHistory).count(), 0)
        self.assertEqual(Session.query(ThingSamePkHistory).count(), 0)

        thing.name = 'Hoge'
        thing_sp.sub_data_1 = 'Hello, universe'
        thing_sm.sub_data_2 = 'Hello, multiuniverse'
        Session.add_all([thing, thing_sp, thing_sm])
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(thing_sp.version, 2)
        self.assertEqual(thing_sm.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 3)
        self.assertEqual(Session.query(ThingSeparatePkHistory).count(), 1)
        self.assertEqual(Session.query(ThingSamePkHistory).count(), 1)

        thing_sp.sub_data_1 = 'Hello, parallel universe'
        Session.add(thing_sp)
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(thing_sp.version, 3)
        self.assertEqual(thing_sm.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 4)
        self.assertEqual(Session.query(ThingSeparatePkHistory).count(), 2)
        self.assertEqual(Session.query(ThingSamePkHistory).count(), 1)

        thing_sm.sub_data_2 = 'Hello, 42'
        Session.add(thing_sm)
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(thing_sp.version, 3)
        self.assertEqual(thing_sm.version, 3)
        self.assertEqual(Session.query(ThingHistory).count(), 5)
        self.assertEqual(Session.query(ThingSeparatePkHistory).count(), 2)
        self.assertEqual(Session.query(ThingSamePkHistory).count(), 2)

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_inheritance_multi(self):
        from sqlalchemy.orm import column_property
        from sqlalchemy.sql.schema import Column, ForeignKey
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'

            id = Column(Integer, primary_key=True)
            name = Column(String)
            type = Column(String)

            __mapper_args__ = {
                'polymorphic_on': type,
                'polymorphic_identity': 'base',
            }

        class ThingSub(Thing):
            __tablename__ = 'thing_sub'
            __mapper_args__ = {'polymorphic_identity': 'sub'}

            id = column_property(Column(Integer, primary_key=True), Thing.id)
            base_id = Column(Integer, ForeignKey('thing.id'))
            sub_data_1 = Column(String)

        class ThingSubSub(ThingSub):
            __tablename__ = 'thing_sub_sub'
            __mapper_args__ = {'polymorphic_identity': 'sub_sub'}

            id = Column(Integer, ForeignKey('thing_sub.id'), primary_key=True)
            sub_data_2 = Column(String)

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        ThingSubHistory = ThingSub.__history_mapper__.class_
        ThingSubSubHistory = ThingSubSub.__history_mapper__.class_
        transaction.begin()

        thing = Thing(name='Foo')
        thing_sub = ThingSub(name='Bar', sub_data_1='Hello, world')
        thing_sub_sub = ThingSubSub(name='Baz', sub_data_2='Hello, galaxy')
        Session.add_all([thing, thing_sub, thing_sub_sub])
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(thing_sub.version, 1)
        self.assertEqual(thing_sub_sub.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)
        self.assertEqual(Session.query(ThingSubHistory).count(), 0)
        self.assertEqual(Session.query(ThingSubSubHistory).count(), 0)

        thing.name = 'Hoge'
        thing_sub.sub_data_1 = 'Hello, universe'
        thing_sub_sub.sub_data_2 = 'Hello, multiuniverse'
        Session.add_all([thing, thing_sub, thing_sub_sub])
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(thing_sub.version, 2)
        self.assertEqual(thing_sub_sub.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 3)
        self.assertEqual(Session.query(ThingSubHistory).count(), 2)
        self.assertEqual(Session.query(ThingSubSubHistory).count(), 1)

        thing_sub.sub_data_1 = 'Hello, parallel universe'
        Session.add(thing_sub)
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(thing_sub.version, 3)
        self.assertEqual(thing_sub_sub.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 4)
        self.assertEqual(Session.query(ThingSubHistory).count(), 3)
        self.assertEqual(Session.query(ThingSubSubHistory).count(), 1)

        thing_sub_sub.sub_data_2 = 'Hello, 42'
        Session.add(thing_sub_sub)
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(thing_sub.version, 3)
        self.assertEqual(thing_sub_sub.version, 3)
        self.assertEqual(Session.query(ThingHistory).count(), 5)
        self.assertEqual(Session.query(ThingSubHistory).count(), 4)
        self.assertEqual(Session.query(ThingSubSubHistory).count(), 2)

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_inheritance_single(self):
        from sqlalchemy.sql.schema import Column
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'

            id = Column(Integer, primary_key=True)
            data = Column(String)
            type = Column(String)

            __mapper_args__ = {
                'polymorphic_on': type,
                'polymorphic_identity': 'base',
            }

        class ThingSub(Thing):
            __mapper_args__ = {'polymorphic_identity': 'sub'}
            sub_data = Column(String)

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        ThingSubHistory = ThingSub.__history_mapper__.class_
        transaction.begin()

        thing = Thing(data='Hello, world')
        thing_sub = ThingSub(data='Hello, galaxy', sub_data='Hello, universe')
        Session.add_all([thing, thing_sub])
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(thing_sub.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)
        self.assertEqual(Session.query(ThingSubHistory).count(), 0)

        thing.data = 'Hello, multiuniverse'
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(thing_sub.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 1)
        self.assertEqual(Session.query(ThingSubHistory).count(), 0)
        thing_v1 = Session.query(ThingHistory).filter_by(version=1).one()
        self.assertEqual(thing_v1.data, 'Hello, world')
        self.assertEqual(thing_v1.change_type, 'update')
        self.assertEqual(thing_v1.version, 1)

        thing_sub.sub_data = 'Hello, parallel universe'
        Session.add(thing_sub)
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(thing_sub.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 2)
        self.assertEqual(Session.query(ThingSubHistory).count(), 1)
        thing_sub_v1 = Session.query(ThingSubHistory).filter_by(version=1).one()
        self.assertEqual(thing_sub_v1.sub_data, 'Hello, universe')
        self.assertEqual(thing_sub_v1.change_type, 'update')
        self.assertEqual(thing_sub_v1.version, 1)

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_unique(self):
        from sqlalchemy.sql.schema import Column
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'

            id = Column(Integer, primary_key=True)
            name = Column(String, unique=True)
            data = Column(String)

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        transaction.begin()

        thing = Thing(name='Hello', data='Hello, world')
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)

        thing.data = 'Hello, galaxy'
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 1)

        thing.data = 'Hello, universe'
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 3)
        self.assertEqual(Session.query(ThingHistory).count(), 2)

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_relationship(self):
        from sqlalchemy.orm import relationship
        from sqlalchemy.sql.schema import Column, ForeignKey
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Relate(Base):
            __tablename__ = 'relate'
            id = Column(Integer, primary_key=True)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'
            id = Column(Integer, primary_key=True)
            relate_id = Column(Integer, ForeignKey('relate.id'))
            relate = relationship('Relate', backref='things')

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        transaction.begin()

        thing = Thing()
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)

        relate = Relate()
        thing.relate = relate
        Session.add_all([relate, thing])
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 1)
        thing_v1 = Session.query(ThingHistory).filter_by(version=1).one()
        self.assertIsNone(thing_v1.relate_id)
        self.assertEqual(thing_v1.change_type, 'update')
        self.assertEqual(thing_v1.version, 1)

        thing.relate = None
        Session.add(thing)
        Session.flush()
        self.assertIsNone(thing.relate_id)
        self.assertEqual(thing.version, 3)
        self.assertEqual(Session.query(ThingHistory).count(), 2)
        thing_v2 = Session.query(ThingHistory).filter_by(version=2).one()
        self.assertEqual(thing_v2.relate_id, relate.id)
        self.assertEqual(thing_v2.change_type, 'update')
        self.assertEqual(thing_v2.version, 2)

        Session.delete(relate)
        Session.flush()

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_relationship_cascade_null(self):
        from sqlalchemy.orm import relationship
        from sqlalchemy.sql.schema import Column, ForeignKey
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Relate(Base):
            __tablename__ = 'relate'
            id = Column(Integer, primary_key=True)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'
            id = Column(Integer, primary_key=True)
            relate_id = Column(Integer, ForeignKey('relate.id'))
            relate = relationship('Relate', backref='things')

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        transaction.begin()

        relate = Relate()
        thing = Thing(relate=relate)
        Session.add_all([relate, thing])
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)

        Session.delete(relate)
        Session.flush()
        self.assertIsNone(thing.relate_id)
        self.assertEqual(thing.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 1)
        thing_v1 = Session.query(ThingHistory).filter_by(version=1).one()
        self.assertEqual(thing_v1.relate_id, relate.id)
        self.assertEqual(thing_v1.change_type, 'update.cascade')
        self.assertEqual(thing_v1.version, 1)

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_relationship_cascade_all(self):
        from sqlalchemy import inspect
        from sqlalchemy.orm import relationship, backref
        from sqlalchemy.sql.schema import Column, ForeignKey
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Relate(Base):
            __tablename__ = 'relate'
            id = Column(Integer, primary_key=True)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'
            id = Column(Integer, primary_key=True)
            relate_id = Column(Integer, ForeignKey('relate.id'))
            relate = relationship(
                'Relate',
                backref=backref('things', cascade='all'))

        class Entity(self._getTargetClass(mappers), Base):
            __tablename__ = 'entity'
            id = Column(Integer, primary_key=True)
            thing_id = Column(Integer, ForeignKey('thing.id'))
            thing = relationship(
                'Thing',
                backref=backref('entities', cascade='all'))

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        EntityHistory = Entity.__history_mapper__.class_
        transaction.begin()

        relate = Relate()
        thing = Thing(relate=relate)
        entity = Entity(thing=thing)
        Session.add_all([relate, thing, entity])
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(entity.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)
        self.assertEqual(Session.query(EntityHistory).count(), 0)

        Session.delete(relate)
        Session.flush()
        self.assertTrue(inspect(thing).deleted)
        self.assertTrue(inspect(entity).deleted)
        self.assertEqual(Session.query(ThingHistory).count(), 1)
        self.assertEqual(Session.query(EntityHistory).count(), 1)
        thing_v1 = Session.query(ThingHistory).filter_by(version=1).one()
        self.assertEqual(thing_v1.relate_id, relate.id)
        self.assertEqual(thing_v1.change_type, 'delete')
        self.assertEqual(thing_v1.version, 1)
        entity_v1 = Session.query(EntityHistory).filter_by(version=1).one()
        self.assertEqual(entity_v1.thing_id, thing.id)
        self.assertEqual(entity_v1.change_type, 'delete')
        self.assertEqual(entity_v1.version, 1)

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_relationship_cascade_orphan(self):
        from sqlalchemy import inspect
        from sqlalchemy.orm import relationship, backref
        from sqlalchemy.sql.schema import Column, ForeignKey
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Relate(Base):
            __tablename__ = 'relate'
            id = Column(Integer, primary_key=True)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'
            id = Column(Integer, primary_key=True)
            relate_id = Column(Integer, ForeignKey('relate.id'))
            relate = relationship(
                'Relate',
                backref=backref('things', cascade='all,delete-orphan'))

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        transaction.begin()

        relate = Relate()
        thing = Thing(relate=relate)
        Session.add_all([relate, thing])
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)

        thing.relate = None
        Session.add(thing)
        Session.flush()
        self.assertTrue(inspect(thing).deleted)
        self.assertEqual(Session.query(ThingHistory).count(), 1)
        thing_v1 = Session.query(ThingHistory).filter_by(version=1).one()
        self.assertEqual(thing_v1.relate_id, relate.id)
        self.assertEqual(thing_v1.change_type, 'update')
        self.assertEqual(thing_v1.version, 1)

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_deleted(self):
        from sqlalchemy import inspect
        from sqlalchemy.sql.schema import Column
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'
            id = Column(Integer, primary_key=True)
            data = Column(String)

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        transaction.begin()

        thing = Thing(data='Hello, world')
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)

        Session.delete(thing)
        Session.flush()
        self.assertTrue(inspect(thing).deleted)
        self.assertEqual(Session.query(ThingHistory).count(), 1)
        thing_v1 = Session.query(ThingHistory).filter_by(version=1).one()
        self.assertEqual(thing_v1.data, 'Hello, world')
        self.assertEqual(thing_v1.change_type, 'delete')
        self.assertEqual(thing_v1.version, 1)

        transaction.abort()
        self._dropTable(Base)

    def test_versioned_named_column(self):
        from sqlalchemy import inspect
        from sqlalchemy.sql.schema import Column
        from sqlalchemy.sql.sqltypes import Integer, String

        mappers = []
        Base = self._makeBase()
        Session = self._makeSession(Base)

        class Thing(self._getTargetClass(mappers), Base):
            __tablename__ = 'thing'
            id = Column(Integer, primary_key=True)
            data_ = Column('data', String)

        self._getSetupFunction(mappers)(Session)
        self._makeTable(Base)
        ThingHistory = Thing.__history_mapper__.class_
        transaction.begin()

        thing = Thing(data_='Hello, world')
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 1)
        self.assertEqual(Session.query(ThingHistory).count(), 0)

        thing.data_ = 'Hello, galaxy'
        Session.add(thing)
        Session.flush()
        self.assertEqual(thing.version, 2)
        self.assertEqual(Session.query(ThingHistory).count(), 1)
        thing_v1 = Session.query(ThingHistory).filter_by(version=1).one()
        self.assertEqual(thing_v1.data_, 'Hello, world')
        self.assertEqual(thing_v1.change_type, 'update')
        self.assertEqual(thing_v1.version, 1)

        transaction.abort()
        self._dropTable(Base)


class BoardModelTest(ModelMixin, unittest.TestCase):

    def test_relations(self):


@@ 206,7 1028,7 @@ class BoardModelTest(ModelMixin, unittest.TestCase):
                         list(board.topics))


class TopicModelTest(ModelMixin, unittest.TestCase):
class TestTopicModel(ModelMixin, unittest.TestCase):

    def test_relations(self):
        board = self._makeBoard(title="Foobar", slug="foo")


@@ 423,7 1245,7 @@ class TopicModelTest(ModelMixin, unittest.TestCase):
            topic.scoped_posts("l5"))


class PostModelTest(ModelMixin, unittest.TestCase):
class TestPostModel(ModelMixin, unittest.TestCase):

    def test_relations(self):
        board = self._makeBoard(title="Foobar", slug="foo")

M setup.cfg => setup.cfg +0 -1
@@ 3,7 3,6 @@ match=^test
nocapture=1
cover-package=fanboi2
with-coverage=1
cover-erase=1

[coverage:report]
show_missing = true