def get_rater_ratings(num_raters, num_ratings): """ Determine which rating each rater gives based on even distribution. Args: num_raters: Number of raters num_ratings: Number of different ratings Returns: Dictionary mapping rater ID to their assigned rating """ raters_per_rating = int(num_raters / num_ratings) remainder = num_raters % num_ratings rater_ratings = {} rater_id = 1 # Distribute ratings evenly, with some ratings getting extra raters if there's a remainder for rating in range(1, num_ratings + 1): # Calculate how many raters get this rating count = raters_per_rating if remainder > 0: count += 1 remainder -= 1 # Assign this rating to the appropriate number of raters for _ in range(count): rater_ratings[rater_id] = rating rater_id += 1 return rater_ratings def validate_real_ratings(real_ratings, num_raters, num_ratings): """ Validate that the real ratings meet the required constraints: - The number of ratings equals num_raters - All ratings are between 1 and num_ratings (inclusive) Args: real_ratings: List of ratings provided by users num_raters: Expected number of raters/ratings num_ratings: Maximum allowed rating value Returns: tuple: (is_valid, error_message) is_valid: Boolean indicating if ratings are valid error_message: Description of the error if not valid, empty string otherwise """ # Check if the number of ratings matches the number of raters if len(real_ratings) != num_raters: return False, f"Error: Expected {num_raters} ratings, but got {len(real_ratings)}." # Check if all ratings are within the valid range for i, rating in enumerate(real_ratings): if rating < 1: return False, f"Error: Rating {i+1} has value {rating}, which is less than the minimum allowed value of 1." if rating > num_ratings: return False, f"Error: Rating {i+1} has value {rating}, which exceeds the maximum allowed value of {num_ratings}." # All checks passed return True, "" def calculate_squared_distance(even_ratings, real_ratings): """ Calculate the sum of squared differences between even_ratings and real_ratings. Args: even_ratings: Dictionary mapping rater ID to their assigned rating real_ratings: List of actual ratings Returns: The sum of squared differences """ squared_distance_sum = 0 for rater_id, even_rating in even_ratings.items(): real_rating = real_ratings[rater_id - 1] # Adjust for 0-based indexing difference = even_rating - real_rating squared_distance_sum += difference ** 2 return squared_distance_sum def calculate_constant_squared_distances(real_ratings, num_ratings): """ Calculate the squared distance between real_ratings and each possible constant rating list. Args: real_ratings: List of actual ratings provided by users num_ratings: Maximum rating value (will calculate for constant ratings 1 to num_ratings) Returns: Dictionary mapping each possible constant rating to its squared distance from real_ratings """ distances = {} for rating in range(1, num_ratings + 1): # Create a constant list with the current rating constant_ratings = [rating] * len(real_ratings) # Calculate squared distance for this constant rating squared_distance_sum = 0 for i, real_rating in enumerate(real_ratings): difference = constant_ratings[i] - real_rating squared_distance_sum += difference ** 2 distances[rating] = squared_distance_sum return distances def find_minimum_constant_squared_distance(real_ratings, num_ratings): """ Find the minimum squared distance between real_ratings and any possible constant rating list. Args: real_ratings: List of actual ratings provided by users num_ratings: Maximum rating value (will check constant ratings 1 to num_ratings) Returns: tuple: (min_rating, min_distance) min_rating: The constant rating value that minimizes the squared distance min_distance: The minimum squared distance value """ # Calculate all constant squared distances distances = calculate_constant_squared_distances(real_ratings, num_ratings) # Find the minimum distance and its corresponding rating min_rating = min(distances, key=distances.get) min_distance = distances[min_rating] return min_rating, min_distance def calculate_entropy(real_ratings, num_ratings, base=2): """ Calculate the entropy of the real_ratings distribution. Args: real_ratings: List of actual ratings provided by users num_ratings: Maximum possible rating value base: The logarithmic base to use for entropy calculation (default=2 for bits) Returns: tuple: (entropy_value, percent_consensus, distribution) entropy_value: The calculated entropy of the distribution percent_consensus: The percentage of consensus (100% means full agreement) distribution: The probability distribution of ratings with high precision """ import numpy as np from scipy.stats import entropy as scipy_entropy # Count occurrences of each rating counts = [0] * num_ratings for rating in real_ratings: counts[rating - 1] += 1 # Adjust for 0-based indexing # Convert counts to probability distribution with high precision # Using numpy's float64 to maintain high precision total = sum(counts) distribution = np.array([count / total for count in counts], dtype=np.float64) # Calculate entropy using SciPy entropy_value = scipy_entropy(distribution, base=base) # Calculate max entropy (uniform distribution) uniform_dist = np.array([1/num_ratings] * num_ratings, dtype=np.float64) max_entropy = scipy_entropy(uniform_dist, base=base) # Calculate min entropy (all ratings the same) min_entropy = 0 # When all ratings are the same, entropy is 0 # Calculate percent consensus if max_entropy == min_entropy: # Edge case when there's only one possible rating percent_consensus = 100 else: percent_consensus = ((max_entropy - entropy_value) / (max_entropy - min_entropy)) * 100 print('entropy_value = ', entropy_value) return entropy_value, percent_consensus, distribution def calculate_standard_deviation(real_ratings): """ Calculate the population standard deviation of the real_ratings list. Args: real_ratings: List of actual ratings provided by users Returns: The population standard deviation of the ratings """ import numpy as np # Convert to numpy array for built-in statistical functions ratings_array = np.array(real_ratings) # Calculate population standard deviation (ddof=0) std_dev = np.std(ratings_array, ddof=0) return std_dev def calculate_max_standard_deviation(num_raters, num_ratings): """ Calculate the maximum possible standard deviation, representing the most polarized distribution. This creates a distribution with ratings at the extremes (1 and num_ratings), dividing raters as evenly as possible between these extremes. Args: num_raters: Number of raters num_ratings: Maximum rating value Returns: The maximum possible standard deviation for the given parameters """ import numpy as np # Create the most polarized distribution # Half the ratings at minimum (1) and half at maximum (num_ratings) half_raters = num_raters // 2 remainder = num_raters % 2 # Create the polarized ratings list polarized_ratings = ([1] * half_raters) + ([num_ratings] * (half_raters + remainder)) # Calculate the standard deviation using the existing function return calculate_standard_deviation(polarized_ratings) def calculate_absolute_distance(even_ratings, real_ratings): """ Calculate the sum of absolute differences between even_ratings and real_ratings, divided by the number of raters. Args: even_ratings: Dictionary mapping rater ID to their assigned rating real_ratings: List of actual ratings Returns: The normalized sum of absolute differences """ absolute_distance_sum = 0 for rater_id, even_rating in even_ratings.items(): real_rating = real_ratings[rater_id - 1] # Adjust for 0-based indexing difference = abs(even_rating - real_rating) absolute_distance_sum += difference # Normalize by dividing by the number of raters return absolute_distance_sum / len(real_ratings) def calculate_constant_absolute_distances(real_ratings, num_ratings): """ Calculate the normalized absolute distance between real_ratings and each possible constant rating list. Args: real_ratings: List of actual ratings provided by users num_ratings: Maximum rating value (will calculate for constant ratings 1 to num_ratings) Returns: Dictionary mapping each possible constant rating to its normalized absolute distance from real_ratings """ distances = {} for rating in range(1, num_ratings + 1): # Create a constant list with the current rating constant_ratings = [rating] * len(real_ratings) # Calculate absolute distance for this constant rating absolute_distance_sum = 0 for i, real_rating in enumerate(real_ratings): difference = abs(constant_ratings[i] - real_rating) absolute_distance_sum += difference # Normalize by dividing by the number of raters distances[rating] = absolute_distance_sum / len(real_ratings) return distances def find_minimum_constant_absolute_distance(real_ratings, num_ratings): """ Find the minimum normalized absolute distance between real_ratings and any possible constant rating list. Args: real_ratings: List of actual ratings provided by users num_ratings: Maximum rating value (will check constant ratings 1 to num_ratings) Returns: tuple: (min_rating, min_distance) min_rating: The constant rating value that minimizes the absolute distance min_distance: The minimum normalized absolute distance value """ # Calculate all constant absolute distances distances = calculate_constant_absolute_distances(real_ratings, num_ratings) # Find the minimum distance and its corresponding rating min_rating = min(distances, key=distances.get) min_distance = distances[min_rating] return min_rating, min_distance def calculate_all_consensus_metrics(real_ratings, num_raters, num_ratings, verbose=False): """ Calculate all consensus metrics for the given ratings. Args: real_ratings: List of actual ratings provided by users num_raters: Number of raters num_ratings: Maximum rating value verbose: Whether to print detailed analysis information Returns: dict: A dictionary containing all calculated consensus metrics """ # Initialize result dictionary results = {} # Validate the real_ratings before proceeding is_valid, error_message = validate_real_ratings(real_ratings, num_raters, num_ratings) if not is_valid: if verbose: print(error_message) return {"error": error_message} # Calculate even distribution ratings even_ratings = get_rater_ratings(num_raters, num_ratings) if verbose: print("Rater assignments:") for rater, rating in even_ratings.items(): print(f"Rater {rater}: Rating {rating}") # Squared distance calculations squared_distance = calculate_squared_distance(even_ratings, real_ratings) if verbose: print(f"Sum of squared differences from even distribution: {squared_distance}") results["squared_distance"] = squared_distance # Calculate constant squared distances constant_distances = calculate_constant_squared_distances(real_ratings, num_ratings) if verbose: print("\nSquared distances from constant ratings:") for rating, distance in constant_distances.items(): print(f"All {rating}s: {distance}") results["constant_distances"] = constant_distances # Find the constant rating with minimum squared distance min_const_rating, min_const_distance = find_minimum_constant_squared_distance(real_ratings, num_ratings) if verbose: print(f"\nConstant rating with minimum squared distance: {min_const_rating} (distance: {min_const_distance})") results["min_const_rating"] = min_const_rating results["min_const_distance"] = min_const_distance # Calculate consensus based on squared distances total_distance = min_const_distance + squared_distance consensus = squared_distance / total_distance nonconsensus = 1 - consensus if verbose: print(f"\nTotal distance: {total_distance}") print(f"Consensus based on squared distances: {consensus}") print(f"Non-consensus: {nonconsensus}") results["total_distance"] = total_distance results["consensus"] = consensus results["nonconsensus"] = nonconsensus # Calculate entropy-based consensus entropy_value, percent_consensus, distribution = calculate_entropy(real_ratings, num_ratings) if verbose: print('distribution:', distribution) print("\nEntropy analysis:") print(f"Rating distribution: {[round(p, 2) for p in distribution]}") print(f"Entropy: {entropy_value:.2f}") print(f"Consensus percentage based on entropy: {percent_consensus:.2f}%") results["entropy_value"] = entropy_value results["percent_consensus"] = percent_consensus results["distribution"] = distribution # Calculate standard deviation metrics std_dev = calculate_standard_deviation(real_ratings) max_std_dev = calculate_max_standard_deviation(num_raters, num_ratings) consensus_std_dev = (max_std_dev - std_dev) / max_std_dev if verbose: print(f"\nStandard Deviation of ratings: {std_dev:.2f}") print(f"\nMaximum possible Standard Deviation: {max_std_dev:.2f}") print(f"\nConsensus based on standard deviation: {consensus_std_dev:.2f}") results["std_dev"] = std_dev results["max_std_dev"] = max_std_dev results["consensus_std_dev"] = consensus_std_dev # Calculate absolute distance metrics absolute_distance = calculate_absolute_distance(even_ratings, real_ratings) if verbose: print(f"\nNormalized absolute distance from even distribution: {absolute_distance:.2f}") results["absolute_distance"] = absolute_distance # Calculate constant absolute distances constant_absolute_distances = calculate_constant_absolute_distances(real_ratings, num_ratings) if verbose: print("\nNormalized absolute distances from constant ratings:") for rating, distance in constant_absolute_distances.items(): print(f"All {rating}s: {distance:.2f}") results["constant_absolute_distances"] = constant_absolute_distances # Find the constant rating with minimum absolute distance min_const_abs_rating, min_const_abs_distance = find_minimum_constant_absolute_distance(real_ratings, num_ratings) if verbose: print(f"\nConstant rating with minimum absolute distance: {min_const_abs_rating} (distance: {min_const_abs_distance:.2f}") results["min_const_abs_rating"] = min_const_abs_rating results["min_const_abs_distance"] = min_const_abs_distance # Calculate consensus based on absolute distances total_abs_distance = min_const_abs_distance + absolute_distance abs_consensus = absolute_distance / total_abs_distance abs_nonconsensus = 1 - abs_consensus if verbose: print(f"\nConsensus based on absolute distances: {abs_consensus:.4f}") print(f"Non-consensus based on absolute distances: {abs_nonconsensus:.4f}") results["total_abs_distance"] = total_abs_distance results["abs_consensus"] = abs_consensus results["abs_nonconsensus"] = abs_nonconsensus return results def incrementally_evolve_distribution(num_raters, num_ratings): """ Incrementally evolve a distribution from even to perfect consensus (all highest rating), tracking consensus metrics at each step. The distribution evolves by adding 1 to all values in each step, except for those that are already at the maximum rating. Args: num_raters: Number of raters num_ratings: Maximum rating value Returns: dict: A dictionary containing metrics for each step of the evolution - 'distributions': List of distributions at each step - 'squared_consensus': List of squared distance consensus values - 'absolute_consensus': List of absolute distance consensus values - 'std_dev_consensus': List of standard deviation consensus values - 'entropy_consensus': List of entropy-based consensus values """ # Get initial even distribution (convert from dict to list) even_ratings_dict = get_rater_ratings(num_raters, num_ratings) distribution = [even_ratings_dict[i+1] for i in range(num_raters)] # Initialize tracking collections evolution_results = { 'distributions': [], 'squared_consensus': [], 'absolute_consensus': [], 'std_dev_consensus': [], 'entropy_consensus': [] } # Track the current state current_distribution = distribution.copy() evolution_results['distributions'].append(current_distribution.copy()) # Calculate initial metrics metrics = calculate_all_consensus_metrics(current_distribution, num_raters, num_ratings) evolution_results['squared_consensus'].append(metrics['consensus']) evolution_results['absolute_consensus'].append(metrics['abs_consensus']) evolution_results['std_dev_consensus'].append(metrics['consensus_std_dev']) evolution_results['entropy_consensus'].append(metrics['percent_consensus']/100) # Continue incrementing until all ratings reach maximum while current_distribution != [num_ratings] * num_raters: # Increment all values by 1, except those at maximum for i in range(num_raters): if current_distribution[i] < num_ratings: current_distribution[i] += 1 # Sort to maintain order current_distribution.sort() # Store the new distribution evolution_results['distributions'].append(current_distribution.copy()) # Calculate and store metrics metrics = calculate_all_consensus_metrics(current_distribution, num_raters, num_ratings) evolution_results['squared_consensus'].append(metrics['consensus']) evolution_results['absolute_consensus'].append(metrics['abs_consensus']) evolution_results['std_dev_consensus'].append(metrics['consensus_std_dev']) evolution_results['entropy_consensus'].append(metrics['percent_consensus']/100) return evolution_results def display_evolution_results(evolution_results, step_interval=1): """ Display the results of the incremental evolution in a table format. Args: evolution_results: Dictionary returned by incrementally_evolve_distribution step_interval: How many steps to skip between rows (default: 1, shows every step) """ print("\n" + "="*120) print("CONSENSUS EVOLUTION RESULTS") print("="*120) # Print header print(f"{'Step':5} | {'Distribution':50} | {'Squared':10} | {'Absolute':10} | {'StdDev':10} | {'Entropy':10}") print("-"*120) # Print each step for i in range(0, len(evolution_results['distributions']), step_interval): dist_str = str(evolution_results['distributions'][i]) print(f"{i:5} | {dist_str:50} | {evolution_results['squared_consensus'][i]:10.4f} | " f"{evolution_results['absolute_consensus'][i]:10.4f} | " f"{evolution_results['std_dev_consensus'][i]:10.4f} | " f"{evolution_results['entropy_consensus'][i]:10.4f}") print("="*120) num_raters = 4 num_ratings = 4 # 1 to 3 # raters: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 # ratings: 1, 2, 3 # 11/3 = 3 remainder 2 # each rater gets 3 of same rating, then 2 get 1 more rating # rater ratings: 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3 raters_per_rating = int(num_raters / num_ratings) remainder = num_raters % num_ratings print('raters_per_rating:', raters_per_rating) print('remainder = ', remainder) #real raters ratings, must be sorted low to high #real_ratings = [4, 4, 4, 4, 4, 8, 8, 8, 8, 8, 8] #real_ratings = [1, 1, 1, 1, 1, 5, 5, 5, 5, 5, 5] #real_ratings = [1, 1, 1, 1, 1, 11, 11, 11, 11, 11, 11] real_ratings = [1, 1, 1, 4] #real_ratings = [1, 1, 1, 1, 1, 11, 11, 11, 11, 11, 11] #real_ratings = [1,1,1,1,1,5,5,5,5,5,5] #real_ratings = [4,4,4,4,4,8,8,8,8,8,8] mode = 'manual' #'auto' or 'manual' if mode == 'manual': # Calculate all consensus metrics results = calculate_all_consensus_metrics(real_ratings, num_raters, num_ratings) # Print summary of all consensus calculations print("\n" + "="*50) print("CONSENSUS SUMMARY") print("="*50) print(f"Squared distance consensus: {results['consensus']:.4f}") print(f"Absolute distance consensus: {results['abs_consensus']:.4f}") print(f"Standard deviation consensus: {results['consensus_std_dev']:.4f}") print(f"Entropy-based consensus: {results['percent_consensus']/100:.4f}") print("="*50) print("END OF CONSENSUS SUMMARY") elif mode == 'auto': #even_ratings = get_rater_ratings(num_raters, num_ratings) #print(even_ratings) #real_ratings = even_ratings #init values # Demonstrate the incremental evolution evolution_results = incrementally_evolve_distribution(num_raters, num_ratings) display_evolution_results(evolution_results, step_interval=1) # Show every 5th step to keep output manageable else: print('Invalid mode. Use "manual" or "auto".')