数据完整存储与内存的数据集类
一、InMemoryDataset基类简介
在PyG中,通过继承InMemoryDataset类来自定义一个数据可全部存储到内存的数据集类。
class InMemoryDataset(root: Optional[str] = None, transform: Optional[Callable] = None, pre_transform: Optional[Callable] = None, pre_filter: Optional[Callable] = None)
每个数据集都要有一个根文件夹(root),指示数据集应该被保存在哪里。在根目录下至少有两个文件夹:
(1)raw_dir,用于存储未处理的文件,保存从网络上下载的数据集文件;
(2)processed_dir,保存处理后的数据集被保存到这里。
创建一个InMemoryDataset,我们需要实现四个基本方法:
(1)raw_file_names()是一个属性方法,返回一个文件名列表,文件应该能在raw_dir文件夹中找到,否则调用download()函数下载文件到raw_dir文件夹。
(2)processed_raw_file_names()是一个属性方法,返回一个文件名列表,文件应该能在processed_dir文件夹中找到,否则调用process()函数对样本做预处理然后保存到processed_dir文件夹。
(3)download()将原始数据文件下载到raw_dir文件夹。
(4)process()对样本做预处理然后保存到processed_dir文件夹。
import torch
from torch_geometric.data import InMemoryDataset, download_url
class MyOwnDataset(InMemoryDataset):
def __init__(self, root, transform=None, pre_transform=None, pre_filter=None):
super().__init__(root=root, transform=transform, pre_transform=pre_transform, pre_filter=pre_filter)
self.data, self.slices = torch.load(self.processed_paths[0])
@property
def raw_file_names(self):
return ['some_file_1', 'some_file_2', ...]
@property
def processed_file_names(self):
return ['data.pt']
def download(self):
download_url(url, self.raw_dir)
def process(self):
data_list = [...]
if self.pre_filter is not None:
data_list = [data for data in data_list if self.pre_filter(data)]
if self.pre_transform is not None:
data_list = [self.pre_transform(data) for data in data_list]
data, slices = self.collate(data_list)
torch.save((data, slices), self.processed_paths[0])
样本从原始文件转换成 Data类对象的过程定义在process函数中。
需要读取和创建一个 Data对象的列表,并将其保存到processed_dir中。在构造函数中把Data对象和切片字典分别加载到属性self.data和self.slices中。
二、定义一个InMemoryDataset子类
以公开数据集PubMed为例。PubMed数据集存储的是文章引用网络,文章对应图的结点,如果两篇文章存在引用关系,则这两篇文章对应的结点之间存在边。
基于PyG中的Planetoid类修改得到下面的PlanetoidPubMed数据集类。
import os.pathas osp
import torch
from torch_geometric.dataimport (InMemoryDataset, download_url)
from torch_geometric.ioimport read_planetoid_data
class PlanetoidPubMed(InMemoryDataset):
url ='https://github.com/kimiyoung/planetoid/raw/master/data'
def __init__(self, root, split="public", num_train_per_class=20,
num_val=500, num_test=1000, transform=None,
pre_transform=None):
super(PlanetoidPubMed,self).__init__(root, transform, pre_transform)
self.data,self.slices = torch.load(self.processed_paths[0])
self.split = split
assert self.splitin ['public','full','random']
if split =='full':
data =self.get(0)
data.train_mask.fill_(True)
data.train_mask[data.val_mask | data.test_mask] =False
self.data,self.slices =self.collate([data])
elif split =='random':
data =self.get(0)
data.train_mask.fill_(False)
for cin range(self.num_classes):
idx = (data.y == c).nonzero(as_tuple=False).view(-1)
idx = idx[torch.randperm(idx.size(0))[:num_train_per_class]]
data.train_mask[idx] =True
remaining = (~data.train_mask).nonzero(as_tuple=False).view(-1)
remaining = remaining[torch.randperm(remaining.size(0))]
data.val_mask.fill_(False)
data.val_mask[remaining[:num_val]] =True
data.test_mask.fill_(False)
data.test_mask[remaining[num_val:num_val + num_test]] =True
self.data,self.slices =self.collate([data])
def raw_dir(self):
return osp.join(self.root,'raw')
def processed_dir(self):
return osp.join(self.root,'processed')
def raw_file_names(self):
names = ['x','tx','allx','y','ty','ally','graph','test.index']
return ['ind.pubmed.{}'.format(name)for namein names]
def processed_file_names(self):
return 'data.pt'
def download(self):
for namein self.raw_file_names:
download_url('{}/{}'.format(self.url, name),self.raw_dir)
def process(self):
data = read_planetoid_data(self.raw_dir,'pubmed')
data = dataif self.pre_transformis None else self.pre_transform(data)
torch.save(self.collate([data]),self.processed_paths[0])
def __repr__(self):
return '{}()'.format(self.name)
在生成一个PlanetoidPubMed类的对象时,程序运行流程如下:
(1)检查数据原始文件是否已下载:
检查self.raw_dir目录下是否存在raw_file_names()属性方法返回的每个文件,如有文件不存在,则调用download()方法执行原始文件下载。其中self.raw_dir为osp.join(self.root, 'raw')。
(2)检查数据是否经过处理:
检查之前对数据做变换的方法:检查self.processed_dir目录下是否存在pre_transform.pt文件:如果存在,意味着之前进行过数据变换,则需加载该文件获取之前所用的数据变换的方法,并检查它与当前pre_transform参数指定的方法是否相同;如果不相同则会报出一个警告,“The pre_transform argument differs from the one used in ……”。
检查之前的样本过滤的方法:检查self.processed_dir目录下是否存在pre_filter.pt文件,如果存在,意味着之前进行过样本过滤,则需加载该文件获取之前所用的样本过滤的方法,并检查它与当前pre_filter参数指定的方法是否相同,如果不相同则会报出一个警告,“The pre_filter argument differs from the one used in ……”。其中self.processed_dir为osp.join(self.root, 'processed')。
检查是否存在处理好的数据:检查self.processed_dir目录下是否存在self.processed_paths方法返回的所有文件,如有文件不存在,意味着不存在已经处理好的样本的文件,如需执行以下的操作:
(a)调用process方法,进行数据处理。
(b)如果pre_transform参数不为None,则调用pre_transform方法进行数据处理。
(c)如果pre_filter参数不为None,则进行样本过滤。
(d)保存处理好的数据到文件,文件存储在processed_paths()属性方法返回的路径。如果将数据保存到多个文件中,则返回的路径有多个。这些路径都在self.processed_dir目录下,以processed_file_names()属性方法的返回值为文件名。
(e)保存新的pre_transform.pt文件和pre_filter.pt文件,其中分别存储当前使用的数据处理方法和样本过滤方法。
查看数据集
将下载好的数据包复制到项目的/dataset/raw文件夹中,直接读取本地路径下的数据包。
代码如下:
dataset= PlanetoidPubMed('dataset')
print(dataset.num_classes)
print(dataset[0].num_nodes)
print(dataset[0].num_edges)
print(dataset[0].num_features)
运行结果如下:
根据运行结果可以得出:该数据集包含3个分类任务,19,717个结点,88,648条边,500个结点特征维度。
结点预测与边预测任务实践
一、结点预测任务实践
重定义一个GAT神经网络,使其能够通过参数定义GATConv的层数,以及每一层GATConv的out_channels。
神经网络定义如下:
class GAT(torch.nn.Module):
def __init__(self, num_features, hidden_channels_list, num_classes):
super(GAT, self).__init__()
torch.manual_seed(12345)
hns = [num_features] + hidden_channels_list
conv_list = []
for idx in range(len(hidden_channels_list)):
conv_list.append((GATConv(hns[idx], hns[idx+1]), 'x, edge_index -> x'))
conv_list.append(ReLU(inplace=True),)
self.convseq = Sequential('x, edge_index', conv_list)
self.linear = Linear(hidden_channels_list[-1], num_classes)
def forward(self, x, edge_index):
x = self.convseq(x, edge_index)
x = F.dropout(x, p=0.5, training=self.training)
x = self.linear(x)
return x
二、边预测任务实践
边预测任务是预测两个结点之间是否存在边。
对于图数据集,存在结点特征矩阵x,和哪些结点之间存在边的信息edge_index。edge_index存储的便是正样本,为了构建边预测任务,需要生成一些负样本,即采样一些不存在边的节点对作为负样本边,正负样本应平衡。要将样本分为训练集、验证集和测试集三个集合。
PyG中提供了现成的方法,train_test_split_edges(data, val_ratio=0.05, test_ratio=0.1),其第一个参数为torch_geometric.data.Data对象,第二参数为验证集所占比例,第三个参数为测试集所占比例。该函数将自动地采样得到负样本,并将正负样本分成训练集、验证集和测试集三个集合。使用Cora数据集作为例子进行边预测任务说明。
获取数据集并进行分析
代码如下:
import os.path as osp
from torch_geometric.utils import negative_sampling
from torch_geometric.datasets import Planetoid
import torch_geometric.transforms as T
from torch_geometric.utils import train_test_split_edges
dataset = 'Cora'
path = osp.join(osp.dirname(osp.realpath(__file__)), '..', 'data', dataset)
dataset = Planetoid(path, dataset, transform=T.NormalizeFeatures())
data = dataset[0]
data.train_mask = data.val_mask = data.test_mask = data.y = None
data = train_test_split_edges(data)
print(data.edge_index.shape)
for key in data.keys:
print(key, getattr(data, key).shape)
运行结果如下:
根据运行结果可以得出:三个集合中正样本边的数量之和不等于原始边的数量。这是因为原始边的数量统计的是双向边的数量,在验证集正样本边和测试集正样本边中只需对一个方向的边做预测精度的衡量,对另一个方向的预测精度衡量属于重复。
构建神经网络模型
import torch
from torch_geometric.nn import GCNConv
class Net(torch.nn.Module):
def __init__(self, in_channels, out_channels):
super(Net, self).__init__()
self.conv1 = GCNConv(in_channels, 128)
self.conv2 = GCNConv(128, out_channels)
def encode(self, x, edge_index):
x = self.conv1(x, edge_index)
x = x.relu()
return self.conv2(x, edge_index)
def decode(self, z, pos_edge_index, neg_edge_index):
edge_index = torch.cat([pos_edge_index, neg_edge_index], dim=-1)
return (z[edge_index[0]] * z[edge_index[1]]).sum(dim=-1)
def decode_all(self, z):
prob_adj = z @ z.t()
return (prob_adj > 0).nonzero(as_tuple=False).t()
用于做边预测的神经网络主要由两部分组成:
其一是编码(encode),与生成结点表征是一样的;其二是解码(decode),为边两端结点的表征生成边为真的几率(odds)。decode_all(self, z)用于推断(inference)阶段,对输入结点的所有结点对预测存在边的几率。
定义单个epoch的训练过程
def get_link_labels(pos_edge_index, neg_edge_index):
num_links = pos_edge_index.size(1) + neg_edge_index.size(1)
link_labels = torch.zeros(num_links, dtype=torch.float)
link_labels[:pos_edge_index.size(1)] = 1.
return link_labels
def train(data, model, optimizer):
model.train()
neg_edge_index = negative_sampling(
edge_index=data.train_pos_edge_index,
num_nodes=data.num_nodes,
num_neg_samples=data.train_pos_edge_index.size(1))
optimizer.zero_grad()
z = model.encode(data.x, data.train_pos_edge_index)
link_logits = model.decode(z, data.train_pos_edge_index, neg_edge_index)
link_labels = get_link_labels(data.train_pos_edge_index, neg_edge_index).to(data.x.device)
loss = F.binary_cross_entropy_with_logits(link_logits, link_labels)
loss.backward()
optimizer.step()
return loss
通常在图上存在边的结点对的数量往往少于不存在边的结点对的数量。为了类平衡,在每一个epoch的训练过程中,只需要用到与正样本一样数量的负样本。
综合以上两点原因,在每一个epoch的训练过程中都采样与正样本数量一样的负样本,这样既做到了类平衡,又增加了训练负样本的丰富性。get_link_labels函数用于生成完整训练集的标签。在负样本采样时,传递了train_pos_edge_index为参数,于是negative_sampling函数只会在训练集中不存在边的结点对中采样。
在训练阶段,应该只见训练集,对验证集与测试集都是不可见的,在此阶段应该要完成对所有结点的编码,假设此处正样本训练集涉及到了所有的结点,这样就能实现对所有结点的编码。
定义单个epoch验证与测试过程
@torch.no_grad()
def test(data, model):
model.eval()
z = model.encode(data.x, data.train_pos_edge_index)
results = []
for prefix in ['val', 'test']:
pos_edge_index = data[f'{prefix}_pos_edge_index']
neg_edge_index = data[f'{prefix}_neg_edge_index']
link_logits = model.decode(z, pos_edge_index, neg_edge_index)
link_probs = link_logits.sigmoid()
link_labels = get_link_labels(pos_edge_index, neg_edge_index)
results.append(roc_auc_score(link_labels.cpu(), link_probs.cpu()))
return results
在验证与测试过程中,只用正样本边训练集做节点特征编码。
运行完整的训练、验证与测试
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
dataset = 'Cora'
path = osp.join(osp.dirname(osp.realpath(__file__)), '..', 'data', dataset)
dataset = Planetoid(path, dataset, transform=T.NormalizeFeatures())
data = dataset[0]
ground_truth_edge_index = data.edge_index.to(device)
data.train_mask = data.val_mask = data.test_mask = data.y = None
data = train_test_split_edges(data)
data = data.to(device)
model = Net(dataset.num_features, 64).to(device)
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.01)
best_val_auc = test_auc = 0
for epoch in range(1, 101):
loss = train(data, model, optimizer)
val_auc, tmp_test_auc = test(data, model)
if val_auc > best_val_auc:
best_val_auc = val_auc
test_auc = tmp_test_auc
print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}, Val: {val_auc:.4f}, '
f'Test: {test_auc:.4f}')
z = model.encode(data.x, data.train_pos_edge_index)
final_edge_index = model.decode_all(z)
三、总结
定义一个数据可全部存储于内存的数据集类的方法,并且实践结点预测任务和边预测任务。
要重点关注InMemoryDataset子类的运行流程与实现四个函数的规范。在图神经网络的实现中,可以适用torch_geometric.nn.Sequential容器对神经网络的多个模块顺序相连。
四、作业
对结点预测任务,尝试用PyG中的不同的网络层去代替GCNConv,以及不同的层数和不同的out_channels。
使用GAT代替GCN
class GAT(torch.nn.Module):
def __init__(self, num_features, hidden_channels_list, num_classes):
super(GAT, self).__init__()
torch.manual_seed(12345)
hns = [num_features] + hidden_channels_list
conv_list = []
for idx in range(len(hidden_channels_list)):
conv_list.append((GATConv(hns[idx], hns[idx+1]), 'x, edge_index -> x'))
conv_list.append(ReLU(inplace=True),)
self.convseq = Sequential('x, edge_index', conv_list)
self.linear = Linear(hidden_channels_list[-1], num_classes)
def forward(self, x, edge_index):
x = self.convseq(x, edge_index)
x = F.dropout(x, p=0.5, training=self.training)
x = self.linear(x)
return x
对边预测任务,尝试用用torch_geometric.nn.Sequential容器构造图神经网络。
class GCN(torch.nn.Module):
def __init__(self, num_features, hidden_channels_list, num_classes):
super(GCN, self).__init__()
torch.manual_seed(12345)
hns = [num_features] + hidden_channels_list
conv_list = []
for idx in range(len(hidden_channels_list)):
conv_list.append((GCNConv(hns[idx], hns[idx+1]), 'x, edge_index -> x'))
conv_list.append(ReLU(inplace=True))
self.conseq = Sequential('x, edge_index', conv_list)
self.linear = Linear(hidden_channels_list[-1], num_classes)
def encode(self, x, edge_index):
x = self.conseq(x, edge_index)
x = F.dropout(x, p=0.5, training=self.training)
x = self.linear(x)
return x
def decode(self, z, pos_edge_index, neg_edge_index):
edge_index = torch.cat([pos_edge_index, neg_edge_index], dim=-1)
return (z[edge_index[0]] * z[edge_index[1]]).sum(dim=-1)
def decode_all(self, z):
prob_adj = z @ z.t()
return (prob_adj > 0).nonzero(as_tuple=False).t()
如下方代码所示,以data.train_pos_edge_index为实际参数,这样采样得到的负样本可能包含验证集正样本或测试集正样本,即可能将真实的正样本标记为负样本,由此会产生冲突。为什么这么做?为什么在验证与测试阶段只根据data.train_pos_edge_index做结点表征的编码?
数据集实际上就是正样本多,负样本少。生成负样本是为了使正负样本平衡。采样的负样本中存在部分正样本,对于数据集来说是极少数的,不影响正负样本平衡。
只根据data.train_pos_edge_index做结点表征的编码,忽略验证集和测试集的具体情况,减少影响因素,提高准确性。
DataWhale开源学习资料:
https://github.com/datawhalechina/team-learning-nlp/tree/master/GNN