本文是halide编程指南的连载,已同步至公众号
第八章 多级管道调度
#include "Halide.h"
#include <stdio.h>
using namespace Halide;
int main(int argc, char **argv) {
// 首先,我们将在下面声明一些Vars.
Var x("x"), y("y");
// 让我们研究一个简单的两阶段管道的各种调度选项。 我们将从默认时间表开始:
{
Func producer("producer_default"), consumer("consumer_default");
// 第一阶段将是一些简单的逐点数学运算,类似于我们熟悉的渐变函数。 x,y位置的值是x和y乘积的正弦值.
producer(x, y) = sin(x * y);
// 现在,我们将添加第二阶段,该阶段将第一阶段的多个点平均在一起.
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
//我们将打开这两个功能的跟踪。
consumer.trace_stores();
producer.trace_stores();
// 并在4x4块上进行评估。
printf("\nEvaluating producer-consumer pipeline with default schedule\n");
consumer.realize(4, 4);
-----------------------------------------------------------------
> Begin pipeline consumer_default.0()
> Store consumer_default.0(0, 0) = 0.210368
> Store consumer_default.0(1, 0) = 0.437692
> Store consumer_default.0(2, 0) = 0.262604
> Store consumer_default.0(3, 0) = -0.153921
> Store consumer_default.0(0, 1) = 0.437692
> Store consumer_default.0(1, 1) = 0.475816
> Store consumer_default.0(2, 1) = 0.003550
> Store consumer_default.0(3, 1) = 0.023565
> Store consumer_default.0(0, 2) = 0.262604
> Store consumer_default.0(1, 2) = 0.003550
> Store consumer_default.0(2, 2) = -0.225879
> Store consumer_default.0(3, 2) = 0.146372
> Store consumer_default.0(0, 3) = -0.153921
> Store consumer_default.0(1, 3) = 0.023565
> Store consumer_default.0(2, 3) = 0.146372
> Store consumer_default.0(3, 3) = -0.237233
> End pipeline consumer_default.0()
------------------------------------------------------------------------
// 没有有关计算生产者价值的消息。这是因为默认计划将“producer”完全内联到“consumer”中。 就像我们写了以下代码一样:
// consumer(x, y) = (sin(x * y) +
// sin(x * (y + 1)) +
// sin((x + 1) * y) +
// sin((x + 1) * (y + 1))/4);
// 所有对“producer”的调用均已替换为“producer”的主体,并用参数代替了变量。
// 等效的c代码为:
float result[4][4];
for (int y = 0; y < 4; y++) {
for (int x = 0; x < 4; x++) {
result[y][x] = (sin(x*y) +
sin(x*(y+1)) +
sin((x+1)*y) +
sin((x+1)*(y+1)))/4;
}
}
printf("\n");
// 如果我们看一下循环嵌套,producer根本就不会出现。 它已内联到consumer中。
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
> produce consumer_default:
> for y:
> for x:
> consumer_default(...) = ...
--------------------------------------------------------------------------
}
// 接下来,我们将研究下一个最简单的选项-在计算任何consumer之前,计算producer中所需的所有值。 我们称此时间表为“root”。
{
// 从相同的函数开始:
Func producer("producer_root"), consumer("consumer_root");
producer(x, y) = sin(x * y);
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 告诉Halide在评估任何producer之前先评估所有consumer.
producer.compute_root();
// 打开跟踪.
consumer.trace_stores();
producer.trace_stores();
// 编译运行.
printf("\nEvaluating producer.compute_root()\n");
consumer.realize(4, 4);
--------------------------------------------------------------------------
> Begin pipeline consumer_root.0()
> Store producer_root.0(0, 0) = 0.000000
> Store producer_root.0(1, 0) = 0.000000
> Store producer_root.0(2, 0) = 0.000000
> Store producer_root.0(3, 0) = 0.000000
> Store producer_root.0(4, 0) = 0.000000
> Store producer_root.0(0, 1) = 0.000000
> Store producer_root.0(1, 1) = 0.841471
> Store producer_root.0(2, 1) = 0.909297
> Store producer_root.0(3, 1) = 0.141120
> Store producer_root.0(4, 1) = -0.756802
> Store producer_root.0(0, 2) = 0.000000
> Store producer_root.0(1, 2) = 0.909297
> Store producer_root.0(2, 2) = -0.756802
> Store producer_root.0(3, 2) = -0.279415
> Store producer_root.0(4, 2) = 0.989358
> Store producer_root.0(0, 3) = 0.000000
> Store producer_root.0(1, 3) = 0.141120
> Store producer_root.0(2, 3) = -0.279415
> Store producer_root.0(3, 3) = 0.412118
> Store producer_root.0(4, 3) = -0.536573
> Store producer_root.0(0, 4) = 0.000000
> Store producer_root.0(1, 4) = -0.756802
> Store producer_root.0(2, 4) = 0.989358
> Store producer_root.0(3, 4) = -0.536573
> Store producer_root.0(4, 4) = -0.287903
> Store consumer_root.0(0, 0) = 0.210368
> Store consumer_root.0(1, 0) = 0.437692
> Store consumer_root.0(2, 0) = 0.262604
> Store consumer_root.0(3, 0) = -0.153921
> Store consumer_root.0(0, 1) = 0.437692
> Store consumer_root.0(1, 1) = 0.475816
> Store consumer_root.0(2, 1) = 0.003550
> Store consumer_root.0(3, 1) = 0.023565
> Store consumer_root.0(0, 2) = 0.262604
> Store consumer_root.0(1, 2) = 0.003550
> Store consumer_root.0(2, 2) = -0.225879
> Store consumer_root.0(3, 2) = 0.146372
> Store consumer_root.0(0, 3) = -0.153921
> Store consumer_root.0(1, 3) = 0.023565
> Store consumer_root.0(2, 3) = 0.146372
> Store consumer_root.0(3, 3) = -0.237233
> End pipeline consumer_root.0()
-------------------------------------------------------------------------------
// 从输出中可以看到:
// A) 有关于producer的store
// B) 在consumer的store之前就已经发生了
// 如下是可视化图.
// producer在左边,consumer在右边。 store用橙色标记,load用蓝色标记。
图81
// 等效c代码:
float result[4][4];
// 为producer分配一些临时存储.
float producer_storage[5][5];
// 计算 producer.
for (int y = 0; y < 5; y++) {
for (int x = 0; x < 5; x++) {
producer_storage[y][x] = sin(x * y);
}
}
// 计算consumer. 跳过打印,不看打印的中间结果了.
for (int y = 0; y < 4; y++) {
for (int x = 0; x < 4; x++) {
result[y][x] = (producer_storage[y][x] +
producer_storage[y+1][x] +
producer_storage[y][x+1] +
producer_storage[y+1][x+1])/4;
}
}
// 请注意,对consumer的评估是在4x4的box上进行的,因此Halide会自动推断出在5x5的box上需要producer。 这与我们在上一课中看到的“边界推断”逻辑相同,在该逻辑中,该逻辑用于检测和避免输入图像的越界读取。
// 如果我们打印循环嵌套,我们将看到与上面的C非常相似的内容.
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
-----------------------------------------------------------------
> produce producer_root:
> for y:
> for x:
> producer_root(...) = ...
> consume producer_root:
> produce consumer_root:
> for y:
> for x:
> consumer_root(...) = ...
--------------------------------------------------------------------
}
// 让我们从性能的角度比较上述两种方法。
// 全内联 (默认的步骤):
// - 分配的临时内存: 0
// - Loads: 0
// - Stores: 16
// - Calls to sin: 64
// producer.compute_root():
// - 分配的临时内存: 25 floats
// - Loads: 64
// - Stores: 41
// - Calls to sin: 25
// 这里需要权衡。 全内联使用了最少的临时内存和内存带宽,但是做了一大堆多余的昂贵数学运算(称为sin)。 它四次评估了“生产者”中的大多数得分。 第二个调度程序producer.compute_root()执行了对sin的最小调用,但是使用了更多的临时内存和更多的内存带宽。
// 在任何给定情况下,都很难做出正确的选择。 如果您的内存带宽有限,或者没有太多的内存(例如,因为您使用的是旧手机),那么进行冗余数学运算就很有意义。 另一方面,Sin代价昂贵,因此,如果您的计算受到限制,那么对sin的调用会减少,从而使您的程序更快。 添加矢量化或多核并行度会倾斜比例,以便进行冗余工作,因为启动多个cpu核会增加每秒可以执行的数学运算量,但不会增加系统内存带宽或容量。
// 我们可以在完整内联和compute_root之间进行选择。 接下来,我们将在根据每个扫描线计算生产者和消费者之间交替进行:
{
// 从相同的函数定义开始:
Func producer("producer_y"), consumer("consumer_y");
producer(x, y) = sin(x * y);
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 告诉Halide,对于consumer的每个y坐标,根据需要评估producer:
producer.compute_at(consumer, y);
// 这会将计算producer的代码放在y的for循环内*,如下面的等效C所示.
// 打开跟踪.
producer.trace_stores();
consumer.trace_stores();
// 编译运行.
printf("\nEvaluating producer.compute_at(consumer, y)\n");
consumer.realize(4, 4);
----------------------------------------------------------------------
> Begin pipeline consumer_y.0()
> Store producer_y.0(0, 0) = 0.000000
> Store producer_y.0(1, 0) = 0.000000
> Store producer_y.0(2, 0) = 0.000000
> Store producer_y.0(3, 0) = 0.000000
> Store producer_y.0(4, 0) = 0.000000
> Store producer_y.0(0, 1) = 0.000000
> Store producer_y.0(1, 1) = 0.841471
> Store producer_y.0(2, 1) = 0.909297
> Store producer_y.0(3, 1) = 0.141120
> Store producer_y.0(4, 1) = -0.756802
> Store consumer_y.0(0, 0) = 0.210368
> Store consumer_y.0(1, 0) = 0.437692
> Store consumer_y.0(2, 0) = 0.262604
> Store consumer_y.0(3, 0) = -0.153921
> Store producer_y.0(0, 1) = 0.000000
> Store producer_y.0(1, 1) = 0.841471
> Store producer_y.0(2, 1) = 0.909297
> Store producer_y.0(3, 1) = 0.141120
> Store producer_y.0(4, 1) = -0.756802
> Store producer_y.0(0, 2) = 0.000000
> Store producer_y.0(1, 2) = 0.909297
> Store producer_y.0(2, 2) = -0.756802
> Store producer_y.0(3, 2) = -0.279415
> Store producer_y.0(4, 2) = 0.989358
> Store consumer_y.0(0, 1) = 0.437692
> Store consumer_y.0(1, 1) = 0.475816
> Store consumer_y.0(2, 1) = 0.003550
> Store consumer_y.0(3, 1) = 0.023565
> Store producer_y.0(0, 2) = 0.000000
> Store producer_y.0(1, 2) = 0.909297
> Store producer_y.0(2, 2) = -0.756802
> Store producer_y.0(3, 2) = -0.279415
> Store producer_y.0(4, 2) = 0.989358
> Store producer_y.0(0, 3) = 0.000000
> Store producer_y.0(1, 3) = 0.141120
> Store producer_y.0(2, 3) = -0.279415
> Store producer_y.0(3, 3) = 0.412118
> Store producer_y.0(4, 3) = -0.536573
> Store consumer_y.0(0, 2) = 0.262604
> Store consumer_y.0(1, 2) = 0.003550
> Store consumer_y.0(2, 2) = -0.225879
> Store consumer_y.0(3, 2) = 0.146372
> Store producer_y.0(0, 3) = 0.000000
> Store producer_y.0(1, 3) = 0.141120
> Store producer_y.0(2, 3) = -0.279415
> Store producer_y.0(3, 3) = 0.412118
> Store producer_y.0(4, 3) = -0.536573
> Store producer_y.0(0, 4) = 0.000000
> Store producer_y.0(1, 4) = -0.756802
> Store producer_y.0(2, 4) = 0.989358
> Store producer_y.0(3, 4) = -0.536573
> Store producer_y.0(4, 4) = -0.287903
> Store consumer_y.0(0, 3) = -0.153921
> Store consumer_y.0(1, 3) = 0.023565
> Store consumer_y.0(2, 3) = 0.146372
> Store consumer_y.0(3, 3) = -0.237233
> End pipeline consumer_y.0()
---------------------------------------------------------------------
// 如下可视化图.
图82
// 阅读日志或查看该图,您应该看到producer和consumer在每条扫描线的基础上交替出现。 让我们看看等效的C:
float result[4][4];
// consumer的扫描线存在外循环:
for (int y = 0; y < 4; y++) {
// 分配空间并计算producer的足够数量,以满足consumer的这一条扫描线。 这意味着producer的box大小为5x2。
float producer_storage[2][5];
for (int py = y; py < y + 2; py++) {
for (int px = 0; px < 5; px++) {
producer_storage[py-y][px] = sin(px * py);
}
}
// 计算consumer的扫描线 .
for (int x = 0; x < 4; x++) {
result[y][x] = (producer_storage[0][x] +
producer_storage[1][x] +
producer_storage[0][x+1] +
producer_storage[1][x+1])/4;
}
}
// 同样,如果我们打印循环嵌套,我们将看到与上面的C非常相似的内容。
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
-------------------------------------------------------------------------------
> produce consumer_y:
> for y:
> produce producer_y:
> for y:
> for x:
> producer_y(...) = ...
> consume producer_y:
> for x:
> consumer_y(...) = ...
------------------------------------------------------------------------------
// 该策略的性能特征介于内联和计算root之间。 我们仍然分配一些临时内存,但分配的内存要少一些,并且具有更好的局部性(在写入后不久便从中加载它,因此对于较大的图像,值仍应位于缓存中)。 我们仍然做了一些多余的工作,但还不到完整的内联:
// producer.compute_at(consumer, y):
// - 分配的临时内存: 10 floats
// - Loads: 64
// - Stores: 56
// - Calls to sin: 40
}
// 我们也可以说producer.compute_at(consumer,x),但这与完全内联(默认计划)非常相似。 相反,让我们区分为producer分配存储的循环级别和实际计算存储的循环级别。 这解锁了一些优化.
{
Func producer("producer_root_y"), consumer("consumer_root_y");
producer(x, y) = sin(x * y);
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 告诉Halide创建一个缓冲区,以在最外层存储所有producer:
producer.store_root();
// ... 但需要根据consumer的y坐标进行计算.
producer.compute_at(consumer, y);
producer.trace_stores();
consumer.trace_stores();
printf("\nEvaluating producer.store_root().compute_at(consumer, y)\n");
consumer.realize(4, 4);
-------------------------------------------------------------------
> Begin pipeline consumer_root_y.0()
> Store producer_root_y.0(0, 0) = 0.000000
> Store producer_root_y.0(1, 0) = 0.000000
> Store producer_root_y.0(2, 0) = 0.000000
> Store producer_root_y.0(3, 0) = 0.000000
> Store producer_root_y.0(4, 0) = 0.000000
> Store producer_root_y.0(0, 1) = 0.000000
> Store producer_root_y.0(1, 1) = 0.841471
> Store producer_root_y.0(2, 1) = 0.909297
> Store producer_root_y.0(3, 1) = 0.141120
> Store producer_root_y.0(4, 1) = -0.756802
> Store consumer_root_y.0(0, 0) = 0.210368
> Store consumer_root_y.0(1, 0) = 0.437692
> Store consumer_root_y.0(2, 0) = 0.262604
> Store consumer_root_y.0(3, 0) = -0.153921
> Store producer_root_y.0(0, 2) = 0.000000
> Store producer_root_y.0(1, 2) = 0.909297
> Store producer_root_y.0(2, 2) = -0.756802
> Store producer_root_y.0(3, 2) = -0.279415
> Store producer_root_y.0(4, 2) = 0.989358
> Store consumer_root_y.0(0, 1) = 0.437692
> Store consumer_root_y.0(1, 1) = 0.475816
> Store consumer_root_y.0(2, 1) = 0.003550
> Store consumer_root_y.0(3, 1) = 0.023565
> Store producer_root_y.0(0, 3) = 0.000000
> Store producer_root_y.0(1, 3) = 0.141120
> Store producer_root_y.0(2, 3) = -0.279415
> Store producer_root_y.0(3, 3) = 0.412118
> Store producer_root_y.0(4, 3) = -0.536573
> Store consumer_root_y.0(0, 2) = 0.262604
> Store consumer_root_y.0(1, 2) = 0.003550
> Store consumer_root_y.0(2, 2) = -0.225879
> Store consumer_root_y.0(3, 2) = 0.146372
> Store producer_root_y.0(0, 4) = 0.000000
> Store producer_root_y.0(1, 4) = -0.756802
> Store producer_root_y.0(2, 4) = 0.989358
> Store producer_root_y.0(3, 4) = -0.536573
> Store producer_root_y.0(4, 4) = -0.287903
> Store consumer_root_y.0(0, 3) = -0.153921
> Store consumer_root_y.0(1, 3) = 0.023565
> Store consumer_root_y.0(2, 3) = 0.146372
> Store consumer_root_y.0(3, 3) = -0.237233
> End pipeline consumer_root_y.0()
-------------------------------------------------------------------
//下图是可视化图。
图83
// 阅读日志或查看该图,您应该看到producer和consumer再次基于每个扫描线交替使用。 它计算producer的5x2框以满足consumer的第一条扫描线,但是之后,它仅为consumer的每条新扫描线计算输出的5x1框!
//
// Halide已检测到,除第一个扫描线外,所有扫描线都可以重用已经存在于我们为producer分配的缓冲区中的值。 让我们看一下等效的C:
float result[4][4];
// producer.store_root() 意味着存储在这里:
float producer_storage[5][5];
// consumer的扫描线存在外部循环:
for (int y = 0; y < 4; y++) {
//计算足够的producer以满足consumer的需求。
for (int py = y; py < y + 2; py++) {
// 跳过我们在上一次迭代中已经计算出的producer行.
if (y > 0 && py == y) continue;
for (int px = 0; px < 5; px++) {
producer_storage[py][px] = sin(px * py);
}
}
// 计算consumer的扫描线.
for (int x = 0; x < 4; x++) {
result[y][x] = (producer_storage[y][x] +
producer_storage[y+1][x] +
producer_storage[y][x+1] +
producer_storage[y+1][x+1])/4;
}
}
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
--------------------------------------------------------------------------
> store producer_root_y:
> produce consumer_root_y:
> for y:
> produce producer_root_y:
> for y:
> for x:
> producer_root_y(...) = ...
> consume producer_root_y:
> for x:
> consumer_root_y(...) = ...
-------------------------------------------------------------------
// 此策略的性能特征非常好! 这些数字与compute_root类似,但局部性更好。 我们正在执行最少数量的sin调用,并且在存储值之后立即加载值,因此我们可能会充分利用缓存:
// producer.store_root().compute_at(consumer, y):
// - 分配的临时内存: 10 floats
// - Loads: 64
// - Stores: 39
// - Calls to sin: 25
// 请注意,我声称的分配的内存量与参考C代码不匹配。 Halide正在后台进行另一项优化。 它将producer的存储折叠到两条扫描线的循环缓冲区中。 等效的C实际上看起来像这样:
{
// 实际存储2条扫描线,而不是5条
float producer_storage[2][5];
for (int y = 0; y < 4; y++) {
for (int py = y; py < y + 2; py++) {
if (y > 0 && py == y) continue;
for (int px = 0; px < 5; px++) {
// 到producer_storage的存储的y坐标是位掩码的.
producer_storage[py & 1][px] = sin(px * py);
}
}
// 计算consumer的扫描线.
for (int x = 0; x < 4; x++) {
// 来自producer_storage的负载的y坐标为位掩码.
result[y][x] = (producer_storage[y & 1][x] +
producer_storage[(y+1) & 1][x] +
producer_storage[y & 1][x+1] +
producer_storage[(y+1) & 1][x+1])/4;
}
}
}
}
// 我们可以做得更好,将存储留在最外层,但是将计算移到最内层的循环中:
{
Func producer("producer_root_x"), consumer("consumer_root_x");
producer(x, y) = sin(x * y);
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 存储最外层,计算最内层。
producer.store_root().compute_at(consumer, x);
producer.trace_stores();
consumer.trace_stores();
printf("\nEvaluating producer.store_root().compute_at(consumer, x)\n");
consumer.realize(4, 4);
------------------------------------------------------------------
> Begin pipeline consumer_root_x.0()
> Store producer_root_x.0(0, 0) = 0.000000
> Store producer_root_x.0(1, 0) = 0.000000
> Store producer_root_x.0(0, 1) = 0.000000
> Store producer_root_x.0(1, 1) = 0.841471
> Store consumer_root_x.0(0, 0) = 0.210368
> Store producer_root_x.0(2, 0) = 0.000000
> Store producer_root_x.0(2, 1) = 0.909297
> Store consumer_root_x.0(1, 0) = 0.437692
> Store producer_root_x.0(3, 0) = 0.000000
> Store producer_root_x.0(3, 1) = 0.141120
> Store consumer_root_x.0(2, 0) = 0.262604
> Store producer_root_x.0(4, 0) = 0.000000
> Store producer_root_x.0(4, 1) = -0.756802
> Store consumer_root_x.0(3, 0) = -0.153921
> Store producer_root_x.0(0, 2) = 0.000000
> Store producer_root_x.0(1, 2) = 0.909297
> Store consumer_root_x.0(0, 1) = 0.437692
> Store producer_root_x.0(2, 2) = -0.756802
> Store consumer_root_x.0(1, 1) = 0.475816
> Store producer_root_x.0(3, 2) = -0.279415
> Store consumer_root_x.0(2, 1) = 0.003550
> Store producer_root_x.0(4, 2) = 0.989358
> Store consumer_root_x.0(3, 1) = 0.023565
> Store producer_root_x.0(0, 3) = 0.000000
> Store producer_root_x.0(1, 3) = 0.141120
> Store consumer_root_x.0(0, 2) = 0.262604
> Store producer_root_x.0(2, 3) = -0.279415
> Store consumer_root_x.0(1, 2) = 0.003550
> Store producer_root_x.0(3, 3) = 0.412118
> Store consumer_root_x.0(2, 2) = -0.225879
> Store producer_root_x.0(4, 3) = -0.536573
> Store consumer_root_x.0(3, 2) = 0.146372
> Store producer_root_x.0(0, 4) = 0.000000
> Store producer_root_x.0(1, 4) = -0.756802
> Store consumer_root_x.0(0, 3) = -0.153921
> Store producer_root_x.0(2, 4) = 0.989358
> Store consumer_root_x.0(1, 3) = 0.023565
> Store producer_root_x.0(3, 4) = -0.536573
> Store consumer_root_x.0(2, 3) = 0.146372
> Store producer_root_x.0(4, 4) = -0.287903
> Store consumer_root_x.0(3, 3) = -0.237233
> End pipeline consumer_root_x.0()
--------------------------------------------------------------------
// 可视化图如下
图84
// 您应该看到producer和consumer现在按像素交替。 这是等效的C:
float result[4][4];
// producer.store_root() 意味着存储在这里,但是我们可以将其折叠到两条扫描线的循环缓冲区中:
float producer_storage[2][5];
// 对于consumer的每个像素:
for (int y = 0; y < 4; y++) {
for (int x = 0; x < 4; x++) {
// 计算足够的producer以满足consumer的这个像素,但是跳过我们已经计算出的值:
if (y == 0 && x == 0)
producer_storage[y & 1][x] = sin(x*y);
if (y == 0)
producer_storage[y & 1][x+1] = sin((x+1)*y);
if (x == 0)
producer_storage[(y+1) & 1][x] = sin(x*(y+1));
producer_storage[(y+1) & 1][x+1] = sin((x+1)*(y+1));
result[y][x] = (producer_storage[y & 1][x] +
producer_storage[(y+1) & 1][x] +
producer_storage[y & 1][x+1] +
producer_storage[(y+1) & 1][x+1])/4;
}
}
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
----------------------------------------------------------------
> store producer_root_x:
> produce consumer_root_x:
> for y:
> for x:
> produce producer_root_x:
> for y:
> for x:
> producer_root_x(...) = ...
> consume producer_root_x:
> consumer_root_x(...) = ...
--------------------------------------------------------------
// 该策略的性能特征是迄今为止最好的。 我们需要的producer的四个值之一可能仍然位于寄存器中,因此我不会将其视为负载:
// producer.store_root().compute_at(consumer, x):
// - 分配的临时内存: 10 floats
// - Loads: 48
// - Stores: 56
// - Calls to sin: 40
}
// 那么有什么收获呢? 为什么不总是为这种类型的代码始终使用producer.store_root()。compute_at(consumer,x)?
//
// 答案是并行性。 在前两种策略中,我们都假设先前迭代计算出的值可供我们重用。 这假定x或y的先前值在时间上较早发生并已完成。 如果并行化或向量化任一循环,则情况并非如此。 如果您进行并行化,那么在store_at级别和compute_at级别之间存在并行循环时,Halide将不会注入跳过已经完成的优化的操作,并且也不会将存储向下折叠到循环缓冲区中,这会使我们的store_root毫无意义 。
// 我们的选项已用完。 我们可以拆分创建新的。 我们可以在consumer的自然变量(x和y)上存储store_at或compute_at,也可以将x或y拆分为新的内部和外部子变量,然后针对它们进行调度。 我们将使用它来表示图块中的融合:
{
Func producer("producer_tile"), consumer("consumer_tile");
producer(x, y) = sin(x * y);
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 我们将以4x4格来计算8x8的consumer.
Var x_outer, y_outer, x_inner, y_inner;
consumer.tile(x, y, x_outer, y_outer, x_inner, y_inner, 4, 4);
// 根据consumer的每块计算producer
producer.compute_at(consumer, x_outer);
// 请注意,我是从管道(consumer)的末端开始编写时间表的。 这是因为producer的计划表引用了x_outer,这是我们在平铺consumer时引入的。 您可以按其他顺序编写它,但是它往往很难阅读。
// 打开追踪.
producer.trace_stores();
consumer.trace_stores();
printf("\nEvaluating:\n"
"consumer.tile(x, y, x_outer, y_outer, x_inner, y_inner, 4, 4);\n"
"producer.compute_at(consumer, x_outer);\n");
consumer.realize(8, 8);
---------------------------------------------------------------------
> Begin pipeline consumer_tile.0()
> Store producer_tile.0(0, 0) = 0.000000
> Store producer_tile.0(1, 0) = 0.000000
> Store producer_tile.0(2, 0) = 0.000000
> Store producer_tile.0(3, 0) = 0.000000
> Store producer_tile.0(4, 0) = 0.000000
> Store producer_tile.0(0, 1) = 0.000000
> Store producer_tile.0(1, 1) = 0.841471
> Store producer_tile.0(2, 1) = 0.909297
> Store producer_tile.0(3, 1) = 0.141120
> Store producer_tile.0(4, 1) = -0.756802
> Store producer_tile.0(0, 2) = 0.000000
> Store producer_tile.0(1, 2) = 0.909297
> Store producer_tile.0(2, 2) = -0.756802
> Store producer_tile.0(3, 2) = -0.279415
> Store producer_tile.0(4, 2) = 0.989358
> Store producer_tile.0(0, 3) = 0.000000
> Store producer_tile.0(1, 3) = 0.141120
> Store producer_tile.0(2, 3) = -0.279415
> Store producer_tile.0(3, 3) = 0.412118
> Store producer_tile.0(4, 3) = -0.536573
> Store producer_tile.0(0, 4) = 0.000000
> Store producer_tile.0(1, 4) = -0.756802
> Store producer_tile.0(2, 4) = 0.989358
> Store producer_tile.0(3, 4) = -0.536573
> Store producer_tile.0(4, 4) = -0.287903
> Store consumer_tile.0(0, 0) = 0.210368
> Store consumer_tile.0(1, 0) = 0.437692
> Store consumer_tile.0(2, 0) = 0.262604
> Store consumer_tile.0(3, 0) = -0.153921
> Store consumer_tile.0(0, 1) = 0.437692
> Store consumer_tile.0(1, 1) = 0.475816
> Store consumer_tile.0(2, 1) = 0.003550
> Store consumer_tile.0(3, 1) = 0.023565
> Store consumer_tile.0(0, 2) = 0.262604
> Store consumer_tile.0(1, 2) = 0.003550
> Store consumer_tile.0(2, 2) = -0.225879
> Store consumer_tile.0(3, 2) = 0.146372
> Store consumer_tile.0(0, 3) = -0.153921
> Store consumer_tile.0(1, 3) = 0.023565
> Store consumer_tile.0(2, 3) = 0.146372
> Store consumer_tile.0(3, 3) = -0.237233
> Store producer_tile.0(4, 0) = 0.000000
> Store producer_tile.0(5, 0) = 0.000000
> Store producer_tile.0(6, 0) = 0.000000
> Store producer_tile.0(7, 0) = 0.000000
> Store producer_tile.0(8, 0) = 0.000000
> Store producer_tile.0(4, 1) = -0.756802
> Store producer_tile.0(5, 1) = -0.958924
> Store producer_tile.0(6, 1) = -0.279415
> Store producer_tile.0(7, 1) = 0.656987
> Store producer_tile.0(8, 1) = 0.989358
> Store producer_tile.0(4, 2) = 0.989358
> Store producer_tile.0(5, 2) = -0.544021
> Store producer_tile.0(6, 2) = -0.536573
> Store producer_tile.0(7, 2) = 0.990607
> Store producer_tile.0(8, 2) = -0.287903
> Store producer_tile.0(4, 3) = -0.536573
> Store producer_tile.0(5, 3) = 0.650288
> Store producer_tile.0(6, 3) = -0.750987
> Store producer_tile.0(7, 3) = 0.836656
> Store producer_tile.0(8, 3) = -0.905578
> Store producer_tile.0(4, 4) = -0.287903
> Store producer_tile.0(5, 4) = 0.912945
> Store producer_tile.0(6, 4) = -0.905578
> Store producer_tile.0(7, 4) = 0.270906
> Store producer_tile.0(8, 4) = 0.551427
> Store consumer_tile.0(4, 0) = -0.428932
> Store consumer_tile.0(5, 0) = -0.309585
> Store consumer_tile.0(6, 0) = 0.094393
> Store consumer_tile.0(7, 0) = 0.411586
> Store consumer_tile.0(4, 1) = -0.317597
> Store consumer_tile.0(5, 1) = -0.579733
> Store consumer_tile.0(6, 1) = 0.207901
> Store consumer_tile.0(7, 1) = 0.587262
> Store consumer_tile.0(4, 2) = 0.139763
> Store consumer_tile.0(5, 2) = -0.295323
> Store consumer_tile.0(6, 2) = 0.134926
> Store consumer_tile.0(7, 2) = 0.158445
> Store consumer_tile.0(4, 3) = 0.184689
> Store consumer_tile.0(5, 3) = -0.023333
> Store consumer_tile.0(6, 3) = -0.137251
> Store consumer_tile.0(7, 3) = 0.188352
> Store producer_tile.0(0, 4) = 0.000000
> Store producer_tile.0(1, 4) = -0.756802
> Store producer_tile.0(2, 4) = 0.989358
> Store producer_tile.0(3, 4) = -0.536573
> Store producer_tile.0(4, 4) = -0.287903
> Store producer_tile.0(0, 5) = 0.000000
> Store producer_tile.0(1, 5) = -0.958924
> Store producer_tile.0(2, 5) = -0.544021
> Store producer_tile.0(3, 5) = 0.650288
> Store producer_tile.0(4, 5) = 0.912945
> Store producer_tile.0(0, 6) = 0.000000
> Store producer_tile.0(1, 6) = -0.279415
> Store producer_tile.0(2, 6) = -0.536573
> Store producer_tile.0(3, 6) = -0.750987
> Store producer_tile.0(4, 6) = -0.905578
> Store producer_tile.0(0, 7) = 0.000000
> Store producer_tile.0(1, 7) = 0.656987
> Store producer_tile.0(2, 7) = 0.990607
> Store producer_tile.0(3, 7) = 0.836656
> Store producer_tile.0(4, 7) = 0.270906
> Store producer_tile.0(0, 8) = 0.000000
> Store producer_tile.0(1, 8) = 0.989358
> Store producer_tile.0(2, 8) = -0.287903
> Store producer_tile.0(3, 8) = -0.905578
> Store producer_tile.0(4, 8) = 0.551427
> Store consumer_tile.0(0, 4) = -0.428932
> Store consumer_tile.0(1, 4) = -0.317597
> Store consumer_tile.0(2, 4) = 0.139763
> Store consumer_tile.0(3, 4) = 0.184689
> Store consumer_tile.0(0, 5) = -0.309585
> Store consumer_tile.0(1, 5) = -0.579733
> Store consumer_tile.0(2, 5) = -0.295323
> Store consumer_tile.0(3, 5) = -0.023333
> Store consumer_tile.0(0, 6) = 0.094393
> Store consumer_tile.0(1, 6) = 0.207901
> Store consumer_tile.0(2, 6) = 0.134926
> Store consumer_tile.0(3, 6) = -0.137251
> Store consumer_tile.0(0, 7) = 0.411586
> Store consumer_tile.0(1, 7) = 0.587262
> Store consumer_tile.0(2, 7) = 0.158445
> Store consumer_tile.0(3, 7) = 0.188352
> Store producer_tile.0(4, 4) = -0.287903
> Store producer_tile.0(5, 4) = 0.912945
> Store producer_tile.0(6, 4) = -0.905578
> Store producer_tile.0(7, 4) = 0.270906
> Store producer_tile.0(8, 4) = 0.551427
> Store producer_tile.0(4, 5) = 0.912945
> Store producer_tile.0(5, 5) = -0.132352
> Store producer_tile.0(6, 5) = -0.988032
> Store producer_tile.0(7, 5) = -0.428183
> Store producer_tile.0(8, 5) = 0.745113
> Store producer_tile.0(4, 6) = -0.905578
> Store producer_tile.0(5, 6) = -0.988032
> Store producer_tile.0(6, 6) = -0.991779
> Store producer_tile.0(7, 6) = -0.916522
> Store producer_tile.0(8, 6) = -0.768255
> Store producer_tile.0(4, 7) = 0.270906
> Store producer_tile.0(5, 7) = -0.428183
> Store producer_tile.0(6, 7) = -0.916522
> Store producer_tile.0(7, 7) = -0.953753
> Store producer_tile.0(8, 7) = -0.521551
> Store producer_tile.0(4, 8) = 0.551427
> Store producer_tile.0(5, 8) = 0.745113
> Store producer_tile.0(6, 8) = -0.768255
> Store producer_tile.0(7, 8) = -0.521551
> Store producer_tile.0(8, 8) = 0.920026
> Store consumer_tile.0(4, 4) = 0.351409
> Store consumer_tile.0(5, 4) = -0.278254
> Store consumer_tile.0(6, 4) = -0.512722
> Store consumer_tile.0(7, 4) = 0.284816
> Store consumer_tile.0(4, 5) = -0.278254
> Store consumer_tile.0(5, 5) = -0.775048
> Store consumer_tile.0(6, 5) = -0.831129
> Store consumer_tile.0(7, 5) = -0.341961
> Store consumer_tile.0(4, 6) = -0.512722
> Store consumer_tile.0(5, 6) = -0.831129
> Store consumer_tile.0(6, 6) = -0.944644
> Store consumer_tile.0(7, 6) = -0.790020
> Store consumer_tile.0(4, 7) = 0.284816
> Store consumer_tile.0(5, 7) = -0.341961
> Store consumer_tile.0(6, 7) = -0.790020
> Store consumer_tile.0(7, 7) = -0.269207
> End pipeline consumer_tile.0()
-------------------------------------------------------------------
// 可视化图如下.
图85
// producer和consumer现在按每个区块交替。 这是等效的C:
float result[8][8];
// 对于consumer的每块:
for (int y_outer = 0; y_outer < 2; y_outer++) {
for (int x_outer = 0; x_outer < 2; x_outer++) {
// 计算此图块开始处的x和y坐标.
int x_base = x_outer*4;
int y_base = y_outer*4;
// 计算足够的producer来满足此图块。 consumer的4x4拼贴需要producer的5x5拼贴。
float producer_storage[5][5];
for (int py = y_base; py < y_base + 5; py++) {
for (int px = x_base; px < x_base + 5; px++) {
producer_storage[py-y_base][px-x_base] = sin(px * py);
}
}
// 计算消费者的这个图块
for (int y_inner = 0; y_inner < 4; y_inner++) {
for (int x_inner = 0; x_inner < 4; x_inner++) {
int x = x_base + x_inner;
int y = y_base + y_inner;
result[y][x] =
(producer_storage[y - y_base][x - x_base] +
producer_storage[y - y_base + 1][x - x_base] +
producer_storage[y - y_base][x - x_base + 1] +
producer_storage[y - y_base + 1][x - x_base + 1])/4;
}
}
}
}
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
-------------------------------------------------------------------
> produce consumer_tile:
> for y.y_outer:
> for x.x_outer:
> produce producer_tile:
> for y:
> for x:
> producer_tile(...) = ...
> consume producer_tile:
> for y.y_inner in [0, 3]:
> for x.x_inner in [0, 3]:
> consumer_tile(...) = ...
------------------------------------------------------------------
// 对于像这样的问题,使用在x和y处向外延伸的模具,平铺可能很有意义。 每个图块可以并行独立地计算,并且一旦图块变得足够大,每个图块完成的冗余工作就不会太糟糕。
}
// 让我们尝试一种混合策略,该策略将我们已完成的拆分,并行化和向量化处理结合在一起。 对于大型图像,这通常在实践中效果很好。 如果您了解此进程,那么您将了解Halide中95%的进程。
{
Func producer("producer_mixed"), consumer("consumer_mixed");
producer(x, y) = sin(x * y);
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 将consumer的y坐标分成16条扫描线:
Var yo, yi;
consumer.split(y, yo, yi, 16);
// 使用线程池和任务队列计算strip.
consumer.parallel(yo);
// 在x上向量化四倍.
consumer.vectorize(x, 4);
// 现在按条存储producer。 这将是producer的17条扫描线(16 + 1),但希望它将折叠成两条扫描线的循环缓冲区:
producer.store_at(consumer, yo);
// 在每个strip中,计算consumer每条扫描线的生产者,而跳过先前扫描线完成的工作。
producer.compute_at(consumer, yi);
// 还要对producer进行矢量化处理(因为可以使用SSE在x86上对sin进行矢量化处理)。
producer.vectorize(x, 4);
// 我们这次不进行跟踪,因为我们将在更大的图像上进行评估。
// consumer.trace_stores();
// producer.trace_stores();
Buffer<float> halide_result = consumer.realize(160, 160);
// 下图是可视化图
图86
// 等效的C代码:
float c_result[160][160];
// 对于16条扫描线的每个strip(在Halide版本中此循环是并行的)
for (int yo = 0; yo < 160/16 + 1; yo++) {
// 16不会除以160,因此将最后一个片段向上推以适合[0,159](请参阅第05章)。
int y_base = yo * 16;
if (y_base > 160-16) y_base = 160-16;
// 为producer分配两个扫描线循环缓冲区
float producer_storage[2][161];
// 对于16 strip中的每条扫描线
for (int yi = 0; yi < 16; yi++) {
int y = y_base + yi;
for (int py = y; py < y+2; py++) {
// 跳过已经在此任务内计算出的扫描线
if (yi > 0 && py == y) continue;
// 用4宽度向量计算producer的扫描线
for (int x_vec = 0; x_vec < 160/4 + 1; x_vec++) {
int x_base = x_vec*4;
// 4不会除以161,因此将最后一个向量向左推(请参阅第05章).
if (x_base > 161 - 4) x_base = 161 - 4;
// 如果您使用的是x86,则Halide会为此部分生成SSE代码:
int x[] = {x_base, x_base + 1, x_base + 2, x_base + 3};
float vec[4] = {sinf(x[0] * py), sinf(x[1] * py),
sinf(x[2] * py), sinf(x[3] * py)};
producer_storage[py & 1][x[0]] = vec[0];
producer_storage[py & 1][x[1]] = vec[1];
producer_storage[py & 1][x[2]] = vec[2];
producer_storage[py & 1][x[3]] = vec[3];
}
}
// 现在计算此扫描线的consumer:
for (int x_vec = 0; x_vec < 160/4; x_vec++) {
int x_base = x_vec * 4;
// 同样,此处的Halide等效项使用SSE.
int x[] = {x_base, x_base + 1, x_base + 2, x_base + 3};
float vec[] = {
(producer_storage[y & 1][x[0]] +
producer_storage[(y+1) & 1][x[0]] +
producer_storage[y & 1][x[0]+1] +
producer_storage[(y+1) & 1][x[0]+1])/4,
(producer_storage[y & 1][x[1]] +
producer_storage[(y+1) & 1][x[1]] +
producer_storage[y & 1][x[1]+1] +
producer_storage[(y+1) & 1][x[1]+1])/4,
(producer_storage[y & 1][x[2]] +
producer_storage[(y+1) & 1][x[2]] +
producer_storage[y & 1][x[2]+1] +
producer_storage[(y+1) & 1][x[2]+1])/4,
(producer_storage[y & 1][x[3]] +
producer_storage[(y+1) & 1][x[3]] +
producer_storage[y & 1][x[3]+1] +
producer_storage[(y+1) & 1][x[3]+1])/4};
c_result[y][x[0]] = vec[0];
c_result[y][x[1]] = vec[1];
c_result[y][x[2]] = vec[2];
c_result[y][x[3]] = vec[3];
}
}
}
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
----------------------------------------------------------------
> produce consumer_mixed:
> parallel y.yo:
> store producer_mixed:
> for y.yi in [0, 15]:
> produce producer_mixed:
> for y:
> for x.x:
> vectorized x.v39 in [0, 3]:
> producer_mixed(...) = ...
> consume producer_mixed:
> for x.x:
> vectorized x.v36 in [0, 3]:
> consumer_mixed(...) = ...
------------------------------------------------------------------
// 看我的代码,你们强大而绝望!
// 让我们对照halide结果检查C结果。 这样做后,我在C实现中发现了几个错误,这些错误应该告诉您一些信息。
for (int y = 0; y < 160; y++) {
for (int x = 0; x < 160; x++) {
float error = halide_result(x, y) - c_result[y][x];
// 这是浮点数学运算,所以我们允许一些误差:
if (error < -0.001f || error > 0.001f) {
printf("halide_result(%d, %d) = %f instead of %f\n",
x, y, halide_result(x, y), c_result[y][x]);
return -1;
}
}
}
}
// 这东西很难。 我们最终在内存带宽,冗余工作和并行性之间进行了三步权衡。 halide无法自动为您做出正确的选择(抱歉)。 相反,它试图使您更轻松地探索各种选项,而不会弄乱您的程序。 实际上,Halide承诺,诸如compute_root之类的调度调用不会改变算法的含义-无论如何调度,您都应该获得相同的位。
// 因此要凭经验! 试用各种时间表,并记录性能。 形成假设,然后尝试证明自己是错误的。 不必假设您只需要将代码矢量化四倍,然后在八个内核上运行它,就会使速度提高32倍。 这几乎是行不通的。 现代系统非常复杂,以至于如果不运行代码就无法可靠地预测性能。
//我们建议您首先安排所有非平凡阶段的compute_root,然后从流水线末端开始向上,依次内联,并行化和向量化各个阶段,直到到达顶部为止。
//Halide不仅涉及向量化和并行化代码。 这还不足以使您走得更远。 Halide旨在为您提供工具,以帮助您快速探索局部性,冗余工作和并行性之间的不同权衡,而不会弄乱您要计算的实际结果。
printf("Success!\n");
return 0;
}