#!/usr/bin/env python3 """ Employee Activity Report Generator Generates comprehensive HTML reports showing employee work patterns over multiple months """ import csv import argparse import json from pathlib import Path from datetime import datetime, timedelta from collections import defaultdict from typing import Dict, List, Tuple # Configuration RAW_DATA_DIR = Path("data/raw") OUTPUT_DIR = Path("reports") OUTPUT_DIR.mkdir(parents=True, exist_ok=True) def get_date_range(months: int = 3, start_date: str = None, end_date: str = None) -> Tuple[datetime, datetime]: """Calculate the date range for the report""" if start_date and end_date: start = datetime.strptime(start_date, '%Y-%m-%d') end = datetime.strptime(end_date, '%Y-%m-%d') elif months: end = datetime.now() start = end - timedelta(days=months * 30) else: # Default to last 3 months end = datetime.now() start = end - timedelta(days=90) return start.date(), end.date() def load_user_data(user_name: str, start_date, end_date) -> List[Dict]: """Load all CSV data for a specific user within date range""" all_data = [] csv_files = sorted(RAW_DATA_DIR.glob("*.csv")) for csv_file in csv_files: # Extract date from filename filename = csv_file.stem if '_' in filename: date_str = filename.split('_')[-1] try: file_date = datetime.strptime(date_str, '%Y-%m-%d').date() # Skip if outside date range if file_date < start_date or file_date > end_date: continue # Read CSV with open(csv_file, 'r') as f: reader = csv.DictReader(f) for row in reader: if row.get('user_name') == user_name or row.get('user_id') == user_name: row['date'] = date_str row['file_date'] = file_date # Extract time from timestamp if 'timestamp' in row and row['timestamp']: try: timestamp = row['timestamp'] if 'T' in timestamp: time_part = timestamp.split('T')[1] elif ' ' in timestamp: time_part = timestamp.split(' ')[1] else: time_part = timestamp time_clean = time_part.split('+')[0].split('-')[0].split('.')[0].split('Z')[0] if ':' in time_clean: parts = time_clean.split(':') if len(parts) >= 2: row['time'] = f"{parts[0].zfill(2)}:{parts[1].zfill(2)}" except: row['time'] = '' all_data.append(row) except: continue return all_data def analyze_daily_activity(data: List[Dict]) -> Dict: """Analyze data and generate daily statistics""" daily_stats = defaultdict(lambda: { 'date': '', 'day_of_week': '', 'total_minutes': 0, 'active_minutes': 0, 'first_active': None, 'last_active': None, 'first_seen': None, 'last_seen': None }) for row in data: date = row.get('date') if not date: continue stats = daily_stats[date] stats['date'] = date stats['day_of_week'] = row.get('file_date', datetime.strptime(date, '%Y-%m-%d').date()).strftime('%A') stats['total_minutes'] += 1 if row.get('presence') == 'active': stats['active_minutes'] += 1 time_val = row.get('time', '') if time_val: if not stats['first_active'] or time_val < stats['first_active']: stats['first_active'] = time_val if not stats['last_active'] or time_val > stats['last_active']: stats['last_active'] = time_val # Track all seen times (active or not) time_val = row.get('time', '') if time_val: if not stats['first_seen'] or time_val < stats['first_seen']: stats['first_seen'] = time_val if not stats['last_seen'] or time_val > stats['last_seen']: stats['last_seen'] = time_val # Convert to list and add calculated fields result = [] for date, stats in sorted(daily_stats.items()): stats['active_hours'] = round(stats['active_minutes'] / 60, 2) stats['activity_rate'] = round((stats['active_minutes'] / stats['total_minutes'] * 100) if stats['total_minutes'] > 0 else 0, 1) result.append(stats) return result def group_by_week(daily_stats: List[Dict]) -> List[Dict]: """Group daily stats into weeks""" if not daily_stats: return [] weeks = [] current_week = [] current_week_num = None for day in daily_stats: date = datetime.strptime(day['date'], '%Y-%m-%d').date() week_num = date.isocalendar()[1] if current_week_num is None: current_week_num = week_num if week_num != current_week_num: # Save current week if current_week: weeks.append({ 'week_num': current_week_num, 'start_date': current_week[0]['date'], 'end_date': current_week[-1]['date'], 'days': current_week, 'total_hours': sum(d['active_hours'] for d in current_week), 'avg_hours': sum(d['active_hours'] for d in current_week) / len(current_week), 'working_days': len([d for d in current_week if d['active_hours'] > 0]) }) current_week = [] current_week_num = week_num current_week.append(day) # Add last week if current_week: weeks.append({ 'week_num': current_week_num, 'start_date': current_week[0]['date'], 'end_date': current_week[-1]['date'], 'days': current_week, 'total_hours': sum(d['active_hours'] for d in current_week), 'avg_hours': sum(d['active_hours'] for d in current_week) / len(current_week), 'working_days': len([d for d in current_week if d['active_hours'] > 0]) }) return weeks def group_by_month(daily_stats: List[Dict]) -> List[Dict]: """Group daily stats into months""" if not daily_stats: return [] months = defaultdict(list) for day in daily_stats: month_key = day['date'][:7] # YYYY-MM months[month_key].append(day) result = [] for month_key, days in sorted(months.items()): month_date = datetime.strptime(month_key, '%Y-%m') # Group this month's days into weeks weeks = group_by_week(days) result.append({ 'month': month_key, 'month_name': month_date.strftime('%B %Y'), 'days': days, 'weeks': weeks, 'total_hours': sum(d['active_hours'] for d in days), 'avg_hours_per_day': sum(d['active_hours'] for d in days) / len(days), 'working_days': len([d for d in days if d['active_hours'] > 0]), 'total_days': len(days) }) return result def calculate_time_patterns(daily_stats: List[Dict]) -> Dict: """Calculate typical time patterns""" start_times = [] end_times = [] hourly_activity = defaultdict(int) hourly_total = defaultdict(int) for day in daily_stats: if day['first_active']: start_times.append(day['first_active']) if day['last_active']: end_times.append(day['last_active']) # Count hourly activity (would need minute-by-minute data for accuracy) # This is a simplified version if day['first_active'] and day['last_active']: start_hour = int(day['first_active'].split(':')[0]) end_hour = int(day['last_active'].split(':')[0]) for hour in range(start_hour, end_hour + 1): hourly_activity[hour] += 1 hourly_total[hour] += 1 # Calculate averages hourly_percentages = {} for hour in range(24): if hour in hourly_total: hourly_percentages[hour] = round((hourly_activity[hour] / len(daily_stats) * 100), 1) else: hourly_percentages[hour] = 0 # Find typical start/end times (median) start_times.sort() end_times.sort() typical_start = start_times[len(start_times) // 2] if start_times else 'N/A' typical_end = end_times[len(end_times) // 2] if end_times else 'N/A' return { 'typical_start': typical_start, 'typical_end': typical_end, 'earliest_ever': min(start_times) if start_times else 'N/A', 'latest_ever': max(end_times) if end_times else 'N/A', 'hourly_activity': hourly_percentages } def generate_html_report(user_name: str, daily_stats: List[Dict], months_data: List[Dict], time_patterns: Dict, start_date, end_date, raw_data: List[Dict] = None) -> str: """Generate the HTML report""" # Build per-day active minute indices for activity scatter charts activity_by_date: Dict[str, List[int]] = {} if raw_data: for row in raw_data: if row.get('presence') == 'active' and row.get('time') and row.get('date'): try: parts = row['time'].split(':') minute_idx = int(parts[0]) * 60 + int(parts[1]) activity_by_date.setdefault(row['date'], []).append(minute_idx) except Exception: pass # Calculate overall stats total_days = len(daily_stats) active_days = len([d for d in daily_stats if d['active_hours'] > 0]) absent_days = total_days - active_days total_hours = sum(d['active_hours'] for d in daily_stats) avg_hours_per_day = total_hours / active_days if active_days > 0 else 0 # Calculate weekly average all_weeks = [] for month in months_data: all_weeks.extend(month['weeks']) avg_hours_per_week = sum(w['total_hours'] for w in all_weeks) / len(all_weeks) if all_weeks else 0 # Get user details (from first data point) user_id = daily_stats[0].get('user_id', '') if daily_stats else '' department = daily_stats[0].get('department', '') if daily_stats else '' team = daily_stats[0].get('team', '') if daily_stats else '' # Prepare data for charts months_chart_data = { 'labels': [m['month_name'] for m in months_data], 'data': [round(m['total_hours'], 1) for m in months_data] } # Generate HTML html = f""" Activity Report - {user_name}

