Source code for ovirtlago.testlib

# Copyright 2014 Red Hat, Inc.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
# Refer to the README and COPYING files for full details of the license
import datetime
import functools
import logging
import os
import time
import nose.plugins
from nose.plugins.skip import SkipTest

from lago import (utils, log_utils, cmd as lago_cmd)

import ovirtlago

LOGGER = logging.getLogger(__name__)
LONG_TIMEOUT = 10 * 60

_test_prefix = None

[docs]def get_test_prefix(): global _test_prefix if _test_prefix is None: cur_workdir_path = os.environ.get('LAGO_WORKDIR_PATH', os.curdir) workdir = ovirtlago.prefix.OvirtWorkdir(path=cur_workdir_path) _test_prefix = workdir.get_prefix('current') return _test_prefix
[docs]def get_prefixed_name(entity_name): suite = os.environ.get('SUITE') return ( 'lago-' + os.path.basename(suite).replace('.', '-') + '-' + entity_name )
[docs]def with_ovirt_prefix(func): @functools.wraps(func) def wrapper(*args, **kwargs): return func(get_test_prefix(), *args, **kwargs) return wrapper
[docs]def with_ovirt_api(func): @functools.wraps(func) @with_ovirt_prefix def wrapper(prefix, *args, **kwargs): return func(prefix.virt_env.engine_vm().get_api(), *args, **kwargs) return wrapper
[docs]def with_ovirt_api4(func): @functools.wraps(func) @with_ovirt_prefix def wrapper(prefix, *args, **kwargs): return func( prefix.virt_env.engine_vm().get_api(api_ver=4), *args, **kwargs ) return wrapper
def _vms_capable(vms, caps): caps = set(caps) def get_vm_caps(vm): set(vm.metadata.get('ovirt-capabilities', [])) existing_caps = set() for vm in vms: existing_caps = existing_caps.union(get_vm_caps(vm) or []) return caps.issubset(existing_caps)
[docs]def engine_capability(caps): def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): prefix = get_test_prefix() if not _vms_capable([prefix.virt_env.engine_vm()], caps): raise SkipTest() return func() return wrapper return decorator
[docs]def host_capability(caps): def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): prefix = get_test_prefix() if not _vms_capable(prefix.virt_env.host_vms(), caps): raise SkipTest() return func() return wrapper return decorator
[docs]def test_sequence_gen(test_list): for test in test_list: def wrapped_test(): test() setattr(wrapped_test, 'description', test.__name__) yield wrapped_test
[docs]class LogCollectorPlugin(nose.plugins.Plugin): name = 'log-collector-plugin' def __init__(self, prefix): nose.plugins.Plugin.__init__(self) self._prefix = prefix
[docs] def options(self, parser, env=None): env = env if env is not None else os.environ super(LogCollectorPlugin, self).options(parser, env)
[docs] def configure(self, options, conf): super(LogCollectorPlugin, self).configure(options, conf)
[docs] def addError(self, test, err): self._addFault(test, err)
[docs] def addFailure(self, test, err): self._addFault(test, err)
def _addFault(self, test, err): suffix ="%Y%m%d%H%M%S") test_name = '%s-%s' % (, suffix) lago_cmd.do_collect( prefix=self._prefix, output=self._prefix.paths.test_logs(test_name), no_skip=False )
[docs]class TaskLogNosePlugin(nose.plugins.Plugin): name = "tasklog-plugin" # the score parameter is a workaround to catch skipped tests # see: score = 10000 def __init__(self, *args, **kwargs): self.logger = logging.getLogger('nose') self.skipped = {} super(TaskLogNosePlugin, self).__init__(*args, **kwargs)
[docs] def options(self, parser, env): return super(TaskLogNosePlugin, self).options(parser, env)
[docs] def configure(self, options, conf): res = super(TaskLogNosePlugin, self).configure(options, conf) self.logger.handlers = logging.root.handlers return res
[docs] def startTest(self, test): log_utils.start_log_task( test.shortDescription() or str(test), logger=self.logger )
[docs] def stopTest(self, test): desc = test.shortDescription() or str(test) if desc in self.skipped: exp_msg = '' try: exp_msg = self.skipped[desc][1] except KeyError: pass'SKIPPED: %s', exp_msg) log_utils.end_log_task(desc, logger=self.logger)
[docs] def addError(self, test, err): desc = test.shortDescription() or str(test) if issubclass(err[0], self.skipped[desc] = err
def _instance_of_any(obj, cls_list): return any(True for cls in cls_list if isinstance(obj, cls))
[docs]def assert_equals_within( func, value, timeout, allowed_exceptions=None, initial_wait=10 ): allowed_exceptions = allowed_exceptions or [] with utils.EggTimer(timeout) as timer: while not timer.elapsed(): try: res = func() if res == value: return except Exception as exc: if _instance_of_any(exc, allowed_exceptions): time.sleep(3) continue LOGGER.exception("Unhandled exception in %s", func) raise if initial_wait == 0: time.sleep(3) else: time.sleep(initial_wait) initial_wait = 0 try: raise AssertionError( '%s != %s after %s seconds' % (res, value, timeout) ) # if func repeatedly raises any of the allowed exceptions, res remains # unbound throughout the function, resulting in an UnboundLocalError. except UnboundLocalError: raise AssertionError( '%s failed to evaluate after %s seconds' % (func.__name__, timeout) )
[docs]def assert_equals_within_short(func, value, allowed_exceptions=None): allowed_exceptions = allowed_exceptions or [] assert_equals_within( func, value, SHORT_TIMEOUT, allowed_exceptions=allowed_exceptions )
[docs]def assert_equals_within_long(func, value, allowed_exceptions=None): allowed_exceptions = allowed_exceptions or [] assert_equals_within( func, value, LONG_TIMEOUT, allowed_exceptions=allowed_exceptions )
[docs]def assert_true_within(func, timeout, allowed_exceptions=None): assert_equals_within(func, True, timeout, allowed_exceptions)
[docs]def assert_true_within_short(func, allowed_exceptions=None): assert_equals_within_short(func, True, allowed_exceptions)
[docs]def assert_true_within_long(func, allowed_exceptions=None): assert_equals_within_long(func, True, allowed_exceptions)