src.pm_rank¶
pm_rank: A toolkit for scoring and ranking prediction market forecasters.
Submodules¶
Classes¶
Individual forecast from a user for a specific problem. |
|
A prediction problem with multiple options and forecasts. |
|
A collection of forecast problems with validation and computed properties. |
|
Abstract base class for loading forecast challenges from different data sources. |
|
Load forecast challenges from GJO (Good Judgment Open) data format. |
|
Load forecast challenges from Prophet Arena data format. |
|
Generalized Bradley-Terry model for ranking forecasters in prediction markets. |
|
Brier scoring rule for evaluating probabilistic forecasts. |
|
Logarithmic scoring rule for evaluating probabilistic forecasts. |
|
Spherical scoring rule for evaluating probabilistic forecasts. |
|
Average Return Model for ranking forecasters based on their expected market returns. |
Functions¶
|
Compute the Spearman correlation between two rankings. |
|
Compute the Kendall correlation between two rankings. |
Package Contents¶
- class src.pm_rank.ForecastEvent¶
Bases:
pydantic.BaseModel
Individual forecast from a user for a specific problem.
- problem_id: str¶
- username: str¶
- timestamp: datetime.datetime¶
- probs: List[float]¶
- unnormalized_probs: List[float] | None¶
- validate_probabilities(v)¶
Validate that probabilities sum to 1 and are non-negative.
- set_unnormalized_probs_default()¶
Set unnormalized_probs to probs if not provided.
- validate_unnormalized_probabilities(v)¶
Validate that unnormalized probabilities are non-negative. we only require every number to be in [0, 1], and the vector dimension is the same as the number of options.
- class src.pm_rank.ForecastProblem¶
Bases:
pydantic.BaseModel
A prediction problem with multiple options and forecasts.
- title: str¶
- problem_id: str¶
- options: List[str]¶
- correct_option_idx: List[int]¶
- forecasts: List[ForecastEvent]¶
- end_time: datetime.datetime¶
- num_forecasters: int¶
- url: str | None¶
- odds: List[float] | None¶
- category: str | None¶
- validate_correct_option_idx(v, info)¶
Validate that correct_option_idx is in the options list.
- validate_forecasts(v, info)¶
Validate that all forecasts have correct number of probabilities.
- validate_odds(v, info)¶
Validate that odds match the number of options if provided.
- smooth_odds()¶
Smooth the odds to not be too close to 0 or 1.
- property has_odds: bool¶
Check if the problem has odds data.
- property crowd_probs: List[float]¶
Calculate crowd probabilities from the forecasts.
- property unique_forecasters: List[str]¶
Get list of unique forecasters for this problem.
- property option_payoffs: List[Tuple[int, float]]¶
Obtain a sorted list of (option_idx, payoff) for this problem. The payoff is the (1 / odds) for a correct option, and 0 for an incorrect option.
- class src.pm_rank.ForecastChallenge¶
Bases:
pydantic.BaseModel
A collection of forecast problems with validation and computed properties.
- title: str¶
- forecast_problems: List[ForecastProblem]¶
- categories: List[str] | None¶
- validate_problems(v)¶
Validate that there are problems and they have unique IDs.
- validate_categories(v, info)¶
Validate that categories are a list of strings.
- property forecaster_map: Dict[str, List[ForecastEvent]]¶
Map from forecaster username to their forecasts across all problems.
- property num_forecasters: int¶
Total number of unique forecasters across all problems.
- property unique_forecasters: List[str]¶
List of unique forecaster usernames.
- property problem_option_payoffs: Dict[str, List[Tuple[int, float]]]¶
Map from problem_id to a sorted list of (option_idx, payoff) for this problem.
- get_forecaster_problems(username: str) List[ForecastProblem] ¶
Get all problems that a specific forecaster participated in.
- get_problem_by_id(problem_id: str) ForecastProblem | None ¶
Get a specific problem by its ID.
- get_problems(nums: int = -1) List[ForecastProblem] ¶
Get a list of problems. If nums is -1, return all problems.
- stream_problems(order: Literal['sequential', 'random', 'time'] = 'sequential', increment: int = 100) Iterator[List[ForecastProblem]] ¶
Stream the problems in the challenge. Either by random or by the problem end time.
- Args:
order: The order in which to stream the problems. increment: The number of problems to stream in each iteration.
- Returns:
An iterator of lists of problems.
- stream_problems_over_time(increment_by: Literal['day', 'week', 'month'] = 'day', min_bucket_size: int = 1) Iterator[Tuple[str, List[ForecastProblem]]] ¶
Stream all problems in chronological buckets.
- fill_problem_with_fair_odds(force: bool = False) None ¶
Certain challenge do not have odds data, we can fill in fair/uniform odds for each problem. If force is True, we will not check whether the problem already has odds data.
- class src.pm_rank.ChallengeLoader¶
Bases:
abc.ABC
Abstract base class for loading forecast challenges from different data sources. This separates the loading logic from the data model.
- abstract load_challenge() ForecastChallenge ¶
Load and return a ForecastChallenge from the data source.
- abstract get_challenge_metadata() Dict[str, Any] ¶
Get metadata about the challenge without loading all data.
- class src.pm_rank.GJOChallengeLoader(predictions_df: pandas.DataFrame | None = None, predictions_file: str | None = None, metadata_file: str | None = None, challenge_title: str = '')¶
Bases:
src.pm_rank.data.base.ChallengeLoader
Load forecast challenges from GJO (Good Judgment Open) data format.
Initialize the GJOChallengeLoader. The challenge can be either loaded with a given pd.DataFrame or with a combination of paths predictions_file and metadata_file.
- Args:
predictions_df (pd.DataFrame): a pd.DataFrame containing the predictions. If provided, predictions_file and metadata_file will be ignored. predictions_file (str): the path to the predictions file metadata_file (str): the path to the metadata file challenge_title (str): the title of the challenge
- challenge_title = ''¶
- logger¶
- load_challenge(forecaster_filter: int = 0, problem_filter: int = 0) src.pm_rank.data.base.ForecastChallenge ¶
Load challenge data from GJO format files.
- Args:
forecaster_filter: minimum number of events for a forecaster to be included problem_filter: minimum number of events for a problem to be included
- Returns:
ForecastChallenge: a ForecastChallenge object containing the forecast problems and events
- get_challenge_metadata() Dict[str, Any] ¶
Get basic metadata about the GJO challenge.
- class src.pm_rank.ProphetArenaChallengeLoader(predictions_df: pandas.DataFrame | None = None, predictions_file: str | None = None, challenge_title: str = '', use_bid_for_odds: bool = False, use_open_time: bool = False)¶
Bases:
src.pm_rank.data.base.ChallengeLoader
Load forecast challenges from Prophet Arena data format.
Initialize the ProphetArenaChallengeLoader.
The challenge can be either loaded with a given pd.DataFrame or with a path to a predictions file.
- Parameters:
predictions_df – A pd.DataFrame containing the predictions. If provided, predictions_file will be ignored.
predictions_file – The path to the predictions file.
challenge_title – The title of the challenge.
use_bid_for_odds – Whether to use the yes_bid field for implied probability calculation. If True, the implied probability will be calculated as the (yes_bid + no_bid) / 2. If False, the implied probability will be simply yes_ask (normalized to sum to 1).
use_open_time – Whether to use the open_time field for the end_time of the problem. If True, the end_time will be the open_time of the problem. If False, the end_time will be the close_time of the problem.
- challenge_title = ''¶
- use_bid_for_odds = False¶
- use_open_time = False¶
- logger¶
- load_challenge(add_market_baseline: bool = False) src.pm_rank.data.base.ForecastChallenge ¶
Load challenge data from Prophet Arena data format. Group by submission_id, then for each group, build the list of forecasts, then the ForecastProblem.
- Parameters:
add_market_baseline – Whether to add the market baseline as a forecaster
- Returns:
A ForecastChallenge object containing the forecast problems and events.
- get_challenge_metadata() Dict[str, Any] ¶
Get basic metadata about the Prophet Arena challenge using pandas groupby (no full parsing).
- class src.pm_rank.GeneralizedBT(method: Literal['MM', 'Elo'] = 'MM', num_iter: int = 100, threshold: float = 0.001, verbose: bool = False)¶
Bases:
object
Generalized Bradley-Terry model for ranking forecasters in prediction markets.
This class implements a generalization of the traditional Bradley-Terry model to handle prediction market scenarios. Each event outcome is treated as a contest between two “pseudo-teams”: a winning team (the realized outcome) and a losing team (all other outcomes). Each forecaster contributes fractions of their capability proportional to their predicted probabilities.
The model estimates skill parameters for each forecaster using an iterative Majorization-Minimization (MM) algorithm, which provides convergence guarantees and intuitive comparative scores similar to Elo ratings.
- Parameters:
method – Optimization method to use (“MM” for Majorization-Minimization).
num_iter – Maximum number of iterations for the MM algorithm (default: 100).
threshold – Convergence threshold for parameter updates (default: 1e-3).
verbose – Whether to enable verbose logging (default: False).
Initialize the generalized Bradley-Terry model.
- Parameters:
method – Optimization method to use (“MM” for Majorization-Minimization).
num_iter – Maximum number of iterations for the MM algorithm (default: 100).
threshold – Convergence threshold for parameter updates (default: 1e-3).
verbose – Whether to enable verbose logging (default: False).
- method = 'MM'¶
- num_iter = 100¶
- threshold = 0.001¶
- verbose = False¶
- logger¶
- fit(problems: List[pm_rank.data.base.ForecastProblem], include_scores: bool = True) Tuple[Dict[str, Any], Dict[str, int]] | Dict[str, int] ¶
Fit the generalized Bradley-Terry model to the given problems.
This method estimates skill parameters for each forecaster using the MM algorithm and returns rankings based on these parameters. The skill parameters represent the relative predictive ability of each forecaster.
- Parameters:
problems – List of ForecastProblem instances to evaluate.
include_scores – Whether to include scores in the results (default: True).
- Returns:
Ranking results, either as a tuple of (scores, rankings) or just rankings.
- class src.pm_rank.BrierScoringRule(negate: bool = True, verbose: bool = False)¶
Bases:
ScoringRule
Brier scoring rule for evaluating probabilistic forecasts.
The Brier score is a quadratic proper scoring rule that measures the squared difference between predicted probabilities and actual outcomes. It is widely used in prediction markets and provides a good balance between rewarding accuracy and calibration.
- Parameters:
negate – Whether to negate the scores so that higher values are better (default: True).
verbose – Whether to enable verbose logging (default: False).
Initialize the Brier scoring rule.
- Parameters:
negate – Whether to negate the scores so that higher values are better (default: True).
verbose – Whether to enable verbose logging (default: False).
- negate = True¶
- class src.pm_rank.LogScoringRule(clip_prob: float = 0.01, verbose: bool = False)¶
Bases:
ScoringRule
Logarithmic scoring rule for evaluating probabilistic forecasts.
The logarithmic scoring rule is a proper scoring rule that rewards forecasters based on the logarithm of their predicted probability for the actual outcome. This rule heavily penalizes overconfident predictions and rewards well-calibrated forecasts.
- Parameters:
clip_prob – Minimum probability value to prevent log(0) (default: 0.01).
verbose – Whether to enable verbose logging (default: False).
Initialize the logarithmic scoring rule.
- Parameters:
clip_prob – Minimum probability value to prevent log(0) (default: 0.01).
verbose – Whether to enable verbose logging (default: False).
- clip_prob = 0.01¶
- class src.pm_rank.SphericalScoringRule(verbose: bool = False)¶
Bases:
ScoringRule
Spherical scoring rule for evaluating probabilistic forecasts.
The spherical scoring rule normalizes probability vectors to unit vectors and measures the cosine similarity with the actual outcome. This rule is less sensitive to extreme probability values compared to the logarithmic rule.
- Parameters:
verbose – Whether to enable verbose logging (default: False).
Initialize the spherical scoring rule.
- Parameters:
verbose – Whether to enable verbose logging (default: False).
- class src.pm_rank.AverageReturn(num_money_per_round: int = 1, risk_aversion: float = 0.0, use_approximate: bool = False, verbose: bool = False)¶
Average Return Model for ranking forecasters based on their expected market returns.
This class implements a ranking algorithm that evaluates forecasters based on how much money they could earn from prediction markets using different risk aversion strategies. The model calculates expected returns for each forecaster and ranks them accordingly.
Initialize the AverageReturn model.
- Parameters:
num_money_per_round – Amount of money to bet per round (default: 1).
risk_aversion – Risk aversion parameter between 0 and 1 (default: 0.0).
verbose – Whether to enable verbose logging (default: False).
use_approximate – Whether to use the approximate CRRA betting strategy (default: False).
- Raises:
AssertionError – If risk_aversion is not between 0 and 1.
- num_money_per_round = 1¶
- risk_aversion = 0.0¶
- use_approximate = False¶
- verbose = False¶
- logger¶
- fit(problems: List[pm_rank.data.base.ForecastProblem], include_scores: bool = True, include_per_problem_info: bool = False) Tuple[Dict[str, Any], Dict[str, int]] | Dict[str, int] ¶
Fit the average return model to the given problems.
This method processes all problems at once and returns the final rankings based on average returns across all problems.
- Parameters:
problems – List of ForecastProblem instances to process.
include_scores – Whether to include scores in the results (default: True).
include_per_problem_info – Whether to include per-problem info in the results (default: False).
- Returns:
Ranking results, either as a tuple of (scores, rankings) or just rankings. If include_per_problem_info is True, returns a tuple of (scores, rankings, per_problem_info).
- fit_stream(problem_iter: Iterator[List[pm_rank.data.base.ForecastProblem]], include_scores: bool = True) Dict[int, Tuple[Dict[str, Any], Dict[str, int]] | Dict[str, int]] ¶
Fit the model to streaming problems and return incremental results.
This method processes problems as they arrive and returns rankings after each batch, allowing for incremental analysis of forecaster performance.
- Parameters:
problem_iter – Iterator over batches of ForecastProblem instances.
include_scores – Whether to include scores in the results (default: True).
- Returns:
Mapping of batch indices to ranking results.
- fit_stream_with_timestamp(problem_time_iter: Iterator[Tuple[str, List[pm_rank.data.base.ForecastProblem]]], include_scores: bool = True) collections.OrderedDict ¶
Fit the model to streaming problems with timestamps and return incremental results.
This method processes problems with associated timestamps and returns rankings after each batch, maintaining chronological order.
- Parameters:
problem_time_iter – Iterator over (timestamp, problems) tuples.
include_scores – Whether to include scores in the results (default: True).
- Returns:
Chronologically ordered mapping of timestamps to ranking results.
- fit_by_category(problems: List[pm_rank.data.base.ForecastProblem], include_scores: bool = True, stream_with_timestamp: bool = False, stream_increment_by: Literal['day', 'week', 'month'] = 'day', min_bucket_size: int = 1) Tuple[Dict[str, Any], Dict[str, int]] | Dict[str, int] ¶
Fit the average return model to the given problems by category.
This method processes all problems at once and returns the final rankings based on average returns across all problems.
- Parameters:
problems – List of ForecastProblem instances to process.
include_scores – Whether to include scores in the results (default: True).
stream_with_timestamp – Whether to stream problems with timestamps (default: False).
stream_increment_by – The increment by which to stream problems (default: “day”).
min_bucket_size – The minimum number of problems to include in a bucket (default: 1).
- src.pm_rank.spearman_correlation(rank_dict_a: Dict[str, int], rank_dict_b: Dict[str, int]) float ¶
Compute the Spearman correlation between two rankings. Reference: https://en.wikipedia.org/wiki/Spearman%27s_rank_correlation_coefficient
- src.pm_rank.kendall_correlation(rank_dict_a: Dict[str, int], rank_dict_b: Dict[str, int]) float ¶
Compute the Kendall correlation between two rankings. Reference: https://en.wikipedia.org/wiki/Kendall_rank_correlation_coefficient