#!/usr/bin/env python3 """ Rebuild all historical summaries from raw data Processes all available data and generates comprehensive CSV summaries """ import csv import json from datetime import datetime, timedelta from pathlib import Path from collections import defaultdict from config_multi_user import ( RAW_DATA_DIR, SUMMARIES_DIR, TIMEZONE, setup_logging ) from data_handler_team import get_team_handler from user_mapping import get_user_mapper class HistoricalSummaryBuilder: """Build comprehensive summaries from all historical data""" def __init__(self): self.logger = setup_logging('historical_summary') self.team_handler = get_team_handler() self.mapper = get_user_mapper() # Output files self.master_daily_csv = SUMMARIES_DIR / "master_daily_summary.csv" self.master_weekly_csv = SUMMARIES_DIR / "master_weekly_summary.csv" self.master_monthly_csv = SUMMARIES_DIR / "master_monthly_summary.csv" self.user_stats_csv = SUMMARIES_DIR / "user_statistics.csv" self.summary_json = SUMMARIES_DIR / "summary_data.json" def get_all_dates_with_data(self): """Find all dates that have data files""" dates = set() # Look for all CSV files in raw data directory for csv_file in RAW_DATA_DIR.glob("*.csv"): # Extract date from filename (assumes format: *_YYYY-MM-DD.csv) filename = csv_file.stem if '_' in filename: date_part = filename.split('_')[-1] try: date_obj = datetime.strptime(date_part, '%Y-%m-%d').date() dates.add(date_obj) except ValueError: continue return sorted(dates) def rebuild_all_daily_summaries(self): """Process all historical data and create daily summaries""" dates = self.get_all_dates_with_data() if not dates: self.logger.error("No data files found!") return None print(f"\nFound data for {len(dates)} days") print(f"Date range: {dates[0]} to {dates[-1]}") all_daily_summaries = [] daily_records = [] # Process each date for i, date in enumerate(dates, 1): print(f"Processing {date} ({i}/{len(dates)})...", end='\r') # Generate summary for this date summary = self.team_handler.generate_daily_team_summary(date) if summary: all_daily_summaries.append(summary) # Create CSV records for each user for user_id, user_data in summary['users'].items(): daily_records.append({ 'Date': date.strftime('%Y-%m-%d'), 'Day': date.strftime('%A'), 'User_ID': user_id, 'Name': user_data['name'], 'Hours': user_data['total_hours'], 'Minutes': user_data['total_minutes'], 'First_Seen': user_data['first_seen'] or '', 'Last_Seen': user_data['last_seen'] or '', 'Status': 'Active' if user_data['total_minutes'] > 0 else 'Absent' }) # Add team total record daily_records.append({ 'Date': date.strftime('%Y-%m-%d'), 'Day': date.strftime('%A'), 'User_ID': 'TEAM_TOTAL', 'Name': f"Team Total ({summary['team_totals']['active_users']}/{summary['team_totals']['total_users']} active)", 'Hours': round(summary['team_totals']['total_hours'], 2), 'Minutes': 0, 'First_Seen': summary['team_totals']['earliest_start'] or '', 'Last_Seen': summary['team_totals']['latest_end'] or '', 'Status': 'Summary' }) print(f"\nProcessed {len(all_daily_summaries)} days of data") # Save to master CSV if daily_records: with open(self.master_daily_csv, 'w', newline='') as f: fieldnames = ['Date', 'Day', 'User_ID', 'Name', 'Hours', 'Minutes', 'First_Seen', 'Last_Seen', 'Status'] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(daily_records) print(f"✅ Saved daily summaries to: {self.master_daily_csv}") return all_daily_summaries def generate_weekly_summaries(self, daily_summaries): """Generate weekly summaries from daily data""" weekly_data = defaultdict(lambda: { 'dates': [], 'users': defaultdict(lambda: { 'name': '', 'total_hours': 0, 'days_active': 0, 'daily_hours': [] }), 'team_totals': { 'total_hours': 0, 'total_days': 0, 'active_users_per_day': [] } }) # Group daily summaries by week for summary in daily_summaries: date = datetime.strptime(summary['date'], '%Y-%m-%d').date() # Get Monday of the week week_start = date - timedelta(days=date.weekday()) week_key = week_start.strftime('%Y-%m-%d') weekly_data[week_key]['dates'].append(date) # Aggregate user data for user_id, user_data in summary['users'].items(): if user_data['total_hours'] > 0: weekly_data[week_key]['users'][user_id]['name'] = user_data['name'] weekly_data[week_key]['users'][user_id]['total_hours'] += user_data['total_hours'] weekly_data[week_key]['users'][user_id]['days_active'] += 1 weekly_data[week_key]['users'][user_id]['daily_hours'].append(user_data['total_hours']) # Track team totals weekly_data[week_key]['team_totals']['total_hours'] += summary['team_totals']['total_hours'] weekly_data[week_key]['team_totals']['total_days'] += 1 weekly_data[week_key]['team_totals']['active_users_per_day'].append( summary['team_totals']['active_users'] ) # Create CSV records weekly_records = [] for week_start, week_data in sorted(weekly_data.items()): week_end = (datetime.strptime(week_start, '%Y-%m-%d').date() + timedelta(days=6)).strftime('%Y-%m-%d') # User records for user_id, user_data in week_data['users'].items(): avg_hours = user_data['total_hours'] / user_data['days_active'] if user_data['days_active'] > 0 else 0 weekly_records.append({ 'Week_Start': week_start, 'Week_End': week_end, 'User_ID': user_id, 'Name': user_data['name'], 'Total_Hours': round(user_data['total_hours'], 2), 'Days_Active': user_data['days_active'], 'Days_Total': len(week_data['dates']), 'Avg_Hours_Per_Active_Day': round(avg_hours, 2), 'Attendance_Rate': round((user_data['days_active'] / len(week_data['dates'])) * 100, 1) }) # Team total avg_active = sum(week_data['team_totals']['active_users_per_day']) / len(week_data['team_totals']['active_users_per_day']) if week_data['team_totals']['active_users_per_day'] else 0 weekly_records.append({ 'Week_Start': week_start, 'Week_End': week_end, 'User_ID': 'TEAM_TOTAL', 'Name': 'Team Total', 'Total_Hours': round(week_data['team_totals']['total_hours'], 2), 'Days_Active': week_data['team_totals']['total_days'], 'Days_Total': len(week_data['dates']), 'Avg_Hours_Per_Active_Day': round(avg_active, 1), 'Attendance_Rate': 100.0 }) # Save to CSV if weekly_records: with open(self.master_weekly_csv, 'w', newline='') as f: fieldnames = ['Week_Start', 'Week_End', 'User_ID', 'Name', 'Total_Hours', 'Days_Active', 'Days_Total', 'Avg_Hours_Per_Active_Day', 'Attendance_Rate'] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(weekly_records) print(f"✅ Saved weekly summaries to: {self.master_weekly_csv}") return weekly_data def generate_monthly_summaries(self, daily_summaries): """Generate monthly summaries from daily data""" monthly_data = defaultdict(lambda: { 'dates': [], 'users': defaultdict(lambda: { 'name': '', 'total_hours': 0, 'days_active': 0, 'work_days': 0 }), 'team_totals': { 'total_hours': 0, 'work_days': 0, 'active_users_per_day': [] } }) # Group daily summaries by month for summary in daily_summaries: date = datetime.strptime(summary['date'], '%Y-%m-%d').date() month_key = date.strftime('%Y-%m') # Only count weekdays if date.weekday() < 5: # Monday = 0, Friday = 4 monthly_data[month_key]['dates'].append(date) monthly_data[month_key]['team_totals']['work_days'] += 1 # Aggregate user data for user_id, user_data in summary['users'].items(): monthly_data[month_key]['users'][user_id]['name'] = user_data['name'] monthly_data[month_key]['users'][user_id]['work_days'] = monthly_data[month_key]['team_totals']['work_days'] if user_data['total_hours'] > 0: monthly_data[month_key]['users'][user_id]['total_hours'] += user_data['total_hours'] monthly_data[month_key]['users'][user_id]['days_active'] += 1 # Track team totals monthly_data[month_key]['team_totals']['total_hours'] += summary['team_totals']['total_hours'] monthly_data[month_key]['team_totals']['active_users_per_day'].append( summary['team_totals']['active_users'] ) # Create CSV records monthly_records = [] for month, month_data in sorted(monthly_data.items()): # User records for user_id, user_data in month_data['users'].items(): if user_data['days_active'] > 0: avg_hours = user_data['total_hours'] / user_data['days_active'] attendance = (user_data['days_active'] / month_data['team_totals']['work_days']) * 100 monthly_records.append({ 'Month': month, 'User_ID': user_id, 'Name': user_data['name'], 'Total_Hours': round(user_data['total_hours'], 2), 'Days_Active': user_data['days_active'], 'Work_Days': month_data['team_totals']['work_days'], 'Avg_Hours_Per_Active_Day': round(avg_hours, 2), 'Attendance_Rate': round(attendance, 1) }) # Team total avg_active = sum(month_data['team_totals']['active_users_per_day']) / len(month_data['team_totals']['active_users_per_day']) if month_data['team_totals']['active_users_per_day'] else 0 monthly_records.append({ 'Month': month, 'User_ID': 'TEAM_TOTAL', 'Name': 'Team Total', 'Total_Hours': round(month_data['team_totals']['total_hours'], 2), 'Days_Active': month_data['team_totals']['work_days'], 'Work_Days': month_data['team_totals']['work_days'], 'Avg_Hours_Per_Active_Day': round(avg_active, 1), 'Attendance_Rate': 100.0 }) # Save to CSV if monthly_records: with open(self.master_monthly_csv, 'w', newline='') as f: fieldnames = ['Month', 'User_ID', 'Name', 'Total_Hours', 'Days_Active', 'Work_Days', 'Avg_Hours_Per_Active_Day', 'Attendance_Rate'] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(monthly_records) print(f"✅ Saved monthly summaries to: {self.master_monthly_csv}") return monthly_data def generate_user_statistics(self, daily_summaries): """Generate overall user statistics""" user_stats = defaultdict(lambda: { 'name': '', 'total_hours': 0, 'total_days': 0, 'days_active': 0, 'earliest_start_ever': None, 'latest_end_ever': None, 'avg_start_time': [], 'avg_end_time': [], 'most_active_day': defaultdict(int), 'monthly_hours': defaultdict(float) }) # Process all daily summaries for summary in daily_summaries: date = datetime.strptime(summary['date'], '%Y-%m-%d').date() month_key = date.strftime('%Y-%m') day_name = date.strftime('%A') for user_id, user_data in summary['users'].items(): user_stats[user_id]['name'] = user_data['name'] user_stats[user_id]['total_days'] += 1 if user_data['total_hours'] > 0: user_stats[user_id]['days_active'] += 1 user_stats[user_id]['total_hours'] += user_data['total_hours'] user_stats[user_id]['monthly_hours'][month_key] += user_data['total_hours'] user_stats[user_id]['most_active_day'][day_name] += 1 # Track start/end times if user_data['first_seen']: user_stats[user_id]['avg_start_time'].append(user_data['first_seen']) if not user_stats[user_id]['earliest_start_ever'] or user_data['first_seen'] < user_stats[user_id]['earliest_start_ever']: user_stats[user_id]['earliest_start_ever'] = user_data['first_seen'] if user_data['last_seen']: user_stats[user_id]['avg_end_time'].append(user_data['last_seen']) if not user_stats[user_id]['latest_end_ever'] or user_data['last_seen'] > user_stats[user_id]['latest_end_ever']: user_stats[user_id]['latest_end_ever'] = user_data['last_seen'] # Calculate averages and create records stats_records = [] for user_id, stats in user_stats.items(): if stats['days_active'] > 0: # Calculate average times avg_start = self._calculate_average_time(stats['avg_start_time']) if stats['avg_start_time'] else '' avg_end = self._calculate_average_time(stats['avg_end_time']) if stats['avg_end_time'] else '' # Find most active day of week if stats['most_active_day']: most_active_day = max(stats['most_active_day'].items(), key=lambda x: x[1])[0] else: most_active_day = '' # Calculate monthly average monthly_avg = sum(stats['monthly_hours'].values()) / len(stats['monthly_hours']) if stats['monthly_hours'] else 0 stats_records.append({ 'User_ID': user_id, 'Name': stats['name'], 'Total_Hours': round(stats['total_hours'], 2), 'Days_Active': stats['days_active'], 'Days_Total': stats['total_days'], 'Attendance_Rate': round((stats['days_active'] / stats['total_days']) * 100, 1), 'Avg_Hours_Per_Active_Day': round(stats['total_hours'] / stats['days_active'], 2), 'Avg_Start_Time': avg_start, 'Avg_End_Time': avg_end, 'Earliest_Start': stats['earliest_start_ever'] or '', 'Latest_End': stats['latest_end_ever'] or '', 'Most_Active_Day': most_active_day, 'Monthly_Avg_Hours': round(monthly_avg, 2) }) # Sort by total hours stats_records.sort(key=lambda x: x['Total_Hours'], reverse=True) # Save to CSV if stats_records: with open(self.user_stats_csv, 'w', newline='') as f: fieldnames = ['User_ID', 'Name', 'Total_Hours', 'Days_Active', 'Days_Total', 'Attendance_Rate', 'Avg_Hours_Per_Active_Day', 'Avg_Start_Time', 'Avg_End_Time', 'Earliest_Start', 'Latest_End', 'Most_Active_Day', 'Monthly_Avg_Hours'] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(stats_records) print(f"✅ Saved user statistics to: {self.user_stats_csv}") return stats_records def _calculate_average_time(self, times): """Calculate average time from list of HH:MM strings""" if not times: return '' total_minutes = 0 for time_str in times: h, m = map(int, time_str.split(':')) total_minutes += h * 60 + m avg_minutes = total_minutes // len(times) hours = avg_minutes // 60 minutes = avg_minutes % 60 return f"{hours:02d}:{minutes:02d}" def save_json_summary(self, daily_summaries, weekly_data, monthly_data, user_stats): """Save all summary data as JSON for web visualization""" summary_data = { 'generated_at': datetime.now(TIMEZONE).isoformat(), 'date_range': { 'start': daily_summaries[0]['date'] if daily_summaries else '', 'end': daily_summaries[-1]['date'] if daily_summaries else '', 'total_days': len(daily_summaries) }, 'recent_days': [], 'user_rankings': [], 'monthly_trends': {}, 'team_stats': {} } # Add recent 30 days for summary in daily_summaries[-30:]: summary_data['recent_days'].append({ 'date': summary['date'], 'active_users': summary['team_totals']['active_users'], 'total_users': summary['team_totals']['total_users'], 'total_hours': round(summary['team_totals']['total_hours'], 2), 'earliest_start': summary['team_totals']['earliest_start'], 'latest_end': summary['team_totals']['latest_end'] }) # Add user rankings (top 20) for stat in user_stats[:20]: summary_data['user_rankings'].append({ 'name': stat['Name'], 'total_hours': stat['Total_Hours'], 'attendance_rate': stat['Attendance_Rate'], 'avg_hours': stat['Avg_Hours_Per_Active_Day'] }) # Add monthly trends for month, data in sorted(monthly_data.items()): summary_data['monthly_trends'][month] = { 'total_hours': round(data['team_totals']['total_hours'], 2), 'work_days': data['team_totals']['work_days'], 'avg_active_users': round( sum(data['team_totals']['active_users_per_day']) / len(data['team_totals']['active_users_per_day']) if data['team_totals']['active_users_per_day'] else 0, 1 ) } # Calculate team statistics if daily_summaries: total_hours = sum(s['team_totals']['total_hours'] for s in daily_summaries) active_days = sum(1 for s in daily_summaries if s['team_totals']['active_users'] > 0) summary_data['team_stats'] = { 'total_hours_all_time': round(total_hours, 2), 'avg_daily_hours': round(total_hours / len(daily_summaries), 2) if daily_summaries else 0, 'total_tracked_users': len(user_stats), 'days_with_activity': active_days } # Save to JSON with open(self.summary_json, 'w') as f: json.dump(summary_data, f, indent=2) print(f"✅ Saved JSON data to: {self.summary_json}") return summary_data def run(self): """Run the complete rebuild process""" print("\n" + "="*60) print("REBUILDING ALL HISTORICAL SUMMARIES") print("="*60) # Step 1: Process all daily summaries print("\n📊 Step 1: Processing daily summaries...") daily_summaries = self.rebuild_all_daily_summaries() if not daily_summaries: print("❌ No data to process!") return False # Step 2: Generate weekly summaries print("\n📊 Step 2: Generating weekly summaries...") weekly_data = self.generate_weekly_summaries(daily_summaries) # Step 3: Generate monthly summaries print("\n📊 Step 3: Generating monthly summaries...") monthly_data = self.generate_monthly_summaries(daily_summaries) # Step 4: Generate user statistics print("\n📊 Step 4: Calculating user statistics...") user_stats = self.generate_user_statistics(daily_summaries) # Step 5: Save JSON for visualization print("\n📊 Step 5: Saving JSON data for visualization...") self.save_json_summary(daily_summaries, weekly_data, monthly_data, user_stats) print("\n" + "="*60) print("✅ SUMMARY REBUILD COMPLETE!") print("="*60) print("\nGenerated files:") print(f" 1. {self.master_daily_csv}") print(f" 2. {self.master_weekly_csv}") print(f" 3. {self.master_monthly_csv}") print(f" 4. {self.user_stats_csv}") print(f" 5. {self.summary_json}") print("\nNext step: Open the HTML dashboard to visualize the data") return True def main(): """Main function""" builder = HistoricalSummaryBuilder() success = builder.run() return 0 if success else 1 if __name__ == "__main__": import sys sys.exit(main())