Overview
Presto Index Source is an optimizing strategy based on the index of data source. It can improve query performance by avoiding reading of data that would be filtered by join condition.
In Index Source strategy, presto will transfer keys from the left table(probe side) to the right table(the Index Source), then the Index Source will do a lookup operation to fetch records according to the keys it received. After that the left and right table can do a hash join operation. That's to say, using index source will not create any splits or do a table scan operation, index source will read records according to the keys it received directly.
So Presto Index Source performs well when the right table is extremely large and we only need a few of them according to the join conditions. Besides, the right table must have an efficient way to fetch rows associated with keys.
Index Source is a little bit like dynamic filter, they all focus on how to reduce the data amount to be read, but dynamic filter has a timeout, and it focuses on the left table.
Code tracing
In order to figure out how Index Source works, I did some code debugging job, so let's start from a simple test case in class com.facebook.presto.tests.AbstractTestIndexedQueries#testBasicIndexJoin
. SQL is :
SELECT * FROM (SELECT * FROM lineitem WHERE partkey % 8 = 0) l JOIN orders o ON l.orderkey = o.orderkey;
From its execution plan we can see that Index Source has changed its right table's ScanProjectOperator to an IndexSourceOperator.
IndexSource[tpch_indexed:com.facebook.presto.tests.tpch.TpchIndexHandle@2981c060, lookup = [orderkey_63]] => [orderkey_63:bigint, custkey:bigint, orderstatus:varchar(1), totalprice:double, orderdate:date, orderpriority:varchar(15), clerk:varchar(15), shippriority:integer, comment_64:varchar(79)]
CPU: 1.68s (30.82%), Scheduled: 1.77s (28.76%), Output: 332605 rows (41.22MB)
Input avg.: 523.79 rows, Input std.dev.: 102.78%
orderkey_63 := tpch:orderkey
custkey := tpch:custkey
orderstatus := tpch:orderstatus
totalprice := tpch:totalprice
orderdate := tpch:orderdate
orderpriority := tpch:orderpriority
clerk := tpch:clerk
shippriority := tpch:shippriority
comment_64 := tpch:comment
IndexSourceOperator is defined in class com.facebook.presto.operator.index.IndexSourceOperator
. Its core code is :
@Override
public Supplier<Optional<UpdatablePageSource>> addSplit(Split split)
{
requireNonNull(split, "split is null");
checkState(source == null, "Index source split already set");
IndexSplit indexSplit = (IndexSplit) split.getConnectorSplit();
// Normalize the incoming RecordSet to something that can be consumed by the index
RecordSet normalizedRecordSet = probeKeyNormalizer.apply(indexSplit.getKeyRecordSet());
// !!!!!filter the right table's records according to the left table's key set ( indexSplit.getKeyRecordSet() )
ConnectorPageSource result = index.lookup(normalizedRecordSet);
// create right table's page source according to the filter result, and read the result set page by page later.
source = new PageSourceOperator(result, operatorContext);
Object splitInfo = split.getInfo();
if (splitInfo != null) {
operatorContext.setInfoSupplier(() -> new SplitOperatorInfo(splitInfo));
}
return Optional::empty;
}
Method ConnectorIndex#lookup
is refer to the lookup operation in execution plan. Step into this method. This interface is only implemented by TpchConnectorIndex
:
@Override
public ConnectorPageSource lookup(RecordSet rawInputRecordSet)
{
// convert the input record set from the column ordering in the query to
// match the column ordering of the index
RecordSet inputRecordSet = keyFormatter.apply(rawInputRecordSet);
// !!!!!lookup the values in the index
RecordSet rawOutputRecordSet = indexedTable.lookupKeys(inputRecordSet);
// convert the output record set of the index into the column ordering
// expect by the query
return new RecordPageSource(outputFormatter.apply(rawOutputRecordSet));
}
Let's step into method IndexedTable#lookupKeys
to see how they do this filtering job:
public RecordSet lookupKeys(RecordSet recordSet)
{
// Since we only return a cached copy of IndexedTable, please make sure you reorder the input to same order of keyColumns
checkArgument(recordSet.getColumnTypes().equals(keyTypes), "Input RecordSet keys do not match expected key type");
Iterable<RecordSet> outputRecordSets = Iterables.transform(tupleIterable(recordSet), key -> {
for (Object value : key.getValues()) {
if (value == null) {
throw new IllegalArgumentException("TPCH index does not support null values");
}
}
// lookup record by specified key
return lookupKey(key);
});
// We will return result same order as outputColumns
return new ConcatRecordSet(outputRecordSets, outputTypes);
}
private RecordSet lookupKey(MaterializedTuple tupleKey)
{
// fetch records from cache key -> record mapping in local attribute keyToValues
return new MaterializedTupleRecordSet(keyToValues.get(tupleKey), outputTypes);
}
For tpch is just a test connector, so when com.facebook.presto.tests.tpch.IndexedTpchConnectorFactory
is initializing, it has read all the records in table orders and cached them as key -> record mapping in a ListMultimap named keyToValues
, so here we can just get record by key and return. Below is how tpch connector cache data in table orders.
public TpchIndexedData(String connectorId, TpchIndexSpec tpchIndexSpec)
{
requireNonNull(connectorId, "connectorId is null");
requireNonNull(tpchIndexSpec, "tpchIndexSpec is null");
TpchMetadata tpchMetadata = new TpchMetadata(connectorId);
TpchRecordSetProvider tpchRecordSetProvider = new TpchRecordSetProvider();
ImmutableMap.Builder<Set<TpchScaledColumn>, IndexedTable> indexedTablesBuilder = ImmutableMap.builder();
Set<TpchScaledTable> tables = tpchIndexSpec.listIndexedTables();
for (TpchScaledTable table : tables) {
SchemaTableName tableName = new SchemaTableName("sf" + table.getScaleFactor(), table.getTableName());
TpchTableHandle tableHandle = tpchMetadata.getTableHandle(null, tableName);
Map<String, ColumnHandle> columnHandles = new LinkedHashMap<>(tpchMetadata.getColumnHandles(null, tableHandle));
for (Set<String> columnNames : tpchIndexSpec.getColumnIndexes(table)) {
List<String> keyColumnNames = ImmutableList.copyOf(columnNames); // Finalize the key order
Set<TpchScaledColumn> keyColumns = keyColumnNames.stream()
.map(name -> new TpchScaledColumn(table, name))
.collect(toImmutableSet());
TpchTable<?> tpchTable = TpchTable.getTable(table.getTableName());
RecordSet recordSet = tpchRecordSetProvider.getRecordSet(tpchTable, ImmutableList.copyOf(columnHandles.values()), table.getScaleFactor(), 0, 1, TupleDomain.all());
IndexedTable indexedTable = indexTable(recordSet, ImmutableList.copyOf(columnHandles.keySet()), keyColumnNames);
indexedTablesBuilder.put(keyColumns, indexedTable);
}
}
indexedTables = indexedTablesBuilder.build();
}
private static IndexedTable indexTable(RecordSet recordSet, final List<String> outputColumns, List<String> keyColumns)
{
List<Integer> keyPositions = keyColumns.stream()
.map(columnName -> {
int position = outputColumns.indexOf(columnName);
checkState(position != -1);
return position;
})
.collect(toImmutableList());
ImmutableListMultimap.Builder<MaterializedTuple, MaterializedTuple> indexedValuesBuilder = ImmutableListMultimap.builder();
List<Type> outputTypes = recordSet.getColumnTypes();
List<Type> keyTypes = extractPositionValues(outputTypes, keyPositions);
RecordCursor cursor = recordSet.cursor();
while (cursor.advanceNextPosition()) {
List<Object> values = extractValues(cursor, outputTypes);
List<Object> keyValues = extractPositionValues(values, keyPositions);
indexedValuesBuilder.put(new MaterializedTuple(keyValues), new MaterializedTuple(values));
}
return new IndexedTable(keyColumns, keyTypes, outputColumns, outputTypes, indexedValuesBuilder.build());
}