#!/usr/bin/env python3 """ Combine all raw CSV files into a single master file Enhanced version with date range filtering options """ import csv import os import argparse from pathlib import Path from datetime import datetime, timedelta from collections import defaultdict # Configuration RAW_DATA_DIR = Path("data/raw") OUTPUT_DIR = Path("data/summaries") OUTPUT_DIR.mkdir(parents=True, exist_ok=True) def create_master_file(start_date=None, end_date=None, output_filename=None): """ Combine all raw CSV files into a single master file Args: start_date: datetime.date - Include files from this date onwards (inclusive) end_date: datetime.date - Include files up to this date (inclusive) output_filename: str - Custom output filename (default: master_data.csv) """ print("="*60) print("CREATING MASTER DATA FILE") if start_date or end_date: print(f"Date Range Filter: {start_date or 'beginning'} to {end_date or 'now'}") print("="*60) # Find all CSV files in raw directory csv_files = list(RAW_DATA_DIR.glob("*.csv")) if not csv_files: print(f"❌ No CSV files found in {RAW_DATA_DIR}") return False print(f"Found {len(csv_files)} CSV files total") # Filter files by date range if specified filtered_files = [] for csv_file in sorted(csv_files): filename = csv_file.stem if '_' in filename: date_str = filename.split('_')[-1] try: file_date = datetime.strptime(date_str, '%Y-%m-%d').date() # Apply date filters if start_date and file_date < start_date: continue if end_date and file_date > end_date: continue filtered_files.append((csv_file, date_str, file_date)) except: print(f"⚠️ Skipping {csv_file.name} - cannot parse date") continue else: print(f"⚠️ Skipping {csv_file.name} - invalid filename format") continue if not filtered_files: print(f"❌ No CSV files found in specified date range") return False print(f"Processing {len(filtered_files)} CSV files after date filtering") if filtered_files: first_date = min(f[2] for f in filtered_files) last_date = max(f[2] for f in filtered_files) print(f"Date range: {first_date} to {last_date}") # Dictionary to store all data AND track daily first/last times all_data = [] daily_first_last = defaultdict(lambda: defaultdict(lambda: {'first': None, 'last': None})) users_seen = set() dates_seen = set() # First pass: read all data and track first/last active times per user per day for csv_file, date_str, file_date in filtered_files: # Read the CSV file try: with open(csv_file, 'r') as f: reader = csv.DictReader(f) row_count = 0 for row in reader: # Add the date to each row row['date'] = date_str row['day_of_week'] = file_date.strftime('%A') # Extract time from timestamp if present time_str = None if 'timestamp' in row and row['timestamp']: try: timestamp = row['timestamp'] # Handle different timestamp formats if 'T' in timestamp: # ISO format: 2024-01-20T09:30:45.123+00:00 time_part = timestamp.split('T')[1] elif ' ' in timestamp: # Space-separated: 2024-01-20 09:30:45 parts = timestamp.split(' ') time_part = parts[1] if len(parts) > 1 else timestamp else: # Might just be time: 09:30:45 time_part = timestamp # Remove timezone and microseconds time_clean = time_part.split('+')[0].split('-')[0].split('.')[0].split('Z')[0] # Extract HH:MM if ':' in time_clean: parts = time_clean.split(':') if len(parts) >= 2: time_str = f"{parts[0].zfill(2)}:{parts[1].zfill(2)}" row['time'] = time_str # Track first/last active times for this user on this date if row.get('presence') == 'active': user_id = row.get('user_id', '') if user_id: if not daily_first_last[date_str][user_id]['first'] or time_str < daily_first_last[date_str][user_id]['first']: daily_first_last[date_str][user_id]['first'] = time_str if not daily_first_last[date_str][user_id]['last'] or time_str > daily_first_last[date_str][user_id]['last']: daily_first_last[date_str][user_id]['last'] = time_str except Exception as e: if row_count == 0: # Only show error once per file print(f" ⚠️ Could not parse timestamp format in {csv_file.name}: {row.get('timestamp', '')[:30]}") if 'time' not in row: row['time'] = '' # Track unique users and dates if 'user_id' in row: users_seen.add(row['user_id']) if 'user_name' in row: users_seen.add(row['user_name']) dates_seen.add(date_str) all_data.append(row) row_count += 1 print(f" ✅ {csv_file.name}: {row_count} rows") except Exception as e: print(f" ❌ Error reading {csv_file.name}: {e}") continue if not all_data: print("❌ No data extracted from files") return False # Second pass: add first_active and last_active columns to each row print("\nAdding first_active and last_active columns...") for row in all_data: user_id = row.get('user_id', '') date = row.get('date', '') if user_id and date in daily_first_last and user_id in daily_first_last[date]: row['first_active'] = daily_first_last[date][user_id]['first'] or '' row['last_active'] = daily_first_last[date][user_id]['last'] or '' else: row['first_active'] = '' row['last_active'] = '' print(f"✅ Added first/last active times to all rows") print(f"\n📊 Total records: {len(all_data):,}") print(f"👥 Unique users: {len(users_seen)}") print(f"📅 Unique dates: {len(dates_seen)}") # Sort data by date, user, and time all_data.sort(key=lambda x: (x.get('date', ''), x.get('user_id', ''), x.get('time', ''))) # Determine output filename if output_filename is None: if start_date or end_date: # Create descriptive filename for filtered data if start_date and end_date: output_filename = f"master_data_{start_date}_{end_date}.csv" elif start_date: output_filename = f"master_data_from_{start_date}.csv" elif end_date: output_filename = f"master_data_until_{end_date}.csv" else: output_filename = "master_data.csv" master_file = OUTPUT_DIR / output_filename # EXPLICITLY define the columns we want, including first_active and last_active essential_columns = ['date', 'day_of_week', 'time', 'timestamp', 'user_id', 'user_name', 'presence', 'first_active', 'last_active'] # Get all other columns from the data all_fields = set() for row in all_data: all_fields.update(row.keys()) # Build final column list: essential columns first, then others fieldnames = essential_columns.copy() for field in sorted(all_fields): if field not in fieldnames: fieldnames.append(field) # Ensure every row has all fields (fill missing with empty string) for row in all_data: for field in fieldnames: if field not in row: row[field] = '' # Write the master CSV with open(master_file, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(all_data) file_size_mb = master_file.stat().st_size / (1024 * 1024) print(f"\n✅ Master file created: {master_file}") print(f" Size: {file_size_mb:.2f} MB ({master_file.stat().st_size:,} bytes)") print(f" Total columns: {len(fieldnames)}") # Create daily summary as well create_daily_summary(all_data, output_filename) return True def create_daily_summary(all_data, master_filename): """Create a daily summary file from the master data""" print("\n" + "="*60) print("CREATING DAILY SUMMARY") print("="*60) # Group data by date and user daily_stats = defaultdict(lambda: defaultdict(lambda: { 'user_name': '', 'total_minutes': 0, 'active_minutes': 0, 'first_seen': None, 'last_seen': None, 'presence_log': [] })) for row in all_data: date = row.get('date', '') user_id = row.get('user_id', '') user_name = row.get('user_name', '') presence = row.get('presence', '') timestamp = row.get('timestamp', '') if not date or not user_id: continue # Update user stats stats = daily_stats[date][user_id] stats['user_name'] = user_name stats['total_minutes'] += 1 if presence == 'active': stats['active_minutes'] += 1 # Extract time from timestamp - handle various formats if timestamp: try: # Handle different timestamp formats if 'T' in timestamp: time_part = timestamp.split('T')[1] elif ' ' in timestamp: time_part = timestamp.split(' ')[1] if len(timestamp.split(' ')) > 1 else timestamp else: time_part = timestamp # Remove timezone and microseconds time_str = time_part.split('+')[0].split('-')[0].split('.')[0].split('Z')[0] # Extract HH:MM if ':' in time_str: parts = time_str.split(':') if len(parts) >= 2: hour_min = f"{parts[0].zfill(2)}:{parts[1].zfill(2)}" if presence == 'active': if not stats['first_seen']: stats['first_seen'] = hour_min stats['last_seen'] = hour_min # Track presence changes for pattern analysis if not stats['presence_log'] or stats['presence_log'][-1] != presence: stats['presence_log'].append(presence) except Exception: pass # Determine output filename if master_filename != "master_data.csv": # Use same date suffix as master file base = master_filename.replace('master_data', 'daily_summary').replace('.csv', '') summary_filename = f"{base}.csv" else: summary_filename = "daily_summary.csv" summary_file = OUTPUT_DIR / summary_filename summary_rows = [] for date in sorted(daily_stats.keys()): for user_id, stats in daily_stats[date].items(): summary_rows.append({ 'date': date, 'user_id': user_id, 'user_name': stats['user_name'], 'total_minutes_tracked': stats['total_minutes'], 'active_minutes': stats['active_minutes'], 'active_hours': round(stats['active_minutes'] / 60, 2), 'first_active': stats['first_seen'] or '', 'last_active': stats['last_seen'] or '', 'activity_rate': round((stats['active_minutes'] / stats['total_minutes'] * 100) if stats['total_minutes'] > 0 else 0, 1), 'presence_changes': len(stats['presence_log']) }) # Write summary CSV if summary_rows: fieldnames = ['date', 'user_id', 'user_name', 'total_minutes_tracked', 'active_minutes', 'active_hours', 'first_active', 'last_active', 'activity_rate', 'presence_changes'] with open(summary_file, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(summary_rows) file_size_kb = summary_file.stat().st_size / 1024 print(f"✅ Daily summary created: {summary_file}") print(f" {len(summary_rows)} user-day records ({file_size_kb:.1f} KB)") def main(): """Main function with command-line argument parsing""" parser = argparse.ArgumentParser( description='Create master data file from raw CSV files with optional date filtering', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Create master file with all data python3 create_master_file.py # Create master file for last 30 days python3 create_master_file.py --last-30-days # Create master file for last month python3 create_master_file.py --last-month # Create master file for current month python3 create_master_file.py --this-month # Create master file for specific date range python3 create_master_file.py --start-date 2025-11-01 --end-date 2025-11-30 # Create master file from specific date onwards python3 create_master_file.py --start-date 2025-11-01 # Create master file with custom output name python3 create_master_file.py --last-month --output november_data.csv """ ) parser.add_argument('--start-date', type=str, help='Start date (YYYY-MM-DD, inclusive)') parser.add_argument('--end-date', type=str, help='End date (YYYY-MM-DD, inclusive)') parser.add_argument('--last-30-days', action='store_true', help='Last 30 days only') parser.add_argument('--last-month', action='store_true', help='Previous calendar month') parser.add_argument('--this-month', action='store_true', help='Current calendar month') parser.add_argument('--output', type=str, help='Custom output filename (default: auto-generated)') args = parser.parse_args() # Parse date arguments start_date = None end_date = None if args.last_30_days: end_date = datetime.now().date() start_date = end_date - timedelta(days=29) # 29 days ago + today = 30 days total print(f"📅 Mode: Last 30 days ({start_date} to {end_date})") elif args.last_month: today = datetime.now().date() # Get first day of current month first_of_this_month = today.replace(day=1) # Go back one day to get last day of previous month last_day_of_last_month = first_of_this_month - timedelta(days=1) # Get first day of previous month first_of_last_month = last_day_of_last_month.replace(day=1) start_date = first_of_last_month end_date = last_day_of_last_month print(f"📅 Mode: Last month ({start_date.strftime('%B %Y')} - {start_date} to {end_date})") elif args.this_month: today = datetime.now().date() start_date = today.replace(day=1) end_date = today print(f"📅 Mode: This month ({start_date.strftime('%B %Y')} - {start_date} to {end_date})") elif args.start_date or args.end_date: if args.start_date: start_date = datetime.strptime(args.start_date, '%Y-%m-%d').date() if args.end_date: end_date = datetime.strptime(args.end_date, '%Y-%m-%d').date() print(f"📅 Mode: Custom date range") # Run the main function success = create_master_file(start_date, end_date, args.output) if success: print("\n" + "="*60) print("✅ SUCCESS!") print("="*60) else: print("\n❌ Failed to create master file") return 1 return 0 if __name__ == "__main__": import sys sys.exit(main())