#!/usr/bin/env python3 """ Employee Activity Report Generator Generates comprehensive HTML reports showing employee work patterns over multiple months """ import csv import argparse import json from pathlib import Path from datetime import datetime, timedelta from collections import defaultdict from typing import Dict, List, Tuple # Configuration RAW_DATA_DIR = Path("data/raw") OUTPUT_DIR = Path("reports") OUTPUT_DIR.mkdir(parents=True, exist_ok=True) def get_date_range(months: int = 3, start_date: str = None, end_date: str = None) -> Tuple[datetime, datetime]: """Calculate the date range for the report""" if start_date and end_date: start = datetime.strptime(start_date, '%Y-%m-%d') end = datetime.strptime(end_date, '%Y-%m-%d') elif months: end = datetime.now() start = end - timedelta(days=months * 30) else: # Default to last 3 months end = datetime.now() start = end - timedelta(days=90) return start.date(), end.date() def load_user_data(user_name: str, start_date, end_date) -> List[Dict]: """Load all CSV data for a specific user within date range""" all_data = [] csv_files = sorted(RAW_DATA_DIR.glob("*.csv")) for csv_file in csv_files: # Extract date from filename filename = csv_file.stem if '_' in filename: date_str = filename.split('_')[-1] try: file_date = datetime.strptime(date_str, '%Y-%m-%d').date() # Skip if outside date range if file_date < start_date or file_date > end_date: continue # Read CSV with open(csv_file, 'r') as f: reader = csv.DictReader(f) for row in reader: if row.get('user_name') == user_name or row.get('user_id') == user_name: row['date'] = date_str row['file_date'] = file_date # Extract time from timestamp if 'timestamp' in row and row['timestamp']: try: timestamp = row['timestamp'] if 'T' in timestamp: time_part = timestamp.split('T')[1] elif ' ' in timestamp: time_part = timestamp.split(' ')[1] else: time_part = timestamp time_clean = time_part.split('+')[0].split('-')[0].split('.')[0].split('Z')[0] if ':' in time_clean: parts = time_clean.split(':') if len(parts) >= 2: row['time'] = f"{parts[0].zfill(2)}:{parts[1].zfill(2)}" except: row['time'] = '' all_data.append(row) except: continue return all_data def analyze_daily_activity(data: List[Dict]) -> Dict: """Analyze data and generate daily statistics""" daily_stats = defaultdict(lambda: { 'date': '', 'day_of_week': '', 'total_minutes': 0, 'active_minutes': 0, 'first_active': None, 'last_active': None, 'first_seen': None, 'last_seen': None }) for row in data: date = row.get('date') if not date: continue stats = daily_stats[date] stats['date'] = date stats['day_of_week'] = row.get('file_date', datetime.strptime(date, '%Y-%m-%d').date()).strftime('%A') stats['total_minutes'] += 1 if row.get('presence') == 'active': stats['active_minutes'] += 1 time_val = row.get('time', '') if time_val: if not stats['first_active'] or time_val < stats['first_active']: stats['first_active'] = time_val if not stats['last_active'] or time_val > stats['last_active']: stats['last_active'] = time_val # Track all seen times (active or not) time_val = row.get('time', '') if time_val: if not stats['first_seen'] or time_val < stats['first_seen']: stats['first_seen'] = time_val if not stats['last_seen'] or time_val > stats['last_seen']: stats['last_seen'] = time_val # Convert to list and add calculated fields result = [] for date, stats in sorted(daily_stats.items()): stats['active_hours'] = round(stats['active_minutes'] / 60, 2) stats['activity_rate'] = round((stats['active_minutes'] / stats['total_minutes'] * 100) if stats['total_minutes'] > 0 else 0, 1) result.append(stats) return result def group_by_week(daily_stats: List[Dict]) -> List[Dict]: """Group daily stats into weeks""" if not daily_stats: return [] weeks = [] current_week = [] current_week_num = None for day in daily_stats: date = datetime.strptime(day['date'], '%Y-%m-%d').date() week_num = date.isocalendar()[1] if current_week_num is None: current_week_num = week_num if week_num != current_week_num: # Save current week if current_week: weeks.append({ 'week_num': current_week_num, 'start_date': current_week[0]['date'], 'end_date': current_week[-1]['date'], 'days': current_week, 'total_hours': sum(d['active_hours'] for d in current_week), 'avg_hours': sum(d['active_hours'] for d in current_week) / len(current_week), 'working_days': len([d for d in current_week if d['active_hours'] > 0]) }) current_week = [] current_week_num = week_num current_week.append(day) # Add last week if current_week: weeks.append({ 'week_num': current_week_num, 'start_date': current_week[0]['date'], 'end_date': current_week[-1]['date'], 'days': current_week, 'total_hours': sum(d['active_hours'] for d in current_week), 'avg_hours': sum(d['active_hours'] for d in current_week) / len(current_week), 'working_days': len([d for d in current_week if d['active_hours'] > 0]) }) return weeks def group_by_month(daily_stats: List[Dict]) -> List[Dict]: """Group daily stats into months""" if not daily_stats: return [] months = defaultdict(list) for day in daily_stats: month_key = day['date'][:7] # YYYY-MM months[month_key].append(day) result = [] for month_key, days in sorted(months.items()): month_date = datetime.strptime(month_key, '%Y-%m') # Group this month's days into weeks weeks = group_by_week(days) result.append({ 'month': month_key, 'month_name': month_date.strftime('%B %Y'), 'days': days, 'weeks': weeks, 'total_hours': sum(d['active_hours'] for d in days), 'avg_hours_per_day': sum(d['active_hours'] for d in days) / len(days), 'working_days': len([d for d in days if d['active_hours'] > 0]), 'total_days': len(days) }) return result def calculate_time_patterns(daily_stats: List[Dict]) -> Dict: """Calculate typical time patterns""" start_times = [] end_times = [] hourly_activity = defaultdict(int) hourly_total = defaultdict(int) for day in daily_stats: if day['first_active']: start_times.append(day['first_active']) if day['last_active']: end_times.append(day['last_active']) # Count hourly activity (would need minute-by-minute data for accuracy) # This is a simplified version if day['first_active'] and day['last_active']: start_hour = int(day['first_active'].split(':')[0]) end_hour = int(day['last_active'].split(':')[0]) for hour in range(start_hour, end_hour + 1): hourly_activity[hour] += 1 hourly_total[hour] += 1 # Calculate averages hourly_percentages = {} for hour in range(24): if hour in hourly_total: hourly_percentages[hour] = round((hourly_activity[hour] / len(daily_stats) * 100), 1) else: hourly_percentages[hour] = 0 # Find typical start/end times (median) start_times.sort() end_times.sort() typical_start = start_times[len(start_times) // 2] if start_times else 'N/A' typical_end = end_times[len(end_times) // 2] if end_times else 'N/A' return { 'typical_start': typical_start, 'typical_end': typical_end, 'earliest_ever': min(start_times) if start_times else 'N/A', 'latest_ever': max(end_times) if end_times else 'N/A', 'hourly_activity': hourly_percentages } def generate_html_report(user_name: str, daily_stats: List[Dict], months_data: List[Dict], time_patterns: Dict, start_date, end_date, raw_data: List[Dict] = None) -> str: """Generate the HTML report""" # Build per-day active minute indices for activity scatter charts activity_by_date: Dict[str, List[int]] = {} if raw_data: for row in raw_data: if row.get('presence') == 'active' and row.get('time') and row.get('date'): try: parts = row['time'].split(':') minute_idx = int(parts[0]) * 60 + int(parts[1]) activity_by_date.setdefault(row['date'], []).append(minute_idx) except Exception: pass # Calculate overall stats total_days = len(daily_stats) active_days = len([d for d in daily_stats if d['active_hours'] > 0]) absent_days = total_days - active_days total_hours = sum(d['active_hours'] for d in daily_stats) avg_hours_per_day = total_hours / active_days if active_days > 0 else 0 # Calculate weekly average all_weeks = [] for month in months_data: all_weeks.extend(month['weeks']) avg_hours_per_week = sum(w['total_hours'] for w in all_weeks) / len(all_weeks) if all_weeks else 0 # Get user details (from first data point) user_id = daily_stats[0].get('user_id', '') if daily_stats else '' department = daily_stats[0].get('department', '') if daily_stats else '' team = daily_stats[0].get('team', '') if daily_stats else '' # Prepare data for charts months_chart_data = { 'labels': [m['month_name'] for m in months_data], 'data': [round(m['total_hours'], 1) for m in months_data] } # Generate HTML html = f"""
Generated on """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """