原文地址
手把手教你:基于粒子群优化算法(PSO)优化卷积神经网络(CNN)的文本分类
<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">
@TOC
</font>
<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">
项目简介
本文主要介绍如何使用python搭建:一个基于:粒子群优化算法(PSO:Particle swarm optimization) 优化CNN网络,并实现文本的分类。
<font color=#999AAA >博主也参考过网上其他博主介绍:粒子群优化算法(PSO)的文章,但大多是理论大于方法。并且很少有用到优化CNN或其他网络的代码。很多同学肯定对原理不需要过多了解,只需要搭建出一个分类或预测系统即可。</font>
本文只会告诉你如何快速搭建一个基于粒子群优化算法优化CNN的系统并运行,原理的东西可以参考其他博主。
也正是因为我发现网上大多的帖子只是针对原理进行介绍,功能实现的相对很少。
如果您有以上想法,那就找对地方了!
<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">
<font color=#999AAA >提示:以下是本篇文章正文内容
一、粒子群算法(PSO)简介
粒子群优化算法(PSO)是一种最优化算法,源于对鸟群捕食的行为研究。粒子群优化算法的基本思想:是通过群体中个体之间的协作和信息共享来寻找最优解.
PSO的优势:在于简单容易实现并且没有许多参数的调节。
目前已被广泛应用于函数优化、神经网络训练、模糊系统控制以及其他遗传算法的应用领域
二、项目展示
模型预测:
[图片上传失败...(image-8cff2e-1649642682961)]
模型训练:
[图片上传失败...(image-c75bc9-1649642682961)]
特征工程:
[图片上传失败...(image-ca8231-1649642682961)]
项目演示可以参考我在B站发的同名视频:
手把手教你:基于粒子群优化算法(PSO)优化卷积神经网络(CNN)的文本分类
二、环境需求
因为本项目基于TensorFlow因此需要以下环境:
- tensorflow==2.0
- pandas
- scikit-learn
- numpy
- Word2Vec
环境安装实例
环境都可以通过pip进行安装。如果只是想要功能跑起来,这边建议tensorflow安装cpu版的。
<font color=#999AAA >如果没使用过pycharm通过pip安装包的同学可以参考如下:
点开“终端”,然后通过pip进行安装tensorflow,其他环境包也可以通过上面的方法安装。
三、重要功能模块介绍
1.数据预处理模块(data_create.py)
首先读取书本信息内容并构建训练数据
部分功能代码:
def read_book_path(path):
"""
读取书本内容
:param path:
:return:
"""
book_msg_list = []
for root, dirs, files in os.walk(path):
# 获取作者
for author_dir in dirs:
# 子文件夹目录
file_path = os.path.join(root, author_dir)
for root_2, dirs_2, files_2 in os.walk(file_path):
for file in files_2:
# 获取书本地址
book_path = os.path.join(file_path, file)
# 获取书本名称
book_name = str(file.split('.txt')[0])
with open(book_path, "r", encoding='utf-8') as f: # 打开文件
try:
txt_data = f.read() # 读取文件
except UnicodeDecodeError:
with open(book_path, "r", encoding='gbk') as f:
try:
txt_data = f.read() # 读取文件
except UnicodeDecodeError as e:
print(e)
print("错误文件:" + str(book_path))
# gbk编码
else:
book_msg = [author_dir, book_name, txt_data]
book_msg_list.append(book_msg)
# utf-8编码
else:
book_msg = [author_dir, book_name, txt_data]
book_msg_list.append(book_msg)
return book_msg_list
def word_2_vec(data_in, words_num=2000, vec_num=128):
"""
构建书本-文字特征向量
:param vec_num: 单词的文本向量维度大小
:param words_num:段落大小
:param data_in:输入数据,格式为list:作者,书名,内容
:return:
"""
label_list = []
name_list = []
word_split_list = []
# 获取所有文本,文本分割
for i in tqdm(range(len(data_in))):
label = data_in[i][0]
book_name = data_in[i][1]
content = data_in[i][2]
# 文本处理
new_content = words_regularized(content)
words_list = new_content.split()
# 文本截取
words_list = words_list[:len(words_list) - len(words_list) % words_num]
# 将文本按定义的维度进行截取
for e in range(1, int(len(words_list) / words_num) + 1):
word_split_list.append(words_list[(e - 1) * words_num:e * words_num])
label_list.append(label)
name_list.append(book_name)
time.sleep(1)
print("完成文本预处理,共计获取:", len(word_split_list), "个段落。")
time_s = datetime.now()
print("****开始预训练词向量,此处预计耗时20秒(根据文本多少变化)")
# 词向量训练
# model = Word2Vec(sentences=word_split_list, vector_size=vec_num, min_count=1)
time_e = datetime.now()
time_cql = int((time_e - time_s).total_seconds())
model = Word2Vec.load('models/word2vec.model')
# model.save('models/word2vec.model')
print("完成文本词向量特征预训练,耗时:", time_cql, "秒。", "预训练词向量保存地址:models/word2vec.model")
# 完成特征构建
words_vec_list = []
for i in tqdm(range(len(word_split_list))):
content = word_split_list[i]
vec_list = []
for word in content:
vec = model.wv[word]
vec_list.append(vec)
words_vec_list.append(vec_list)
time.sleep(1)
print("完成特征构建。")
return words_vec_list, label_list, name_list
def words_regularized(text):
"""
过滤特殊符号以及还原常见缩写单词
:param text:原始文本
:return:处理后文本
"""
2.定义粒子群优化算法(n_PSO.py)
部分功能代码:
import numpy as np
import random
import n_model as md
import tensorflow as tf
def fit_fun(param, X): # 适应函数,此处为模型训练
# 设置GPU按需使用
gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_virtual_device_configuration(gpus[0], [
tf.config.experimental.VirtualDeviceConfiguration(memory_limit=5120)])
# 获取模型参数
label_count = param['label_count']
a_shape = param['a_shape']
b_shape = param['b_shape']
train_data = param['data']
train_label = param['label']
model = md.cnn_model(label_count, data_shape=(a_shape, b_shape))
# 传入待优化参数learning_rate
res_model = model.model_create(X[-1])
history = res_model.fit(train_data, train_label, epochs=5, batch_size=8, validation_split=0.2)
# 获取最小的loss值,优化为loss最小时learning_rate
val_loss = 1 - max(history.history['val_acc'])
return val_loss
class Particle:
# 初始化
def __init__(self, model_param, x_max, x_min, max_vel, dim):
self.__pos = [random.uniform(x_min, x_max) for i in range(dim)] # 粒子的位置
self.__vel = [random.uniform(-max_vel, max_vel) for i in range(dim)] # 粒子的速度
self.__bestPos = [0.0 for i in range(dim)] # 粒子最好的位置
self.__fitnessValue = fit_fun(model_param, self.__pos) # 适应度函数值
def set_pos(self, i, value):
self.__pos[i] = value
def get_pos(self):
return self.__pos
def set_best_pos(self, i, value):
self.__bestPos[i] = value
def get_best_pos(self):
return self.__bestPos
def set_vel(self, i, value):
self.__vel[i] = value
def get_vel(self):
return self.__vel
def set_fitness_value(self, value):
self.__fitnessValue = value
def get_fitness_value(self):
return self.__fitnessValue
class PSO:
def __init__(self, model_param, pso_param, best_fitness_value=float('Inf'), C1=2,
C2=2, W=1):
self.C1 = C1
self.C2 = C2
self.W = W
self.dim = pso_param['dim'] # 粒子的维度
self.size = pso_param['size'] # 粒子个数
self.iter_num = pso_param['iter_num'] # 迭代次数
self.x_max = pso_param['x_max'] # 粒子最大位置
self.x_min = pso_param['x_min'] # 粒子最小位置
self.max_vel = pso_param['max_vel'] # 粒子最大速度
self.best_position = [0.0 for i in range(pso_param['dim'])] # 种群最优位置
self.model_param = model_param # 模型参数
self.best_fitness_value = best_fitness_value
self.fitness_val_list = [] # 每次迭代最优适应值
# 对种群进行初始化
self.Particle_list = [Particle(self.model_param, self.x_max, self.x_min, self.max_vel, self.dim) for i in
range(self.size)]
3.定义被优化CNN模型
部分功能代码:
from tensorflow import keras
from tensorflow.keras import layers, models
class cnn_model:
def __init__(self, label_num, data_shape=(2000, 128)): # 默认输入张量为(2000,128)
# res块数量
self.num_blocks = 2
# 过滤器数量
self.filters = 64
# 步长
self.conv_size = 3
# 分类类别数
self.label_num = label_num
# 数据输入的shape
self.data_shape = data_shape
self.loss = 'sparse_categorical_crossentropy'
self.metrics = ['acc']
def res_net_block(self, input_data):
# CNN层
x = layers.Conv1D(self.filters, self.conv_size, activation='relu', padding='same')(input_data)
x = layers.BatchNormalization()(x)
x = layers.Conv1D(self.filters, self.conv_size, activation=None, padding='same')(x)
# 第二层没有激活函数
def model_create(self, learning_rate):
4.使用PSO优化CNN初始化学习率(ModelTrain.py)
import os
from collections import Counter
import numpy as np
from n_PSO import PSO
import n_model as md
import tensorflow as tf
import json
if __name__ == '__main__':
# 加载数据
data, label, label_count = load_data()
# 生成训练集测试集
train_data, train_label, val_data, val_label = create_train_data(data, label, 0.9)
# 输入数据的shape值,用于模型构造
a_shape = data.shape[1]
b_shape = data.shape[2]
# 模型参数
model_param = {
"a_shape": a_shape,
"b_shape": b_shape,
"label_count": label_count,
"data": train_data,
"label": train_label
}
"""
用粒子群优化算法对训练模型初始化参数进行优化
"""
# 设置粒子群优化参数
# 被优化参数
dim = 1
# 粒子数
size = 5
# 优化算法迭代次数
iter_num = 20
# 被优化参数最大值
x_max = 0.01
# 被优化参数最小值
x_min = 0.00001
# 每个粒子每次移动的最大速度
max_vel = 0.0005
# 粒子群算法参数
pso_param = {
"dim": dim,
"size": size,
"iter_num": iter_num,
"x_max": x_max,
"x_min": x_min,
"max_vel": max_vel
}
# 实例化粒子群算法
pso = PSO(model_param, pso_param)
# 寻最优解
best_err, best_learn_rate = pso.update()
print("粒子群优化后最优准确率为:", 1 - best_err)
print("粒子群优化后最优初始化learning_rate:", best_learn_rate)
# 保存pso优化后参数
os.path.join("app210323")
best_param = {
"acc": 1 - best_err,
"learn_rate": best_learn_rate,
}
b = json.dumps(best_param)
file = open('models/pso_out_param.json', 'w')
file.write(b)
file.close()
"""
使用最优化初始参数进行训练
"""
# 模型训练
model = md.cnn_model(label_count, data_shape=(a_shape, b_shape))
# 使用最优学习率进行训练
cnn_model = model.model_create(best_learn_rate)
5.模型分类预测
import tensorflow as tf
import numpy as np
from collections import Counter
from tensorflow import keras
# 评估模型
from sklearn.metrics import roc_curve, roc_auc_score, classification_report, accuracy_score
if __name__ == '__main__':
# -------------------设置显存按需分配-----------------
# 设置显存
gpu_memory = 5120
gpus = tf.config.experimental.list_physical_devices('GPU')
# 对需要进行限制的GPU进行设置
tf.config.experimental.set_virtual_device_configuration(gpus[0], [
tf.config.experimental.VirtualDeviceConfiguration(memory_limit=gpu_memory)])
# 查看GPU是否可用
print("检查GPU是否可用:", tf.test.is_gpu_available())
# 加载测试数据
val_data, val_label = load_data()
# 加载模型
# 模型地址名称,如重新训练了模型,请在此处修改地址和名称
model_path = 'models/cnn_model_epoch-30_valAcc-0.94545454.h5'
model = keras.models.load_model(model_path)
print("模型:", model_path, "。加载成功!")
print("*****完成预处理,进行模型评估*****")
y_pred = model.predict(val_data)
y_pred = [np.argmax(x) for x in y_pred]
print('------------------测试集上得分:------------------------')
print('*' * 5)
print('测试集准确率得分:', accuracy_score(val_label, y_pred))
print('*' * 5)
print('准确率、召回率、f1-值测试报告如下:\n', classification_report(val_label, y_pred))
四、完整代码地址
由于项目代码量较大,感兴趣的同学可以下载完整代码,使用过程中如遇到任何问题可以私信我,我都会一一解答。