参考:
Introducing TensorFlow Federated
Federated Learning
Federated learning for image classification
Demo1:Image Classification
1. 开始之前
测试tff是否正常
from __future__ import absolute_import, division, print_function
import collections
import warnings
from six.moves import range
import numpy as np
import six
import tensorflow as tf
import tensorflow_federated as tff
warnings.simplefilter('ignore')
tf.compat.v1.enable_v2_behavior()
np.random.seed(0)
# NOTE: If the statement below fails, it means that you are
# using an older version of TFF without the high-performance
# executor stack. Call `tff.framework.set_default_executor()`
# instead to use the default reference runtime.
if six.PY3:
tff.framework.set_default_executor(tff.framework.create_local_executor())
print(tff.federated_computation(lambda: 'Hello, World!')())
# 输出
b'Hello World!'
2. 准备输入数据
数据集来源:
- 经过 Leaf 处理过的用于FL的mnist数据集:femnist
- tff已经集成的emnist数据集
MNIST 的原始数据集为 NIST,其中包含 81 万张手写的数字,由 3600多个志愿者提供,目标是建立一个识别数字的 ML 模型。
通过调用 TFF 的 FL API,使用已由 GitHub 上的Leaf
项目处理的 NIST 数据集版本来分隔每个数据提供者所写的数字
数据集一览:
by_write 以用户划分,每个用户一个id,共3600多份,每个用户包含4份文件,分别是数字,大写字母,小写字母,混合字母
by_class 按字符类别划分, 每个文件夹对应一个字符,以字符的十六进制acsii码命名,如
30-39
对应数字0-9
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()
print(len(emnist_train.client_ids))
# 输出 3383
load_data()
函数得到的 emnist_train
和 emnist_test
是 tff.simulation.ClientData
的一个实例/接口,允许枚举一系列用户
The data sets returned by
load_data()
are instances oftff.simulation.ClientData
, an interface that allows you to enumerate the set of users, to construct atf.data.Dataset
that represents the data of a particular user, and to query the structure of individual elements. Here's how you can use this interface to explore the content of the data set. Keep in mind that while this interface allows you to iterate over clients ids, this is only a feature of the simulation data. As you will see shortly, client identities are not used by the federated learning framework - their only purpose is to allow you to select subsets of the data for simulations.
第一次运行会下载所需的数据集:
查看数据信息:
example_dataset = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[0])
example_element = iter(example_dataset).next()
print(emnist_train.element_type_structure)
print(example_element['label'].numpy())
plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal')
plt.show()
可以看到数据集的结构如下:
(OrderedDict([('pixels', tf.float32), ('label', tf.int32)]), OrderedDict([('pixels', TensorShape([28, 28])), ('label', TensorShape([]))]))
以字典形式存储,pixels是图片的key,label是对应图片标签的key,后面预处理将他们改为x和y
数据预处理
- flatten the 28x28 images into 784-element arrays
- shuffle the individual examples
- organize them into batches
- renames the features from pixels and label to x and y for use with Keras
NUM_CLIENTS = 10
NUM_EPOCHS = 10
BATCH_SIZE = 20
SHUFFLE_BUFFER = 500
def preprocess(dataset):
def element_fn(element):
return collections.OrderedDict([
('x', tf.reshape(element['pixels'], [-1])),
('y', tf.reshape(element['label'], [1])),
])
return dataset.repeat(NUM_EPOCHS).map(element_fn).shuffle(
SHUFFLE_BUFFER).batch(BATCH_SIZE)
preprocessed_example_dataset = preprocess(example_dataset)
sample_batch = tf.nest.map_structure(
lambda x: x.numpy(), iter(preprocessed_example_dataset).next())
print(sample_batch)
sample_batch输出如下:每个batch_sample含20对数据
OrderedDict([('x', array([[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
...,
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.]], dtype=float32)), ('y', array([[7],
[7],
[4],
[0],
[9],
[1],
[9],
[5],
[4],
[8],
[0],
[4],
[0],
[9],
[7],
[0],
[6],
[7],
[4],
[1]], dtype=int32))])
选择用户并生成对应用户的数据集
在模拟中向TFF提供联合数据的一种方法是简单地将其作为一个Python列表,该列表的每个元素都包含单个用户的数据,不管是作为列表还是tf.data.Dataset。
既然我们已经有了提供后者的接口,让我们使用它。下面是一个简单的帮助函数,它将构造来自给定用户集的数据集列表,作为一轮培训或评估的输入。
def make_federated_data(client_data, client_ids):
return [preprocess(client_data.create_tf_dataset_for_client(x))
for x in client_ids]
当然,我们是在一个模拟环境中,所有的数据都是本地可用的。通常情况下,当运行模拟时,我们会简单地对每一轮训练中涉及的客户的随机子集进行抽样,通常在每一轮中是不同的。
sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]
federated_train_data = make_federated_data(emnist_train, sample_clients)
print(len(federated_train_data))
print(federated_train_data[0])
为了简化,这里做的是对客户端集进行一次抽样(取10个客户端),然后每一轮都重复使用相同的这10个客户端,以加速收敛,故意对这几个用户的数据进行过度拟合)。
我们把它作为一个练习留给读者来修改本教程来模拟随机抽样——这是相当容易做到的(一旦你这样做了,记住如果每轮选择不同客户端,让模型收敛可能需要一段时间)。
3. 用Keras创建模型
如果您正在使用Keras,那么您可能已经有了构建Keras模型的代码。下面是一个简单模型的例子。
def create_compiled_keras_model():
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(
10, activation=tf.nn.softmax, kernel_initializer='zeros', input_shape=(784,))])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.02),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
return model
关于编译的一个重要注意事项。
如下所示,在联邦平均算法中使用时,优化器只是整个优化算法的一半,因为它只用于计算每个客户机上的本地模型更新。算法的其余部分涉及如何在客户机上平均执行这些更新,以及如何将它们应用到服务器上的全局模型。
特别是,这意味着这里使用的优化器和学习率的选择可能需要不同于您在标准的i.i.d.数据集上训练模型的选择。
我们建议从常规的SGD开始,学习速度要比平时慢。我们在这里使用的学习速度没有经过仔细调整,请随意尝试。
为了使用TFE的任何模型。它需要包装在一个 tff.learning.Model 接口的实例中。
与Keras类似,它公开了对模型的前向传递、元数据属性等进行标记的方法,但也引入了其他元素,例如控制计算联邦度量的过程的方法。
现在我们先不要担心这个,如果你有一个编译过的Keras模型,就像我们上面定义的那样,你可以通过调用 tf.learning.from_compiled_keras_model 让TFF为你包装它。from_compiled_keras_model,将模型和样本数据批处理作为参数传递,如下所示
def model_fn():
keras_model = create_compiled_keras_model()
return tff.learning.from_compiled_keras_model(keras_model, sample_batch)
4. 训练模型
FedSGD与FedAVG
参数说明:
C:每轮执行计算的client的比例fraction
E:每轮客户端对其本地数据集的训练遍数epochs
B:用于客户端更新的本地mini-batch大小。
当B取无穷,E取1时,代表每个client使用本地所有数据集作为一个batch,并且在一轮中训练了一个epoch,就变成了FedSGD
关键一句:
iterative_process = tff.learning.build_federated_averaging_process(model_fn)
调用 tff.learning.build_federated_averaging_process()
,将会返回一个 IterativeProcess 的实例,包含两个函数:initialize()
和 next()
-
initialize()
用于初始化,返回的是训练开始时的state -
next()
输入当前的state,执行一轮计算,得到新的state
next,代表了一轮Federated Averaging,它包括将服务器状态(包括模型参数)推给客户机,对它们的本地数据进行设备上的培训,收集和平均模型更新,并在服务器上生成一个新的更新模型。
state = iterative_process.initialize()
# 训练10轮,并输出每轮精度
for round_num in range(1, 11):
state, metrics = iterative_process.next(state, federated_train_data)
print('round {:2d}, metrics={}'.format(round_num, metrics))
输出结果:
round 1, metrics=<sparse_categorical_accuracy=0.19063962996006012,loss=2.719102621078491>
round 2, metrics=<sparse_categorical_accuracy=0.28754550218582153,loss=2.2646403312683105>
round 3, metrics=<sparse_categorical_accuracy=0.3825013041496277,loss=1.9177477359771729>
round 4, metrics=<sparse_categorical_accuracy=0.46357253193855286,loss=1.6692872047424316>
round 5, metrics=<sparse_categorical_accuracy=0.5483359098434448,loss=1.4488725662231445>
round 6, metrics=<sparse_categorical_accuracy=0.6206448078155518,loss=1.2644093036651611>
round 7, metrics=<sparse_categorical_accuracy=0.6653406023979187,loss=1.148783564567566>
round 8, metrics=<sparse_categorical_accuracy=0.6995579600334167,loss=1.051501989364624>
round 9, metrics=<sparse_categorical_accuracy=0.7348414063453674,loss=0.9653234481811523>
round 10, metrics=<sparse_categorical_accuracy=0.7528861165046692,loss=0.90737384557724>
实际上每一轮都可能选择不同的clients,这里为了简化,重复使用一开始选中的那10个clients
5. 测试精度
evaluation = tff.learning.build_federated_evaluation(model_fn)
train_metrics = evaluation(state.model, federated_train_data)
print(train_metrics)
federated_test_data = make_federated_data(emnist_test, sample_clients)
test_metrics = evaluation(state.model, federated_test_data)
print(test_metrics)
前面使用的是keras来快速搭建模型,下面 使用自定义模型
从头开始创建模型
1. Defining model variables, forward pass, and metrics
第一步是识别我们要使用的TensorFlow变量。
为了使代码更加清晰,让我们定义一个数据结构来表示整个集。
这将包括变量如权重W
和偏置B
,各种累积统计信息如loss_sum
,accuracy_sum
,num_examples
MnistVariables = collections.namedtuple('MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')
定义创建变量的函数
def create_mnist_variables():
return MnistVariables(
weights = tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
name='weights',
trainable=True),
bias = tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(10)),
name='bias',
trainable=True),
num_examples = tf.Variable(0.0, name='num_examples', trainable=False),
loss_sum = tf.Variable(0.0, name='loss_sum', trainable=False),
accuracy_sum = tf.Variable(0.0, name='accuracy_sum', trainable=False))
有了变量之后,定义forward方式,根据输入的变量和批大小,得到loss值和预测结果
def mnist_forward_pass(variables, batch):
y = tf.nn.softmax(tf.matmul(batch['x'], variables.weights) + variables.bias)
predictions = tf.cast(tf.argmax(y, 1), tf.int32)
flat_labels = tf.reshape(batch['y'], [-1])
loss = -tf.reduce_mean(tf.reduce_sum(
tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
accuracy = tf.reduce_mean(
tf.cast(tf.equal(predictions, flat_labels), tf.float32))
num_examples = tf.cast(tf.size(batch['y']), tf.float32)
variables.num_examples.assign_add(num_examples)
variables.loss_sum.assign_add(loss * num_examples)
variables.accuracy_sum.assign_add(accuracy * num_examples)
return loss, predictions
def get_local_mnist_metrics(variables):
return collections.OrderedDict([
('num_examples', variables.num_examples),
('loss', variables.loss_sum / variables.num_examples),
('accuracy', variables.accuracy_sum / variables.num_examples)
])
Demo2:Text Ceneration
1. 开始之前
测试tff是否正常
2. 加载预训练的模型
生成vocab查找表
# A fixed vocabularly of ASCII chars that occur in the works of Shakespeare and Dickens:
vocab = list('dhlptx@DHLPTX $(,048cgkoswCGKOSW[_#\'/37;?bfjnrvzBFJNRVZ"&*.26:\naeimquyAEIMQUY]!%)-159\r')
# Creating a mapping from unique characters to indices
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)
加载预训练模型并生成一些文本
def load_model(batch_size):
urls = {
1: 'https://storage.googleapis.com/tff-models-public/dickens_rnn.batch1.kerasmodel',
8: 'https://storage.googleapis.com/tff-models-public/dickens_rnn.batch8.kerasmodel'}
assert batch_size in urls, 'batch_size must be in ' + str(urls.keys())
url = urls[batch_size]
local_file = tf.keras.utils.get_file(os.path.basename(url), origin=url)
return tf.keras.models.load_model(local_file, compile=False)
def generate_text(model, start_string):
# From https://www.tensorflow.org/tutorials/sequences/text_generation
num_generate = 200
input_eval = [char2idx[s] for s in start_string]
input_eval = tf.expand_dims(input_eval, 0)
text_generated = []
temperature = 1.0
model.reset_states()
for i in range(num_generate):
predictions = model(input_eval)
predictions = tf.squeeze(predictions, 0)
predictions = predictions / temperature
predicted_id = tf.random.categorical(
predictions, num_samples=1)[-1, 0].numpy()
input_eval = tf.expand_dims([predicted_id], 0)
text_generated.append(idx2char[predicted_id])
return (start_string + ''.join(text_generated))
# Text generation requires a batch_size=1 model.
keras_model_batch1 = load_model(batch_size=1)
print(generate_text(keras_model_batch1, 'What of TensorFlow Federated, you ask? '))
outputs:
Downloading data from https://storage.googleapis.com/tff-models-public/dickens_rnn.batch1.kerasmodel
16195584/16193984 [==============================] - 58s 4us/step
What of TensorFlow Federated, you ask? Says
allary officers to guide on a difference of my death; I had known a buh to make, Surely, she called Lucie.
You know the Doctor had quite grim, and many frafe, with a protest from
weithered
3. 加载和预处理莎士比亚数据
train_data, test_data = tff.simulation.datasets.shakespeare.load_data()
# Here the play is "The Tragedy of King Lear" and the character is "King".
raw_example_dataset = train_data.create_tf_dataset_for_client(
'THE_TRAGEDY_OF_KING_LEAR_KING')
# To allow for future extensions, each entry x
# is an OrderedDict with a single key 'snippets' which contains the text.
# for x in raw_example_dataset.take(2):
# print(x['snippets'])
# Input pre-processing parameters
SEQ_LENGTH = 100
BATCH_SIZE = 8
BUFFER_SIZE = 10000 # For dataset shuffling
# Construct a lookup table to map string chars to indexes,
# using the vocab loaded above:
table = tf.lookup.StaticHashTable(
tf.lookup.KeyValueTensorInitializer(
keys=vocab, values=tf.constant(list(range(len(vocab))),
dtype=tf.int64)),
default_value=0)
def to_ids(x):
s = tf.reshape(x['snippets'], shape=[1])
chars = tf.strings.bytes_split(s).values
ids = table.lookup(chars)
return ids
def split_input_target(chunk):
input_text = tf.map_fn(lambda x: x[:-1], chunk)
target_text = tf.map_fn(lambda x: x[1:], chunk)
return (input_text, target_text)
def preprocess(dataset):
return (
# Map ASCII chars to int64 indexes using the vocab
dataset.map(to_ids)
# Split into individual chars
.unbatch()
# Form example sequences of SEQ_LENGTH +1
.batch(SEQ_LENGTH + 1, drop_remainder=True)
# Shuffle and form minibatches
.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
# And finally split into (input, target) tuples,
# each of length SEQ_LENGTH.
.map(split_input_target))
example_dataset = preprocess(raw_example_dataset)
print(tf.data.experimental.get_structure(example_dataset))
4. 编译模型并在预处理后的数据上进行训练
class FlattenedCategoricalAccuracy(tf.keras.metrics.SparseCategoricalAccuracy):
def __init__(self, name='accuracy', dtype=None):
super(FlattenedCategoricalAccuracy, self).__init__(name, dtype=dtype)
def update_state(self, y_true, y_pred, sample_weight=None):
y_true = tf.reshape(y_true, [-1, 1])
y_pred = tf.reshape(y_pred, [-1, len(vocab), 1])
return super(FlattenedCategoricalAccuracy, self).update_state(
y_true, y_pred, sample_weight)
def compile(keras_model):
keras_model.compile(
optimizer=tf.keras.optimizers.SGD(lr=0.5),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[FlattenedCategoricalAccuracy()])
return keras_model
BATCH_SIZE = 8 # The training and eval batch size for the rest of this tutorial.
keras_model = load_model(batch_size=BATCH_SIZE)
compile(keras_model)
# Confirm that loss is much lower on Shakespeare than on random data
print('Evaluating on an example Shakespeare character:')
keras_model.evaluate(example_dataset.take(1))
# As a sanity check, we can construct some completely random data, where we expect
# the accuracy to be essentially random:
random_indexes = np.random.randint(
low=0, high=len(vocab), size=1 * BATCH_SIZE * (SEQ_LENGTH + 1))
data = {
'snippets':
tf.constant(''.join(np.array(vocab)[random_indexes]), shape=[1, 1])
}
random_dataset = preprocess(tf.data.Dataset.from_tensor_slices(data))
print('\nExpected accuracy for random guessing: {:.3f}'.format(1.0 / len(vocab)))
print('Evaluating on completely random data:')
keras_model.evaluate(random_dataset, steps=1)
outputs: