215 lines
7.2 KiB
Python
215 lines
7.2 KiB
Python
|
|
'''
|
|
Wrapper for Python's unittest.TestCase that sets up BZip2 testing environment.
|
|
'''
|
|
|
|
__copyright__ = "Copyright (C) 2022 Micah Snyder"
|
|
|
|
from math import ceil
|
|
import os
|
|
from pathlib import Path
|
|
import platform
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
import unittest
|
|
from typing import Tuple, Union, NamedTuple
|
|
|
|
# Use older Python 3.5 syntax.
|
|
CmdResult = NamedTuple('CmdResult', [('ec', int), ('out', bytes), ('err', bytes)])
|
|
|
|
class TestCase(unittest.TestCase):
|
|
|
|
version = ""
|
|
|
|
path_source = None
|
|
path_build = None
|
|
path_tmp = None
|
|
|
|
bzip2 = None
|
|
|
|
valgrind = "" # Not 'None' because we'll use this variable even if valgrind not found.
|
|
valgrind_args = []
|
|
|
|
original_working_directory = ""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
'''
|
|
Prepare test environment:
|
|
- Create a temporary testing directory.
|
|
- Get paths needed for tests from environment variables.
|
|
'''
|
|
cls.operating_system = platform.platform().split("-")[0].lower()
|
|
|
|
# The bzip2 program uses the BZIP and BZIP2 environment variables as
|
|
# additional input. We must purge them to prevent OS environment
|
|
# variables from affecting the test suite.
|
|
os.environ.pop('BZIP', None)
|
|
os.environ.pop('BZIP2', None)
|
|
|
|
# Version may be used for testing bzip2 --version output, etc.
|
|
cls.version = os.getenv("VERSION")
|
|
if cls.version == None:
|
|
raise Exception("VERSION environment variable not defined! Aborting...")
|
|
|
|
# Get test paths from environment variables.
|
|
cls.path_source = Path(os.getenv("PATH_SOURCE"))
|
|
cls.path_build = Path(os.getenv("PATH_BUILD"))
|
|
cls.bzip2 = Path(os.getenv("PATH_BZIP2")) if os.getenv("PATH_BZIP2") != None else None
|
|
|
|
# Generate temp directory
|
|
cls.path_tmp = Path(tempfile.mkdtemp(prefix=(cls.__name__ + "-"), dir=os.getenv("TMP")))
|
|
|
|
# Enable valgrind testing if VALGRIND variable set to path of Valgrind executable.
|
|
if os.getenv('VALGRIND') != None:
|
|
valgrind = Path(os.getenv("VALGRIND"))
|
|
|
|
if valgrind.is_file():
|
|
cls.valgrind = valgrind
|
|
|
|
logfile = cls.path_tmp / 'valgrind.log'
|
|
cls.valgrind_args = [
|
|
'-v',
|
|
'--trace-children=yes',
|
|
'--track-fds=yes',
|
|
'--leak-check=full',
|
|
'--gen-suppressions=all',
|
|
'--show-leak-kinds=definite',
|
|
'--errors-for-leak-kinds=definite',
|
|
f'--log-file={logfile}',
|
|
'--error-exitcode=123',
|
|
]
|
|
|
|
# Perform all tests with cwd set to the cls.path_tmp, created above.
|
|
cls.original_working_directory = os.getcwd()
|
|
os.chdir(cls.path_tmp)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
'''
|
|
Clean up after ourselves,
|
|
Delete the generated tmp directory.
|
|
'''
|
|
print("")
|
|
|
|
# Restore current working directory before deleting cls.path_tmp.
|
|
os.chdir(cls.original_working_directory)
|
|
|
|
if None == os.getenv("KEEPTEMP"):
|
|
try:
|
|
shutil.rmtree(cls.path_tmp)
|
|
print("Removed tmp directory: {}".format(cls.path_tmp))
|
|
except Exception:
|
|
print("No tmp directory to clean up.")
|
|
|
|
def setUp(self):
|
|
print('\n')
|
|
|
|
def tearDown(self):
|
|
print('\n')
|
|
|
|
def execute(self, cmd: list, try_valgrind: bool = True) -> CmdResult:
|
|
'''
|
|
Execute a subprocess.Popen list of commands.
|
|
Return a tuple of
|
|
'''
|
|
# Use valgrind if we have it.
|
|
if try_valgrind and self.valgrind != '':
|
|
cmd = [str(self.valgrind),] + self.valgrind_args + cmd
|
|
|
|
print(f"Running: {' '.join(cmd)}\n")
|
|
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
out, err = p.communicate()
|
|
|
|
# Check the valgrind log for errors.
|
|
if try_valgrind and self.valgrind != '':
|
|
self.verify_valgrind_log()
|
|
|
|
return CmdResult(p.returncode, out, err)
|
|
|
|
def verify_valgrind_log(self, log_file: Union[Path, None]=None):
|
|
'''
|
|
Check if valgrind log file contains errors.
|
|
If valgrind not enabled this is basically a nop.
|
|
'''
|
|
if self.valgrind == "":
|
|
return
|
|
|
|
if log_file == None:
|
|
log_file = self.path_tmp / 'valgrind.log'
|
|
|
|
if not log_file.exists():
|
|
raise AssertionError('{} not found. Valgrind failed to run?'.format(log_file))
|
|
|
|
errors = False
|
|
print('Verifying {}...'.format(log_file))
|
|
try:
|
|
with log_file.open('r') as the_log:
|
|
assert 'ERROR SUMMARY: 0 errors' not in the_log
|
|
except AssertionError:
|
|
print("*" * 80)
|
|
print('Valgrind test failed!'.center(80, ' '))
|
|
print('Please submit a bug report with this log to https://gitlab.com/bzip2/bzip2/issues'.center(69, ' '))
|
|
print(str(log_file).center(80, ' '))
|
|
print("*" * 80)
|
|
errors = True
|
|
finally:
|
|
with log_file.open('r') as log:
|
|
found_summary = False
|
|
for line in log.readlines():
|
|
if 'ERROR SUMMARY' in line:
|
|
found_summary = True
|
|
if (found_summary or errors) and len(line) < 500:
|
|
print(line.rstrip('\n'))
|
|
if errors:
|
|
raise AssertionError('Valgrind test FAILED!')
|
|
|
|
@staticmethod
|
|
def hex_compare(actual: bytes, expected: bytes, size: int = 16):
|
|
'''
|
|
Return string with hex comparison of two buffers
|
|
'''
|
|
a_lines = ceil(float(len(actual)) / float(size))
|
|
e_lines = ceil(float(len(expected)) / float(size))
|
|
lines = max(a_lines, e_lines)
|
|
|
|
comparison = ' ' + \
|
|
f'output ({len(actual)}):'.ljust(size*2 + 3) + \
|
|
f'expected ({len(expected)}):\n'
|
|
|
|
def render_slice(to_print, to_compare):
|
|
line = ''
|
|
for byte in range(0, size):
|
|
if byte == size / 2:
|
|
line += ' '
|
|
|
|
if byte < len(to_print):
|
|
if byte >= len(to_compare) or to_print[byte] != to_compare[byte]:
|
|
line += '\x1b[1;33m{:02x}\x1b[0m'.format(to_print[byte]) # bold yellow
|
|
else:
|
|
line += '{:02x}'.format(to_print[byte]) # plain
|
|
else:
|
|
line += ' '
|
|
|
|
return line
|
|
|
|
prev_is_dots = False
|
|
for line in range(0, lines):
|
|
a_line = actual[line * size : line * size + size]
|
|
e_line = expected[line * size : line * size + size]
|
|
|
|
if a_line == e_line:
|
|
if prev_is_dots == False:
|
|
comparison += " ...\n"
|
|
prev_is_dots = True
|
|
else:
|
|
text_line = '{:8d}: {} {}'.format(
|
|
line * size,
|
|
render_slice(a_line, e_line),
|
|
render_slice(e_line, a_line)
|
|
)
|
|
comparison += text_line + '\n'
|
|
prev_is_dots = False
|
|
|
|
return comparison + '\n'
|