#!/usr/bin/env python3 """ Build a ranked Tier 3 contact list based on total playtime. Joins pk_player_cache (total_played) → pk_players (owner) → users (email), sums playtime across all characters per user, excludes Tier 1 and Tier 2 contacts, and exports the top N contacts to fill the 10k target. """ import csv import sqlite3 import sys sys.path.insert(0, "/Users/tomassimkus/VS/marketing_tools") from build_marketing_table import parse_sql_values, is_email_suspect BASE = "/Users/tomassimkus/VS/marketing_tools" TARGET_TOTAL = 10_000 def load_emails(path): with open(path, "r") as f: return set(r["email"].strip().lower() for r in csv.DictReader(f)) def main(): conn = sqlite3.connect(":memory:") # --- Load pk_player_cache --- print("Loading pk_player_cache.sql ...") with open(f"{BASE}/pk_player_cache.sql", "r", errors="replace") as f: sql = f.read() conn.execute(""" CREATE TABLE pk_player_cache ( id INTEGER, player_id INTEGER, user INTEGER, type INTEGER, key TEXT, value TEXT ) """) rows_loaded = 0 for row in parse_sql_values(sql): if len(row) == 6: conn.execute("INSERT INTO pk_player_cache VALUES (?,?,?,?,?,?)", row) rows_loaded += 1 conn.commit() print(f" -> {rows_loaded:,} cache rows loaded") # --- Load pk_players --- print("Loading pk_players.sql ...") with open(f"{BASE}/pk_players.sql", "r", errors="replace") as f: sql = f.read() conn.execute(""" CREATE TABLE pk_players ( user INTEGER, id INTEGER, username TEXT, group_id INTEGER, owner INTEGER, rest TEXT ) """) # We only need user, id, owner — grab first 5 cols pk_cols_full = [ "user", "id", "username", "group_id", "owner", "sub_expires", "platinum_expires", "combat", "skill_total", "x", "y", "fatigue", "combatstyle", "block_chat", "block_private", "block_global", "block_trade", "block_duel", "cameraauto", "onemouse", "soundoff", "chat_filter", "showroof", "autoscreenshot", "combatwindow", "haircolour", "topcolour", "trousercolour", "skincolour", "headsprite", "bodysprite", "male", "skulled", "pass", "last_password_change", "pass_bcrypt", "creation_date", "creation_ip", "login_date", "login_ip", "banned", "offences", "muted", "kills", "deaths", "iron_man", "online", "world", "quest_points", "progressbar", "eventcd", "vote", "deadman", "highscoreopt", "exp_total", "transfer_date", "transfer_ip", "veteran", "pk_mode", "deadman_died", "return_x", "return_y", "bank_size", "petID", ] rows_loaded = 0 for row in parse_sql_values(sql): if len(row) == len(pk_cols_full): # Only store user, id, username, group_id, owner conn.execute( "INSERT INTO pk_players (user, id, username, group_id, owner) VALUES (?,?,?,?,?)", row[:5], ) rows_loaded += 1 conn.commit() print(f" -> {rows_loaded:,} player rows loaded") # --- Load users --- print("Loading users.sql ...") with open(f"{BASE}/users.sql", "r", errors="replace") as f: sql = f.read() conn.execute("CREATE TABLE users (id INTEGER, username TEXT, email TEXT)") users_cols = [ "id", "group_id", "username", "password", "last_password_change", "pass_bcrypt", "email", "email_verified", "email_verification", "two_factor_enabled", "title", "realname", "url", "facebook", "twitter", "msn", "google", "location", "country", "signature", "disp_topics", "disp_posts", "email_setting", "notify_with_post", "notify_pm_full", "auto_notify", "show_smilies", "show_img", "show_img_sig", "show_avatars", "show_sig", "timezone", "dst", "time_format", "date_format", "language", "style", "backstage_color", "num_posts", "num_pms", "last_post", "last_search", "last_email_sent", "last_report_sent", "registered", "registration_ip", "last_visit", "admin_note", "activate_string", "activate_key", "use_pm", "notify_pm", "tokens", "sub_expires", "first_run", "accent", "set_avatar", "receive_newsletters", ] rows_loaded = 0 for row in parse_sql_values(sql): if len(row) == len(users_cols): conn.execute( "INSERT INTO users (id, username, email) VALUES (?,?,?)", (row[0], row[2], row[6]), ) rows_loaded += 1 conn.commit() print(f" -> {rows_loaded:,} user rows loaded") # --- Create indexes --- conn.execute("CREATE INDEX idx_cache_user ON pk_player_cache(user)") conn.execute("CREATE INDEX idx_cache_key ON pk_player_cache(key)") conn.execute("CREATE INDEX idx_pk_user ON pk_players(user)") conn.execute("CREATE INDEX idx_pk_owner ON pk_players(owner)") conn.execute("CREATE INDEX idx_users_id ON users(id)") conn.commit() # --- Query: sum total_played per user account --- print("\nAggregating playtime per user account...") query = """ SELECT u.id AS user_id, u.username AS username, LOWER(TRIM(u.email)) AS email, SUM(CAST(c.value AS INTEGER)) AS total_played_ms, ROUND(SUM(CAST(c.value AS INTEGER)) / 3600000.0, 1) AS total_played_hours, COUNT(DISTINCT p.id) AS characters_with_playtime FROM pk_player_cache c INNER JOIN pk_players p ON p.user = c.user INNER JOIN users u ON u.id = p.owner WHERE c.key = 'total_played' AND u.id != 1 GROUP BY u.id ORDER BY total_played_ms DESC """ results = conn.execute(query).fetchall() print(f" -> {len(results):,} users with playtime data") # --- Load existing tier1 and tier2 emails --- tier1 = load_emails(f"{BASE}/resend_tier1_paying.csv") tier2 = load_emails(f"{BASE}/resend_tier2_verified.csv") existing = tier1 | tier2 needed = TARGET_TOTAL - len(tier1) - len(tier2) print(f"\nTier 1: {len(tier1):,}") print(f"Tier 2: {len(tier2):,}") print(f"Current total: {len(tier1) + len(tier2):,}") print(f"Need {needed:,} more to reach {TARGET_TOTAL:,}") # --- Filter to tier3-only, non-suspect emails, ranked by playtime --- tier3_candidates = [] seen_emails = set() for row in results: user_id, username, email, total_ms, total_hours, char_count = row if email in existing or email in seen_emails: continue if is_email_suspect(email): continue seen_emails.add(email) tier3_candidates.append({ "email": email, "first_name": username, "last_name": "", "unsubscribed": "false", "total_played_hours": total_hours, "characters_with_playtime": char_count, }) print(f"Tier 3 candidates (by playtime, not in T1/T2): {len(tier3_candidates):,}") # --- Show top 25 --- print(f"\nTop 25 by playtime:") print(f"{'Hours':>10} {'Chars':>5} {'Username':<20} Email") print("-" * 75) for c in tier3_candidates[:25]: print( f"{c['total_played_hours']:>10.1f} " f"{c['characters_with_playtime']:>5} " f"{c['first_name']:<20} " f"{c['email']}" ) # --- Playtime distribution --- print(f"\nPlaytime distribution of all {len(tier3_candidates):,} candidates:") buckets = [ ("500+ hours", lambda h: h >= 500), ("200-500 hours", lambda h: 200 <= h < 500), ("100-200 hours", lambda h: 100 <= h < 200), ("50-100 hours", lambda h: 50 <= h < 100), ("20-50 hours", lambda h: 20 <= h < 50), ("10-20 hours", lambda h: 10 <= h < 20), ("1-10 hours", lambda h: 1 <= h < 10), ("< 1 hour", lambda h: h < 1), ] for label, fn in buckets: count = sum(1 for c in tier3_candidates if fn(c["total_played_hours"])) print(f" {label:>15}: {count:,}") # --- Export top N --- to_export = tier3_candidates[:needed] out_path = f"{BASE}/resend_tier3_active.csv" with open(out_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter( f, fieldnames=["email", "first_name", "last_name", "unsubscribed"] ) writer.writeheader() for c in to_export: writer.writerow({ "email": c["email"], "first_name": c["first_name"], "last_name": c["last_name"], "unsubscribed": c["unsubscribed"], }) print(f"\nExported {len(to_export):,} contacts to {out_path}") min_hours = to_export[-1]["total_played_hours"] if to_export else 0 print(f"Minimum playtime in export: {min_hours} hours") print(f"\nFinal totals:") print(f" Tier 1: {len(tier1):,}") print(f" Tier 2: {len(tier2):,}") print(f" Tier 3: {len(to_export):,}") print(f" Total: {len(tier1) + len(tier2) + len(to_export):,}") conn.close() if __name__ == "__main__": main()