#!/usr/bin/env python3 """ Build a marketing campaign table from SQL dump files. Parses phpMyAdmin MariaDB SQL exports, loads into SQLite, runs aggregation queries, and exports to CSV. """ import csv import re import sqlite3 import sys from datetime import datetime, timezone # --------------------------------------------------------------------------- # Email quality heuristics # --------------------------------------------------------------------------- DISPOSABLE_DOMAINS = { "mailinator.com", "guerrillamail.com", "guerrillamail.net", "tempmail.com", "throwaway.email", "sharklasers.com", "guerrillamailblock.com", "grr.la", "dispostable.com", "yopmail.com", "trashmail.com", "trashmail.net", "fakeinbox.com", "tempail.com", "tempr.email", "temp-mail.org", "temp-mail.io", "getnada.com", "mohmal.com", "maildrop.cc", "discard.email", "mailnesia.com", "jetable.com", "trash-mail.com", "10minutemail.com", "minutemail.com", "getairmail.com", "filzmail.com", "inboxalias.com", } FAKE_LOCAL_PARTS = { "test", "fake", "noreply", "no", "none", "null", "admin", "root", "example", "sample", "demo", "temp", "tmp", "abc", "xyz", "asd", "asdf", "asdasd", "qwerty", "qwer", "zxcv", "foobar", "foo", "bar", "user", "noone", "nobody", "na", "n/a", "nope", } def is_email_suspect(email): """ Return 1 if the email looks fake/disposable, 0 if it looks legitimate. """ if not email or not isinstance(email, str): return 1 email = email.strip().lower() # Must have exactly one @ if email.count("@") != 1: return 1 local, domain = email.split("@") # Empty local or domain if not local or not domain or "." not in domain: return 1 # Very short local part (1-2 chars) if len(local) <= 2: return 1 # Disposable domain if domain in DISPOSABLE_DOMAINS: return 1 # Known fake local parts if local in FAKE_LOCAL_PARTS: return 1 # All same character repeated (aaa@, bbb@, etc.) if len(set(local)) == 1: return 1 # Keyboard mashing: mostly same char repeated (aaaa, ssss, dddd) if len(local) >= 4: most_common = max(set(local), key=local.count) if local.count(most_common) / len(local) > 0.7: return 1 # Common keyboard sequences keyboard_seqs = ["qwerty", "asdfgh", "zxcvbn", "qazwsx", "123456", "abcdef"] for seq in keyboard_seqs: if seq in local and len(local) < len(seq) + 4: return 1 # All numeric local part if local.isdigit(): return 1 # Excessive numbers (local part is 80%+ digits) digit_ratio = sum(1 for c in local if c.isdigit()) / len(local) if digit_ratio > 0.8 and len(local) > 4: return 1 # No vowels at all in local part (likely gibberish) - allow short ones vowels = set("aeiou") if len(local) >= 5 and not any(c in vowels for c in local): return 1 return 0 # --------------------------------------------------------------------------- # SQL INSERT parser # --------------------------------------------------------------------------- def parse_sql_values(sql_text): """ Yield tuples of raw string tokens from multi-row INSERT statements. Handles: - escaped quotes \' and '' - NULL values - numeric values (int / float / negative) - strings containing commas, parens, newlines """ # Find all INSERT ... VALUES blocks insert_re = re.compile( r"INSERT\s+INTO\s+`[^`]+`\s*\([^)]+\)\s*VALUES\s*", re.IGNORECASE, ) for m in insert_re.finditer(sql_text): pos = m.end() length = len(sql_text) while pos < length: # skip whitespace / newlines while pos < length and sql_text[pos] in (" ", "\t", "\n", "\r"): pos += 1 if pos >= length or sql_text[pos] != "(": break # --- parse one (value, value, ...) tuple --- pos += 1 # skip '(' row = [] while pos < length: # skip whitespace while pos < length and sql_text[pos] in (" ", "\t", "\n", "\r"): pos += 1 if pos >= length: break ch = sql_text[pos] if ch == ")": pos += 1 break if ch == ",": pos += 1 continue # NULL if sql_text[pos : pos + 4].upper() == "NULL": row.append(None) pos += 4 continue # Quoted string if ch == "'": pos += 1 parts = [] while pos < length: c = sql_text[pos] if c == "\\" and pos + 1 < length: # escaped character pos += 1 parts.append(sql_text[pos]) pos += 1 elif c == "'" and pos + 1 < length and sql_text[pos + 1] == "'": parts.append("'") pos += 2 elif c == "'": pos += 1 break else: parts.append(c) pos += 1 row.append("".join(parts)) continue # Unquoted number or identifier start = pos while pos < length and sql_text[pos] not in (",", ")", " ", "\t", "\n", "\r"): pos += 1 token = sql_text[start:pos] # try to interpret as number try: if "." in token: row.append(float(token)) else: row.append(int(token)) except ValueError: row.append(token) yield tuple(row) # After closing ')' expect ',' or ';' or end while pos < length and sql_text[pos] in (" ", "\t", "\n", "\r"): pos += 1 if pos < length and sql_text[pos] == ",": pos += 1 elif pos < length and sql_text[pos] == ";": pos += 1 break else: break def load_table(conn, table_name, columns, sql_file, column_types=None): """ Create a SQLite table and bulk-insert rows parsed from a SQL dump file. column_types: optional dict mapping column name -> SQLite type hint. """ # Build CREATE TABLE col_defs = [] for c in columns: ctype = "TEXT" if column_types and c in column_types: ctype = column_types[c] col_defs.append(f'"{c}" {ctype}') create_sql = f'CREATE TABLE IF NOT EXISTS "{table_name}" ({", ".join(col_defs)})' conn.execute(create_sql) placeholders = ", ".join(["?"] * len(columns)) insert_sql = f'INSERT INTO "{table_name}" VALUES ({placeholders})' print(f" Reading {sql_file} ...") with open(sql_file, "r", encoding="utf-8", errors="replace") as f: sql_text = f.read() rows = list(parse_sql_values(sql_text)) # Validate column count and insert good = 0 bad = 0 for row in rows: if len(row) != len(columns): bad += 1 continue conn.execute(insert_sql, row) good += 1 conn.commit() print(f" -> {good:,} rows loaded ({bad} skipped due to column mismatch)") return good # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): BASE = "/Users/tomassimkus/VS/marketing_tools" conn = sqlite3.connect(":memory:") conn.execute("PRAGMA journal_mode=WAL") print("Loading SQL dumps into SQLite...\n") # --- users --- users_cols = [ "id", "group_id", "username", "password", "last_password_change", "pass_bcrypt", "email", "email_verified", "email_verification", "two_factor_enabled", "totp_secret", "totp_enabled", "two_factor_type", "title", "realname", "url", "facebook", "twitter", "msn", "google", "location", "country", "signature", "disp_topics", "disp_posts", "email_setting", "notify_with_post", "notify_pm_full", "auto_notify", "show_smilies", "show_img", "show_img_sig", "show_avatars", "show_sig", "timezone", "dst", "time_format", "date_format", "language", "style", "backstage_color", "num_posts", "num_pms", "last_post", "last_search", "last_email_sent", "last_report_sent", "registered", "registration_ip", "last_visit", "admin_note", "activate_string", "activate_key", "use_pm", "notify_pm", "tokens", "sub_expires", "first_run", "accent", "set_avatar", "receive_newsletters", ] load_table(conn, "users", users_cols, f"{BASE}/users_new.sql") # --- pk_players --- pk_cols = [ "user", "id", "username", "group_id", "owner", "sub_expires", "platinum_expires", "combat", "skill_total", "x", "y", "fatigue", "combatstyle", "block_chat", "block_private", "block_global", "block_trade", "block_duel", "cameraauto", "onemouse", "soundoff", "chat_filter", "showroof", "autoscreenshot", "combatwindow", "haircolour", "topcolour", "trousercolour", "skincolour", "headsprite", "bodysprite", "male", "skulled", "pass", "last_password_change", "pass_bcrypt", "creation_date", "creation_ip", "login_date", "login_ip", "banned", "offences", "muted", "kills", "deaths", "iron_man", "online", "world", "quest_points", "progressbar", "eventcd", "vote", "deadman", "highscoreopt", "exp_total", "transfer_date", "transfer_ip", "veteran", "pk_mode", "deadman_died", "return_x", "return_y", "bank_size", "petID", ] load_table(conn, "pk_players", pk_cols, f"{BASE}/pk_players.sql") # --- stripe_orders --- stripe_cols = [ "order_id", "txn_id", "forum_id", "e_mail", "ip_address", "tokens", "paid", "currency", "payment_status", "timestamp", ] load_table(conn, "stripe_orders", stripe_cols, f"{BASE}/stripe_orders.sql") # --- pk_paypal_transactions --- paypal_cols = [ "order_id", "txn_id", "payer_email", "paid", "tokens_purchased", "user", "time", ] load_table(conn, "pk_paypal_transactions", paypal_cols, f"{BASE}/pk_paypal_transactions.sql") # Create indexes for performance print("\nCreating indexes...") conn.execute("CREATE INDEX IF NOT EXISTS idx_pk_owner ON pk_players(owner)") conn.execute("CREATE INDEX IF NOT EXISTS idx_stripe_forum ON stripe_orders(forum_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_paypal_user ON pk_paypal_transactions(\"user\")") conn.commit() # --- Build marketing table --- print("Running aggregation query...\n") today_ts = int(datetime(2026, 4, 2, tzinfo=timezone.utc).timestamp()) query = f""" SELECT u.id AS user_id, u.username AS username, u.email AS email, u.email_verified AS email_verified, u.receive_newsletters AS receive_newsletters, u.country AS country, CASE WHEN CAST(u.registered AS INTEGER) > 0 THEN datetime(CAST(u.registered AS INTEGER), 'unixepoch') ELSE NULL END AS registered, CASE WHEN CAST(u.last_visit AS INTEGER) > 0 THEN datetime(CAST(u.last_visit AS INTEGER), 'unixepoch') ELSE NULL END AS last_forum_visit, CASE WHEN p.last_game_login_ts > 0 THEN datetime(p.last_game_login_ts, 'unixepoch') ELSE NULL END AS last_game_login, CASE WHEN p.last_game_login_ts > 0 THEN ({today_ts} - p.last_game_login_ts) / 86400 ELSE NULL END AS days_since_last_login, p.total_characters AS total_characters, p.highest_combat AS highest_combat, p.highest_skill_total AS highest_skill_total, p.max_exp_total AS max_exp_total, p.total_kills AS total_kills, CASE WHEN COALESCE(s.stripe_spent, 0) + COALESCE(pp.paypal_spent, 0) > 0 THEN 1 ELSE 0 END AS is_paying_customer, ROUND(COALESCE(s.stripe_spent, 0) + COALESCE(pp.paypal_spent, 0), 2) AS total_spent_usd, COALESCE(s.stripe_spent, 0) AS stripe_spent_usd, ROUND(COALESCE(pp.paypal_spent, 0), 2) AS paypal_spent_usd, COALESCE(s.stripe_txns, 0) + COALESCE(pp.paypal_txns, 0) AS total_transactions, CASE WHEN COALESCE(s.last_stripe_ts, 0) >= COALESCE(pp.last_paypal_ts, 0) AND COALESCE(s.last_stripe_ts, 0) > 0 THEN datetime(s.last_stripe_ts, 'unixepoch') WHEN COALESCE(pp.last_paypal_ts, 0) > 0 THEN datetime(pp.last_paypal_ts, 'unixepoch') ELSE NULL END AS last_purchase_date, p.has_active_characters AS has_active_characters, 0 AS questionnaire_sent, 0 AS questionnaire_replied FROM users u INNER JOIN ( SELECT owner, MAX(CASE WHEN CAST(login_date AS INTEGER) > 0 THEN CAST(login_date AS INTEGER) ELSE 0 END) AS last_game_login_ts, COUNT(*) AS total_characters, MAX(CAST(combat AS INTEGER)) AS highest_combat, MAX(CAST(skill_total AS INTEGER)) AS highest_skill_total, MAX(CAST(exp_total AS INTEGER)) AS max_exp_total, SUM(CAST(kills AS INTEGER)) AS total_kills, MAX(CASE WHEN banned = '0' THEN 1 ELSE 0 END) AS has_active_characters FROM pk_players GROUP BY owner ) p ON p.owner = u.id LEFT JOIN ( SELECT forum_id, SUM(CASE WHEN payment_status = 'succeeded' THEN CAST(paid AS INTEGER) ELSE 0 END) AS stripe_spent, COUNT(*) AS stripe_txns, MAX(CASE WHEN CAST("timestamp" AS INTEGER) > 0 THEN CAST("timestamp" AS INTEGER) ELSE 0 END) AS last_stripe_ts FROM stripe_orders GROUP BY forum_id ) s ON s.forum_id = u.id LEFT JOIN ( SELECT "user", SUM(CAST(paid AS REAL)) AS paypal_spent, COUNT(*) AS paypal_txns, MAX(CASE WHEN CAST("time" AS INTEGER) > 0 THEN CAST("time" AS INTEGER) ELSE 0 END) AS last_paypal_ts FROM pk_paypal_transactions GROUP BY "user" ) pp ON pp."user" = u.id WHERE u.id != 1 ORDER BY u.email_verified DESC, p.last_game_login_ts DESC """ cur = conn.execute(query) col_names = [desc[0] for desc in cur.description] rows = cur.fetchall() # --- Score email quality --- email_idx = col_names.index("email") col_names.append("email_suspect") scored_rows = [] suspect_count = 0 for row in rows: suspect = is_email_suspect(row[email_idx]) suspect_count += suspect scored_rows.append(row + (suspect,)) rows = scored_rows print(f"Email quality check: {suspect_count:,} suspect emails flagged\n") # --- Export to CSV --- out_path = f"{BASE}/marketing_campaign.csv" with open(out_path, "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(col_names) writer.writerows(rows) print(f"Exported {len(rows):,} rows to {out_path}\n") # --- Summary --- total_users = len(rows) paying = sum(1 for r in rows if r[col_names.index("is_paying_customer")] == 1) total_spent = sum(r[col_names.index("total_spent_usd")] or 0 for r in rows) avg_spend_all = total_spent / total_users if total_users else 0 avg_spend_paying = total_spent / paying if paying else 0 active_chars = sum(1 for r in rows if str(r[col_names.index("has_active_characters")]) == "1") verified = sum(1 for r in rows if str(r[col_names.index("email_verified")]) == "1") newsletter = sum(1 for r in rows if str(r[col_names.index("receive_newsletters")]) == "1") # days since login stats days_vals = [r[col_names.index("days_since_last_login")] for r in rows if r[col_names.index("days_since_last_login")] is not None] avg_days = sum(days_vals) / len(days_vals) if days_vals else 0 min_days = min(days_vals) if days_vals else 0 max_days = max(days_vals) if days_vals else 0 print("=" * 60) print("MARKETING CAMPAIGN TABLE — SUMMARY") print("=" * 60) print(f" Total users exported: {total_users:,}") print(f" Paying customers: {paying:,} ({100*paying/total_users:.1f}%)") print(f" Total revenue (USD): ${total_spent:,.2f}") print(f" Avg spend (all users): ${avg_spend_all:,.2f}") print(f" Avg spend (paying only): ${avg_spend_paying:,.2f}") print(f" Users with active characters: {active_chars:,}") print(f" Email verified: {verified:,}") print(f" Opted in to newsletters: {newsletter:,}") print(f" Days since last login (avg): {avg_days:,.0f}") print(f" Days since last login (min): {min_days:,}") print(f" Days since last login (max): {max_days:,}") print("=" * 60) conn.close() if __name__ == "__main__": main()