import random def create_real_ratings(bins, num_raters): """ Create a random distribution of ratings using values from bins. Args: bins: List of possible rating values num_raters: Number of ratings to generate Returns: List of randomly generated ratings, sorted from low to high """ # Generate random ratings by selecting from bins ratings = [random.choice(bins) for _ in range(num_raters)] # Sort the ratings from low to high ratings.sort() return ratings def find_binned_ratings(lo, hi, M): delta = (hi - lo)/(M-1) bins = [lo + i*delta for i in range(M)] print('bins:', bins) return bins def find_even_ratings(bins, N): """ Distribute N raters evenly across the bins, maximizing their distance from each other. Args: bins: List of rating bins (values that raters can choose from) N: Number of raters Returns: List of ratings for each rater, where raters are spread as evenly as possible """ M = len(bins) # Number of bins # If there's only one rater, place them at lowest bin if N == 1: return [bins[0]] # Calculate the spacing between raters # For 2 raters, they should be at the extreme bins # For more raters, they should be evenly spaced if N >= 2: indices = [int(i * (M - 1) / (N - 1)) for i in range(N)] else: indices = [0] # Fallback, should not happen with the checks above # Map indices to actual bin values even_ratings = [bins[idx] for idx in indices] #print('even_ratings:', even_ratings) return even_ratings def find_even_ratings_when_mlteqn(bins, N): """ Distribute N raters as evenly as possible across M bins where M <= N. Each bin will have approximately the same number of raters assigned to it. Args: bins: List of rating bins (values that raters can choose from) N: Number of raters Returns: List of ratings for each rater, distributed as evenly as possible across the bins """ M = len(bins) # Number of bins # Calculate how many raters should be assigned to each bin raters_per_bin = N // M remainder = N % M even_ratings = [] # Distribute raters evenly across bins for bin_idx in range(M): # Calculate how many raters to assign to this bin count = raters_per_bin if bin_idx < remainder: count += 1 # Assign this bin value to the appropriate number of raters even_ratings.extend([bins[bin_idx]] * count) #print('even_ratings:', even_ratings) return even_ratings def calculate_average_distance_to_nonconsensus(real_ratings, even_ratings): """ Calculate the average distance between real_ratings and even_ratings. This function: 1. Sorts both real_ratings and even_ratings in ascending order 2. Calculates the absolute difference between each corresponding pair 3. Sums these differences and divides by the number of raters Args: real_ratings: List of actual ratings provided by users even_ratings: List of ratings from an even distribution Returns: The average absolute difference between the real and even ratings """ # Make copies to avoid modifying the original lists real_sorted = sorted(real_ratings) even_sorted = sorted(even_ratings) # Verify that both lists have the same length if len(real_sorted) != len(even_sorted): raise ValueError("real_ratings and even_ratings must have the same length") # Calculate the sum of absolute differences total_difference = 0 for i in range(len(real_sorted)): total_difference += abs(real_sorted[i] - even_sorted[i]) # Calculate the average average_distance = total_difference / len(real_sorted) #print('average_distance_to_nonconsensus:', average_distance) return average_distance def calculate_average_distance_to_consensus(real_ratings, even_ratings): """ Calculate the minimum average distance between real_ratings and all possible perfect consensus configurations. This function: 1. Identifies all unique values in even_ratings 2. For each unique value, creates a perfect consensus list (all elements have this value) 3. Calculates the average absolute distance between each perfect consensus and real_ratings 4. Returns the minimum of these distances Args: real_ratings: List of actual ratings provided by users even_ratings: List of ratings from an even distribution (used to determine possible consensus values) Returns: The minimum average absolute difference between real_ratings and any perfect consensus """ # Find all unique rating values from even_ratings unique_ratings = sorted(set(even_ratings)) # Calculate the average distance for each possible perfect consensus min_average_distance = float('inf') # Initialize with infinity best_consensus_value = None for consensus_value in unique_ratings: # Create a perfect consensus list (all values are the same) perfect_consensus = [consensus_value] * len(real_ratings) # Calculate the sum of absolute differences total_difference = 0 for i in range(len(real_ratings)): #print(real_ratings[i], perfect_consensus[i]) #print('perfect_consensus[i]:', perfect_consensus[i]) total_difference += abs(real_ratings[i] - perfect_consensus[i]) #print('total_difference:', total_difference) #print('') #print('total_difference:', total_difference) #print('') # Calculate the average average_distance = total_difference / len(real_ratings) # Check if this is the minimum distance so far if average_distance < min_average_distance: min_average_distance = average_distance best_consensus_value = consensus_value #print('minimum average_distance_to_consensus:', min_average_distance) #print('best consensus value:', best_consensus_value) return [min_average_distance, best_consensus_value] def find_consensus(real_ratings, even_ratings): avg_distance_to_nonconsensus = calculate_average_distance_to_nonconsensus(real_ratings, even_ratings) avg_distance_to_consensus, best_consensus_value = calculate_average_distance_to_consensus(real_ratings, even_ratings) total_distance = avg_distance_to_nonconsensus + avg_distance_to_consensus percent_consensus = avg_distance_to_nonconsensus / total_distance return [percent_consensus, avg_distance_to_nonconsensus, avg_distance_to_consensus, best_consensus_value] def get_next_bin(bins, current_rating): for i, b in enumerate(bins): if b == current_rating: if i == len(bins) - 1: return bins[i] else: return bins[i + 1] def is_perfect_consensus(real_ratings, best_consensus_value): if not real_ratings: return False # Check if all ratings are equal to the reference value is_consensus = all(rating == best_consensus_value for rating in real_ratings) # Return whether consensus exists and the consensus value if it does return is_consensus def change_rating_in_best_way(real_ratings, best_consensus_value): """ Find the best way to change a single rating to maximize consensus. This function: 1. Tries shifting each rating up or down toward best_consensus_value 2. For each shift, calculates the new percent_consensus 3. Finds the shift that maximizes percent_consensus 4. If multiple shifts maximize percent_consensus, picks the one closest to the extremes Args: real_ratings: List of actual ratings provided by users (assumed to be sorted) best_consensus_value: The consensus value that minimizes average distance Returns: tuple: (modified_ratings, original_index, shift_direction, new_percent_consensus) modified_ratings: The new ratings list after the best shift original_index: The index of the rating that was changed shift_direction: "up" or "down" indicating direction of shift new_percent_consensus: The percent_consensus after the shift """ # Make a copy of the original ratings to avoid modifying them original_ratings = real_ratings.copy() # Track the best shift best_shift_index = -1 best_shift_direction = "" best_shift_ratings = [] best_percent_consensus = 0 ties = [] # To track shifts that tie for max percent_consensus # Try each possible single rating shift for i in range(len(real_ratings)): current_rating = real_ratings[i] # Only try shifting if not already at best_consensus_value if current_rating != best_consensus_value: # Determine shift direction shift_direction = "up" if current_rating < best_consensus_value else "down" # Create a copy of ratings for testing this shift test_ratings = original_ratings.copy() # Shift the rating by 1 toward best_consensus_value if shift_direction == "up": # Don't shift if this would disrupt the sorting if i < len(real_ratings) - 1 and current_rating + 1 > real_ratings[i + 1]: continue test_ratings[i] += 1 else: # shift_direction == "down" # Don't shift if this would disrupt the sorting if i > 0 and current_rating - 1 < real_ratings[i - 1]: continue test_ratings[i] -= 1 # Ensure test_ratings remains sorted test_ratings.sort() # Calculate consensus metrics for this shift # Use the existing functions to calculate consensus # We need even_ratings for the calculation if num_ratings > len(test_ratings): even_ratings = find_even_ratings(bins, len(test_ratings)) else: even_ratings = find_even_ratings_when_mlteqn(bins, len(test_ratings)) # Calculate consensus metrics percent_consensus, _, avg_distance_to_consensus, _ = find_consensus(test_ratings, even_ratings) # Check if this is the best shift so far if percent_consensus > best_percent_consensus: best_percent_consensus = percent_consensus best_shift_index = i best_shift_direction = shift_direction best_shift_ratings = test_ratings.copy() ties = [(i, shift_direction, test_ratings.copy())] elif percent_consensus == best_percent_consensus: # This is a tie, add to ties list ties.append((i, shift_direction, test_ratings.copy())) # If we have multiple ties for the best percent_consensus, pick the one closest to extremes if len(ties) > 1: # Sort ties by index (to find extremes: lowest and highest indices) ties.sort(key=lambda x: x[0]) # Choose either the lowest or highest index if ties[0][0] == 0: # If the lowest index is 0 (extreme low) best_shift_index, best_shift_direction, best_shift_ratings = ties[0] else: # Otherwise choose the highest index (extreme high) best_shift_index, best_shift_direction, best_shift_ratings = ties[-1] return (best_shift_ratings, best_shift_index, best_shift_direction, best_percent_consensus) def change_rating_in_worst_way(real_ratings, best_consensus_value): """ Find the way to change a single rating that minimizes the gain in consensus. This function: 1. Tries shifting each rating up or down toward best_consensus_value 2. For each shift, calculates the new percent_consensus 3. Finds the shift that minimizes percent_consensus 4. If multiple shifts minimize percent_consensus, picks the one closest to the extremes Args: real_ratings: List of actual ratings provided by users (assumed to be sorted) best_consensus_value: The consensus value that minimizes average distance Returns: tuple: (modified_ratings, original_index, shift_direction, new_percent_consensus) modified_ratings: The new ratings list after the worst shift original_index: The index of the rating that was changed shift_direction: "up" or "down" indicating direction of shift new_percent_consensus: The percent_consensus after the shift """ # Make a copy of the original ratings to avoid modifying them original_ratings = real_ratings.copy() # Track the worst shift (the one with minimum percent_consensus) worst_shift_index = -1 worst_shift_direction = "" worst_shift_ratings = [] worst_percent_consensus = float('inf') # Initialize with infinity for finding minimum ties = [] # To track shifts that tie for min percent_consensus # Get current percent_consensus as baseline if num_ratings > len(real_ratings): baseline_even_ratings = find_even_ratings(bins, len(real_ratings)) else: baseline_even_ratings = find_even_ratings_when_mlteqn(bins, len(real_ratings)) baseline_consensus, _, _, _ = find_consensus(real_ratings, baseline_even_ratings) # Try each possible single rating shift for i in range(len(real_ratings)): current_rating = real_ratings[i] # Only try shifting if not already at best_consensus_value if current_rating != best_consensus_value: # Determine shift direction shift_direction = "up" if current_rating < best_consensus_value else "down" # Create a copy of ratings for testing this shift test_ratings = original_ratings.copy() # Shift the rating by 1 toward best_consensus_value if shift_direction == "up": # Don't shift if this would disrupt the sorting if i < len(real_ratings) - 1 and current_rating + 1 > real_ratings[i + 1]: continue test_ratings[i] += 1 else: # shift_direction == "down" # Don't shift if this would disrupt the sorting if i > 0 and current_rating - 1 < real_ratings[i - 1]: continue test_ratings[i] -= 1 # Ensure test_ratings remains sorted test_ratings.sort() # Calculate consensus metrics for this shift if num_ratings > len(test_ratings): even_ratings = find_even_ratings(bins, len(test_ratings)) else: even_ratings = find_even_ratings_when_mlteqn(bins, len(test_ratings)) # Calculate consensus metrics percent_consensus, _, _, _ = find_consensus(test_ratings, even_ratings) # Only consider shifts that improve consensus (percent_consensus > baseline_consensus) # Otherwise, we'd just stay in place and never reach consensus if percent_consensus > baseline_consensus: # Check if this is the worst shift so far (minimum percent_consensus) if percent_consensus < worst_percent_consensus: worst_percent_consensus = percent_consensus worst_shift_index = i worst_shift_direction = shift_direction worst_shift_ratings = test_ratings.copy() ties = [(i, shift_direction, test_ratings.copy())] elif percent_consensus == worst_percent_consensus: # This is a tie, add to ties list ties.append((i, shift_direction, test_ratings.copy())) # If we have multiple ties for the worst percent_consensus, pick the one closest to extremes if len(ties) > 1: # Sort ties by index (to find extremes: lowest and highest indices) ties.sort(key=lambda x: x[0]) # Choose either the lowest or highest index if ties[0][0] == 0: # If the lowest index is 0 (extreme low) worst_shift_index, worst_shift_direction, worst_shift_ratings = ties[0] else: # Otherwise choose the highest index (extreme high) worst_shift_index, worst_shift_direction, worst_shift_ratings = ties[-1] # If we didn't find any valid shifts, return the original ratings if worst_shift_index == -1: return (original_ratings, -1, "", baseline_consensus) return (worst_shift_ratings, worst_shift_index, worst_shift_direction, worst_percent_consensus) if __name__ == "__main__": hi = 11 lo = 1 num_ratings = 11 # number of bins (M in notes) bins = find_binned_ratings(lo, hi, num_ratings) # #num_raters = 11 #real_ratings = create_real_ratings(bins, num_raters) real_ratings = [1, 2, 2, 2, 4, 4, 5, 5, 6, 9, 11] num_raters = len(real_ratings) # number of raters (N in notes) if num_ratings > num_raters: even_ratings = find_even_ratings(bins, num_raters) else: even_ratings = find_even_ratings_when_mlteqn(bins, num_raters) # Calculate the distances and consensus percent_consensus, avg_distance_to_nonconsensus, avg_distance_to_consensus, best_consensus_value = find_consensus(real_ratings, even_ratings) print('num_raters:', num_raters) print('real_ratings:', real_ratings) print('even_ratings:', even_ratings) print('best_consensus_value:', best_consensus_value) print('average distance to non-consensus:', avg_distance_to_nonconsensus) print('average distance to consensus:', avg_distance_to_consensus) print('percent consensus:', percent_consensus) print('') # Change the rating in the best way print('Best') modified_ratings, original_index, shift_direction, new_percent_consensus = change_rating_in_best_way(real_ratings, best_consensus_value) print('modified_ratings:', modified_ratings) #print('original_index:', original_index) #print('shift_direction:', shift_direction) print(new_percent_consensus) #print('') while(is_perfect_consensus(modified_ratings, best_consensus_value) == False): modified_ratings, original_index, shift_direction, new_percent_consensus = change_rating_in_best_way(modified_ratings, best_consensus_value) print('modified_ratings:', modified_ratings) #print('original_index:', original_index) #print('shift_direction:', shift_direction) print(new_percent_consensus) #print('') # print('') print('Worst') modified_ratings, original_index, shift_direction, new_percent_consensus = change_rating_in_worst_way(real_ratings, best_consensus_value) print('modified_ratings:', modified_ratings) #print('original_index:', original_index) #print('shift_direction:', shift_direction) print(new_percent_consensus) #print('') while(is_perfect_consensus(modified_ratings, best_consensus_value) == False): modified_ratings, original_index, shift_direction, new_percent_consensus = change_rating_in_worst_way(modified_ratings, best_consensus_value) print(new_percent_consensus) print('modified_ratings:', modified_ratings) a = input('Continue? ') if a == 'n': break #