机器学习-8 XGBoost【附代码】

返回主页


XGBoost(eXtreme Gradient Boosting)又称极致梯度提升树,由陈天奇于2016年提出, = 决策树 + Boosting,适用于含有连续、离散特征的回归与分类问题,当数据集含有二分类/多分类变量时,提升树集成算法往往优于现有的深度学习算法。XGBoost 是微软后续 LightGBM 的前身,也是 Kaggle 竞赛的大杀器,也是决策树一系算法登峰造极之产物。

论文:《XGBoost: A Scalable Tree Boosting System》

网上关于 XGBoost 的基础讲解已为海量,在此不赘述,只从机器学习三要素的角度进行推导和归纳,将其纳入到机器学习基本范式中进行讨论:

1、数据集与特征空间

2、定义假设空间
2.1、对于回归问题:

2.2、对于二分类问题:

3、推导目标函数(泰勒二阶)
3.1、从损失函数一般形态入手,利用泰勒二阶展开,推导到对于当前本颗树每个样本落到每个叶子结点的得分 wj,得到目标函数表达式:

注:论文原文中并没有提及一范数惩罚项alpha,但接口文档中存在一范数超参,因此将alpha加入推导过程,便于从整体上理解参数对算法的影响。(也许是因为L1不可微,从学术严谨性角度论文中没有涉及)

https://xgboost.readthedocs.io/en/latest/parameter.html

3.2、证明目标函数的凸性:拉格朗日乘子 lambda 非负,二阶导的求和项 H > 0(通常情况下损失函数用平方误差和交叉熵),那么可证目标函数为凸(Convex)。

3.3、推出 wj 的表达式,wj* 是目标函数的最优解,同时是假设函数中子模型的最优值;再通过贪心思想推出信息增益最大化对递归建树的要求:

4、损失函数及其梯度
4.1、对于回归问题:

4.2、对于二分类问题:

5、优化步骤
5.1、根据任务类型,确定是回归任务还是二分类任务,明确用哪种损失函数
5.2、计算损失函数在每个样本点的一阶导数 gi 和二阶导数 hi,根据经验,回归问题 y_hat 初始化为 0,二分类初始化为 0.5
5.3、对数据集进行列抽样和行抽样,得到本轮的训练子集,注意抽样要同时对 gi 和 hi 进行
5.4、在本轮子集上递归建树,得到本轮每个叶子结点的得分 wj,并保存本轮的树结构
5.5、把本轮新生成的树 fm 添加到整合假设函数中,得到更新的 y_hat,注意要加学习率

5.6、计算累计到本轮的训练集和测试集 Loss,并保存
5.7、循环 5.2 至 5.6,直到测试集损失不再下降,算法停止避免过拟合
5.8、运用学习到的 “森林” 执行模型预测


手写 XGBoost 并与 Sklearn 对比

# -*- coding: utf-8 -*-
import os
import random as rd
import numpy as np
import pandas as pd
#from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
#from sklearn.preprocessing import StandardScaler
#import matplotlib.pyplot as plt
import xgboost as xgb
from xgboost.sklearn import (XGBRegressor, XGBClassifier)

