#!/usr/bin/env python3 """ Team data handler for managing individual and team-wide summaries Fixed version with proper file pattern matching """ import csv import os import fcntl from datetime import datetime, date, timedelta from pathlib import Path from collections import defaultdict from config_multi_user import ( get_raw_data_file, get_summary_file, TIMEZONE, setup_logging, RAW_DATA_DIR, SUMMARIES_DIR, USE_READABLE_NAMES ) class TeamDataHandler: """Handler for team-wide data operations""" def __init__(self): """Initialize the data handler""" self.logger = setup_logging('team_data_handler') self.team_summary_file = SUMMARIES_DIR / "team_summary.csv" self.team_weekly_file = SUMMARIES_DIR / "team_weekly.csv" self.team_monthly_file = SUMMARIES_DIR / "team_monthly.csv" def save_user_presence(self, presence_data): """ Save presence data to individual user CSV file with readable names Args: presence_data: Dictionary with presence information including user_id """ user_id = presence_data['user_id'] # Use sanitized name if available for the filename user_name = presence_data.get('sanitized_name') if user_name: # Use the readable name for the file csv_file = get_raw_data_file(user_id=user_id, user_name=user_name) else: # Fallback to user_id if no name available csv_file = get_raw_data_file(user_id=user_id) write_header = not csv_file.exists() try: with open(csv_file, 'a', newline='') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: # Include department and team if available fieldnames = ['timestamp', 'user_id', 'user_name', 'presence', 'auto_away', 'manual_away', 'last_activity', 'department', 'team'] writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore') if write_header: writer.writeheader() self.logger.info(f"Created new raw data file for {presence_data.get('user_name', user_id)}: {csv_file}") # The writer will only include fields that exist in fieldnames # extrasaction='ignore' means extra fields in presence_data are ignored writer.writerow(presence_data) finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except Exception as e: self.logger.error(f"Failed to save presence data for {user_id}: {e}") raise def read_user_daily_data(self, user_id, user_name=None, target_date=None): """Read presence data for a specific user and date""" if target_date is None: target_date = datetime.now(TIMEZONE).date() # Try to get the file with the readable name first if user_name: csv_file = get_raw_data_file(target_date, user_id=user_id, user_name=user_name) else: csv_file = get_raw_data_file(target_date, user_id=user_id) if not csv_file.exists(): self.logger.debug(f"No data file found for user {user_id} on {target_date}: {csv_file}") return [] try: data = [] with open(csv_file, 'r') as f: reader = csv.DictReader(f) for row in reader: data.append(row) self.logger.debug(f"Read {len(data)} rows for user {user_id} from {csv_file}") return data except Exception as e: self.logger.error(f"Failed to read data for user {user_id}: {e}") return [] def calculate_user_summary(self, user_data): """Calculate summary statistics for a single user's data""" if not user_data: return { 'total_minutes': 0, 'first_seen': None, 'last_seen': None, 'hourly_breakdown': {h: 0 for h in range(24)} } hourly_minutes = {hour: 0 for hour in range(24)} active_times = [] for row in user_data: if row['presence'] == 'active': timestamp = datetime.fromisoformat(row['timestamp']) hour = timestamp.hour hourly_minutes[hour] += 1 active_times.append(timestamp) first_seen = min(active_times).strftime('%H:%M') if active_times else None last_seen = max(active_times).strftime('%H:%M') if active_times else None return { 'total_minutes': sum(hourly_minutes.values()), 'first_seen': first_seen, 'last_seen': last_seen, 'hourly_breakdown': hourly_minutes } def get_all_tracked_users(self, target_date=None): """Get list of all users who have data files for the given date""" if target_date is None: target_date = datetime.now(TIMEZONE).date() # Pattern depends on USE_READABLE_NAMES setting date_str = target_date.strftime('%Y-%m-%d') # Look for all CSV files ending with the target date pattern = f"*_{date_str}.csv" user_files = list(RAW_DATA_DIR.glob(pattern)) self.logger.info(f"Found {len(user_files)} user files for {date_str} with pattern: {pattern}") users = [] user_ids_seen = set() for file_path in user_files: # Try to extract user info from the file try: with open(file_path, 'r') as f: reader = csv.DictReader(f) first_row = next(reader, None) if first_row and 'user_id' in first_row: user_id = first_row['user_id'] if user_id not in user_ids_seen: user_ids_seen.add(user_id) # Extract sanitized name from filename filename = file_path.stem # Get filename without extension # Remove date part from filename name_part = filename.replace(f"_{date_str}", "") if name_part.startswith("presence_"): name_part = name_part.replace("presence_", "") users.append({ 'user_id': user_id, 'user_name': first_row.get('user_name', 'Unknown'), 'sanitized_name': name_part }) self.logger.debug(f"Found user: {user_id} - {first_row.get('user_name')} from file {file_path.name}") except Exception as e: self.logger.warning(f"Could not read file {file_path}: {e}") continue self.logger.info(f"Extracted {len(users)} unique users from files") return users def generate_daily_team_summary(self, target_date=None): """ Generate team-wide daily summary combining all users Returns: dict: Team summary with per-user and aggregate statistics """ if target_date is None: target_date = (datetime.now(TIMEZONE) - timedelta(days=1)).date() self.logger.info(f"Generating team summary for {target_date}") # Get all tracked users users = self.get_all_tracked_users(target_date) if not users: self.logger.warning(f"No users found for {target_date}") return None team_summary = { 'date': target_date.strftime('%Y-%m-%d'), 'users': {}, 'team_totals': { 'total_users': len(users), 'active_users': 0, 'total_hours': 0, 'average_hours': 0, 'earliest_start': None, 'latest_end': None }, 'hourly_team_presence': {h: 0 for h in range(24)} } earliest_start = None latest_end = None # Process each user for user_info in users: user_id = user_info['user_id'] user_name = user_info['user_name'] sanitized_name = user_info.get('sanitized_name') # Read user's data with the sanitized name user_data = self.read_user_daily_data(user_id, sanitized_name, target_date) if not user_data: self.logger.warning(f"No data found for user {user_id} ({user_name}) on {target_date}") # Calculate user summary user_summary = self.calculate_user_summary(user_data) # Format time total_hours = user_summary['total_minutes'] / 60 # Store user summary team_summary['users'][user_id] = { 'name': user_name, 'total_hours': round(total_hours, 2), 'total_minutes': user_summary['total_minutes'], 'first_seen': user_summary['first_seen'], 'last_seen': user_summary['last_seen'] } # Update team totals if user_summary['total_minutes'] > 0: team_summary['team_totals']['active_users'] += 1 team_summary['team_totals']['total_hours'] += total_hours # Track earliest/latest if user_summary['first_seen']: if not earliest_start or user_summary['first_seen'] < earliest_start: earliest_start = user_summary['first_seen'] if user_summary['last_seen']: if not latest_end or user_summary['last_seen'] > latest_end: latest_end = user_summary['last_seen'] # Add to hourly team presence for hour, minutes in user_summary['hourly_breakdown'].items(): if minutes > 0: team_summary['hourly_team_presence'][hour] += 1 # Calculate averages if team_summary['team_totals']['active_users'] > 0: team_summary['team_totals']['average_hours'] = round( team_summary['team_totals']['total_hours'] / team_summary['team_totals']['active_users'], 2 ) team_summary['team_totals']['earliest_start'] = earliest_start team_summary['team_totals']['latest_end'] = latest_end self.logger.info(f"Summary complete: {team_summary['team_totals']['active_users']} active users out of {team_summary['team_totals']['total_users']}") return team_summary def save_team_summary_to_csv(self, team_summary): """Save team summary to CSV file""" if not team_summary: return date_str = team_summary['date'] try: # Read existing data existing_data = [] headers = ['Date', 'User', 'Name', 'Hours', 'First Seen', 'Last Seen'] if self.team_summary_file.exists(): with open(self.team_summary_file, 'r') as f: reader = csv.DictReader(f) # Keep all data except for the date we're updating existing_data = [row for row in reader if row.get('Date') != date_str] # Add new data for user_id, user_data in team_summary['users'].items(): existing_data.append({ 'Date': date_str, 'User': user_id, 'Name': user_data['name'], 'Hours': f"{user_data['total_hours']:.2f}", 'First Seen': user_data['first_seen'] or '-', 'Last Seen': user_data['last_seen'] or '-' }) # Add team totals row existing_data.append({ 'Date': date_str, 'User': 'TEAM_TOTAL', 'Name': f"{team_summary['team_totals']['active_users']} active users", 'Hours': f"{team_summary['team_totals']['total_hours']:.2f}", 'First Seen': team_summary['team_totals']['earliest_start'] or '-', 'Last Seen': team_summary['team_totals']['latest_end'] or '-' }) # Sort by date and user existing_data.sort(key=lambda x: (x['Date'], x['User'])) # Write back with open(self.team_summary_file, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=headers) writer.writeheader() writer.writerows(existing_data) self.logger.info(f"Saved team summary for {date_str}") except Exception as e: self.logger.error(f"Failed to save team summary: {e}") def generate_weekly_summary(self, week_start_date=None): """Generate weekly team summary""" if week_start_date is None: # Default to last week's Monday today = datetime.now(TIMEZONE).date() days_since_monday = today.weekday() last_monday = today - timedelta(days=days_since_monday + 7) week_start_date = last_monday week_data = { 'week_start': week_start_date.strftime('%Y-%m-%d'), 'week_end': (week_start_date + timedelta(days=6)).strftime('%Y-%m-%d'), 'users': defaultdict(lambda: { 'name': '', 'total_hours': 0, 'days_active': 0, 'average_hours_per_day': 0 }), 'daily_active_users': {}, 'team_totals': { 'total_hours': 0, 'average_daily_active': 0, 'most_active_day': None, 'least_active_day': None } } # Process each day of the week for day_offset in range(7): current_date = week_start_date + timedelta(days=day_offset) day_summary = self.generate_daily_team_summary(current_date) if day_summary: day_name = current_date.strftime('%A') week_data['daily_active_users'][day_name] = day_summary['team_totals']['active_users'] # Aggregate user data for user_id, user_data in day_summary['users'].items(): if user_data['total_hours'] > 0: week_data['users'][user_id]['name'] = user_data['name'] week_data['users'][user_id]['total_hours'] += user_data['total_hours'] week_data['users'][user_id]['days_active'] += 1 # Calculate averages for user_id, user_data in week_data['users'].items(): if user_data['days_active'] > 0: user_data['average_hours_per_day'] = round( user_data['total_hours'] / user_data['days_active'], 2 ) week_data['team_totals']['total_hours'] += user_data['total_hours'] # Find most/least active days if week_data['daily_active_users']: max_day = max(week_data['daily_active_users'].items(), key=lambda x: x[1]) min_day = min(week_data['daily_active_users'].items(), key=lambda x: x[1]) week_data['team_totals']['most_active_day'] = f"{max_day[0]} ({max_day[1]} users)" week_data['team_totals']['least_active_day'] = f"{min_day[0]} ({min_day[1]} users)" week_data['team_totals']['average_daily_active'] = round( sum(week_data['daily_active_users'].values()) / len(week_data['daily_active_users']), 1 ) return week_data def generate_monthly_summary(self, year=None, month=None): """Generate monthly team summary""" if year is None or month is None: # Default to last month today = datetime.now(TIMEZONE).date() first_of_month = today.replace(day=1) last_month = first_of_month - timedelta(days=1) year = last_month.year month = last_month.month # Get first and last day of month month_start = date(year, month, 1) if month == 12: month_end = date(year + 1, 1, 1) - timedelta(days=1) else: month_end = date(year, month + 1, 1) - timedelta(days=1) month_data = { 'month': f"{year}-{month:02d}", 'users': defaultdict(lambda: { 'name': '', 'total_hours': 0, 'days_active': 0, 'average_hours_per_day': 0, 'attendance_rate': 0 }), 'team_totals': { 'total_hours': 0, 'average_daily_active': 0, 'total_work_days': 0 } } # Count work days (excluding weekends) current = month_start work_days = 0 daily_totals = [] while current <= month_end: if current.weekday() < 5: # Monday = 0, Friday = 4 work_days += 1 day_summary = self.generate_daily_team_summary(current) if day_summary: daily_totals.append(day_summary['team_totals']['active_users']) # Aggregate user data for user_id, user_data in day_summary['users'].items(): if user_data['total_hours'] > 0: month_data['users'][user_id]['name'] = user_data['name'] month_data['users'][user_id]['total_hours'] += user_data['total_hours'] month_data['users'][user_id]['days_active'] += 1 current += timedelta(days=1) month_data['team_totals']['total_work_days'] = work_days # Calculate statistics for user_id, user_data in month_data['users'].items(): if user_data['days_active'] > 0: user_data['average_hours_per_day'] = round( user_data['total_hours'] / user_data['days_active'], 2 ) user_data['attendance_rate'] = round( (user_data['days_active'] / work_days) * 100, 1 ) month_data['team_totals']['total_hours'] += user_data['total_hours'] if daily_totals: month_data['team_totals']['average_daily_active'] = round( sum(daily_totals) / len(daily_totals), 1 ) return month_data # Singleton instance _handler_instance = None def get_team_handler(): """Get or create a singleton team data handler instance""" global _handler_instance if _handler_instance is None: _handler_instance = TeamDataHandler() return _handler_instance