445 lines
12 KiB
Python
445 lines
12 KiB
Python
import random
|
|
import re
|
|
|
|
import mutator_extra_statements
|
|
from sqlite_static_helper import *
|
|
|
|
_TOKEN_RE = re.compile(
|
|
r"""
|
|
(?P<line_comment>--[^\n]*) |
|
|
(?P<block_comment>/\*.*?\*/) |
|
|
(?P<string>'(?:[^']|'')*') |
|
|
(?P<dquoted>"(?:[^"]|"")*") |
|
|
(?P<bracket>\[[^\]]*\]) |
|
|
(?P<backtick>`(?:[^`]|``)*`) |
|
|
(?P<blob>[xX]'[0-9a-fA-F]*') |
|
|
(?P<number>\b\d+(?:\.\d+)?(?:[eE][+-]?\d+)?\b) |
|
|
(?P<ident>[A-Za-z_][A-Za-z0-9_]*) |
|
|
(?P<op>[<>!=]=|<>|\|\||::|->>?|[+\-*/%<>=&|^~,.;()@]) |
|
|
(?P<ws>\s+) |
|
|
(?P<other>.)
|
|
""",
|
|
re.VERBOSE | re.DOTALL,
|
|
)
|
|
|
|
_UNSAFE_KINDS = {
|
|
"line_comment", "block_comment", "string", "dquoted", "bracket", "backtick",
|
|
"blob"
|
|
}
|
|
|
|
|
|
def _tokenize(s: str):
|
|
return [(m.lastgroup, m.group()) for m in _TOKEN_RE.finditer(s)]
|
|
|
|
|
|
def _sub_in_safe(s: str, pattern: re.Pattern, repl, max_subs: int = 1) -> str:
|
|
if max_subs <= 0:
|
|
return s
|
|
parts: list[str] = []
|
|
done = 0
|
|
for kind, text in _tokenize(s):
|
|
if done >= max_subs or kind in _UNSAFE_KINDS:
|
|
parts.append(text)
|
|
continue
|
|
new_text, n = pattern.subn(repl, text, count=max_subs - done)
|
|
parts.append(new_text)
|
|
done += n
|
|
return ''.join(parts)
|
|
|
|
|
|
_BETWEEN_RE = re.compile(
|
|
r'\b([A-Za-z_]\w*)\s+BETWEEN\s+(-?\d+(?:\.\d+)?)\s+AND\s+(-?\d+(?:\.\d+)?)',
|
|
re.I,
|
|
)
|
|
|
|
|
|
def mut_between_to_and(s: str) -> str:
|
|
"""`x BETWEEN a AND b` -> `(x >= a AND x <= b)`."""
|
|
|
|
def repl(m: re.Match) -> str:
|
|
col, lo, hi = m.group(1), m.group(2), m.group(3)
|
|
return f'({col} >= {lo} AND {col} <= {hi})'
|
|
|
|
return _sub_in_safe(s, _BETWEEN_RE, repl, 1)
|
|
|
|
|
|
_RANGE_AND_RE = re.compile(
|
|
r'\(\s*([A-Za-z_]\w*)\s*>=\s*(-?\d+(?:\.\d+)?)\s+AND\s+'
|
|
r'\1\s*<=\s*(-?\d+(?:\.\d+)?)\s*\)',
|
|
re.I,
|
|
)
|
|
|
|
|
|
def mut_and_to_between(s: str) -> str:
|
|
"""`(x >= a AND x <= b)` -> `x BETWEEN a AND b`."""
|
|
|
|
def repl(m: re.Match) -> str:
|
|
return f'{m.group(1)} BETWEEN {m.group(2)} AND {m.group(3)}'
|
|
|
|
return _sub_in_safe(s, _RANGE_AND_RE, repl, 1)
|
|
|
|
|
|
_IN_LIST_RE = re.compile(
|
|
r'\b([A-Za-z_]\w*)\s+IN\s*\(\s*([^()]*?)\s*\)',
|
|
re.I,
|
|
)
|
|
|
|
|
|
def mut_in_to_or(s: str) -> str:
|
|
"""`x IN (a, b, c)` -> `(x = a OR x = b OR x = c)`"""
|
|
|
|
def repl(m: re.Match) -> str:
|
|
col, body = m.group(1), m.group(2)
|
|
if re.search(r'\bSELECT\b', body, re.I):
|
|
return m.group(0)
|
|
items = [p.strip() for p in body.split(',') if p.strip()]
|
|
if not items or len(items) > 16:
|
|
return m.group(0)
|
|
return '(' + ' OR '.join(f'{col} = {v}' for v in items) + ')'
|
|
|
|
return _sub_in_safe(s, _IN_LIST_RE, repl, 1)
|
|
|
|
|
|
def mut_arithmetic_identity(s: str) -> str:
|
|
"""`<n>` -> `(<n> + 0)` / `(<n> * 1)` / `(<n> - 0)`."""
|
|
pat = re.compile(r'\b(-?\d+(?:\.\d+)?)\b')
|
|
forms = ['({v} + 0)', '({v} * 1)', '({v} - 0)', '({v} + 0.0)', '(0 + {v})']
|
|
|
|
def repl(m: re.Match) -> str:
|
|
return random.choice(forms).format(v=m.group(1))
|
|
|
|
return _sub_in_safe(s, pat, repl, 1)
|
|
|
|
|
|
def mut_double_negate(s: str) -> str:
|
|
"""`<n>` -> `-(-<n>)`"""
|
|
pat = re.compile(r'\b(-?\d+(?:\.\d+)?)\b')
|
|
return _sub_in_safe(s, pat, lambda m: f'-(-({m.group(1)}))', 1)
|
|
|
|
|
|
def mut_string_concat_identity(s: str) -> str:
|
|
"""`'abc'` -> `('abc' || '')`."""
|
|
out: list[str] = []
|
|
done = False
|
|
for kind, text in _tokenize(s):
|
|
if not done and kind == 'string' and len(text) >= 2:
|
|
out.append(random.choice([f"({text} || '')", f"'' || ({text})"]))
|
|
done = True
|
|
else:
|
|
out.append(text)
|
|
return ''.join(out)
|
|
|
|
|
|
def mut_double_not(s: str) -> str:
|
|
"""`WHERE p` -> `WHERE NOT NOT p`"""
|
|
m = re.search(
|
|
r'\bWHERE\b\s+([^;]+?)(?=\bGROUP\b|\bORDER\b|\bLIMIT\b|\bHAVING\b|\bRETURNING\b|;|$)',
|
|
s, re.I)
|
|
if not m:
|
|
return s
|
|
pred = m.group(1).strip()
|
|
if not pred:
|
|
return s
|
|
new = f'WHERE NOT NOT ({pred}) '
|
|
return s[:m.start()] + new + s[m.end():]
|
|
|
|
|
|
def mut_de_morgan(s: str) -> str:
|
|
"""`NOT (a AND b)` -> `(NOT a OR NOT b)`"""
|
|
pat = re.compile(r'NOT\s*\(\s*([^()]+?)\s+AND\s+([^()]+?)\s*\)', re.I)
|
|
|
|
def repl(m: re.Match) -> str:
|
|
return f'(NOT ({m.group(1)}) OR NOT ({m.group(2)}))'
|
|
|
|
return _sub_in_safe(s, pat, repl, 1)
|
|
|
|
|
|
def mut_tautology_and(s: str) -> str:
|
|
"""`WHERE p` -> `WHERE p AND 1=1`"""
|
|
if not re.search(r'\bWHERE\b', s, re.I):
|
|
return s
|
|
pat = re.compile(
|
|
r'(\bWHERE\b\s+[^;]+?)(?=\bGROUP\b|\bORDER\b|\bLIMIT\b|\bHAVING\b|\bRETURNING\b|;|$)',
|
|
re.I)
|
|
tauto = random.choice(['1=1', "'a'='a'", 'NULL IS NULL', 'TRUE'])
|
|
return _sub_in_safe(s, pat, lambda m: f'{m.group(1).rstrip()} AND {tauto} ',
|
|
1)
|
|
|
|
|
|
def mut_tautology_or(s: str) -> str:
|
|
"""`WHERE p` -> `WHERE p OR 1=0`"""
|
|
if not re.search(r'\bWHERE\b', s, re.I):
|
|
return s
|
|
pat = re.compile(
|
|
r'(\bWHERE\b\s+[^;]+?)(?=\bGROUP\b|\bORDER\b|\bLIMIT\b|\bHAVING\b|\bRETURNING\b|;|$)',
|
|
re.I)
|
|
contradiction = random.choice(['1=0', "'a'='b'", 'NULL IS NOT NULL', 'FALSE'])
|
|
return _sub_in_safe(s, pat,
|
|
lambda m: f'({m.group(1).rstrip()}) OR {contradiction} ',
|
|
1)
|
|
|
|
|
|
def mut_is_null_to_isnull(s: str) -> str:
|
|
"""`x IS NULL` -> `x ISNULL` or `x IS NOT NULL` -> `x NOTNULL`"""
|
|
if random.random() < 0.5:
|
|
return _sub_in_safe(s, re.compile(r'\bIS\s+NULL\b', re.I), 'ISNULL', 1)
|
|
return _sub_in_safe(s, re.compile(r'\bIS\s+NOT\s+NULL\b', re.I), 'NOTNULL', 1)
|
|
|
|
|
|
def mut_eq_to_is(s: str) -> str:
|
|
"""`x = NULL` -> `x IS NULL`"""
|
|
return _sub_in_safe(s, re.compile(r'=\s*NULL\b', re.I), 'IS NULL', 1)
|
|
|
|
|
|
def mut_inject_null_check(s: str) -> str:
|
|
"""Append `AND <col> IS NOT NULL` to a `WHERE`"""
|
|
if not re.search(r'\bWHERE\b', s, re.I):
|
|
return s
|
|
_, col = mutator_extra_statements._pick_table_col(s)
|
|
pat = re.compile(
|
|
r'(\bWHERE\b\s+[^;]+?)(?=\bGROUP\b|\bORDER\b|\bLIMIT\b|\bHAVING\b|\bRETURNING\b|;|$)',
|
|
re.I)
|
|
return _sub_in_safe(s, pat,
|
|
lambda m: f'{m.group(1).rstrip()} AND {col} IS NOT NULL ',
|
|
1)
|
|
|
|
|
|
def mut_add_indexed_by(s: str) -> str:
|
|
"""Tag a table reference with `NOT INDEXED` or `INDEXED BY`"""
|
|
|
|
schema = mutator_extra_statements.get_schema(s)
|
|
if not schema:
|
|
return s
|
|
table = random.choice(list(schema.keys()))
|
|
hint = random.choice([
|
|
'NOT INDEXED',
|
|
f'INDEXED BY sqlite_autoindex_{table}_1',
|
|
])
|
|
pat = re.compile(rf'\bFROM\s+{re.escape(table)}\b(?!\s+(?:AS|NOT|INDEXED))',
|
|
re.I)
|
|
|
|
return _sub_in_safe(s, pat, lambda m: f'{m.group(0)} {hint}', 1)
|
|
|
|
|
|
def mut_qualify_with_main(s: str) -> str:
|
|
"""Qualify a bare table name with `main.` schema prefix."""
|
|
|
|
schema = mutator_extra_statements.get_schema(s)
|
|
if not schema:
|
|
return s
|
|
table = random.choice(list(schema.keys()))
|
|
pat = re.compile(rf'(?<![.\w])\b{re.escape(table)}\b', re.I)
|
|
|
|
return _sub_in_safe(s, pat, lambda m: f'main.{m.group(0)}', 1)
|
|
|
|
|
|
def mut_predicate_pushdown_blocker(s: str) -> str:
|
|
"""Prefix a column reference in `WHERE` with `+`"""
|
|
|
|
span = re.search(r'\bWHERE\b\s+([A-Za-z_]\w*)\s*(=|<|>|<=|>=|<>|!=)', s, re.I)
|
|
if not span:
|
|
return s
|
|
col = span.group(1)
|
|
|
|
return s[:span.start(1)] + f'+{col}' + s[span.end(1):]
|
|
|
|
|
|
def mut_subquery_in_from(s: str) -> str:
|
|
"""`FROM <table>` -> `FROM (SELECT * FROM <table>)`"""
|
|
|
|
schema = mutator_extra_statements.get_schema(s)
|
|
if not schema:
|
|
return s
|
|
table = random.choice(list(schema.keys()))
|
|
pat = re.compile(rf'\bFROM\s+{re.escape(table)}\b(?!\s*\.)', re.I)
|
|
alias = f't_{random.randint(0, 999)}'
|
|
|
|
return _sub_in_safe(
|
|
s,
|
|
pat,
|
|
lambda m: f'FROM (SELECT * FROM {table}) AS {alias}',
|
|
1,
|
|
)
|
|
|
|
|
|
def mut_force_materialization(s: str) -> str:
|
|
"""`SELECT * FROM users WHERE age > 18 ORDER BY name;` -> `WITH _m AS MATERIALIZED (SELECT * FROM users) SELECT * FROM _m WHERE age > 18 ORDER BY name;`"""
|
|
|
|
schema = mutator_extra_statements.get_schema(s)
|
|
if not schema:
|
|
return s
|
|
table = random.choice(list(schema.keys()))
|
|
pat = re.compile(rf'(\bSELECT\b[^;]*?\bFROM\s+){re.escape(table)}\b', re.I)
|
|
cte = (f'WITH _m AS MATERIALIZED (SELECT * FROM {table}) ')
|
|
m = pat.search(s)
|
|
if not m:
|
|
return s
|
|
sel_start = s.rfind('SELECT', 0, m.start() + 1)
|
|
if sel_start == -1:
|
|
sel_start = m.start()
|
|
head = s[:sel_start]
|
|
tail = s[sel_start:].replace(table, '_m', 1)
|
|
|
|
return head + cte + tail
|
|
|
|
|
|
def mut_comma_join_to_explicit(s: str) -> str:
|
|
"""`FROM a, b WHERE a.x = b.x` -> `FROM a JOIN b ON a.x = b.x`"""
|
|
|
|
pat = re.compile(r'\bFROM\s+([A-Za-z_]\w*)\s*,\s*([A-Za-z_]\w*)', re.I)
|
|
|
|
return _sub_in_safe(
|
|
s,
|
|
pat,
|
|
lambda m: f'FROM {m.group(1)} CROSS JOIN {m.group(2)}',
|
|
1,
|
|
)
|
|
|
|
|
|
def mut_swap_on_to_using(s: str) -> str:
|
|
"""`JOIN t ON a.col = b.col` -> `JOIN t USING (col)` when columns match."""
|
|
pat = re.compile(
|
|
r'JOIN\s+([A-Za-z_]\w*)\s+(?:AS\s+\w+\s+)?ON\s+'
|
|
r'(\w+)\.([A-Za-z_]\w*)\s*=\s*(\w+)\.\3\b',
|
|
re.I,
|
|
)
|
|
return _sub_in_safe(s, pat,
|
|
lambda m: f'JOIN {m.group(1)} USING ({m.group(3)})', 1)
|
|
|
|
|
|
def mut_inject_empty_block_comment(s: str) -> str:
|
|
"""Insert an empty `/**/` between two top-level tokens."""
|
|
|
|
tokens = _tokenize(s)
|
|
if len(tokens) < 4:
|
|
return s
|
|
candidates = [i for i, (k, _) in enumerate(tokens) if k == 'ws']
|
|
if not candidates:
|
|
return s
|
|
i = random.choice(candidates)
|
|
tokens[i] = ('ws', ' /**/ ')
|
|
|
|
return ''.join(t for _, t in tokens)
|
|
|
|
def mut_inject_non_empty_block_comment(s: str) -> str:
|
|
"""Insert a `/* <value> */` between two top-level tokens."""
|
|
|
|
tokens = _tokenize(s)
|
|
if len(tokens) < 4:
|
|
return s
|
|
candidates = [i for i, (k, _) in enumerate(tokens) if k == 'ws']
|
|
if not candidates:
|
|
return s
|
|
i = random.choice(candidates)
|
|
tokens[i] = ('ws', f' /* {str(random.choice(VALUES))} */ ')
|
|
|
|
return ''.join(t for _, t in tokens)
|
|
|
|
|
|
def mut_inject_line_comment(s: str) -> str:
|
|
"""Insert `-- <comment>;\\n` between two statements."""
|
|
|
|
if ';' not in s:
|
|
return s
|
|
|
|
pat = re.compile(r';(\s*)')
|
|
msg = str(random.choice(VALUES))
|
|
|
|
return _sub_in_safe(s, pat, lambda m: f';\n-- {msg}\n', 1)
|
|
|
|
|
|
def mut_redundant_parens(s: str) -> str:
|
|
"""Wrap a numeric or column reference in extra parens."""
|
|
|
|
pat = re.compile(r'\b([A-Za-z_]\w*|\d+)\b')
|
|
|
|
return _sub_in_safe(s, pat, lambda m: f'(({m.group(1)}))', 1)
|
|
|
|
|
|
def mut_quote_identifier(s: str) -> str:
|
|
"""Surround a table name with quotes"""
|
|
|
|
schema = mutator_extra_statements.get_schema(s)
|
|
if not schema:
|
|
return s
|
|
table = random.choice(list(schema.keys()))
|
|
quote = random.choice(['"', '`', '['])
|
|
close = {'"': '"', '`': '`', '[': ']'}[quote]
|
|
pat = re.compile(rf'(?<![.\w]){re.escape(table)}\b')
|
|
|
|
return _sub_in_safe(s, pat, lambda m: f'{quote}{table}{close}', 1)
|
|
|
|
|
|
def mut_memeify(s: str) -> str:
|
|
"""Randomly mixed-case a keyword: `SELECT` -> `SeLeCt`"""
|
|
keywords = [
|
|
'SELECT', 'FROM', 'WHERE', 'ORDER', 'GROUP', 'JOIN', 'UNION', 'INSERT',
|
|
'UPDATE', 'DELETE', 'AND', 'OR', 'NOT', 'LIMIT', 'OFFSET', 'HAVING', 'CASE',
|
|
'WHEN', 'THEN', 'END'
|
|
]
|
|
kw = random.choice(keywords)
|
|
pat = re.compile(rf'\b{kw}\b', re.I)
|
|
|
|
def jitter(m: re.Match) -> str:
|
|
return ''.join(
|
|
c.upper() if random.random() < 0.5 else c.lower() for c in m.group(0))
|
|
|
|
return _sub_in_safe(s, pat, jitter, 1)
|
|
|
|
|
|
def mut_swap_whitespace(s: str) -> str:
|
|
"""Swap one whitespace character with another"""
|
|
|
|
tokens = _tokenize(s)
|
|
candidates = [i for i, (k, t) in enumerate(tokens) if k == 'ws' and ' ' in t]
|
|
if not candidates:
|
|
return s
|
|
i = random.choice(candidates)
|
|
repl = random.choice(
|
|
['\t', '\n', ' ', '\u00a0' if random.random() < 0.25 else ' '])
|
|
tokens[i] = ('ws', tokens[i][1].replace(' ', repl, 1))
|
|
|
|
return ''.join(t for _, t in tokens)
|
|
|
|
|
|
def mut_to_case_expr(s: str) -> str:
|
|
"""`x = a` (in a `SELECT` list) -> `CASE WHEN x = a THEN 1 ELSE 0 END`"""
|
|
|
|
pat = re.compile(r'\b([A-Za-z_]\w*)\s*=\s*(-?\d+)\b')
|
|
return _sub_in_safe(
|
|
s,
|
|
pat,
|
|
lambda m: f'CASE WHEN {m.group(1)} = {m.group(2)} THEN 1 ELSE 0 END',
|
|
1,
|
|
)
|
|
|
|
|
|
def mut_iif_to_case(s: str) -> str:
|
|
"""`IIF(a, b, c)` -> `CASE WHEN a THEN b ELSE c END`."""
|
|
|
|
pat = re.compile(r'\bIIF\s*\(\s*([^,()]+),\s*([^,()]+),\s*([^,()]+)\s*\)',
|
|
re.I)
|
|
|
|
return _sub_in_safe(
|
|
s,
|
|
pat,
|
|
lambda m: f'CASE WHEN {m.group(1)} THEN {m.group(2)} ELSE {m.group(3)} END',
|
|
1,
|
|
)
|
|
|
|
|
|
def mut_case_to_iif(s: str) -> str:
|
|
"""`CASE WHEN a THEN b ELSE c END` -> `IIF(a, b, c)`."""
|
|
|
|
pat = re.compile(
|
|
r'\bCASE\s+WHEN\s+([^()]+?)\s+THEN\s+([^()]+?)\s+ELSE\s+([^()]+?)\s+END\b',
|
|
re.I,
|
|
)
|
|
|
|
return _sub_in_safe(
|
|
s,
|
|
pat,
|
|
lambda m: f'IIF({m.group(1)}, {m.group(2)}, {m.group(3)})',
|
|
1,
|
|
)
|