223 lines
6.2 KiB
Python
223 lines
6.2 KiB
Python
from __future__ import absolute_import
|
|
|
|
import contextlib
|
|
import re
|
|
import warnings
|
|
|
|
from sqlalchemy import exc as sa_exc
|
|
from sqlalchemy.engine import default
|
|
from sqlalchemy.util import decorator
|
|
|
|
from . import config
|
|
from . import mock
|
|
from .exclusions import db_spec
|
|
from .. import util
|
|
from ..util.compat import py3k
|
|
from ..util.compat import text_type
|
|
|
|
|
|
if not util.sqla_094:
|
|
|
|
def eq_(a, b, msg=None):
|
|
"""Assert a == b, with repr messaging on failure."""
|
|
assert a == b, msg or "%r != %r" % (a, b)
|
|
|
|
def ne_(a, b, msg=None):
|
|
"""Assert a != b, with repr messaging on failure."""
|
|
assert a != b, msg or "%r == %r" % (a, b)
|
|
|
|
def is_(a, b, msg=None):
|
|
"""Assert a is b, with repr messaging on failure."""
|
|
assert a is b, msg or "%r is not %r" % (a, b)
|
|
|
|
def is_not_(a, b, msg=None):
|
|
"""Assert a is not b, with repr messaging on failure."""
|
|
assert a is not b, msg or "%r is %r" % (a, b)
|
|
|
|
def assert_raises(except_cls, callable_, *args, **kw):
|
|
try:
|
|
callable_(*args, **kw)
|
|
success = False
|
|
except except_cls:
|
|
success = True
|
|
|
|
# assert outside the block so it works for AssertionError too !
|
|
assert success, "Callable did not raise an exception"
|
|
|
|
def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
|
|
try:
|
|
callable_(*args, **kwargs)
|
|
assert False, "Callable did not raise an exception"
|
|
except except_cls as e:
|
|
assert re.search(msg, text_type(e), re.UNICODE), "%r !~ %s" % (
|
|
msg,
|
|
e,
|
|
)
|
|
print(text_type(e).encode("utf-8"))
|
|
|
|
|
|
else:
|
|
from sqlalchemy.testing.assertions import assert_raises # noqa
|
|
from sqlalchemy.testing.assertions import assert_raises_message # noqa
|
|
from sqlalchemy.testing.assertions import eq_ # noqa
|
|
from sqlalchemy.testing.assertions import is_ # noqa
|
|
from sqlalchemy.testing.assertions import is_not_ # noqa
|
|
from sqlalchemy.testing.assertions import ne_ # noqa
|
|
|
|
|
|
def eq_ignore_whitespace(a, b, msg=None):
|
|
a = re.sub(r"^\s+?|\n", "", a)
|
|
a = re.sub(r" {2,}", " ", a)
|
|
b = re.sub(r"^\s+?|\n", "", b)
|
|
b = re.sub(r" {2,}", " ", b)
|
|
|
|
# convert for unicode string rendering,
|
|
# using special escape character "!U"
|
|
if py3k:
|
|
b = re.sub(r"!U", "", b)
|
|
else:
|
|
b = re.sub(r"!U", "u", b)
|
|
|
|
assert a == b, msg or "%r != %r" % (a, b)
|
|
|
|
|
|
def assert_compiled(element, assert_string, dialect=None):
|
|
dialect = _get_dialect(dialect)
|
|
eq_(
|
|
text_type(element.compile(dialect=dialect))
|
|
.replace("\n", "")
|
|
.replace("\t", ""),
|
|
assert_string.replace("\n", "").replace("\t", ""),
|
|
)
|
|
|
|
|
|
_dialect_mods = {}
|
|
|
|
|
|
def _get_dialect(name):
|
|
if name is None or name == "default":
|
|
return default.DefaultDialect()
|
|
else:
|
|
try:
|
|
dialect_mod = _dialect_mods[name]
|
|
except KeyError:
|
|
dialect_mod = getattr(
|
|
__import__("sqlalchemy.dialects.%s" % name).dialects, name
|
|
)
|
|
_dialect_mods[name] = dialect_mod
|
|
d = dialect_mod.dialect()
|
|
if name == "postgresql":
|
|
d.implicit_returning = True
|
|
elif name == "mssql":
|
|
d.legacy_schema_aliasing = False
|
|
return d
|
|
|
|
|
|
def expect_warnings(*messages, **kw):
|
|
"""Context manager which expects one or more warnings.
|
|
|
|
With no arguments, squelches all SAWarnings emitted via
|
|
sqlalchemy.util.warn and sqlalchemy.util.warn_limited. Otherwise
|
|
pass string expressions that will match selected warnings via regex;
|
|
all non-matching warnings are sent through.
|
|
|
|
The expect version **asserts** that the warnings were in fact seen.
|
|
|
|
Note that the test suite sets SAWarning warnings to raise exceptions.
|
|
|
|
"""
|
|
return _expect_warnings(sa_exc.SAWarning, messages, **kw)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def expect_warnings_on(db, *messages, **kw):
|
|
"""Context manager which expects one or more warnings on specific
|
|
dialects.
|
|
|
|
The expect version **asserts** that the warnings were in fact seen.
|
|
|
|
"""
|
|
spec = db_spec(db)
|
|
|
|
if isinstance(db, util.string_types) and not spec(config._current):
|
|
yield
|
|
else:
|
|
with expect_warnings(*messages, **kw):
|
|
yield
|
|
|
|
|
|
def emits_warning(*messages):
|
|
"""Decorator form of expect_warnings().
|
|
|
|
Note that emits_warning does **not** assert that the warnings
|
|
were in fact seen.
|
|
|
|
"""
|
|
|
|
@decorator
|
|
def decorate(fn, *args, **kw):
|
|
with expect_warnings(assert_=False, *messages):
|
|
return fn(*args, **kw)
|
|
|
|
return decorate
|
|
|
|
|
|
def emits_warning_on(db, *messages):
|
|
"""Mark a test as emitting a warning on a specific dialect.
|
|
|
|
With no arguments, squelches all SAWarning failures. Or pass one or more
|
|
strings; these will be matched to the root of the warning description by
|
|
warnings.filterwarnings().
|
|
|
|
Note that emits_warning_on does **not** assert that the warnings
|
|
were in fact seen.
|
|
|
|
"""
|
|
|
|
@decorator
|
|
def decorate(fn, *args, **kw):
|
|
with expect_warnings_on(db, *messages):
|
|
return fn(*args, **kw)
|
|
|
|
return decorate
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _expect_warnings(exc_cls, messages, regex=True, assert_=True):
|
|
|
|
if regex:
|
|
filters = [re.compile(msg, re.I) for msg in messages]
|
|
else:
|
|
filters = messages
|
|
|
|
seen = set(filters)
|
|
|
|
real_warn = warnings.warn
|
|
|
|
def our_warn(msg, exception=None, *arg, **kw):
|
|
if exception and not issubclass(exception, exc_cls):
|
|
return real_warn(msg, exception, *arg, **kw)
|
|
|
|
if not filters:
|
|
return
|
|
|
|
for filter_ in filters:
|
|
if (regex and filter_.match(msg)) or (
|
|
not regex and filter_ == msg
|
|
):
|
|
seen.discard(filter_)
|
|
break
|
|
else:
|
|
if exception is None:
|
|
real_warn(msg, *arg, **kw)
|
|
else:
|
|
real_warn(msg, exception, *arg, **kw)
|
|
|
|
with mock.patch("warnings.warn", our_warn):
|
|
yield
|
|
|
|
if assert_:
|
|
assert not seen, "Warnings were not seen: %s" % ", ".join(
|
|
"%r" % (s.pattern if regex else s) for s in seen
|
|
)
|