#!/usr/bin/env python3 import sys import os # Add current directory to path for imports current_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, current_dir) from datetime import datetime, timedelta import time from config.database import get_db_connection, get_tradeable_items, current_timestamp from config.logging_config import get_collector_logger # Use rotating logger for this collector logger = get_collector_logger('notable_trades') def collect_notable_trades(): """Identify and store high-value trades""" logger.info("Collecting notable trades") try: with get_db_connection() as conn: cursor = conn.cursor() # Get tradeable items tradeable_items = get_tradeable_items(cursor) # Get high-value trades from last hour one_hour_ago = current_timestamp() - 3600 query = """ SELECT s.sale_id, COALESCE(act.item_id, inact.item_id) as raw_item_id, s.amount, COALESCE(act.price_each, inact.price_each) as price_each, s.amount * COALESCE(act.price_each, inact.price_each) as total_value, s.time as trade_time, buyer.username as buyer_name, COALESCE(act_seller.username, inact_seller.username) as seller_name FROM pk_market_sales s LEFT JOIN pk_market_active act ON s.auction_id = act.auctionID LEFT JOIN pk_market_inactive inact ON s.auction_id = inact.auction_id LEFT JOIN pk_players buyer ON s.player_id = buyer.id LEFT JOIN pk_players act_seller ON act.player_id = act_seller.id LEFT JOIN pk_players inact_seller ON inact.player_id = inact_seller.id WHERE s.time >= %s AND COALESCE(act.item_id, inact.item_id) IS NOT NULL AND COALESCE(act.deadman, inact.deadman, 0) = 0 AND s.amount * COALESCE(act.price_each, inact.price_each) >= 1000000 ORDER BY total_value DESC LIMIT 100 """ cursor.execute(query, (one_hour_ago,)) all_trades = cursor.fetchall() # Filter trades trades = [] for trade in all_trades: if trade['raw_item_id'] is None: continue # No normalization needed - market converts noted to regular automatically item_id = trade['raw_item_id'] if item_id not in tradeable_items: continue trade['item_id'] = item_id trades.append(trade) if trades: # Calculate percentile thresholds from DESC-sorted values values = [t['total_value'] for t in trades] n = len(values) p99 = values[max(0, int(n * 0.01))] p95 = values[max(0, int(n * 0.05))] p90 = values[max(0, int(n * 0.10))] inserted_count = 0 for trade in trades: # Classify by percentile (optional enrichment, not a filter) if trade['total_value'] >= p99: percentile = 99.0 elif trade['total_value'] >= p95: percentile = 95.0 elif trade['total_value'] >= p90: percentile = 90.0 else: percentile = None # Record ALL 1M+ GP trades, not just top percentile check_query = "SELECT 1 FROM market_notable_trades WHERE sale_id = %s" cursor.execute(check_query, (trade['sale_id'],)) if not cursor.fetchone(): insert_query = """ INSERT INTO market_notable_trades (sale_id, item_id, amount, price_each, total_value, buyer_name, seller_name, trade_time, percentile) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """ cursor.execute(insert_query, ( trade['sale_id'], trade['item_id'], trade['amount'], trade['price_each'], trade['total_value'], trade['buyer_name'], trade['seller_name'], trade['trade_time'], percentile )) inserted_count += 1 conn.commit() logger.info(f"Recorded {inserted_count} notable trades") except Exception as e: logger.error(f"Error collecting notable trades: {e}") raise if __name__ == "__main__": collect_notable_trades()