📊 Activity Report: {user_name}

Period: {start_date.strftime('%B %d, %Y')} - {end_date.strftime('%B %d, %Y')}
{f'Department: {department} | ' if department else ''} {f'Team: {team} | ' if team else ''} User ID: {user_id}

📈 Overall Summary ({len(months_data)} Months)

Total Days Tracked
{total_days}
Active Days
{active_days}
Absent Days
{absent_days}
Total Hours
{total_hours:.1f}h
Avg Hours/Day
{avg_hours_per_day:.1f}h
Avg Hours/Week
{avg_hours_per_week:.1f}h
Earliest Start
{time_patterns['earliest_ever']}
Latest End
{time_patterns['latest_ever']}

📅 Monthly Comparison

📆 Monthly Breakdown

""" # Add each month section for i, month in enumerate(months_data): expanded_class = 'expanded' if i == len(months_data) - 1 else '' # Expand most recent month html += f"""

{month['month_name']}

{month['total_hours']:.1f}h total {'▼' if expanded_class else '▶'}
Total Hours
{month['total_hours']:.1f}h
Working Days
{month['working_days']}/{month['total_days']}
Avg Hours/Day
{month['avg_hours_per_day']:.1f}h
Number of Weeks
{len(month['weeks'])}

Weekly Breakdown

