singe/thirdparty/bzip2/tests/testcase.py
2023-10-23 19:38:18 -05:00

215 lines
7.2 KiB
Python

'''
Wrapper for Python's unittest.TestCase that sets up BZip2 testing environment.
'''
__copyright__ = "Copyright (C) 2022 Micah Snyder"
from math import ceil
import os
from pathlib import Path
import platform
import shutil
import subprocess
import tempfile
import unittest
from typing import Tuple, Union, NamedTuple
# Use older Python 3.5 syntax.
CmdResult = NamedTuple('CmdResult', [('ec', int), ('out', bytes), ('err', bytes)])
class TestCase(unittest.TestCase):
version = ""
path_source = None
path_build = None
path_tmp = None
bzip2 = None
valgrind = "" # Not 'None' because we'll use this variable even if valgrind not found.
valgrind_args = []
original_working_directory = ""
@classmethod
def setUpClass(cls):
'''
Prepare test environment:
- Create a temporary testing directory.
- Get paths needed for tests from environment variables.
'''
cls.operating_system = platform.platform().split("-")[0].lower()
# The bzip2 program uses the BZIP and BZIP2 environment variables as
# additional input. We must purge them to prevent OS environment
# variables from affecting the test suite.
os.environ.pop('BZIP', None)
os.environ.pop('BZIP2', None)
# Version may be used for testing bzip2 --version output, etc.
cls.version = os.getenv("VERSION")
if cls.version == None:
raise Exception("VERSION environment variable not defined! Aborting...")
# Get test paths from environment variables.
cls.path_source = Path(os.getenv("PATH_SOURCE"))
cls.path_build = Path(os.getenv("PATH_BUILD"))
cls.bzip2 = Path(os.getenv("PATH_BZIP2")) if os.getenv("PATH_BZIP2") != None else None
# Generate temp directory
cls.path_tmp = Path(tempfile.mkdtemp(prefix=(cls.__name__ + "-"), dir=os.getenv("TMP")))
# Enable valgrind testing if VALGRIND variable set to path of Valgrind executable.
if os.getenv('VALGRIND') != None:
valgrind = Path(os.getenv("VALGRIND"))
if valgrind.is_file():
cls.valgrind = valgrind
logfile = cls.path_tmp / 'valgrind.log'
cls.valgrind_args = [
'-v',
'--trace-children=yes',
'--track-fds=yes',
'--leak-check=full',
'--gen-suppressions=all',
'--show-leak-kinds=definite',
'--errors-for-leak-kinds=definite',
f'--log-file={logfile}',
'--error-exitcode=123',
]
# Perform all tests with cwd set to the cls.path_tmp, created above.
cls.original_working_directory = os.getcwd()
os.chdir(cls.path_tmp)
@classmethod
def tearDownClass(cls):
'''
Clean up after ourselves,
Delete the generated tmp directory.
'''
print("")
# Restore current working directory before deleting cls.path_tmp.
os.chdir(cls.original_working_directory)
if None == os.getenv("KEEPTEMP"):
try:
shutil.rmtree(cls.path_tmp)
print("Removed tmp directory: {}".format(cls.path_tmp))
except Exception:
print("No tmp directory to clean up.")
def setUp(self):
print('\n')
def tearDown(self):
print('\n')
def execute(self, cmd: list, try_valgrind: bool = True) -> CmdResult:
'''
Execute a subprocess.Popen list of commands.
Return a tuple of
'''
# Use valgrind if we have it.
if try_valgrind and self.valgrind != '':
cmd = [str(self.valgrind),] + self.valgrind_args + cmd
print(f"Running: {' '.join(cmd)}\n")
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
# Check the valgrind log for errors.
if try_valgrind and self.valgrind != '':
self.verify_valgrind_log()
return CmdResult(p.returncode, out, err)
def verify_valgrind_log(self, log_file: Union[Path, None]=None):
'''
Check if valgrind log file contains errors.
If valgrind not enabled this is basically a nop.
'''
if self.valgrind == "":
return
if log_file == None:
log_file = self.path_tmp / 'valgrind.log'
if not log_file.exists():
raise AssertionError('{} not found. Valgrind failed to run?'.format(log_file))
errors = False
print('Verifying {}...'.format(log_file))
try:
with log_file.open('r') as the_log:
assert 'ERROR SUMMARY: 0 errors' not in the_log
except AssertionError:
print("*" * 80)
print('Valgrind test failed!'.center(80, ' '))
print('Please submit a bug report with this log to https://gitlab.com/bzip2/bzip2/issues'.center(69, ' '))
print(str(log_file).center(80, ' '))
print("*" * 80)
errors = True
finally:
with log_file.open('r') as log:
found_summary = False
for line in log.readlines():
if 'ERROR SUMMARY' in line:
found_summary = True
if (found_summary or errors) and len(line) < 500:
print(line.rstrip('\n'))
if errors:
raise AssertionError('Valgrind test FAILED!')
@staticmethod
def hex_compare(actual: bytes, expected: bytes, size: int = 16):
'''
Return string with hex comparison of two buffers
'''
a_lines = ceil(float(len(actual)) / float(size))
e_lines = ceil(float(len(expected)) / float(size))
lines = max(a_lines, e_lines)
comparison = ' ' + \
f'output ({len(actual)}):'.ljust(size*2 + 3) + \
f'expected ({len(expected)}):\n'
def render_slice(to_print, to_compare):
line = ''
for byte in range(0, size):
if byte == size / 2:
line += ' '
if byte < len(to_print):
if byte >= len(to_compare) or to_print[byte] != to_compare[byte]:
line += '\x1b[1;33m{:02x}\x1b[0m'.format(to_print[byte]) # bold yellow
else:
line += '{:02x}'.format(to_print[byte]) # plain
else:
line += ' '
return line
prev_is_dots = False
for line in range(0, lines):
a_line = actual[line * size : line * size + size]
e_line = expected[line * size : line * size + size]
if a_line == e_line:
if prev_is_dots == False:
comparison += " ...\n"
prev_is_dots = True
else:
text_line = '{:8d}: {} {}'.format(
line * size,
render_slice(a_line, e_line),
render_slice(e_line, a_line)
)
comparison += text_line + '\n'
prev_is_dots = False
return comparison + '\n'