class XGBoostModel(object):
    def __init__(self, job_type, learning_rate=0.1, alpha=0, lamb=1, min_sample_split=1,
                 subsample=0.9, colsample=0.9, n_estimators=100, tol=0.0001,
                 use_early_stopping=True, verbose=True):
        self.job_type = job_type
        self.learning_rate = learning_rate
        self.alpha = alpha
        self.lamb = lamb
        self.min_sample_split = min_sample_split
        self.subsample = subsample
        self.colsample = colsample
        self.n_estimators = n_estimators
        self.use_early_stopping = use_early_stopping
        self.tol = tol
        self.verbose = verbose
    
    
    def check_params(self):
        '''参数校验'''
        if self.learning_rate <= 0:
            raise ValueError("learning_rate must > 0, float")
        if self.alpha < 0:
            raise ValueError("alpha must >= 0, default = 0")
        if self.lamb < 0:
            raise ValueError("lambda must >= 0, default = 1")
        if self.min_sample_split <= 0:
            raise ValueError("min_sample_split must > 1, int")
        if self.subsample <= 0 or self.subsample > 1:
            raise ValueError("subsample range (0, 1], float")
        if self.colsample <= 0 or self.colsample > 1:
            raise ValueError("colsample range (0, 1], float")
        if self.n_estimators < 1:
            raise ValueError("n_estimators must >= 1, int")
        return None
    
    
    def data_augment(self, x):
        '''
        数据增强:
        1、列名改为数字
        2、为每个连续变量加上一个极端小量 N(0, 10**-8),使每个连续变量值唯一,
        从而可以避免后面建树时出现变量值相同而分裂点报错
        '''
        N, _ = x.shape
        x.columns = range(len(x.columns))
        
        for j in x.columns:
            if isinstance(x[j].iloc[0], np.float64):
                randn = pd.Series(np.random.normal(0.0, 10**-8, N), index=x.index)
                x[j] += randn
            else:
                continue
        return x
    
    
    def data_sampling(self, x, y):
        '''
        样本抽样:
        1、行抽样
        2、列抽样
        3、根据抽样提取子集
        '''
        # 行抽样 & 列抽样
        idx_row = rd.sample(x.index.tolist(), int(len(x)*self.subsample))
        idx_col = rd.sample(x.columns.tolist(), int(len(x.columns)*self.colsample))
        # 提取子集
        x_sub = x.loc[idx_row, idx_col]
        y_sub = y[idx_row]
        return x_sub, y_sub
    
    
    def get_info(self, g, h):
        '''计算信息增益'''
        fenzi = (g.sum() + self.alpha)**2
        fenmu = h.sum() + self.lamb
        info = fenzi / (fenmu + 10**-8)
        return info
    
    def get_w(self, g, h):
        '''计算样本落到叶子结点的得分,即假设函数'''
        fenzi = g.sum() + self.alpha
        fenmu = h.sum() + self.lamb
        weight = -fenzi / (fenmu + 10**-8)
        return weight
    
    def split_data(self, x, g, h, best_feature, best_value):
        '''根据信息增益最大化,分裂子树'''
        # 样本分裂
        x_left = x[x[best_feature] < best_value]
        x_right = x[x[best_feature] >= best_value]
        # 一阶导分裂
        g_left = g[x[best_feature] < best_value]
        g_right = g[x[best_feature] >= best_value]
        # 二阶导分裂
        h_left = h[x[best_feature] < best_value]
        h_right = h[x[best_feature] >= best_value]
        return x_left, x_right, g_left, g_right, h_left, h_right
    
    def search_info(self, x, g, h, i, j, df_info_gain_res):
        '''查找最大信息增益'''
        g_left = g.iloc[0:i]
        g_right = g.iloc[i:]
        
        h_left = h.iloc[0:i]
        h_right = h.iloc[i:]
        
        info_parent = self.get_info(g, h)
        info_left = self.get_info(g_left, h_left)
        info_right = self.get_info(g_right, h_right)
        info_gain = info_left + info_right - info_parent
        
        df_info_gain = pd.DataFrame({"feature": [j],
                                     "value": x[j].iloc[i],
                                     "info_gain": [info_gain]},
                                     columns=["feature", "value", "info_gain"])
        df_info_gain_res = pd.concat([df_info_gain_res, df_info_gain], 
                                     axis=0, 
                                     ignore_index=True)
        return df_info_gain_res
    
    def create_tree(self, x, g, h):
        '''递归建树'''
        N, _ = x.shape
        # 判断结点最小分裂样本,如果达到阈值则不再分裂,将该结点置为叶子结点计算得分
        if N <= self.min_sample_split:
            wj = self.get_w(g, h)
            return wj
        
        # 信息增益评价表
        df_info_gain_res = pd.DataFrame()
        # 列循环
        for j in x.columns:
            # 列排序
            x = x.sort_values(by=j, ascending=True)
            g = g.reindex(index=x.index)
            h = h.reindex(index=x.index)
            # 如果特征为连续属性,则逐行搜索最优分裂点
            if isinstance(x[j].iloc[0], np.float64):
                for i in range(1, N):
                    df_info_gain_res = self.search_info(x, g, h, i, j, df_info_gain_res)
            # 如果特征为离散属性,则当属性值发生变化时,触发搜索最优分裂点
            elif isinstance(x[j].iloc[0], np.int64):
                for i in range(1, N):
                    if x[j].iloc[i] != x[j].iloc[i-1]:
                        df_info_gain_res = self.search_info(x, g, h, i, j, df_info_gain_res)
            else:
                raise TypeError("dtypes of data should be np.float64 or np.int64")
                
        # 计算增益最大化、最优特征、最优分裂值
        best_gain = df_info_gain_res.info_gain.max()
        best_feature = df_info_gain_res.loc[df_info_gain_res.info_gain == best_gain, "feature"].values[0]
        best_value = df_info_gain_res.loc[df_info_gain_res.info_gain == best_gain, "value"].values[0]
        # 构建左右子树有向边条件
        left_rule = "< " + str(best_value)
        right_rule = ">= " + str(best_value)
        # 分裂
        x_left, x_right, g_left, g_right, h_left, h_right = self.split_data(x, g, h, best_feature, best_value)
        # 递归建树
        tree = {best_feature: {}}
        tree[best_feature][left_rule] = self.create_tree(x_left, g_left, h_left)
        tree[best_feature][right_rule] = self.create_tree(x_right, g_right, h_right)
        return tree
        
    
    def predict_single(self, tree, x_sub):
        '''单样本预测'''
        # 递归解析建好的树
        for (feature, branch) in tree.items():
            # 读取特征值
            value = x_sub[feature]
            # 获取判断条件
            left_rule = list(branch.keys())[0]
            right_rule = list(branch.keys())[1]
            # 是否满足左分支
            if eval(str(value) + left_rule):
                tree = list(branch.values())[0]
                if isinstance(tree, np.float64):
                    return tree
                else:
                    tree = self.predict_single(tree, x_sub)
            # 是否满足右分支
            if eval(str(value) + right_rule):
                tree = list(branch.values())[1]
                if isinstance(tree, np.float64):
                    return tree
                else:
                    tree = self.predict_single(tree, x_sub)
        return tree
    
    def predict_batch(self, tree, x):
        '''批预测'''
        y_hat_list = []
        for i in range(len(x)):
            x_sub = x.iloc[i, :]
            y_hat_sub = self.predict_single(tree, x_sub)
            y_hat_list.append(y_hat_sub)
        
        y_hat = pd.Series(y_hat_list, index=x.index)
        return y_hat
    
    def predict(self, forest, x):
        '''模型预测(回归)'''
        y_pred = pd.Series(np.zeros(len(x)), index=x.index)
        for i in range(len(forest)):
            y_hat = self.predict_batch(forest[i], x)
            y_pred += self.learning_rate * y_hat
        return y_pred
    
    def predict_proba(self, forest, x):
        '''模型预测(二分类)'''
        y_pred = pd.Series(np.zeros(len(x)), index=x.index) + 0.5
        for i in range(len(forest)):
            y_hat = self.predict_batch(forest[i], x)
            y_pred += self.learning_rate * y_hat
        return y_pred
    
    
    def loss_reg(self, y_true, y_pred):
        '''均方根误差'''
        return np.sqrt(np.mean((y_true - y_pred)**2))
    
    def loss_bina(self, y_true, y_pred):
        '''交叉熵'''
        res = - y_true * np.log(y_pred) - (1 - y_true) * np.log(1 - y_pred)
        return np.mean(res)
    
    
    def fit(self, x_train, y_train, x_test, y_test):
        '''模型训练'''
        # 参数校验
        self.check_params()
        # 变量初始化
        if self.job_type == "Regressor":
            y_pred_train = pd.Series(np.zeros_like(y_train), index=y_train.index)
            y_pred_test = pd.Series(np.zeros_like(y_test), index=y_test.index)
            g = y_pred_train - y_train
            h = pd.Series(np.ones_like(y_train), index=y_train.index)
        elif self.job_type == "Classifier":
            y_pred_train = pd.Series(np.zeros_like(y_train), index=y_train.index) + 0.5
            y_pred_test = pd.Series(np.zeros_like(y_test), index=y_test.index) + 0.5
            g = -(y_train / y_pred_train) + ((1 - y_train) / (1 - y_pred_train))
            h = (y_train / y_pred_train**2) + ((1 - y_train) / (1 - y_pred_train)**2)
        else:
            raise ValueError("job_type must be 'Regressor' or 'Classifier'")
        
        loss_train_list = []
        loss_test_list = []
        forest = []
        
        # 迭代训练子树模型
        for epoch in range(self.n_estimators):
            # 行列抽样
            x_sub, y_sub = self.data_sampling(x_train, y_train)
            g_sub = g[x_sub.index]
            h_sub = h[x_sub.index]
            # 递归建树
            tree = self.create_tree(x_sub, g_sub, h_sub)
            # 计算训练集和测试集损失并保存
            y_hat_train = self.predict_batch(tree, x_train)
            y_hat_test = self.predict_batch(tree, x_test)
            y_pred_train += self.learning_rate * y_hat_train
            y_pred_test += self.learning_rate * y_hat_test
            # 更新变量
            if self.job_type == "Regressor":
                # 更新梯度
                g = y_pred_train - y_train
                loss_train = self.loss_reg(y_train, y_pred_train)
                loss_test = self.loss_reg(y_test, y_pred_test)
            elif self.job_type == "Classifier":
                # 预测值调整
                y_pred_train = y_pred_train.apply(lambda y: 0.01 if y < 0 else y)\
                .apply(lambda y: 0.99 if y > 1 else y)
                y_pred_test = y_pred_test.apply(lambda y: 0.01 if y < 0 else y)\
                .apply(lambda y: 0.99 if y > 1 else y)
                # 更新梯度
                g = -(y_train / y_pred_train) + ((1 - y_train) / (1 - y_pred_train))
                h = (y_train / y_pred_train**2) + ((1 - y_train) / (1 - y_pred_train)**2)
                loss_train = self.loss_bina(y_train, y_pred_train)
                loss_test = self.loss_bina(y_test, y_pred_test)
            else:
                raise ValueError("job_type must be 'Regressor' or 'Classifier'")
                
            # 打印结果
            if self.verbose:
                print(f"epoch {epoch}  loss_train {loss_train}  loss_test {loss_test}")
            
            loss_train_list.append(loss_train)
            loss_test_list.append(loss_test)
            forest.append(tree)
            
            # 判断停止条件并保存
            if self.use_early_stopping:
                err = loss_test_list[epoch-1] - loss_test_list[epoch]
                if (len(loss_test_list) >= 2) and (err <= self.tol):
                    print(f"best iteration at epoch {epoch-1}")
                    evals_result = {"loss_train": loss_train_list[:-1], 
                                    "loss_test": loss_test_list[:-1]}
                    forest = forest[:-1]
                    break
                else:
                    continue
            else:
                evals_result = {"loss_train": loss_train_list, "loss_test": loss_test_list}
                
        return forest, evals_result
            
        
    def get_score(self, y_true, y_pred):
        '''计算准确率'''
        score = sum(y_true == y_pred) / len(y_true)
        return score

