Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ethen8181
GitHub Repository: ethen8181/machine-learning
Path: blob/master/model_selection/prob_calibration/calibration_module/utils.py
2585 views
1
import os
2
import math
3
import numpy as np
4
import pandas as pd
5
import matplotlib.pyplot as plt
6
import sklearn.metrics as metrics
7
from typing import Dict, List, Tuple, Optional
8
from sklearn.utils import check_consistent_length, column_or_1d
9
from sklearn.calibration import calibration_curve
10
11
12
__all__ = [
13
'compute_calibration_error',
14
'create_binned_data',
15
'get_bin_boundaries',
16
'compute_binary_score',
17
'compute_calibration_summary',
18
]
19
20
21
def compute_calibration_error(
22
y_true: np.ndarray,
23
y_prob: np.ndarray,
24
n_bins: int=15,
25
round_digits: int=4) -> float:
26
"""
27
Computes the calibration error for binary classification via binning
28
data points into the specified number of bins. Samples with similar
29
``y_prob`` will be grouped into the same bin. The bin boundary is
30
determined by having similar number of samples within each bin.
31
32
Parameters
33
----------
34
y_true : 1d ndarray
35
Binary true targets.
36
37
y_prob : 1d ndarray
38
Raw probability/score of the positive class.
39
40
n_bins : int, default 15
41
A bigger bin number requires more data. In general,
42
the larger the bin size, the closer the calibration error
43
will be to the true calibration error.
44
45
round_digits : int, default 4
46
Round the calibration error metric.
47
48
Returns
49
-------
50
calibration_error : float
51
RMSE between the average positive label and predicted probability
52
within each bin.
53
"""
54
y_true = column_or_1d(y_true)
55
y_prob = column_or_1d(y_prob)
56
check_consistent_length(y_true, y_prob)
57
58
binned_y_true, binned_y_prob = create_binned_data(y_true, y_prob, n_bins)
59
60
# looping shouldn't be a source of bottleneck as n_bins should be a small number.
61
bin_errors = 0.0
62
for bin_y_true, bin_y_prob in zip(binned_y_true, binned_y_prob):
63
avg_y_true = np.mean(bin_y_true)
64
avg_y_score = np.mean(bin_y_prob)
65
bin_error = (avg_y_score - avg_y_true) ** 2
66
bin_errors += bin_error * len(bin_y_true)
67
68
calibration_error = math.sqrt(bin_errors / len(y_true))
69
return round(calibration_error, round_digits)
70
71
72
def create_binned_data(
73
y_true: np.ndarray,
74
y_prob: np.ndarray,
75
n_bins: int) -> Tuple[List[np.ndarray], List[np.ndarray]]:
76
"""
77
Bin ``y_true`` and ``y_prob`` by distribution of the data.
78
i.e. each bin will contain approximately an equal number of
79
data points. Bins are sorted based on ascending order of ``y_prob``.
80
81
Parameters
82
----------
83
y_true : 1d ndarray
84
Binary true targets.
85
86
y_prob : 1d ndarray
87
Raw probability/score of the positive class.
88
89
n_bins : int, default 15
90
A bigger bin number requires more data.
91
92
Returns
93
-------
94
binned_y_true/binned_y_prob : 1d ndarray
95
Each element in the list stores the data for that bin.
96
"""
97
sorted_indices = np.argsort(y_prob)
98
sorted_y_true = y_true[sorted_indices]
99
sorted_y_prob = y_prob[sorted_indices]
100
binned_y_true = np.array_split(sorted_y_true, n_bins)
101
binned_y_prob = np.array_split(sorted_y_prob, n_bins)
102
return binned_y_true, binned_y_prob
103
104
105
def get_bin_boundaries(binned_y_prob: List[np.ndarray]) -> np.ndarray:
106
"""
107
Given ``binned_y_prob`` from ``create_binned_data`` get the
108
boundaries for each bin.
109
110
Parameters
111
----------
112
binned_y_prob : list
113
Each element in the list stores the data for that bin.
114
115
Returns
116
-------
117
bins : 1d ndarray
118
Boundaries for each bin.
119
"""
120
bins = []
121
for i in range(len(binned_y_prob) - 1):
122
last_prob = binned_y_prob[i][-1]
123
next_first_prob = binned_y_prob[i + 1][0]
124
bins.append((last_prob + next_first_prob) / 2.0)
125
126
bins.append(1.0)
127
return np.array(bins)
128
129
130
def compute_binary_score(
131
y_true: np.ndarray,
132
y_prob: np.ndarray,
133
round_digits: int=4) -> Dict[str, float]:
134
"""
135
Compute various evaluation metrics for binary classification.
136
Including auc, precision, recall, f1, log loss, brier score. The
137
threshold for precision and recall numbers are based on the one
138
that gives the best f1 score.
139
140
Parameters
141
----------
142
y_true : 1d ndarray
143
Binary true targets.
144
145
y_prob : 1d ndarray
146
Raw probability/score of the positive class.
147
148
round_digits : int, default 4
149
Round the evaluation metric.
150
151
Returns
152
-------
153
metrics_dict : dict
154
Metrics are stored in key value pair. ::
155
156
{
157
'auc': 0.82,
158
'precision': 0.56,
159
'recall': 0.61,
160
'f1': 0.59,
161
'log_loss': 0.42,
162
'brier': 0.12
163
}
164
"""
165
auc = round(metrics.roc_auc_score(y_true, y_prob), round_digits)
166
log_loss = round(metrics.log_loss(y_true, y_prob), round_digits)
167
brier_score = round(metrics.brier_score_loss(y_true, y_prob), round_digits)
168
169
precision, recall, threshold = metrics.precision_recall_curve(y_true, y_prob)
170
f1 = 2 * (precision * recall) / (precision + recall)
171
172
mask = ~np.isnan(f1)
173
f1 = f1[mask]
174
precision = precision[mask]
175
recall = recall[mask]
176
177
best_index = np.argmax(f1)
178
precision = round(precision[best_index], round_digits)
179
recall = round(recall[best_index], round_digits)
180
f1 = round(f1[best_index], round_digits)
181
return {
182
'auc': auc,
183
'precision': precision,
184
'recall': recall,
185
'f1': f1,
186
'log_loss': log_loss,
187
'brier': brier_score
188
}
189
190
191
def compute_calibration_summary(
192
eval_dict: Dict[str, pd.DataFrame],
193
label_col: str='label',
194
score_col: str='score',
195
n_bins: int=15,
196
strategy: str='quantile',
197
round_digits: int=4,
198
show: bool=True,
199
save_plot_path: Optional[str]=None) -> pd.DataFrame:
200
"""
201
Plots the calibration curve and computes the summary statistics for the model.
202
203
Parameters
204
----------
205
eval_dict : dict
206
We can evaluate multiple calibration model's performance in one go. The key
207
is the model name used to distinguish different calibration model, the value
208
is the dataframe that stores the binary true targets and the predicted score
209
for the positive class.
210
211
label_col : str
212
Column name for the dataframe in ``eval_dict`` that stores the binary true targets.
213
214
score_col : str
215
Column name for the dataframe in ``eval_dict`` that stores the predicted score.
216
217
n_bins : int, default 15
218
Number of bins to discretize the calibration curve plot and calibration error statistics.
219
A bigger number requires more data, but will be closer to the true calibration error.
220
221
strategy : {'uniform', 'quantile'}, default 'quantile'
222
Strategy used to define the boundary of the bins.
223
224
- uniform: The bins have identical widths.
225
- quantile: The bins have the same number of samples and depend on the predicted score.
226
227
round_digits : default 4
228
Round the evaluation metric.
229
230
show : bool, default True
231
Whether to show the plots on the console or jupyter notebook.
232
233
save_plot_path : str, default None
234
Path where we'll store the calibration plot. None means it will not save the plot.
235
236
Returns
237
-------
238
df_metrics : pd.DataFrame
239
Corresponding metrics for all the input dataframe.
240
"""
241
242
fig, (ax1, ax2) = plt.subplots(2)
243
244
# estimator_metrics stores list of dict, e.g.
245
# [{'auc': 0.776, 'name': 'xgb'}]
246
estimator_metrics = []
247
for name, df_eval in eval_dict.items():
248
prob_true, prob_pred = calibration_curve(
249
df_eval[label_col],
250
df_eval[score_col],
251
n_bins=n_bins,
252
strategy=strategy)
253
254
calibration_error = compute_calibration_error(
255
df_eval[label_col], df_eval[score_col], n_bins, round_digits)
256
metrics_dict = compute_binary_score(df_eval[label_col], df_eval[score_col], round_digits)
257
metrics_dict['calibration_error'] = calibration_error
258
metrics_dict['name'] = name
259
estimator_metrics.append(metrics_dict)
260
261
ax1.plot(prob_pred, prob_true, 's-', label=name)
262
ax2.hist(df_eval[score_col], range=(0, 1), bins=n_bins, label=name, histtype='step', lw=2)
263
264
ax1.plot([0, 1], [0, 1], 'k:', label='perfect')
265
266
ax1.set_xlabel('Fraction of positives (Predicted)')
267
ax1.set_ylabel('Fraction of positives (Actual)')
268
ax1.set_ylim([-0.05, 1.05])
269
ax1.legend(loc='upper left', ncol=2)
270
ax1.set_title('Calibration Plots (Reliability Curve)')
271
272
ax2.set_xlabel('Predicted scores')
273
ax2.set_ylabel('Count')
274
ax2.set_title('Histogram of Predicted Scores')
275
ax2.legend(loc='upper right', ncol=2)
276
277
plt.tight_layout()
278
if show:
279
plt.show()
280
281
if save_plot_path is not None:
282
save_dir = os.path.dirname(save_plot_path)
283
if save_dir:
284
os.makedirs(save_dir, exist_ok=True)
285
286
fig.savefig(save_plot_path, dpi=300, bbox_inches='tight')
287
288
plt.close(fig)
289
290
df_metrics = pd.DataFrame(estimator_metrics)
291
return df_metrics
292
293