今天记录一下最近将深度学习方法用于智能合约漏洞检测的第一次实验,顺便给研究这方向的同行们提供一点借鉴意义。这个方法跟NLP有点相似,但又不太一样,因为操作码序列虽然具备一定语义信息,但偏向底层机器语言,所以刚开始我并不确定最终能不能达到很好的训练效果。这个实验的完整过程如下:首先通过插桩在本地链上同步当前以太坊的部分区块交易数据,借此拿到每笔交易的操作码序列、合约地址等等原始数据;接着通过word2vec或one-hot编码将每个操作码转成词向量;最后搭建CNN+LSTM的深度学习模型完成多分类训练。
0. 导包
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.layers import Embedding
from keras.layers import LSTM
from keras.layers import Convolution1D, MaxPooling1D
from keras.saving.save import load_model
from keras.utils import to_categorical
import gensim
from gensim.models import word2vec
import tensorflow as tf
import numpy as np
import pickle as pkl
from gensim.corpora.dictionary import Dictionary
from sklearn.model_selection import train_test_split
import pandas as pd
from tensorflow.keras.preprocessing import sequence
import matplotlib.pyplot as plt
import logging
1. 从数据库获取数据
我使用的是MongoDB数据库,因此可以使用可视化软件MongoDB Compass或Navicat for MongoDB直接查询获取,同样也可以通过python连接数据库获取,本质上都是编写MongoDB的查询语句,导出格式一般是csv或txt,csv因为携带了标签比较容易处理。
# 连接数据库
client = MongoClient('mongodb://admin:123456@172.22.60.69:27017/')
db = client.geth
# 获取对应数据库表的对象
collection = db.transaction
i = 0
for collection in collection.find({}, {"tx_trace": 1, "_id": 0}):
i = i + 1
with open('./trace.txt', 'a') as f:
f.write(collection.get('tx_trace'))
f.write('\n')
if i == 3685499:
break
2. 训练词向量
对于文本的向量化其实有很多方式,包括独热(one-hot),词袋模型(bag of words),逆文本特征频率(tf-idf)和word2vec等,在本实验中我使用现成的word2vec预训练模型直接训练得到词向量,同样也可以用自己搭建的模型训练。word2vec是使用深度学习的方式将词映射为一个多维向量,维度可以自行选择。
# 设置输出日志
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
# 直接用gemsim提供的API去读取txt文件,读取文件的API有LineSentence和Text8Corpus, PathLineSentences等。
sentences = word2vec.LineSentence("./trace.txt")
print("*****************")
# 训练模型,词向量的维度设置为128,迭代次数为8,采用skip-gram模型,模型保存为bin格式,并发数为5
model = gensim.models.Word2Vec(sentences, vector_size=128, sg=1, epochs=8, min_count=1, negative=8, workers=5)
model.save("word2vec1.model")
model.wv.save_word2vec_format("./word2vec1.bin", binary=True)
# 加载bin格式的模型
model1 = gensim.models.KeyedVectors.load_word2vec_format("word2vec1.bin", binary=True)
model1.save_word2vec_format('./vector1.txt', binary=False)
3. 数据预处理
读取csv或txt格式的原始数据并处理成嵌套列表的形式。
# 读取csv源数据并处理成嵌套列表格式
def csv_to_trace_list(csv_path, trace_list1):
trace = pd.read_csv(csv_path, usecols=['tx_trace'])
# 将DataFrame转成list
p_list = np.array(trace).tolist()
# 将每个序列分词并转成list,并将每个list存入trace_list1,此时trace_list1为嵌套列表
for i in p_list:
trace_list1.append(i[0].split())
4. 根据词向量创建词语字典
加载上面生成的word2vec模型创建出对应的词向量字典(操作码向量字典)。
def create_dictionaries(model1):
# 创建词语字典,并返回word2vec模型中词语的索引,词向量
gensim_dict = Dictionary() # 创建词语词典
gensim_dict.doc2bow(model1.wv.index_to_key, allow_update=True)
w2index = {v: k + 1 for k, v in gensim_dict.items()} # 词语的索引,从1开始编号
w2vec = {word: model1.wv[word] for word in w2index.keys()} # 词语的词向量
return w2index, w2vec
model = word2vec.Word2Vec.load("word2vec1.model")
index_dict, word_vectors = create_dictionaries(model) # 索引字典、词向量字典
5. 将序列文本转成字典索引数字
将输入操作码序列中出现在词向量字典中的操作码转换为索引数字,未出现的转换为0即可。
def text_to_index_array(p_new_dic, p_sen):
# 文本或列表转换为索引数字
if type(p_sen) == list:
new_sentences = []
for sen in p_sen:
new_sen = []
for word in sen:
try:
new_sen.append(p_new_dic[word]) # 单词转索引数字
except:
new_sen.append(0) # 索引字典里没有的词转为数字0
new_sentences.append(new_sen)
return np.array(new_sentences) # 转numpy数组
else:
new_sentences = []
sentences = []
p_sen = p_sen.split(" ")
for word in p_sen:
try:
sentences.append(p_new_dic[word]) # 单词转索引数字
except:
sentences.append(0) # 索引字典里没有的词转为数字0
new_sentences.append(sentences)
return new_sentences
6. 统一序列长度
定义必要参数,加载词向量数据并填充词向量矩阵。
# 参数设置
vocab_dim = 128 # 词向量维度
max_len = 5000 # 序列保留的最大长度
batch_size = 100 # 训练过程中每次传入模型的特征数量
n_epoch = 5 # 迭代次数
# Convolution 卷积
filter_length = 3 # 滤波器长度
nb_filter = 64 # 滤波器个数
pool_size = 4 # 池化长度
将原始数据集分为训练集和测试集,把序列操作码转成对应索引并统一长度。
# 划分训练集和测试集,此时都是list列表
X_train_l, X_test_l, y_train_l, y_test_l = train_test_split(trace_list, label_list, test_size=0.2)
x_train = text_to_index_array(index_dict, X_train_l)
x_test = text_to_index_array(index_dict, X_test_l)
y_train = np.array(y_train_l) # 转numpy数组
y_test = np.array(y_test_l)
# 将不够max_len的用0补足,超过max_len的去掉
x_train = sequence.pad_sequences(x_train, maxlen=max_len)
x_test = sequence.pad_sequences(x_test, maxlen=max_len)
print('训练集shape:', x_train.shape)
print('测试集shape:', x_test.shape)
7. 搭建模型并训练
本来我只用了LSTM,但因为序列长度太大,所以只能加上CNN层来对矩阵进行降维,这样训练速度会更快,结果发现效果也还行。
def train_lstm(p_n_symbols, p_embedding_weights, p_X_train, p_y_train, p_X_test, p_y_test, X_test_l):
print('创建模型......')
model = Sequential()
model.add(Embedding(output_dim=vocab_dim, # 输出向量维度
input_dim=p_n_symbols, # 输入向量维度
mask_zero=True, # 使我们填补的0值在后续训练中不产生影响(屏蔽0值)
weights=[p_embedding_weights], # 对数据加权
input_length=max_len)) # 每个特征的长度
# 1D 卷积层,对词嵌入层输出做卷积操作
model.add(Convolution1D(filters=nb_filter,
kernel_size=filter_length,
activation='relu'))
# 池化层
model.add(MaxPooling1D(pool_size=pool_size))
model.add(LSTM(units=128,
activation='sigmoid',
recurrent_activation='hard_sigmoid'))
model.add(Dropout(0.5)) # 每次迭代丢弃50神经元 防止过拟合
model.add(Dense(units=512,
activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(units=6, # 输出层1个神经元 1代表正面 0代表负面
activation='softmax'))
model.summary()
print('编译模型......')
model.compile(loss='categorical_crossentropy',
optimizer='rmsprop',
metrics=['accuracy'])
print("训练......")
train_history = model.fit(p_X_train, p_y_train, batch_size=batch_size, epochs=n_epoch,
validation_data=(p_X_test, p_y_test))
print("评估......")
score, acc = model.evaluate(p_X_test, p_y_test, batch_size=batch_size)
label = model.predict(p_X_test)
print('Test score:', score)
print('Test accuracy:', acc)
logging.info('Test score: {}'.format(score))
logging.info('Test accuracy: {}'.format(acc))
for (a, b, c) in zip(p_y_test, X_test_l, label):
logging.info("原文为:{}".format(b))
logging.info("预测倾向为{}".format(a))
logging.info("真实倾向为{}".format(c))
"""保存模型"""
model.save('./model_LSTM2.h5')
print("模型保存成功")
show_train_history(train_history, 'accuracy', 'val_accuracy') # 训练集准确率与验证集准确率 折线图
show_train_history(train_history, 'loss', 'val_loss') # 训练集误差率与验证集误差率 折线图
8. 输出可视化训练图表
可以通过show_train_history函数打印的训练集曲线来判断模型是否过拟合。
def show_train_history(train_history, train, velidation):
# 可视化训练过程 对比
plt.plot(train_history.history[train])
plt.plot(train_history.history[velidation])
plt.title("Train History") # 标题
plt.xlabel('Epoch') # x轴标题
plt.ylabel(train) # y轴标题
plt.legend(['train', 'test'], loc='upper left') # 图例 左上角
plt.show()
借鉴项目:https://github.com/sph116/lstm_emotion(大家自行搜索)
部分结果截图(仅供参考):