Source code for heat.testing.runner

# vim: tabstop=4 shiftwidth=4 softtabstop=4

#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.

# Colorizer Code is borrowed from Twisted:
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
#
#    Permission is hereby granted, free of charge, to any person obtaining
#    a copy of this software and associated documentation files (the
#    "Software"), to deal in the Software without restriction, including
#    without limitation the rights to use, copy, modify, merge, publish,
#    distribute, sublicense, and/or sell copies of the Software, and to
#    permit persons to whom the Software is furnished to do so, subject to
#    the following conditions:
#
#    The above copyright notice and this permission notice shall be
#    included in all copies or substantial portions of the Software.
#
#    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#    EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
#    MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
#    NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
#    LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
#    OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
#    WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Unittest runner for Heat.

To run all tests
    python heat/testing/runner.py

To run a single test module:
    python heat/testing/runner.py test_resources


To run a single test:
    python heat/testing/runner.py
        test_resources:ResourceTestCase.test_resource_from_template

"""

import gettext
import heapq
import os
import unittest
import sys
import time

import eventlet
from nose import config
from nose import core
from nose import result

gettext.install('heat', unicode=1)
reldir = os.path.join(os.path.dirname(__file__), '..', '..')
absdir = os.path.abspath(reldir)
sys.path.insert(0, absdir)


class _AnsiColorizer(object):
    """
    A colorizer is an object that loosely wraps around a stream, allowing
    callers to write text to the stream in a particular color.

    Colorizer classes must implement C{supported()} and C{write(text, color)}.
    """
    _colors = dict(black=30, red=31, green=32, yellow=33,
                   blue=34, magenta=35, cyan=36, white=37)

    def __init__(self, stream):
        self.stream = stream

    def supported(cls, stream=sys.stdout):
        """
        A class method that returns True if the current platform supports
        coloring terminal output using this method. Returns False otherwise.
        """
        if not stream.isatty():
            return False  # auto color only on TTYs
        try:
            import curses
        except ImportError:
            return False
        else:
            try:
                try:
                    return curses.tigetnum("colors") > 2
                except curses.error:
                    curses.setupterm()
                    return curses.tigetnum("colors") > 2
            except Exception:
                # guess false in case of error
                return False
    supported = classmethod(supported)

    def write(self, text, color):
        """
        Write the given text to the stream in the given color.

        @param text: Text to be written to the stream.

        @param color: A string label for a color. e.g. 'red', 'white'.
        """
        color = self._colors[color]
        self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))


class _Win32Colorizer(object):
    """
    See _AnsiColorizer docstring.
    """
    def __init__(self, stream):
        import win32console as win
        red, green, blue, bold = (win.FOREGROUND_RED, win.FOREGROUND_GREEN,
                                  win.FOREGROUND_BLUE, win.FOREGROUND_INTENSITY
                                  )
        self.stream = stream
        self.screenBuffer = win.GetStdHandle(win.STD_OUT_HANDLE)
        self._colors = {'normal': red | green | blue,
                        'red': red | bold,
                        'green': green | bold,
                        'blue': blue | bold,
                        'yellow': red | green | bold,
                        'magenta': red | blue | bold,
                        'cyan': green | blue | bold,
                        'white': red | green | blue | bold}

    def supported(cls, stream=sys.stdout):
        try:
            import win32console
            screenBuffer = win32console.GetStdHandle(
                win32console.STD_OUT_HANDLE)
        except ImportError:
            return False
        import pywintypes
        try:
            screenBuffer.SetConsoleTextAttribute(
                win32console.FOREGROUND_RED |
                win32console.FOREGROUND_GREEN |
                win32console.FOREGROUND_BLUE)
        except pywintypes.error:
            return False
        else:
            return True
    supported = classmethod(supported)

    def write(self, text, color):
        color = self._colors[color]
        self.screenBuffer.SetConsoleTextAttribute(color)
        self.stream.write(text)
        self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])


class _NullColorizer(object):
    """
    See _AnsiColorizer docstring.
    """
    def __init__(self, stream):
        self.stream = stream

    def supported(cls, stream=sys.stdout):
        return True
    supported = classmethod(supported)

    def write(self, text, color):
        self.stream.write(text)


def get_elapsed_time_color(elapsed_time):
[docs] if elapsed_time > 1.0: return 'red' elif elapsed_time > 0.25: return 'yellow' else: return 'green' class HeatTestResult(result.TextTestResult):
[docs] def __init__(self, *args, **kw): self.show_elapsed = kw.pop('show_elapsed') result.TextTestResult.__init__(self, *args, **kw) self.num_slow_tests = 5 self.slow_tests = [] # this is a fixed-sized heap self._last_case = None self.colorizer = None # NOTE(vish): reset stdout for the terminal check stdout = sys.stdout sys.stdout = sys.__stdout__ for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]: if colorizer.supported(): self.colorizer = colorizer(self.stream) break sys.stdout = stdout # NOTE(lorinh): Initialize start_time in case a sqlalchemy-migrate # error results in it failing to be initialized later. Otherwise, # _handleElapsedTime will fail, causing the wrong error message to # be outputted. self.start_time = time.time() def getDescription(self, test):
[docs] return str(test) def _handleElapsedTime(self, test):
self.elapsed_time = time.time() - self.start_time item = (self.elapsed_time, test) # Record only the n-slowest tests using heap if len(self.slow_tests) >= self.num_slow_tests: heapq.heappushpop(self.slow_tests, item) else: heapq.heappush(self.slow_tests, item) def _writeElapsedTime(self, test): color = get_elapsed_time_color(self.elapsed_time) self.colorizer.write(" %.2f" % self.elapsed_time, color) def _writeResult(self, test, long_result, color, short_result, success): if self.showAll: self.colorizer.write(long_result, color) if self.show_elapsed and success: self._writeElapsedTime(test) self.stream.writeln() elif self.dots: self.stream.write(short_result) self.stream.flush() # NOTE(vish): copied from unittest with edit to add color def addSuccess(self, test):
[docs] unittest.TestResult.addSuccess(self, test) self._handleElapsedTime(test) self._writeResult(test, 'OK', 'green', '.', True) # NOTE(vish): copied from unittest with edit to add color def addFailure(self, test, err):
[docs] unittest.TestResult.addFailure(self, test, err) self._handleElapsedTime(test) self._writeResult(test, 'FAIL', 'red', 'F', False) # NOTE(vish): copied from nose with edit to add color def addError(self, test, err):
[docs] """Overrides normal addError to add support for errorClasses. If the exception is a registered class, the error will be added to the list for that class, not errors. """ self._handleElapsedTime(test) stream = getattr(self, 'stream', None) ec, ev, tb = err try: exc_info = self._exc_info_to_string(err, test) except TypeError: # 2.3 compat exc_info = self._exc_info_to_string(err) for cls, (storage, label, isfail) in self.errorClasses.items(): if result.isclass(ec) and issubclass(ec, cls): if isfail: test.passed = False storage.append((test, exc_info)) # Might get patched into a streamless result if stream is not None: if self.showAll: message = [label] detail = result._exception_detail(err[1]) if detail: message.append(detail) stream.writeln(": ".join(message)) elif self.dots: stream.write(label[:1]) return self.errors.append((test, exc_info)) test.passed = False if stream is not None: self._writeResult(test, 'ERROR', 'red', 'E', False) def startTest(self, test):
[docs] unittest.TestResult.startTest(self, test) self.start_time = time.time() current_case = test.test.__class__.__name__ if self.showAll: if current_case != self._last_case: self.stream.writeln(current_case) self._last_case = current_case self.stream.write( ' %s' % str(test.test._testMethodName).ljust(60)) self.stream.flush() class HeatTestRunner(core.TextTestRunner):
[docs] def __init__(self, *args, **kwargs): self.show_elapsed = kwargs.pop('show_elapsed') core.TextTestRunner.__init__(self, *args, **kwargs) def _makeResult(self): return HeatTestResult(self.stream, self.descriptions, self.verbosity, self.config, show_elapsed=self.show_elapsed) def _writeSlowTests(self, result_): # Pare out 'fast' tests slow_tests = [item for item in result_.slow_tests if get_elapsed_time_color(item[0]) != 'green'] if slow_tests: slow_total_time = sum(item[0] for item in slow_tests) self.stream.writeln("Slowest %i tests took %.2f secs:" % (len(slow_tests), slow_total_time)) for elapsed_time, test in sorted(slow_tests, reverse=True): time_str = "%.2f" % elapsed_time self.stream.writeln(" %s %s" % (time_str.ljust(10), test)) def run(self, test):
[docs] result_ = core.TextTestRunner.run(self, test) if self.show_elapsed: self._writeSlowTests(result_) return result_ def run():
[docs] # This is a fix to allow the --hide-elapsed flag while accepting # arbitrary nosetest flags as well argv = [x for x in sys.argv if x != '--hide-elapsed'] hide_elapsed = argv != sys.argv # If any argument looks like a test name but doesn't have "heat.tests" in # front of it, automatically add that so we don't have to type as much for i, arg in enumerate(argv): if arg.startswith('test_'): argv[i] = 'heat.tests.%s' % arg testdir = os.path.abspath(os.path.join("heat", "tests")) c = config.Config(stream=sys.stdout, env=os.environ, verbosity=3, workingDir=testdir, plugins=core.DefaultPluginManager()) runner = HeatTestRunner(stream=c.stream, verbosity=c.verbosity, config=c, show_elapsed=not hide_elapsed) sys.exit(not core.run(config=c, testRunner=runner, argv=argv)) if __name__ == '__main__':
eventlet.monkey_patch() run()