一道电面题目, 分为两问:
- 设计一个系统, 不断接收数据包(数据内容可以简单想成一个int值). 给定常量M, 要求从所有获取的数据中随机抽样M个, 每个样本被抽取的概率相等.
- 如果已接收数据包的数量还未超过M个, 则将它们全部返回.
- 接收数据包的总量是未知的, 可能非常大.
- 机器的存储空间有限, 无法存储所有数据包. 但是存储M个数据包还是绰绰有余的.
- 假如有K台机器, 如何将第一问的算法做成分布式的, 以最大化吞吐量?
第一问是一个标准的水塘抽样算法(Reservoir Sampling)问题.
class System {
private:
vector<int> v;
int capacity;
int cnt;
public:
System(int capacity) : capacity(capacity), cnt(0) {}
void put(int n) {
++cnt;
if (cnt <= capacity) {
v.push_back(n);
return;
}
int index = rand() % cnt;
if (index < capacity) {
v[index] = n;
}
}
vector<int> get() {
return v;
}
};
算法思路:
维护一个大小为M的数组. 记当前接收的是第N个数据(从1开始).
- 如果
N<=M, 直接插入 - 如果
N>M, 就取一个1~N之间的随机数index. 如果index在1~M之间, 则用新接收的数据替换第index个数据; 否则丢弃.
证明:
假设当前是第M+1个元素, 它被丢弃的概率是1/(M+1), 留下的概率就是M/(M+1). 对于已经在集合中的M个元素, 每个以1/(M+1)的概率被丢弃, 留下的概率也是M/(M+1).
假设当前是第M+2个元素, 它被丢弃的概率是2/(M+2), 留下的概率是M/(M+2). 对于前M+1个元素, 它们在集合中的概率是M/(M+1)(见上一个分析). 这一次, 它们每个被以1/(M+2)的概率被丢弃, 留下的概率就是 M/(M+1) * (M+1)/(M+2) = M/(M+2)
依次类推, 到接受第N个元素时, 每个元素被抽取的概率就是M/N.
第二问就是分布式的蓄水池抽样问题了.
算法思路是:
假设有K个机器, 每个机器维护大小为M的数组, 并记录该机器接受的数据总数Ni.
- 当机器获取新数据时, 进行单机的蓄水池抽样.
- 当进行采样时, 重复
M次以下操作:
取随机数d在[0,1)之间, 记N=Sum(Ni | i=1...K)- 若
d<N1/N则从第一个机器上等概率抽取一个元素. - 若
N1/N<=d<(N1+N2)/N则从第二个机器上等概率抽取一个元素 - 依此类推.
- 若
假设Ni>M:
因为第i个机器上数据的留存概率为M/Ni, 而采样时又以Ni/N的概率抽取该机器, 又以1/M的概率等概率不放回地选取一个元素, 所以第i个机器上一个数据被抽中的概率为M/Ni * Ni/N * 1/M = 1/N. 这样重复M次, 每个元素被抽取到的概率就是M/N.
假设Ni<=M
第i个机器上数据的留存概率为1, 采样时以Ni/N的概率抽取该机器, 又以1/Ni的概率等概率不放回地选取一个元素, 所以第i个机器上一个数据被抽中的概率为1/N. 同样, 重复M次让每个元素被抽取到的概率为M/N.