线性分类器Model
X = tf.constant([[1., 2., 3.], [4., 5., 6.]])
y = tf.constant([[10.], [20.]])
# 线性分类器
class Linear(tf.keras.Model):
def __init__(self):
super().__init__()
self.dense = tf.keras.layers.Dense(
units=1,
activation=None,
kernel_initializer=tf.zeros_initializer(),
bias_initializer = tf.zeros_initializer()
)
def call(self, input):
output = self.dense(input)
return output
model = Linear()
model.compile(optimizer='SGD', loss='mse', metrics=['mae'])
model.fit(X, y, epochs=100, verbose=1)
model.evaluate(X, y)
weights = model.variables[0]
bias = model.variables[1]
y_pred = tf.matmul(X, weights) + bias
数据获取及预处理: tf.keras.datasets
class MNISTLoader():
def __init__(self):
mnist = tf.keras.datasets.mnist
# train_data shape (60000, 28, 28)
# test_data shape (10000, 28, 28)
(self.train_data, self.train_label), (self.test_data, self.test_label) = mnist.load_data()
# 以下代码将其归一化到0-1之间的浮点数,并在最后增加一维作为颜色通道
self.train_data = np.expand_dims(self.train_data.astype(np.float32) / 255.0, axis=-1)
self.test_data = np.expand_dims(self.test_data.astype(np.float32) / 255.0, axis=-1)
self.train_label = self.train_label.astype(np.int32)
self.test_label = self.test_label.astype(np.int32)
self.num_train_data, self.num_test.data = self.train_data.shape[0], self.test_data.shape[0]
def get_batch(self, batch_size):
# 从数据集中随机取出batch_size个元素并返回
index = np.random.randint(0, self.num_train_data, batch_size)
return self.train_data[index, :], self.train_label[index]
模型的构建: tf.keras.Model
和 tf.keras.layers
多层感知机的模型类实现与上面的线性模型类似,使用 tf.keras.Model
和 tf.keras.layers
构建,所不同的地方在于层数增加了(顾名思义,“多层” 感知机),以及引入了非线性激活函数(这里使用了 ReLU 函数 , 即下方的 activation=tf.nn.relu
)。该模型输入一个向量(比如这里是拉直的 1×784
手写体数字图片),输出 10 维的向量,分别代表这张图片属于 0 到 9 的概率。
class MLP(tf.keras.Model):
def __init__(self):
super().__init__()
self.flatten = tf.keras.layers.Flatten()
self.dense1 = tf.keras.layers.Dense(units=100, activation=tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(units=10)
def call(self, inputs):
x = self.flatten(inputs)
x = self.dense1(x)
x = self.dense2(x)
output = tf.nn.softmax(x)
return output
这里,因为我们希望输出 “输入图片分别属于 0 到 9 的概率”,也就是一个 10 维的离散概率分布,所以我们希望这个 10 维向量至少满足两个条件:
- 该向量中的每个元素均在[0, 1]之间;
- 该向量的所有元素之和为 1。
为了使得模型的输出能始终满足这两个条件,我们使用 Softmax 函数 (归一化指数函数,tf.nn.softmax
)对模型的原始输出进行归一化。其形式为 。不仅如此,softmax 函数能够凸显原始向量中最大的值,并抑制远低于最大值的其他分量,这也是该函数被称作 softmax 函数的原因(即平滑化的 argmax 函数)。
自定义层
自定义层需要继承 tf.keras.layers.Layer
类,并重写 __init__
、 build
和 call
三个方法,如下所示:
# 线性层 相当于Dense实现
class LinearLayer(tf.keras.layers.Layer):
def __init__(self, units):
super().__init__()
self.units = units
def build(self, input_shape):
self.w = self.add_weight(name='w', shape=[input_shape[-1], self.units], initializer=tf.zeros_initializer())
self.b = self.add_weight(name='b', shape=[self.units], initializer=tf.zeros_initializer())
def call(self, inputs):
y_pred = tf.matmul(inputs, self.w) + self.b
return y_pred
# 线性模型
class LinearModel(tf.keras.Model):
def __init__(self):
super().__init__()
self.layer = LinearLayer(units=1)
def call(self, inputs):
output = self.layer(inputs)
return output
linearModel = LinearModel()
# 同LinearLayer
class MyDenseLayer(tf.keras.layers.Layer):
def __init__(self, num_outputs):
super(MyDenseLayer, self).__init__()
self.num_outputs = num_outputs
def build(self, input_shape):
self.kernel = self.add_variable("kernel",
shape=[int(input_shape[-1]),
self.num_outputs])
def call(self, input):
return tf.matmul(input, self.kernel)
layer = MyDenseLayer(10)
自定义损失函数和评估指标
自定义损失函数需要继承 tf.keras.losses.Loss
类,重写 call
方法即可,输入真实值 y_true
和模型预测值 y_pred
,输出模型预测值和真实值之间通过自定义的损失函数计算出的损失值。下面的示例为均方差损失函数:
class MeanSquaredError(tf.keras.losses.Loss):
def call(self, y_true, y_pred):
return tf.reduce_mean(tf.square(y_pred-y_true))
meanSquaredError = MeanSquaredError()
meanSquaredError(np.array([1,0]), np.array([0.9, 0.01]))
自定义评估指标需要继承 tf.keras.metrics.Metric
类,并重写 __init__
、 update_state
和 result
三个方法。下面的示例对前面用到的 SparseCategoricalAccuracy
评估指标类做了一个简单的重实现:
class SparseCategoricalAccuracy(tf.keras.metrics.Metric):
def __init__(self):
super().__init__()
self.total = self.add_weight(name='total', dtype=tf.int32, initializer=tf.zeros_initializer())
self.count = self.add_weight(name='count', dtype=tf.int32, initializer=tf.zeros_initializer())
def update_state(self, y_true, y_pred, sample_weight=None):
values = tf.cast(tf.equal(y_true, tf.argmax(y_pred, axis=-1, output_type=tf.int32)), tf.int32)
self.total.assign_add(tf.shape(y_true)[0])
self.count.assign_add(tf.reduce_sum(values))
def result(self):
return self.count / self.total