Files
ast-project/part1/mutator_equivalent_rewrites.py
T
2026-06-24 13:47:14 +02:00

445 lines
12 KiB
Python

import random
import re
import mutator_extra_statements
from sqlite_static_helper import *
_TOKEN_RE = re.compile(
r"""
(?P<line_comment>--[^\n]*) |
(?P<block_comment>/\*.*?\*/) |
(?P<string>'(?:[^']|'')*') |
(?P<dquoted>"(?:[^"]|"")*") |
(?P<bracket>\[[^\]]*\]) |
(?P<backtick>`(?:[^`]|``)*`) |
(?P<blob>[xX]'[0-9a-fA-F]*') |
(?P<number>\b\d+(?:\.\d+)?(?:[eE][+-]?\d+)?\b) |
(?P<ident>[A-Za-z_][A-Za-z0-9_]*) |
(?P<op>[<>!=]=|<>|\|\||::|->>?|[+\-*/%<>=&|^~,.;()@]) |
(?P<ws>\s+) |
(?P<other>.)
""",
re.VERBOSE | re.DOTALL,
)
_UNSAFE_KINDS = {
"line_comment", "block_comment", "string", "dquoted", "bracket", "backtick",
"blob"
}
def _tokenize(s: str):
return [(m.lastgroup, m.group()) for m in _TOKEN_RE.finditer(s)]
def _sub_in_safe(s: str, pattern: re.Pattern, repl, max_subs: int = 1) -> str:
if max_subs <= 0:
return s
parts: list[str] = []
done = 0
for kind, text in _tokenize(s):
if done >= max_subs or kind in _UNSAFE_KINDS:
parts.append(text)
continue
new_text, n = pattern.subn(repl, text, count=max_subs - done)
parts.append(new_text)
done += n
return ''.join(parts)
_BETWEEN_RE = re.compile(
r'\b([A-Za-z_]\w*)\s+BETWEEN\s+(-?\d+(?:\.\d+)?)\s+AND\s+(-?\d+(?:\.\d+)?)',
re.I,
)
def mut_between_to_and(s: str) -> str:
"""`x BETWEEN a AND b` -> `(x >= a AND x <= b)`."""
def repl(m: re.Match) -> str:
col, lo, hi = m.group(1), m.group(2), m.group(3)
return f'({col} >= {lo} AND {col} <= {hi})'
return _sub_in_safe(s, _BETWEEN_RE, repl, 1)
_RANGE_AND_RE = re.compile(
r'\(\s*([A-Za-z_]\w*)\s*>=\s*(-?\d+(?:\.\d+)?)\s+AND\s+'
r'\1\s*<=\s*(-?\d+(?:\.\d+)?)\s*\)',
re.I,
)
def mut_and_to_between(s: str) -> str:
"""`(x >= a AND x <= b)` -> `x BETWEEN a AND b`."""
def repl(m: re.Match) -> str:
return f'{m.group(1)} BETWEEN {m.group(2)} AND {m.group(3)}'
return _sub_in_safe(s, _RANGE_AND_RE, repl, 1)
_IN_LIST_RE = re.compile(
r'\b([A-Za-z_]\w*)\s+IN\s*\(\s*([^()]*?)\s*\)',
re.I,
)
def mut_in_to_or(s: str) -> str:
"""`x IN (a, b, c)` -> `(x = a OR x = b OR x = c)`"""
def repl(m: re.Match) -> str:
col, body = m.group(1), m.group(2)
if re.search(r'\bSELECT\b', body, re.I):
return m.group(0)
items = [p.strip() for p in body.split(',') if p.strip()]
if not items or len(items) > 16:
return m.group(0)
return '(' + ' OR '.join(f'{col} = {v}' for v in items) + ')'
return _sub_in_safe(s, _IN_LIST_RE, repl, 1)
def mut_arithmetic_identity(s: str) -> str:
"""`<n>` -> `(<n> + 0)` / `(<n> * 1)` / `(<n> - 0)`."""
pat = re.compile(r'\b(-?\d+(?:\.\d+)?)\b')
forms = ['({v} + 0)', '({v} * 1)', '({v} - 0)', '({v} + 0.0)', '(0 + {v})']
def repl(m: re.Match) -> str:
return random.choice(forms).format(v=m.group(1))
return _sub_in_safe(s, pat, repl, 1)
def mut_double_negate(s: str) -> str:
"""`<n>` -> `-(-<n>)`"""
pat = re.compile(r'\b(-?\d+(?:\.\d+)?)\b')
return _sub_in_safe(s, pat, lambda m: f'-(-({m.group(1)}))', 1)
def mut_string_concat_identity(s: str) -> str:
"""`'abc'` -> `('abc' || '')`."""
out: list[str] = []
done = False
for kind, text in _tokenize(s):
if not done and kind == 'string' and len(text) >= 2:
out.append(random.choice([f"({text} || '')", f"'' || ({text})"]))
done = True
else:
out.append(text)
return ''.join(out)
def mut_double_not(s: str) -> str:
"""`WHERE p` -> `WHERE NOT NOT p`"""
m = re.search(
r'\bWHERE\b\s+([^;]+?)(?=\bGROUP\b|\bORDER\b|\bLIMIT\b|\bHAVING\b|\bRETURNING\b|;|$)',
s, re.I)
if not m:
return s
pred = m.group(1).strip()
if not pred:
return s
new = f'WHERE NOT NOT ({pred}) '
return s[:m.start()] + new + s[m.end():]
def mut_de_morgan(s: str) -> str:
"""`NOT (a AND b)` -> `(NOT a OR NOT b)`"""
pat = re.compile(r'NOT\s*\(\s*([^()]+?)\s+AND\s+([^()]+?)\s*\)', re.I)
def repl(m: re.Match) -> str:
return f'(NOT ({m.group(1)}) OR NOT ({m.group(2)}))'
return _sub_in_safe(s, pat, repl, 1)
def mut_tautology_and(s: str) -> str:
"""`WHERE p` -> `WHERE p AND 1=1`"""
if not re.search(r'\bWHERE\b', s, re.I):
return s
pat = re.compile(
r'(\bWHERE\b\s+[^;]+?)(?=\bGROUP\b|\bORDER\b|\bLIMIT\b|\bHAVING\b|\bRETURNING\b|;|$)',
re.I)
tauto = random.choice(['1=1', "'a'='a'", 'NULL IS NULL', 'TRUE'])
return _sub_in_safe(s, pat, lambda m: f'{m.group(1).rstrip()} AND {tauto} ',
1)
def mut_tautology_or(s: str) -> str:
"""`WHERE p` -> `WHERE p OR 1=0`"""
if not re.search(r'\bWHERE\b', s, re.I):
return s
pat = re.compile(
r'(\bWHERE\b\s+[^;]+?)(?=\bGROUP\b|\bORDER\b|\bLIMIT\b|\bHAVING\b|\bRETURNING\b|;|$)',
re.I)
contradiction = random.choice(['1=0', "'a'='b'", 'NULL IS NOT NULL', 'FALSE'])
return _sub_in_safe(s, pat,
lambda m: f'({m.group(1).rstrip()}) OR {contradiction} ',
1)
def mut_is_null_to_isnull(s: str) -> str:
"""`x IS NULL` -> `x ISNULL` or `x IS NOT NULL` -> `x NOTNULL`"""
if random.random() < 0.5:
return _sub_in_safe(s, re.compile(r'\bIS\s+NULL\b', re.I), 'ISNULL', 1)
return _sub_in_safe(s, re.compile(r'\bIS\s+NOT\s+NULL\b', re.I), 'NOTNULL', 1)
def mut_eq_to_is(s: str) -> str:
"""`x = NULL` -> `x IS NULL`"""
return _sub_in_safe(s, re.compile(r'=\s*NULL\b', re.I), 'IS NULL', 1)
def mut_inject_null_check(s: str) -> str:
"""Append `AND <col> IS NOT NULL` to a `WHERE`"""
if not re.search(r'\bWHERE\b', s, re.I):
return s
_, col = mutator_extra_statements._pick_table_col(s)
pat = re.compile(
r'(\bWHERE\b\s+[^;]+?)(?=\bGROUP\b|\bORDER\b|\bLIMIT\b|\bHAVING\b|\bRETURNING\b|;|$)',
re.I)
return _sub_in_safe(s, pat,
lambda m: f'{m.group(1).rstrip()} AND {col} IS NOT NULL ',
1)
def mut_add_indexed_by(s: str) -> str:
"""Tag a table reference with `NOT INDEXED` or `INDEXED BY`"""
schema = mutator_extra_statements.get_schema(s)
if not schema:
return s
table = random.choice(list(schema.keys()))
hint = random.choice([
'NOT INDEXED',
f'INDEXED BY sqlite_autoindex_{table}_1',
])
pat = re.compile(rf'\bFROM\s+{re.escape(table)}\b(?!\s+(?:AS|NOT|INDEXED))',
re.I)
return _sub_in_safe(s, pat, lambda m: f'{m.group(0)} {hint}', 1)
def mut_qualify_with_main(s: str) -> str:
"""Qualify a bare table name with `main.` schema prefix."""
schema = mutator_extra_statements.get_schema(s)
if not schema:
return s
table = random.choice(list(schema.keys()))
pat = re.compile(rf'(?<![.\w])\b{re.escape(table)}\b', re.I)
return _sub_in_safe(s, pat, lambda m: f'main.{m.group(0)}', 1)
def mut_predicate_pushdown_blocker(s: str) -> str:
"""Prefix a column reference in `WHERE` with `+`"""
span = re.search(r'\bWHERE\b\s+([A-Za-z_]\w*)\s*(=|<|>|<=|>=|<>|!=)', s, re.I)
if not span:
return s
col = span.group(1)
return s[:span.start(1)] + f'+{col}' + s[span.end(1):]
def mut_subquery_in_from(s: str) -> str:
"""`FROM <table>` -> `FROM (SELECT * FROM <table>)`"""
schema = mutator_extra_statements.get_schema(s)
if not schema:
return s
table = random.choice(list(schema.keys()))
pat = re.compile(rf'\bFROM\s+{re.escape(table)}\b(?!\s*\.)', re.I)
alias = f't_{random.randint(0, 999)}'
return _sub_in_safe(
s,
pat,
lambda m: f'FROM (SELECT * FROM {table}) AS {alias}',
1,
)
def mut_force_materialization(s: str) -> str:
"""`SELECT * FROM users WHERE age > 18 ORDER BY name;` -> `WITH _m AS MATERIALIZED (SELECT * FROM users) SELECT * FROM _m WHERE age > 18 ORDER BY name;`"""
schema = mutator_extra_statements.get_schema(s)
if not schema:
return s
table = random.choice(list(schema.keys()))
pat = re.compile(rf'(\bSELECT\b[^;]*?\bFROM\s+){re.escape(table)}\b', re.I)
cte = (f'WITH _m AS MATERIALIZED (SELECT * FROM {table}) ')
m = pat.search(s)
if not m:
return s
sel_start = s.rfind('SELECT', 0, m.start() + 1)
if sel_start == -1:
sel_start = m.start()
head = s[:sel_start]
tail = s[sel_start:].replace(table, '_m', 1)
return head + cte + tail
def mut_comma_join_to_explicit(s: str) -> str:
"""`FROM a, b WHERE a.x = b.x` -> `FROM a JOIN b ON a.x = b.x`"""
pat = re.compile(r'\bFROM\s+([A-Za-z_]\w*)\s*,\s*([A-Za-z_]\w*)', re.I)
return _sub_in_safe(
s,
pat,
lambda m: f'FROM {m.group(1)} CROSS JOIN {m.group(2)}',
1,
)
def mut_swap_on_to_using(s: str) -> str:
"""`JOIN t ON a.col = b.col` -> `JOIN t USING (col)` when columns match."""
pat = re.compile(
r'JOIN\s+([A-Za-z_]\w*)\s+(?:AS\s+\w+\s+)?ON\s+'
r'(\w+)\.([A-Za-z_]\w*)\s*=\s*(\w+)\.\3\b',
re.I,
)
return _sub_in_safe(s, pat,
lambda m: f'JOIN {m.group(1)} USING ({m.group(3)})', 1)
def mut_inject_empty_block_comment(s: str) -> str:
"""Insert an empty `/**/` between two top-level tokens."""
tokens = _tokenize(s)
if len(tokens) < 4:
return s
candidates = [i for i, (k, _) in enumerate(tokens) if k == 'ws']
if not candidates:
return s
i = random.choice(candidates)
tokens[i] = ('ws', ' /**/ ')
return ''.join(t for _, t in tokens)
def mut_inject_non_empty_block_comment(s: str) -> str:
"""Insert a `/* <value> */` between two top-level tokens."""
tokens = _tokenize(s)
if len(tokens) < 4:
return s
candidates = [i for i, (k, _) in enumerate(tokens) if k == 'ws']
if not candidates:
return s
i = random.choice(candidates)
tokens[i] = ('ws', f' /* {str(random.choice(VALUES))} */ ')
return ''.join(t for _, t in tokens)
def mut_inject_line_comment(s: str) -> str:
"""Insert `-- <comment>;\\n` between two statements."""
if ';' not in s:
return s
pat = re.compile(r';(\s*)')
msg = str(random.choice(VALUES))
return _sub_in_safe(s, pat, lambda m: f';\n-- {msg}\n', 1)
def mut_redundant_parens(s: str) -> str:
"""Wrap a numeric or column reference in extra parens."""
pat = re.compile(r'\b([A-Za-z_]\w*|\d+)\b')
return _sub_in_safe(s, pat, lambda m: f'(({m.group(1)}))', 1)
def mut_quote_identifier(s: str) -> str:
"""Surround a table name with quotes"""
schema = mutator_extra_statements.get_schema(s)
if not schema:
return s
table = random.choice(list(schema.keys()))
quote = random.choice(['"', '`', '['])
close = {'"': '"', '`': '`', '[': ']'}[quote]
pat = re.compile(rf'(?<![.\w]){re.escape(table)}\b')
return _sub_in_safe(s, pat, lambda m: f'{quote}{table}{close}', 1)
def mut_memeify(s: str) -> str:
"""Randomly mixed-case a keyword: `SELECT` -> `SeLeCt`"""
keywords = [
'SELECT', 'FROM', 'WHERE', 'ORDER', 'GROUP', 'JOIN', 'UNION', 'INSERT',
'UPDATE', 'DELETE', 'AND', 'OR', 'NOT', 'LIMIT', 'OFFSET', 'HAVING', 'CASE',
'WHEN', 'THEN', 'END'
]
kw = random.choice(keywords)
pat = re.compile(rf'\b{kw}\b', re.I)
def jitter(m: re.Match) -> str:
return ''.join(
c.upper() if random.random() < 0.5 else c.lower() for c in m.group(0))
return _sub_in_safe(s, pat, jitter, 1)
def mut_swap_whitespace(s: str) -> str:
"""Swap one whitespace character with another"""
tokens = _tokenize(s)
candidates = [i for i, (k, t) in enumerate(tokens) if k == 'ws' and ' ' in t]
if not candidates:
return s
i = random.choice(candidates)
repl = random.choice(
['\t', '\n', ' ', '\u00a0' if random.random() < 0.25 else ' '])
tokens[i] = ('ws', tokens[i][1].replace(' ', repl, 1))
return ''.join(t for _, t in tokens)
def mut_to_case_expr(s: str) -> str:
"""`x = a` (in a `SELECT` list) -> `CASE WHEN x = a THEN 1 ELSE 0 END`"""
pat = re.compile(r'\b([A-Za-z_]\w*)\s*=\s*(-?\d+)\b')
return _sub_in_safe(
s,
pat,
lambda m: f'CASE WHEN {m.group(1)} = {m.group(2)} THEN 1 ELSE 0 END',
1,
)
def mut_iif_to_case(s: str) -> str:
"""`IIF(a, b, c)` -> `CASE WHEN a THEN b ELSE c END`."""
pat = re.compile(r'\bIIF\s*\(\s*([^,()]+),\s*([^,()]+),\s*([^,()]+)\s*\)',
re.I)
return _sub_in_safe(
s,
pat,
lambda m: f'CASE WHEN {m.group(1)} THEN {m.group(2)} ELSE {m.group(3)} END',
1,
)
def mut_case_to_iif(s: str) -> str:
"""`CASE WHEN a THEN b ELSE c END` -> `IIF(a, b, c)`."""
pat = re.compile(
r'\bCASE\s+WHEN\s+([^()]+?)\s+THEN\s+([^()]+?)\s+ELSE\s+([^()]+?)\s+END\b',
re.I,
)
return _sub_in_safe(
s,
pat,
lambda m: f'IIF({m.group(1)}, {m.group(2)}, {m.group(3)})',
1,
)