if __name__ == "__main__":
    file_path = os.getcwd()
    dataSet = pd.read_csv(file_path + "/swiss.csv")
    
    # 1、回归问题
    # 1.1、手写算法
    df = dataSet[["Fertility", "Agriculture", "Catholic", "InfantMortality", "Examination", "Education"]]
    y = df.iloc[:, 0]
    x = df.iloc[:, 1:]
    
    model = XGBoostModel(job_type="Regressor", learning_rate=0.3)
    x = model.data_augment(x)
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0)
    forest, evals_result = model.fit(x_train, y_train, x_test, y_test)
    df_evals = pd.DataFrame({"loss_train": evals_result.get("loss_train"),
                             "loss_test": evals_result.get("loss_test")})
    df_evals.plot()
    y_pred = model.predict(forest, x_test)
    model.loss_reg(y_test, y_pred)
    
    # 1.2、sklearn
    d_train = xgb.DMatrix(x_train, y_train)
    d_test = xgb.DMatrix(x_test, y_test)
    wathchlist = [(d_train, "train"), (d_test, "test")]
    reg = XGBRegressor(learning_rate=0.3, n_estimators=100,
                       objective="reg:linear", eval_metric="rmse", 
                       min_child_weight=1, subsample=0.8, colsample_bytree=0.8, 
                       reg_alpha=0, reg_lambda=1, n_jobs=-1, nthread=-1, seed=3)
    params = reg.get_params()
    evals_res = {}
    model_sklearn = xgb.train(params=params, dtrain=d_train, evals=wathchlist, 
                              evals_result=evals_res, early_stopping_rounds=10, 
                              verbose_eval=True)
    y_pred = model_sklearn.predict(d_test)
    df_evals = pd.DataFrame({"loss_train": evals_res.get("train").get("rmse"),
                             "loss_test": evals_res.get("test").get("rmse")})
    df_evals.plot()
    model.loss_reg(y_test, y_pred)
    
    # 2、二分类问题
    # 2.1、手写算法
    df = dataSet[["Fertility2", "Agriculture", "Catholic", "InfantMortality", "Examination", "Education"]]
    y = df.iloc[:, 0]
    x = df.iloc[:, 1:]
    
    model = XGBoostModel(job_type="Classifier", learning_rate=0.1)
    x = model.data_augment(x)
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0)
    forest, evals_result = model.fit(x_train, y_train, x_test, y_test)
    df_evals = pd.DataFrame({"loss_train": evals_result.get("loss_train"),
                             "loss_test": evals_result.get("loss_test")})
    df_evals.plot()
            
    y_hat = model.predict_proba(forest, x_test)
    y_pred = y_hat.apply(lambda y: 0 if y <= 0.5 else 1)
    model.get_score(y_test, y_pred)
        
    # 2.2、sklearn
    d_train = xgb.DMatrix(x_train, y_train)
    d_test = xgb.DMatrix(x_test, y_test)
    wathchlist = [(d_train, "train"), (d_test, "test")]
    clf = XGBClassifier(learning_rate=0.1, n_estimators=100,
                       objective="binary:logistic", eval_metric="logloss", 
                       min_child_weight=1, subsample=0.8, colsample_bytree=0.8, 
                       reg_alpha=0, reg_lambda=1, n_jobs=-1, nthread=-1, seed=3)
    params = clf.get_params()
    evals_res = {}
    model_sklearn = xgb.train(params=params, dtrain=d_train, evals=wathchlist, 
                              evals_result=evals_res, early_stopping_rounds=10, 
                              verbose_eval=True)
    y_hat = model_sklearn.predict(d_test)
    df_evals = pd.DataFrame({"loss_train": evals_res.get("train").get("logloss"),
                             "loss_test": evals_res.get("test").get("logloss")})
    df_evals.plot()
    
    y_pred = np.where(y_hat <= 0.5, 0, 1)
    model.get_score(y_test, y_pred)

1、回归问题
1.1、手写算法

迭代33轮停止
手写算法的损失图
手写算法的均方根误差

1.2、sklearn


迭代9轮停止
sklearn损失图
sklearn的均方根误差

2、二分类问题
2.1、手写算法

手写算法的损失图
手写算法的准确率

2.2、sklearn

sklearn损失图
sklearn 准确率

全文总结:
1、在同样的参数下,手写算法的回归泛化能力略逊于Sklearn库,迭代周期更长,手写算法的过拟合现象更明显;
2、在同样的参数下,手写算法的分类泛化能力略优于Sklearn库,迭代周期更短,分类准确率高出10%
3、本文仅针对小样本数据集进行测试,目的在于通过手写 XGBoost 底层源码更深刻的理解算法的精髓,更好地服务于工程开发与调参;
4、后代算法如 LightGBM 通过引入直方图算法加速连续变量的计算,在此未做代码,可以尝试在建树的过程中对连续变量进行分箱按离散变量处理之。


返回主页

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容