什么是N+1问题
{
topStories(limit: 10) {
title
author {
name
email
}
}
}
这个简单的查询, 在Query
resolve里面会查询到top10 story, 但是StoryType
却会调用10次author
查询
大概的SQL查询:
select * from stories limit 10
select * from author where stories.author_id = x
select * from author where stories.author_id = x
select * from author where stories.author_id = x
...
select * from author where stories.author_id = x
这个简单的Query就会查询11次数据库.
我们要知道Graphql的图形结构可能是个闭环结构, 例如, stories->authors->...->stories(这个树形图, 可以使用voyager来查看), 所以如果不解决N+1
问题, 那么Graphql
接口将会严重拖慢的Server
的性能.
解决完N+1
问题的效果
优化前
优化后
sql结果
解决方案
目前GraphQL
通用的解决方案是Facebook
提供的Dataloader, 他的核心思想是Batch Query
和Cached
. 每种语言都有自己的DataLoader
的实现, 去Github都能找到
Laravel N+1的解决方法
目前我们采用的是PHP的Laravel GraphQL框架. 这个框架有两种解决N+1
的问题
-
DataLoader
, 这个有个现成的Laravel的Demo -
GraphQL\Deferred
, laravel-graphql使用的是webonyx/graphql-php, 这个框架封装了Deferred
的方法.
经过综合考虑, 选择了Deferred
方法.
Laravel Deferred
以下面BlogStory
为例:
<?php
'resolve' => function($blogStory) {
MyUserBuffer::add($blogStory['authorId']);
return new GraphQL\Deferred(function () use ($blogStory) {
MyUserBuffer::loadBuffered();
return MyUserBuffer::get($blogStory['authorId']);
});
}
整体思路:
- 先将要查询的字段先存起来
- 在
resolve
的GraphQL\Deferred
返回一个Promise
, 这个Promise
不会真正查询数据. - 当所有的逻辑全部执行完毕后, 才会真正查询数据库.
这里MyUserBuffer
需要自己实现.
我的实现
$fieldAndIds = [
'user_id' => $root['id'],
'order_by' => [
'id' => 'DESC',
],
];
UsersRobotsChild::add($fieldAndIds);
return new Deferred(function () use ($fieldAndIds) {
UsersRobotsChild::loadBuffers();
return UsersRobotsChild::get($fieldAndIds);
});
trait Deferred
{
private static $fieldAndIds = [];
private static $buffer = [];
/**
* @param $arr
*
* 这个函数的目的:
* 首先获取这个函数是被哪个类调用的 + 按照查询的字段, 放入到数组中.
* 例如:
* UsersRobotsChild::where('user_id', 11)->where('child_id', 1)->get()
* 就会调用UsersRobotsChild::add(['user_id', 11, 'child_Id', 1]);
* 这个时候就会变成下面这个样子:
* $fieldAndIds['Luka\Models\Account\UsersRobotsChild']['user_id-child_id'][] = ['user_id', 11, 'child_Id', 1];
*/
public static function add($arr)
{
$className = get_called_class();
$fieldKey = null;
foreach ($arr as $field => $id) {
if ($fieldKey) {
$fieldKey = $fieldKey . '-' . $field;
} else {
$fieldKey = $field;
}
}
self::$fieldAndIds[$className][$fieldKey][] = $arr;
}
public static function get($arr)
{
$name = get_called_class();
$key = self::encode($arr, $name);
return isset(self::$buffer[$key]) ? self::$buffer[$key] : null;
}
public static function first($arr)
{
$name = get_called_class();
$key = self::encode($arr, $name);
if (!isset(self::$buffer[$key])) {
return null;
}
if (count(self::$buffer[$key]) < 1) {
return null;
}
return self::$buffer[$key][0];
}
public static function loadBuffers()
{
$t1 = microtime(true);
$name = get_called_class();
if (!isset(self::$fieldAndIds[$name])) {
return;
}
$allFields = self::$fieldAndIds[$name];
// 1. 将这个$name的所有记录全部一次查出来
foreach ($allFields as $fieldSplice => $allField) {
// 将同一种相关字段查询的集合起来一次查询完毕. 比如:
// UsersRobotsChild::where('user_id', 15)->where('child_id', 100)->first();
// UsersRobotsChild::where('user_id', 16)->where('child_id', 201)->first();
// 经过下面的操作之后会变成这样:
// $valuesOfField = [
// 'user_id' => [15, 16],
// 'child_id' => [100, 201'],
// ];
$valuesOfField = [];
$orderBy = [];
$isContinueQueryDb = false;
foreach ($allField as $singleFieldAndId) {
$key = self::encode($singleFieldAndId, $name);
if (array_key_exists($key, self::$buffer)) {
continue;
}
foreach ($singleFieldAndId as $field => $id) {
if ($field == 'order_by') {
$orderBy = $id;
continue;
}
$isContinueQueryDb = true;
$valuesOfField[$field][] = $id;
$valuesOfField[$field] = array_unique($valuesOfField[$field]);
}
}
if (!$isContinueQueryDb) {
return;
}
// 拿到$valuesOfField后, 做一次查询
// $values = UsersRobotsChild::whereIn('user_id', $valuesOfField['user_id'])
// ->whereIn('child_id', $valuesOfField['child_id'])
// ->get();
$values = $name::deferredField($valuesOfField, $orderBy)->get();
$fields = explode('-', $fieldSplice);
// 3. 将结果写入到Buffer里
foreach ($values as $value) {
// 下面这段程序的目的是:
// $values查出来是个集合, 现在需要根据查出来的数据encode出来一个key, 然后把结果放入到buffer中.
$fieldAndId = [];
foreach ($fields as $field) {
if ($field == 'order_by') {
continue;
}
$fieldAndId[$field] = $value->$field;
}
$key = self::encode($fieldAndId, $name);
self::$buffer[$key][] = self::filter($value, $name);
}
}
// 2. 将$fieldAndIds这个key的记录清空
if (isset(self::$fieldAndIds[$name])) {
unset(self::$fieldAndIds[$name]);
}
$t2 = microtime(true);
\Log::info((($t2 - $t1) * 1000));
}
public function scopeDeferredField($query, $fieldAndId, $orderBy)
{
foreach ($fieldAndId as $field => $id) {
if ($field == 'order_by') {
continue;
}
$query->whereIn($field, $id);
}
foreach ($orderBy as $key => $value) {
$query->orderBy($key, $value);
}
return $query;
}
private static function encode($arr, $name)
{
if (isset($arr['order_by'])) {
unset($arr['order_by']);
}
return $name . ' - ' . json_encode($arr);
}
private static function decode($key)
{
$arr = explode(' - ', $key);
if (count($arr) != 2) {
return null;
}
return json_decode($arr[1], true);
}
}
注意: 我这里的实现利用了PHP的特性, 每次请求完毕都会释放内存, 所以buffer我就没有管理. 如果使用其他语言实现时, 一定要注意要释放内存.