""" # Add each week in this month for j, week in enumerate(month['weeks']): week_start = datetime.strptime(week['start_date'], '%Y-%m-%d').strftime('%b %d') week_end = datetime.strptime(week['end_date'], '%Y-%m-%d').strftime('%b %d') expected_hours = week['working_days'] * 8 # Assuming 8h/day expected percentage = (week['total_hours'] / expected_hours * 100) if expected_hours > 0 else 0 html += f"""
Week {j+1} ({week_start} - {week_end})
{week['total_hours']:.1f}h | {week['working_days']} days
Week Progress {percentage:.0f}% of expected hours

Minute-by-Minute Activity Timeline

Each dot = 1 minute online. Gaps show breaks, lunch, and offline time.

""" # Add each day in this week for day in week['days']: day_date = datetime.strptime(day['date'], '%Y-%m-%d') is_weekend = day['day_of_week'] in ['Saturday', 'Sunday'] is_absent = day['active_hours'] == 0 and not is_weekend is_short = 0 < day['active_hours'] < 6 row_class = 'day-weekend' if is_weekend else ('day-absent' if is_absent else ('day-short' if is_short else 'day-normal')) notes = '' if is_weekend: notes = 'Weekend' elif is_absent: notes = 'Absent' elif is_short: notes = 'Short day' elif day['active_hours'] > 9: notes = 'Long day' else: notes = 'Normal' html += f""" """ html += """
Date Day Hours Start End Activity Rate Notes
{day_date.strftime('%m/%d')} {day['day_of_week'][:3]} {day['active_hours']:.1f}h {day['first_active'] or '-'} {day['last_active'] or '-'} {day['activity_rate']:.0f}% {notes}
""" html += """
""" # Add time patterns section html += f"""

🕒 Time Patterns

Typical Start Time
{time_patterns['typical_start']}
Typical End Time
{time_patterns['typical_end']}

Hourly Activity Distribution

