112 lines
2.5 KiB
Python
112 lines
2.5 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import datetime
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
|
|
GCOV_SRC_DIR = "/home/test/sqlite3-src/build"
|
|
GCDA_FILES = [
|
|
"/home/test/sqlite3-src/build/sqlite3-shell.gcda",
|
|
"/home/test/sqlite3-src/build/sqlite3-sqlite3.gcda",
|
|
"/home/test/sqlite3-src/build/lemon.gcda",
|
|
"/home/test/sqlite3-src/build/mkkeywordhash.gcda",
|
|
]
|
|
|
|
_TIMEOUT = 5
|
|
|
|
|
|
def _build_args(binary: str, flag: str) -> list[str]:
|
|
if not flag:
|
|
return [binary]
|
|
return [binary] + shlex.split(flag)
|
|
|
|
|
|
def run_sql(binary: str, sql: str, flag: str):
|
|
result = subprocess.run(
|
|
_build_args(binary, flag),
|
|
input=sql,
|
|
capture_output=True,
|
|
text=True,
|
|
encoding='latin-1',
|
|
errors='replace',
|
|
timeout=_TIMEOUT,
|
|
)
|
|
return result.stdout.strip(), result.stderr.strip(), result.returncode
|
|
|
|
|
|
def run_sql_parallel(binary_a: str, binary_b: str, sql: str, flag: str):
|
|
args_a = _build_args(binary_a, flag)
|
|
args_b = _build_args(binary_b, flag)
|
|
|
|
pa = subprocess.Popen(
|
|
args_a,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
encoding='latin-1',
|
|
errors='replace',
|
|
)
|
|
pb = subprocess.Popen(
|
|
args_b,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
encoding='latin-1',
|
|
errors='replace',
|
|
)
|
|
|
|
try:
|
|
a_out, a_err = pa.communicate(input=sql, timeout=_TIMEOUT)
|
|
except subprocess.TimeoutExpired:
|
|
pa.kill()
|
|
pa.wait()
|
|
pb.kill()
|
|
pb.wait()
|
|
raise
|
|
except Exception:
|
|
pa.kill()
|
|
pa.wait()
|
|
pb.kill()
|
|
pb.wait()
|
|
raise
|
|
|
|
try:
|
|
b_out, b_err = pb.communicate(input=sql, timeout=_TIMEOUT)
|
|
except subprocess.TimeoutExpired:
|
|
pb.kill()
|
|
pb.wait()
|
|
raise
|
|
|
|
return (a_out.strip(), a_err.strip(), pa.returncode, b_out.strip(),
|
|
b_err.strip(), pb.returncode)
|
|
|
|
|
|
def reset_coverage():
|
|
for gcda in GCDA_FILES:
|
|
if os.path.exists(gcda):
|
|
os.remove(gcda)
|
|
|
|
print(f'{datetime.datetime.now()}: [INFO] All existing .gcda files have been deleted.')
|
|
|
|
|
|
def collect_coverage(output_path: str):
|
|
result = subprocess.run([
|
|
"gcovr",
|
|
"--root",
|
|
GCOV_SRC_DIR,
|
|
"--json-pretty",
|
|
"--output",
|
|
output_path,
|
|
"--gcov-ignore-errors=source_not_found",
|
|
"--gcov-ignore-parse-errors",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=GCOV_SRC_DIR)
|
|
|
|
if result.returncode != 0:
|
|
print(f'{datetime.datetime.now()}: [ERROR] gcovr failed_\n{result.stderr}')
|