import json import os from uuid import uuid4 from runner import collect_coverage # from https://sqlite.org/lang_keywords.html SQLITE_KEYWORDS = [ "ABORT", "ACTION", "ADD", "AFTER", "ALL", "ALTER", "ALWAYS", "ANALYZE", "AND", "AS", "ASC", "ATTACH", "AUTOINCREMENT", "BEFORE", "BEGIN", "BETWEEN", "BY", "CASCADE", "CASE", "CAST", "CHECK", "COLLATE", "COLUMN", "COMMIT", "CONFLICT", "CONSTRAINT", "CREATE", "CROSS", "CURRENT", "CURRENT_DATE", "CURRENT_TIME", "CURRENT_TIMESTAMP", "DATABASE", "DEFAULT", "DEFERRABLE", "DEFERRED", "DELETE", "DESC", "DETACH", "DISTINCT", "DO", "DROP", "EACH", "ELSE", "END", "ESCAPE", "EXCEPT", "EXCLUDE", "EXCLUSIVE", "EXISTS", "EXPLAIN", "FAIL", "FILTER", "FIRST", "FOLLOWING", "FOR", "FOREIGN", "FROM", "FULL", "GENERATED", "GLOB", "GROUP", "GROUPS", "HAVING", "IF", "IGNORE", "IMMEDIATE", "IN", "INDEX", "INDEXED", "INITIALLY", "INNER", "INSERT", "INSTEAD", "INTERSECT", "INTO", "IS", "ISNULL", "JOIN", "KEY", "LAST", "LEFT", "LIKE", "LIMIT", "MATCH", "MATERIALIZED", "NATURAL", "NO", "NOT", "NOTHING", "NOTNULL", "NULL", "NULLS", "OF", "OFFSET", "ON", "OR", "ORDER", "OTHERS", "OUTER", "OVER", "PARTITION", "PLAN", "PRAGMA", "PRECEDING", "PRIMARY", "QUERY", "RAISE", "RANGE", "RECURSIVE", "REFERENCES", "REGEXP", "REINDEX", "RELEASE", "RENAME", "REPLACE", "RESTRICT", "RETURNING", "RIGHT", "ROLLBACK", "ROW", "ROWS", "SAVEPOINT", "SELECT", "SET", "TABLE", "TEMP", "TEMPORARY", "THEN", "TIES", "TO", "TRANSACTION", "TRIGGER", "UNBOUNDED", "UNION", "UNIQUE", "UPDATE", "USING", "VACUUM", "VALUES", "VIEW", "VIRTUAL", "WHEN", "WHERE", "WINDOW", "WITH", "WITHOUT" ] queries: list[str] = [] def save_bug(sql: str, kind: str, b_out: str, b_err: str, r_out: str = '', flag: str = ''): """ Creates and stores the bug report, with all the relevant metadata. """ os.makedirs('bugs', exist_ok=True) path = f'bugs/bug_{uuid4()}_{kind}.md' with open(path, 'w') as f: f.write(create_bug_report(sql, b_out or b_err, r_out, flag)) def create_stats_report(valid_queries, invalid_queries, generated_queries: list[str], qpm_gen: float = 0.0, qpm_full: float = 0.0, wall_seconds: float = 0.0): collect_coverage("/fuzzer/coverage.json") with open("/fuzzer/coverage.json", "r") as f: data = json.load(f) total_lines = 0 covered_lines = 0 total_branches = 0 covered_branches = 0 func_cov_dict = {} for file in data.get("files") or []: for line in file.get("lines") or []: total_lines += 1 if line.get("count", 0) > 0: covered_lines += 1 for branch in line.get("branches") or []: total_branches += 1 if branch.get("count") or 0 > 0: covered_branches += 1 # https://gcovr.com/en/stable/output/json.html#json-output for function_obj in file.get("functions") or []: executed = function_obj["execution_count"] > 0 func_cov_dict[f"{file['file']}-{function_obj['name']}"] = executed print() print( '----------------------------------------------------------------------------------------' ) print() print(f"- Valid queries: {valid_queries}") print(f"- Invalid queries: {invalid_queries}") print(f"- Line coverage: {round(covered_lines / total_lines * 100, 2)}%") print( f"- Branch coverage: {round(covered_branches / total_branches * 100, 2)}%") print( f"- Function coverage: {round(sum(1 for value in func_cov_dict.values() if value) / len(func_cov_dict) * 100, 2)}%" ) # Part 1a. count_dict = {k: 0 for k in SQLITE_KEYWORDS} for query in generated_queries: for keyword in SQLITE_KEYWORDS: if keyword in query: count_dict[keyword] += 1 sorted_count_dict = dict( sorted(count_dict.items(), key=lambda item: item[1], reverse=True)[:30]) print() print( '----------------------------------------------------------------------------------------' ) print() print('Part 1a. Number of unique queries that each keyword appears in:') print() for k in sorted_count_dict: print(f'{k}: {sorted_count_dict[k]}') # Part 1b. count_dict_b = {k: 0 for k in SQLITE_KEYWORDS} for query in generated_queries: keyword_set = set(''.join( char for char in query if char.isalnum() or char == '_' or char == ' ').split()) for keyword in keyword_set: if keyword in SQLITE_KEYWORDS: count_dict_b[keyword] += query.count(keyword) count_dict_b = { k: round(v / len(generated_queries), 3) for k, v in count_dict_b.items() } sorted_count_dict_b = dict( sorted(count_dict_b.items(), key=lambda item: item[1], reverse=True)) print() print( '----------------------------------------------------------------------------------------' ) print() print('Part 1b. Mean number of occurrences of each keyword per query:') print() for k, v in sorted_count_dict_b.items(): print(f'{k}: {v}') print() def create_bug_report(sql, actual, expected, flag): """ ## Summary ## Minimized query ```sql Code of the bug-triggering SQL query ``` ## Actual output ```sql // add output here ``` ## Expectation ```sql // add output here ``` """ report = "## Summary\n**No review yet**\n\n" report += f"## Minimized query\n\n```sql\n{sql}\n```\n\n" report += f"## Actual output\n\n```sql\n{actual}\n```\n\n" report += f"## Expectation\n\n```sql\n{expected}\n```\n\n" report += f"## Flag\n\n```\n{flag}\n```\n\n" return report