""" # Add hourly activity bars for hour in range(6, 20): # Show 6 AM to 8 PM percentage = time_patterns['hourly_activity'].get(hour, 0) html += f"""
{hour:02d}:00
{percentage:.0f}%
""" html += """
""" return html def generate_report(user_name: str, months: int = 3, start_date: str = None, end_date: str = None): """Main function to generate a report for a user""" print(f"\n{'='*60}") print(f"GENERATING REPORT FOR: {user_name}") print(f"{'='*60}\n") # Get date range start, end = get_date_range(months, start_date, end_date) print(f"Date range: {start} to {end}") # Load user data print(f"Loading data for {user_name}...") raw_data = load_user_data(user_name, start, end) if not raw_data: print(f"❌ No data found for {user_name} in the specified date range") return None print(f"✅ Loaded {len(raw_data):,} data points") # Analyze data print("Analyzing daily activity...") daily_stats = analyze_daily_activity(raw_data) print(f"✅ Analyzed {len(daily_stats)} days") # Group by month print("Grouping by month...") months_data = group_by_month(daily_stats) print(f"✅ Processed {len(months_data)} months") # Calculate patterns print("Calculating time patterns...") time_patterns = calculate_time_patterns(daily_stats) print("✅ Patterns calculated") # Generate HTML print("Generating HTML report...") html_content = generate_html_report(user_name, daily_stats, months_data, time_patterns, start, end, raw_data) # Save to file safe_filename = user_name.lower().replace(' ', '_').replace('.', '_') filename = f"{safe_filename}_{start}_{end}.html" output_path = OUTPUT_DIR / filename with open(output_path, 'w', encoding='utf-8') as f: f.write(html_content) print(f"\n✅ Report generated: {output_path}") print(f" File size: {output_path.stat().st_size / 1024:.1f} KB") return output_path def get_all_users() -> List[str]: """Get list of all users from raw data files""" users = set() csv_files = list(RAW_DATA_DIR.glob("*.csv"))[:10] # Sample first 10 files for csv_file in csv_files: try: with open(csv_file, 'r') as f: reader = csv.DictReader(f) for row in reader: if 'user_name' in row and row['user_name']: users.add(row['user_name']) except: continue return sorted(list(users)) def main(): parser = argparse.ArgumentParser( description='Generate employee activity reports', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Generate report for one employee (last 3 months) python3 generate_employee_report.py --user "Tomas Šimkus" # Last 6 months for one employee python3 generate_employee_report.py --user "Tomas Šimkus" --months 6 # Specific date range python3 generate_employee_report.py --user "Tomas Šimkus" --start 2025-11-01 --end 2026-01-31 # Generate for all employees python3 generate_employee_report.py --all --months 3 """ ) parser.add_argument('--user', type=str, help='User name to generate report for') parser.add_argument('--all', action='store_true', help='Generate reports for all users') parser.add_argument('--months', type=int, default=3, help='Number of months to include (default: 3)') parser.add_argument('--start', type=str, help='Start date (YYYY-MM-DD)') parser.add_argument('--end', type=str, help='End date (YYYY-MM-DD)') args = parser.parse_args() if not args.user and not args.all: parser.error("Either --user or --all must be specified") if args.all: # Generate for all users users = get_all_users() print(f"\nFound {len(users)} users") print("Generating reports for all users...\n") generated = [] for user in users: try: output_path = generate_report(user, args.months, args.start, args.end) if output_path: generated.append(output_path) except Exception as e: print(f"❌ Error generating report for {user}: {e}") continue print(f"\n{'='*60}") print(f"✅ Generated {len(generated)} reports") print(f"{'='*60}") # Create index file create_index_file(generated) else: # Generate for single user generate_report(args.user, args.months, args.start, args.end) def create_index_file(report_paths: List[Path]): """Create an index.html file listing all reports""" html = """ Employee Reports - Index

📊 Employee Activity Reports

Generated on """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """

""" for report_path in sorted(report_paths): filename = report_path.name parts = filename.replace('.html', '').split('_') # Extract user name and dates if len(parts) >= 3: user_name = ' '.join(parts[:-2]).replace('_', ' ').title() date_range = f"{parts[-2]} to {parts[-1]}" else: user_name = filename date_range = "" html += f"""
{user_name}
{date_range}
View Report →
""" html += """
""" index_path = OUTPUT_DIR / 'index.html' with open(index_path, 'w', encoding='utf-8') as f: f.write(html) print(f"\n✅ Index created: {index_path}") if __name__ == "__main__": main()