This is implements the probablistic F score described here:
Instead of using for loop we can use following methods to speed up our metric calculations. Tensorflow, Torch and Numpy takes nearly 78ms
comparing native loop method which takes 19s
which is nearly 140x
slower for 5M
samples. For our competition there will not be that many samples, but we can still save some time. $$ pF1=2\frac{pPrecision⋅pRecall}{pPrecision+pRecall} $$
Where, $$ pPrecision=\frac{pTP}{pTP+pFP} $$ $$ pRecall=\frac{pTP}{TP+FN} $$
Dummy Labels and Predictions¶
import numpy as np
labels = np.random.randint(0, 2, size=(5000000)).astype(np.float32)
preds = np.random.uniform(0, 1, size=(5000000)).astype(np.float32)
Naive Python (using for
def pfbeta(labels, predictions, beta=1):
y_true_count = 0
ctp = 0
cfp = 0
for idx in range(len(labels)):
prediction = min(max(predictions[idx], 0), 1)
if (labels[idx]):
y_true_count += 1
ctp += prediction
cfp += prediction
beta_squared = beta * beta
c_precision = ctp / (ctp + cfp)
c_recall = ctp / y_true_count
if (c_precision > 0 and c_recall > 0):
result = (1 + beta_squared) * (c_precision * c_recall) / (beta_squared * c_precision + c_recall)
return result
return 0
pfbeta(labels, preds)
CPU times: user 23.2 s, sys: 3.52 ms, total: 23.2 s Wall time: 23.2 s
def pfbeta_np(labels, preds, beta=1):
preds = preds.clip(0, 1)
y_true_count = labels.sum()
ctp = preds[labels==1].sum()
cfp = preds[labels==0].sum()
beta_squared = beta * beta
c_precision = ctp / (ctp + cfp)
c_recall = ctp / y_true_count
if (c_precision > 0 and c_recall > 0):
result = (1 + beta_squared) * (c_precision * c_recall) / (beta_squared * c_precision + c_recall)
return result
return 0.0
pfbeta_np(labels, preds)
CPU times: user 107 ms, sys: 12.9 ms, total: 120 ms Wall time: 119 ms
from numba import njit, jit
# @jit(nopython=True)
def pfbeta_numba(labels, preds, beta=1):
preds = preds.clip(0, 1)
y_true_count = labels.sum()
ctp = preds[labels==1].sum()
cfp = preds[labels==0].sum()
beta_squared = beta * beta
c_precision = ctp / (ctp + cfp)
c_recall = ctp / y_true_count
if (c_precision > 0 and c_recall > 0):
result = (1 + beta_squared) * (c_precision * c_recall) / (beta_squared * c_precision + c_recall)
return result
return 0.0
# run first time to compile
pfbeta_numba(labels, preds)
pfbeta_numba(labels, preds)
CPU times: user 85.8 ms, sys: 0 ns, total: 85.8 ms Wall time: 86.3 ms
def pfbeta_tf(labels, preds, beta=1):
preds = tf.clip_by_value(preds, 0, 1)
y_true_count = tf.reduce_sum(labels)
ctp = tf.reduce_sum(preds[labels==1])
cfp = tf.reduce_sum(preds[labels==0])
beta_squared = beta * beta
c_precision = ctp / (ctp + cfp)
c_recall = ctp / y_true_count
if (c_precision > 0 and c_recall > 0):
result = (1 + beta_squared) * (c_precision * c_recall) / (beta_squared * c_precision + c_recall)
return result
return 0.0
import tensorflow as tf
labels = tf.convert_to_tensor(labels)
preds = tf.convert_to_tensor(preds)
2022-12-29 15:00:09.520919: I tensorflow/core/common_runtime/] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
pfbeta_tf(labels, preds)
CPU times: user 181 ms, sys: 56.7 ms, total: 238 ms Wall time: 210 ms
<tf.Tensor: shape=(), dtype=float32, numpy=0.49990934>
It holds the intermediate state hence can aggregate overall result unlike previous one.
class pFBeta(tf.keras.metrics.Metric):
"""Compute overall probabilistic F-beta score."""
def __init__(self, beta=1, epsilon=1e-5, name='pfbeta', **kwargs):
super().__init__(name=name, **kwargs)
self.beta = beta
self.epsilon = epsilon
self.pos = self.add_weight(name='pos', initializer='zeros')
self.ctp = self.add_weight(name='ctp', initializer='zeros')
self.cfp = self.add_weight(name='cfp', initializer='zeros')
def update_state(self, y_true, y_pred, sample_weight=None):
y_true = tf.cast(y_true, tf.float32)
y_pred = tf.clip_by_value(y_pred, 0, 1)
pos = tf.reduce_sum(y_true)
ctp = tf.reduce_sum(y_pred[y_true==1])
cfp = tf.reduce_sum(y_pred[y_true==0])
def result(self):
beta_squared = self.beta * self.beta
c_precision = self.ctp / (self.ctp + self.cfp + self.epsilon)
c_recall = self.ctp / (self.pos + self.epsilon)
result = (1 + beta_squared) * (c_precision * c_recall) / (beta_squared * c_precision + c_recall)
return tf.cond(c_precision > 0 and c_recall > 0, lambda: result, lambda: 0.0)
pfbeta_tf_v2 = pFBeta(beta=1, epsilon=0)
pfbeta_tf_v2.update_state(labels, preds)
CPU times: user 530 ms, sys: 1.83 ms, total: 532 ms Wall time: 469 ms
<tf.Tensor: shape=(), dtype=float32, numpy=0.49990934>
This will give you overall pFBeta score for overall data during training. This perks comes from using tf.kreas.metrics.Metric
. For an example, below I am updating the state again with another batch of samples. When I can result it will show me the overall computed result rather than batch-wise aggregates result.
pfbeta_tf_v2.update_state(labels, preds+0.05)
<tf.Tensor: shape=(), dtype=float32, numpy=0.5118099>
You would get the same result if you concat these two batches and compute metric at once, Here an example
pfbeta_tf(tf.concat([labels, labels],0), tf.concat([preds, preds+0.05],0))
<tf.Tensor: shape=(), dtype=float32, numpy=0.51180995>
Torch (Same as numpy
def pfbeta_torch(labels, preds, beta=1):
preds = preds.clip(0, 1)
y_true_count = labels.sum()
ctp = preds[labels==1].sum()
cfp = preds[labels==0].sum()
beta_squared = beta * beta
c_precision = ctp / (ctp + cfp)
c_recall = ctp / y_true_count
if (c_precision > 0 and c_recall > 0):
result = (1 + beta_squared) * (c_precision * c_recall) / (beta_squared * c_precision + c_recall)
return result
return 0.0
import torch
labels = torch.Tensor(labels.numpy())
preds = torch.Tensor(preds.numpy())
pfbeta_torch(labels, preds)
CPU times: user 199 ms, sys: 73.1 ms, total: 272 ms Wall time: 147 ms