21、预测延迟
from collections import defaultdict
import time
import gc
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import Ridge
from sklearn.linear_model import SGDRegressor
from sklearn.svm import SVR
from sklearn.utils import shuffle
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
def _not_in_sphinx():
# Hack to detect whether we are running by the sphinx builder
return '__file__' in globals()
def atomic_benchmark_estimator(estimator, X_test, verbose=False):
"""Measure runtime prediction of each instance."""
n_instances = X_test.shape[0]
runtimes = np.zeros(n_instances, dtype=np.float)
for i in range(n_instances):
instance = X_test[[i], :]
start = time.time()
estimator.predict(instance)
runtimes[i] = time.time() - start
if verbose:
print("atomic_benchmark runtimes:", min(runtimes), np.percentile(
runtimes, 50), max(runtimes))
return runtimes
def bulk_benchmark_estimator(estimator, X_test, n_bulk_repeats, verbose):
"""Measure runtime prediction of the whole input."""
n_instances = X_test.shape[0]
runtimes = np.zeros(n_bulk_repeats, dtype=np.float)
for i in range(n_bulk_repeats):
start = time.time()
estimator.predict(X_test)
runtimes[i] = time.time() - start
runtimes = np.array(list(map(lambda x: x / float(n_instances), runtimes)))
if verbose:
print("bulk_benchmark runtimes:", min(runtimes), np.percentile(
runtimes, 50), max(runtimes))
return runtimes
def benchmark_estimator(estimator, X_test, n_bulk_repeats=30, verbose=False):
atomic_runtimes = atomic_benchmark_estimator(estimator, X_test, verbose)
bulk_runtimes = bulk_benchmark_estimator(estimator, X_test, n_bulk_repeats,
verbose)
return atomic_runtimes, bulk_runtimes
def generate_dataset(n_train, n_test, n_features, noise=0.1, verbose=False):
if verbose:
print("generating dataset...")
X, y, coef = make_regression(n_samples=n_train + n_test,
n_features=n_features, noise=noise, coef=True)
random_seed = 13
X_train, X_test, y_train, y_test = train_test_split(
X, y, train_size=n_train, test_size=n_test, random_state=random_seed)
X_train, y_train = shuffle(X_train, y_train, random_state=random_seed)
X_scaler = StandardScaler()
X_train = X_scaler.fit_transform(X_train)
X_test = X_scaler.transform(X_test)
y_scaler = StandardScaler()
y_train = y_scaler.fit_transform(y_train[:, None])[:, 0]
y_test = y_scaler.transform(y_test[:, None])[:, 0]
gc.collect()
if verbose:
print("ok")
return X_train, y_train, X_test, y_test
def boxplot_runtimes(runtimes, pred_type, configuration):
fig, ax1 = plt.subplots(figsize=(10, 6))
bp = plt.boxplot(runtimes, )
cls_infos = ['%s\n(%d %s)' % (estimator_conf['name'],
estimator_conf['complexity_computer'](
estimator_conf['instance']),
estimator_conf['complexity_label']) for
estimator_conf in configuration['estimators']]
plt.setp(ax1, xticklabels=cls_infos)
plt.setp(bp['boxes'], color='black')
plt.setp(bp['whiskers'], color='black')
plt.setp(bp['fliers'], color='red', marker='+')
ax1.yaxis.grid(True, linestyle='-', which='major', color='lightgrey',
alpha=0.5)
ax1.set_axisbelow(True)
ax1.set_title('每个实例的预测时间 - %s, %d feats.' % (
pred_type.capitalize(),
configuration['n_features']))
ax1.set_ylabel('Prediction Time (us)')
plt.show()
def benchmark(configuration):
X_train, y_train, X_test, y_test = generate_dataset(
configuration['n_train'], configuration['n_test'],
configuration['n_features'])
stats = {}
for estimator_conf in configuration['estimators']:
print("Benchmarking", estimator_conf['instance'])
estimator_conf['instance'].fit(X_train, y_train)
gc.collect()
a, b = benchmark_estimator(estimator_conf['instance'], X_test)
stats[estimator_conf['name']] = {'atomic': a, 'bulk': b}
cls_names = [estimator_conf['name'] for estimator_conf in configuration[
'estimators']]
runtimes = [1e6 * stats[clf_name]['atomic'] for clf_name in cls_names]
boxplot_runtimes(runtimes, 'atomic', configuration)
runtimes = [1e6 * stats[clf_name]['bulk'] for clf_name in cls_names]
boxplot_runtimes(runtimes, 'bulk (%d)' % configuration['n_test'],
configuration)
def n_feature_influence(estimators, n_train, n_test, n_features, percentile):
percentiles = defaultdict(defaultdict)
for n in n_features:
print("benchmarking with %d features" % n)
X_train, y_train, X_test, y_test = generate_dataset(n_train, n_test, n)
for cls_name, estimator in estimators.items():
estimator.fit(X_train, y_train)
gc.collect()
runtimes = bulk_benchmark_estimator(estimator, X_test, 30, False)
percentiles[cls_name][n] = 1e6 * np.percentile(runtimes,
percentile)
return percentiles
def plot_n_features_influence(percentiles, percentile):
fig, ax1 = plt.subplots(figsize=(10, 6))
colors = ['r', 'g', 'b']
for i, cls_name in enumerate(percentiles.keys()):
x = np.array(sorted([n for n in percentiles[cls_name].keys()]))
y = np.array([percentiles[cls_name][n] for n in x])
plt.plot(x, y, color=colors[i], )
ax1.yaxis.grid(True, linestyle='-', which='major', color='lightgrey',
alpha=0.5)
ax1.set_axisbelow(True)
ax1.set_title('具有特征的预测时间演化')
ax1.set_xlabel('#Features')
ax1.set_ylabel('Prediction Time at %d%%-ile (us)' % percentile)
plt.show()
def benchmark_throughputs(configuration, duration_secs=0.1):
X_train, y_train, X_test, y_test = generate_dataset(
configuration['n_train'], configuration['n_test'],
configuration['n_features'])
throughputs = dict()
for estimator_config in configuration['estimators']:
estimator_config['instance'].fit(X_train, y_train)
start_time = time.time()
n_predictions = 0
while (time.time() - start_time) < duration_secs:
estimator_config['instance'].predict(X_test[[0]])
n_predictions += 1
throughputs[estimator_config['name']] = n_predictions / duration_secs
return throughputs
def plot_benchmark_throughput(throughputs, configuration):
fig, ax = plt.subplots(figsize=(10, 6))
colors = ['r', 'g', 'b']
cls_infos = ['%s\n(%d %s)' % (estimator_conf['name'],
estimator_conf['complexity_computer'](
estimator_conf['instance']),
estimator_conf['complexity_label']) for
estimator_conf in configuration['estimators']]
cls_values = [throughputs[estimator_conf['name']] for estimator_conf in
configuration['estimators']]
plt.bar(range(len(throughputs)), cls_values, width=0.5, color=colors)
ax.set_xticks(np.linspace(0.25, len(throughputs) - 0.75, len(throughputs)))
ax.set_xticklabels(cls_infos, fontsize=10)
ymax = max(cls_values) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel('Throughput (predictions/sec)')
ax.set_title('不同估计量的预测吞吐量 (%d '
'features)' % configuration['n_features'])
plt.show()
start_time = time.time()
# 各种回归器的基准体/原子预测速度
configuration = {
'n_train': int(1e3),
'n_test': int(1e2),
'n_features': int(1e2),
'estimators': [
{'name': 'Linear Model',
'instance': SGDRegressor(penalty='elasticnet', alpha=0.01,
l1_ratio=0.25, tol=1e-4),
'complexity_label': 'non-zero coefficients',
'complexity_computer': lambda clf: np.count_nonzero(clf.coef_)},
{'name': 'RandomForest',
'instance': RandomForestRegressor(),
'complexity_label': 'estimators',
'complexity_computer': lambda clf: clf.n_estimators},
{'name': 'SVR',
'instance': SVR(kernel='rbf'),
'complexity_label': 'support vectors',
'complexity_computer': lambda clf: len(clf.support_vectors_)},
]
}
benchmark(configuration)
# 基准n特征对预测速度的影响
percentile = 90
percentiles = n_feature_influence({'ridge': Ridge()},
configuration['n_train'],
configuration['n_test'],
[100, 250, 500], percentile)
plot_n_features_influence(percentiles, percentile)
# 基准吞吐量
throughputs = benchmark_throughputs(configuration)
plot_benchmark_throughput(throughputs, configuration)
stop_time = time.time()
print("example run in %.2fs" % (stop_time - start_time))