#fixed percentage of received weight delegation algorithm import numpy as np #cut down high delegations to hilim def cut_hidelegations_to1(D, hilim): D[D > hilim] = hilim D[D < -1.0*hilim] = -1.0*hilim return D #if any user delegates a total of more than hilim, then # we factor them down to a total of hilim def factor_totaldelegations(D, hilim): #use absolute values of the delegations for this Dabs = np.abs(D) row_sums = Dabs.sum(axis=1) for i in range(D.shape[0]): if row_sums[i] > hilim: D[i] *= hilim / row_sums[i] return D def calculate_single_round(W, D, Wdel, Wundel): # for each round we calculate the amount of delegation # by multiplying the delegation percentage by the delegatable # portion of the weight #if any Wdel < 0 then we make it equal to 0 (you cannot have negative delegatable weight) Wdel[Wdel < 0] = 0 D_amt = Wdel[:, np.newaxis] * D #print("Delegation amounts:") #print(D_amt) #find the sum of absolute values of delegated weight D_amt_abs = np.abs(D_amt) D_amt_sum = D_amt_abs.sum(axis=1) #print("Delegation amounts sum:") #print(D_amt_sum) # subtract this from W -- that is the undelegatable weight Wundel = W - D_amt_sum #print("Undelegatable weights:") #print(Wundel) # find the delegatable weights # first find the sum of the weights to each user D_to = D_amt.sum(axis=0) # the Wdel is D_to if it is greater than 0, otherwise it is 0 Wdel = np.where(D_to > 0, D_to, 0) #print('Wdel = ', Wdel) #print('Wundel = ', Wundel) #the new weight is the sum of these two W = Wdel + Wundel #print('W = ', W) return W, Wdel, Wundel #calculate delegatable weight for every user def calculate_delegation_rounds(D, initweight): # create the initial user weights array W = np.full((D.shape[0],), initweight) #just for testing #W = np.array([1.0, 1.0, 1.0]) #print("Initial user weights:", W) # delegatable weight is initial weight at first Wdel = W.copy() # the undelegatable weight is 0 at first Wundel = np.zeros((D.shape[0],)) #do this 200 times, if necessary for i in range(200): Wold = W.copy() # Store the old weights for convergence check Wdelold = Wdel.copy() #PM_081225 W, Wdel, Wundel = calculate_single_round(W, D, Wdel, Wundel) # Check for convergence (if the weights don't change significantly, we can stop) if np.allclose(W, Wold, rtol=1e-5) and np.allclose(Wdel, Wdelold, rtol=1e-5): #PM_081225 print(f"Converged after {i+1} iterations") break return W, Wdel, Wundel def find_optimal_alpha(D, initweight, lolim, tolerance=1e-6, max_iterations=100): """ Binary search to find the maximum alpha that ensures all final weights >= lolim Args: D: Delegation matrix initweight: Initial weight for all users lolim: Lower limit that all final weights must satisfy tolerance: Convergence tolerance for binary search max_iterations: Maximum number of iterations Returns: optimal_alpha: The maximum alpha that satisfies the lolim constraint final_weights: The final weights using the optimal alpha final_Wdel: The final delegatable weights final_Wundel: The final undelegatable weights """ alpha_low = 0.0 alpha_high = 1.0 alpha_best = 0.0 print(f"Searching for optimal alpha to satisfy lolim = {lolim}") for iteration in range(max_iterations): alpha = (alpha_low + alpha_high) / 2.0 #print(f"\nIteration {iteration + 1}: Testing alpha = {alpha:.6f}") # Test this alpha by scaling D D_scaled = alpha * D W_test, Wdel_test, Wundel_test = calculate_delegation_rounds(D_scaled, initweight) min_weight = np.min(W_test) #print(f"Minimum weight with alpha {alpha:.6f}: {min_weight:.6f}") # Check if all weights satisfy lolim if np.all(W_test >= lolim): # This alpha works, try a higher one alpha_best = alpha alpha_low = alpha #print(f"Alpha {alpha:.6f} satisfies lolim, trying higher alpha") else: # This alpha doesn't work, try a lower one alpha_high = alpha #print(f"Alpha {alpha:.6f} violates lolim, trying lower alpha") # Check for convergence if (alpha_high - alpha_low) < tolerance: print(f"\nConverged! Optimal alpha = {alpha_best:.6f}") break # Calculate final results with the best alpha found if alpha_best > 0: print(f"\nUsing optimal alpha = {alpha_best:.6f}") D_optimal = alpha_best * D W_final, Wdel_final, Wundel_final = calculate_delegation_rounds(D_optimal, initweight) print(f"Final minimum weight: {np.min(W_final):.6f}") return alpha_best, W_final, Wdel_final, Wundel_final else: print("\nNo valid alpha found that satisfies lolim constraint") print("Using alpha = 0 (no delegations)") D_zero = np.zeros_like(D) W_final, Wdel_final, Wundel_final = calculate_delegation_rounds(D_zero, initweight) return 0.0, W_final, Wdel_final, Wundel_final def find_final_weights_and_delegations(D, initweight, hilim, lolim): print('hi') print('D from user:') print(D) print("initweight =", initweight) D = cut_hidelegations_to1(D, hilim) print("D after cutting high delegations to hilim:") print(D) D = factor_totaldelegations(D, hilim) print("D after factoring total delegations to hilim:") print(D) W, Wdel, Wundel = calculate_delegation_rounds(D, initweight) #check if W satisfies the lolim requirement if np.all(W >= lolim): print("All users satisfy the lolim requirement") else: print('W = ',W) print("Some users do not satisfy the lolim requirement") # Find the optimal alpha factor and recalculate weights optimal_alpha, W, Wdel, Wundel = find_optimal_alpha(D, initweight, lolim) # Update D with the optimal alpha for consistency D = optimal_alpha * D print('Optimal alpha = ', optimal_alpha) print('Final weights:') print(W) print('Final delegations:') print(D) return D, W def find_delegation_matrix_for_users_who_delegate(U): # Create a delegation matrix for users who delegate # Step 1: Find all delegation participants participants = set() for user in U: userid = user[0] delegations = user[3] # Dictionary of delegations # If user has delegations, they are a participant if delegations: # Non-empty dictionary participants.add(userid) # All users they delegate to are also participants for delegate_to_userid in delegations.keys(): participants.add(delegate_to_userid) # Step 2: Sort participants from lowest to highest delegation_participants_list = sorted(list(participants)) n = len(delegation_participants_list) print(f"Delegation participants (sorted): {delegation_participants_list}") # Step 3: Create user ID to matrix index mapping userid_to_index = {userid: i for i, userid in enumerate(delegation_participants_list)} # Step 4: Create delegation matrix (n x n) D = np.zeros((n, n)) # Step 5: Fill the delegation matrix for user in U: userid = user[0] delegations = user[3] if userid in userid_to_index and delegations: from_index = userid_to_index[userid] for delegate_to_userid, percentage in delegations.items(): if delegate_to_userid in userid_to_index: to_index = userid_to_index[delegate_to_userid] D[from_index][to_index] = percentage print(f"User ID to matrix index mapping: {userid_to_index}") print(f"Delegation matrix shape: {D.shape}") print("Delegation matrix:") print(D) return D, delegation_participants_list, userid_to_index def is_number(value): return isinstance(value, (int, float)) and not isinstance(value, bool) and not np.isnan(value) and not np.isinf(value) def adjust_delegations_for_clawback(U, Dnovote, delegation_participants_list, userid_to_index): #if the user has clawback enabled and they vote then their delegation # to others is set to zero Dclawback = Dnovote.copy() for p in delegation_participants_list: i = userid_to_index[p] #this means the user voted (with a real number) and has clawback enabled if(is_number(U[i][1]) == True and U[i][2] == True): # Set all delegations from this user to zero for j in range(Dclawback.shape[1]): Dclawback[i][j] = 0.0 #print("Dclawback after clawback adjustment:") #print(Dclawback) return Dclawback def create_uvector_from_users(users): """ Transform users with multiple tag delegations into separate Uvectors for each tag. Input format: [userid, vote, clawback, [tagid1, {delegations1}], [tagid2, {delegations2}], ...] Output format: {tagid: [U0, U1, U2, ...], ...} where each U = [userid, vote, clawback, {delegations}] Args: users: List of users in extended format with multiple tag delegations Returns: tuple: (uvectors_dict, tagid_to_index, index_to_tagid) """ # Step 1: Find all unique tag IDs tag_ids = set() for user in users: # Skip the first 3 elements (userid, vote, clawback), process tag delegations for i in range(3, len(user)): if len(user[i]) >= 2: # Should be [tagid, {delegations}] tag_ids.add(user[i][0]) # Step 2: Sort tag IDs for consistent ordering sorted_tag_ids = sorted(list(tag_ids)) # Step 3: Create mapping dictionaries tagid_to_index = {tagid: i for i, tagid in enumerate(sorted_tag_ids)} index_to_tagid = {i: tagid for i, tagid in enumerate(sorted_tag_ids)} # Step 4: Create Uvectors dictionary uvectors_dict = {} for tagid in sorted_tag_ids: uvector = [] for user in users: userid = user[0] vote = user[1] clawback = user[2] # Find delegations for this specific tag delegations = {} for i in range(3, len(user)): if len(user[i]) >= 2 and user[i][0] == tagid: delegations = user[i][1] # Extract delegation dictionary break # Create standard User format for this tag u_entry = [userid, vote, clawback, delegations] uvector.append(u_entry) uvectors_dict[tagid] = uvector print(f"Created Uvectors for tags: {sorted_tag_ids}") print(f"Tag ID to index mapping: {tagid_to_index}") return uvectors_dict, tagid_to_index, index_to_tagid def process_multi_tag_delegation(users, initweight=100.0, hilimfac=1.0, lolim=40.0): """ Process delegation calculation for multiple tags Args: users: List of users in extended format with multiple tag delegations initweight: Initial weight for each user hilimfac: High limit factor for delegations lolim: Low limit for final weights Returns: dict: Results for each tag containing final weights and delegations """ hilim = hilimfac # The true high limit is 100% so this is some number below that print("\n" + "="*60) print("MULTI-TAG DELEGATION PROCESSING") print("="*60) # Step 1: Create Uvectors for each tag print("\n1. Creating Uvectors for each tag...") uvectors_dict, tagid_to_index, index_to_tagid = create_uvector_from_users(users) # Step 2: Process each tag separately tag_results = {} for tagid in sorted(uvectors_dict.keys()): print(f"\n{'-'*50}") print(f"PROCESSING TAG {tagid}") print(f"{'-'*50}") U = uvectors_dict[tagid] print(f"Uvector for tag {tagid}: {U}") # Step 2.1: Calculate the delegation matrix print(f"\n2.1. Calculating delegation matrix for tag {tagid}...") D, delegation_participants_list, userid_to_index = find_delegation_matrix_for_users_who_delegate(U) print(f"Delegation Matrix for tag {tagid}:") print(D) print(f"Participants: {delegation_participants_list}") print(f"userid_to_index: {userid_to_index}") # Step 2.2: Calculate the clawback delegation print(f"\n2.2. Adjusting delegations for clawback for tag {tagid}...") Dclawback = adjust_delegations_for_clawback(U, D, delegation_participants_list, userid_to_index) print(f"After clawback, Dclawback for tag {tagid}:") print(Dclawback) # Step 2.3: Calculate the final delegations and weights print(f"\n2.3. Calculating final weights and delegations for tag {tagid}...") D_orig = D.copy() D_orig_clawback = Dclawback.copy() D_final, W_final = find_final_weights_and_delegations(Dclawback, initweight, hilim, lolim) print(f"Final weights and delegations for tag {tagid}:") print(f"D_final:") print(D_final) print(f"W_final:") print(W_final) # Store results for this tag tag_results[tagid] = { 'uvector': U, 'delegation_matrix_original': D_orig, 'delegation_matrix_clawback': D_orig_clawback, 'delegation_matrix_final': D_final, 'final_weights': W_final, 'participants': delegation_participants_list, 'userid_to_index': userid_to_index } print(f"\n{'='*60}") print("MULTI-TAG PROCESSING COMPLETE") print(f"{'='*60}") print(f"Processed tags: {sorted(tag_results.keys())}") return tag_results, tagid_to_index, index_to_tagid def calculate_aggregate_vote(users, tag_results, tagid_to_index, initweight=100.0): """ Calculate the aggregate vote across all tags for a proposition. Process: 1. For each tag, filter users who voted (numeric votes only) 2. Use delegation weights for participants, initweight for non-participants 3. Calculate weighted average vote for the tag 4. Take simple average of all tag weighted averages Args: users: Original user data with votes tag_results: Results from process_multi_tag_delegation() tagid_to_index: Tag ID to index mapping initweight: Initial weight for non-delegating users Returns: dict: Contains tag-level weighted averages and final aggregate vote """ print("\n" + "="*60) print("CALCULATING AGGREGATE VOTE") print("="*60) tag_weighted_averages = {} # Process each tag for tagid in sorted(tag_results.keys()): print(f"\n{'-'*40}") print(f"PROCESSING TAG {tagid} FOR VOTING") print(f"{'-'*40}") result = tag_results[tagid] participants = result['participants'] final_weights = result['final_weights'] userid_to_index = result['userid_to_index'] # Step 2: Filter users who actually voted (numeric votes only) voting_users = [] for user in users: userid = user[0] vote = user[1] # Check if user has a numeric vote if isinstance(vote, (int, float)) and not isinstance(vote, bool): voting_users.append([userid, vote]) print(f"Users who voted: {voting_users}") # Step 3-4: Calculate weighted vote for this tag total_weighted_vote = 0.0 total_weight = 0.0 for userid, vote in voting_users: # Step 3: Determine user's weight if userid in userid_to_index: # User participated in delegation - use calculated weight user_index = userid_to_index[userid] user_weight = final_weights[user_index] weight_source = "delegation" else: # User did not participate in delegation - use initweight user_weight = initweight weight_source = "initweight" # Step 4: Add to weighted sum weighted_vote = vote * user_weight total_weighted_vote += weighted_vote total_weight += user_weight print(f" User {userid}: vote={vote}, weight={user_weight:.2f} ({weight_source}), weighted_vote={weighted_vote:.2f}") # Step 5: Calculate weighted average for this tag if total_weight > 0: tag_weighted_average = total_weighted_vote / total_weight tag_weighted_averages[tagid] = tag_weighted_average print(f"\nTag {tagid} weighted average: {total_weighted_vote:.2f} / {total_weight:.2f} = {tag_weighted_average:.4f}") else: print(f"\nTag {tagid}: No voting users found") tag_weighted_averages[tagid] = 0.0 # Step 7: Calculate simple average of all tag weighted averages print(f"\n{'='*60}") print("FINAL AGGREGATE CALCULATION") print(f"{'='*60}") if tag_weighted_averages: aggregate_vote = sum(tag_weighted_averages.values()) / len(tag_weighted_averages) print(f"Tag weighted averages: {tag_weighted_averages}") print(f"Final aggregate vote: {sum(tag_weighted_averages.values()):.4f} / {len(tag_weighted_averages)} = {aggregate_vote:.4f}") else: aggregate_vote = 0.0 print("No tags with voting users found") return { 'tag_weighted_averages': tag_weighted_averages, 'aggregate_vote': aggregate_vote, 'total_tags': len(tag_weighted_averages) } def unified_delegation_calculator(mode="preview", users_data=None): """ Unified function for delegation calculations in two modes: Mode 1 ("preview"): Single-tag delegation weight preview Mode 2 ("aggregate"): Multi-tag delegation with aggregate vote calculation Args: mode: "preview" or "aggregate" users_data: Optional user data for aggregate mode (if None, uses default test data) Returns: dict: Results based on mode """ # Settings section initweight = 1.0 hilimfac = 1.0 hilim = hilimfac # The true high limit is 100% so this is some number below that lolim = 0.0 print(f"\n{'='*60}") print(f"UNIFIED DELEGATION CALCULATOR - MODE: {mode.upper()}") print(f"{'='*60}") print(f"Settings: initweight={initweight}, hilim={hilim}, lolim={lolim}") if mode == "preview": print("\n--- MODE 1: SINGLE-TAG DELEGATION WEIGHT PREVIEW ---") # [userid, vote, clawback, {delegationtouserid: value, ...}] # vote and clawback don't matter here, so we just set them as arbitrary values U0 = [27, None, False, {42: 0.4, 56: 0.6}] U1 = [42, None, False, {27: 0.2, 56: 0.0}] U2 = [56, None, False, {27: 0.2, 42: 0.0}] U = [U0, U1, U2] print(f"Preview delegation users: {U}") # Calculate delegation matrix and weights D, delegation_participants_list, userid_to_index = find_delegation_matrix_for_users_who_delegate(U) print(f'Delegation matrix D:\n{D}') print(f'Delegation participants: {delegation_participants_list}') print(f'User ID to index mapping: {userid_to_index}') D_orig = D.copy() # Calculate the weights/delegation without votes (ignore clawback, vote, abstain) Dprevweights, Wprevweights = find_final_weights_and_delegations(D, initweight, hilim, lolim) print("Final delegation matrix (Dprevweights):") print(Dprevweights) print("Final weights (Wprevweights):") print(Wprevweights) return { 'mode': 'preview', 'delegation_matrix_original': D_orig, 'delegation_matrix_final': Dprevweights, 'final_weights': Wprevweights, 'participants': delegation_participants_list, 'userid_to_index': userid_to_index, 'settings': {'initweight': initweight, 'hilim': hilim, 'lolim': lolim} } elif mode == "aggregate": print("\n--- MODE 2: MULTI-TAG AGGREGATE VOTE CALCULATION ---") # Use provided user data or default test data if users_data is None: test_users = [ [27, 0.2, False, [789, {36: 0.4, 42: 0.6}]], [36, 0.8, False, [789, {27: 0.2}]], [42, 0.9, False, [789, {27: 0.2}]], [59, 'nevervoted', False, [789, {36: 1.0}]] #[56, 0.5, False, []], #[78, 0.9, False, []], #[90, 'abstain', False, []] ] else: test_users = users_data print(f"Aggregate vote users: {test_users}") # Step 1: Process multi-tag delegation tag_results, tagid_to_index, index_to_tagid = process_multi_tag_delegation( test_users, initweight, hilimfac, lolim ) # Step 2: Calculate aggregate vote aggregate_result = calculate_aggregate_vote(test_users, tag_results, tagid_to_index, initweight) return { 'mode': 'aggregate', 'tag_results': tag_results, 'tagid_to_index': tagid_to_index, 'index_to_tagid': index_to_tagid, 'aggregate_result': aggregate_result, 'final_aggregate_vote': aggregate_result['aggregate_vote'], 'tag_weighted_averages': aggregate_result['tag_weighted_averages'], 'settings': {'initweight': initweight, 'hilim': hilim, 'lolim': lolim} } else: raise ValueError(f"Invalid mode: {mode}. Must be 'preview' or 'aggregate'") # Test the unified delegation calculator print("\n" + "="*80) print("TESTING UNIFIED DELEGATION CALCULATOR") print("="*80) # Test Mode 1: Preview print("\n" + "="*40 + " MODE 1 TEST " + "="*40) preview_result = unified_delegation_calculator(mode="preview") print(f"\nMode 1 Results Summary:") print(f" Participants: {preview_result['participants']}") print(f" Final weights: {preview_result['final_weights']}") print(f" Weight sum: {np.sum(preview_result['final_weights']):.2f}") # Test Mode 2: Aggregate print("\n" + "="*40 + " MODE 2 TEST " + "="*40) aggregate_result = unified_delegation_calculator(mode="aggregate") print(f"\nMode 2 Results Summary:") print(f" Final aggregate vote: {aggregate_result['final_aggregate_vote']:.4f}") print(f" Tags processed: {list(aggregate_result['tag_weighted_averages'].keys())}") for tagid, avg in aggregate_result['tag_weighted_averages'].items(): print(f" Tag {tagid}: {avg:.4f}") print(f"\n{'='*80}") print("UNIFIED CALCULATOR TESTING COMPLETE") print(f"{'='*80}") print("✅ Mode 1 (Preview): Single-tag delegation weights calculated") print("✅ Mode 2 (Aggregate): Multi-tag aggregate vote calculated") print("🔧 Ready for database integration!")