背景
之前公司有个需求,需要存储多值
的标签+分值
类型的数据,并用来检索,其中标签
是有限的,分值
最大为1,示例数据如下:
id | 字段值 |
---|---|
doc1 | (标签1,0.95) (标签2,0.8) (标签3,0.6) |
doc2 | (标签2,0.88) (标签3,0.73) (标签4,0.4) |
doc3 | (标签1,0.0.96) (标签3,0.85) (标签5,0.77) |
标签数值化
针对于这种需求,我们提供了第一版的解决方案,将数据的标签和分值进行整合,将标签数值化,那么整体数据变换为多值
的数值
。
标签 | 对应数值标签 |
---|---|
标签1 | 01 |
标签2 | 02 |
标签3 | 03 |
标签4 | 04 |
标签5 | 05 |
在实际使用过程中,为了方便,我们将小数的分值也*100,变换为了一个整数类型,那么最终的数据就类似于如下:
id | 字段值 |
---|---|
doc1 | 1095,2080,3060 |
doc2 | 2088,3073,4040 |
doc3 | 1096,3085,5077 |
这样我们就能方便的使用范围查询来召回我们需要的标签及分值的数据。比如,查询标签1,分值在0.8-1范围内的数据,查询条件写成[1080 TO 1100]即可,但是存在一个缺陷,无法按照分值进行排序。
Payload改造
偶然的机会,在社区看到一位朋友的回复,可以通过lucene的payload机制来进行查询及排序,刚好适应这种场景。那位朋友介绍可以通过自定义一个新的query plugin,我这边考虑采用script score的形式去实现,实现了一个script plugin,最终的结果还是比较满意的,能近乎完美的满足我们的需求。
1. 思路
es的底层已经对lucene的score方法进行了完善的封装,适应绝大多数用途,这里使用的script plugin的核心是ScoreScript
,只需要在ScoreScript
中添加对doc
的payload
获取及分值计算就可以了。
2. 实现效果展示
2.1 定义索引
PUT payload_tags
{
"mappings": {
"properties": {
"tags": {
"type": "text",
"term_vector": "with_positions_payloads",
"analyzer": "payload_delimiter"
}
}
},
"settings": {
"analysis": {
"analyzer": {
"payload_delimiter": {
"tokenizer": "whitespace",
"filter": [ "delimited_payload" ]
}
}
}
}
}
2.2 样例数据
POST /payload_tags/_doc/1
{
"tags": "标签1|0.95 标签2|0.8 标签3|0.6",
"text": "北京"
}
POST /payload_tags/_doc/2
{
"tags": "标签2|0.88 标签3|0.73 标签4|0.4",
"text": "北京北京"
}
POST /payload_tags/_doc/3
{
"tags": "标签1|0.96 标签3|0.85 标签5|0.77",
"text": "北京北京北京"
}
2.3 基本用法
查询 标签1的数据,按照分值排序
GET payload_tags/_search
{
"query": {
"script_score": {
"query": {
"match": {
"tags": "标签1"
}
},
"script": {
"source": "payload_rank",
"lang": "payload",
"params": {
"field": "tags",
"term": "标签1"
}
}
}
}
}
示例结果:
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 0.96,
"hits" : [
{
"_index" : "payload_tags",
"_type" : "_doc",
"_id" : "3",
"_score" : 0.96,
"_source" : {
"tags" : "标签1|0.96 标签3|0.85 标签5|0.77",
"text" : "北京北京北京"
}
},
{
"_index" : "payload_tags",
"_type" : "_doc",
"_id" : "1",
"_score" : 0.95,
"_source" : {
"tags" : "标签1|0.95 标签2|0.8 标签3|0.6",
"text" : "北京"
}
}
]
}
}
2.4 function score 实现其他关键词的_score与payload分值的联合计算
GET payload_tags/_search
{
"query": {
"function_score": {
"query": {
"match": {
"text": "北京"
}
},
"functions": [
{
"script_score": {
"script": {
"source": "payload_rank",
"lang": "payload",
"params": {
"field": "tags",
"term": "标签1"
}
}
}
}
]
}
}
}
示例结果:
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 0.36389458,
"hits" : [
{
"_index" : "payload_tags",
"_type" : "_doc",
"_id" : "3",
"_score" : 0.36389458,
"_source" : {
"tags" : "标签1|0.96 标签3|0.85 标签5|0.77",
"text" : "北京北京北京"
}
},
{
"_index" : "payload_tags",
"_type" : "_doc",
"_id" : "1",
"_score" : 0.31894928,
"_source" : {
"tags" : "标签1|0.95 标签2|0.8 标签3|0.6",
"text" : "北京"
}
},
{
"_index" : "payload_tags",
"_type" : "_doc",
"_id" : "2",
"_score" : 0.0,
"_source" : {
"tags" : "标签2|0.88 标签3|0.73 标签4|0.4",
"text" : "北京北京"
}
}
]
}
}
2.5 script_score 实现仅payload分值排序
GET payload_tags/_search
{
"query": {
"script_score": {
"query": {
"match": {
"text": "北京"
}
},
"script": {
"source": "payload_rank",
"lang": "payload",
"params": {
"field": "tags",
"term": "标签1"
}
}
}
}
}
示例结果:
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 0.96,
"hits" : [
{
"_index" : "payload_tags",
"_type" : "_doc",
"_id" : "3",
"_score" : 0.96,
"_source" : {
"tags" : "标签1|0.96 标签3|0.85 标签5|0.77",
"text" : "北京北京北京"
}
},
{
"_index" : "payload_tags",
"_type" : "_doc",
"_id" : "1",
"_score" : 0.95,
"_source" : {
"tags" : "标签1|0.95 标签2|0.8 标签3|0.6",
"text" : "北京"
}
},
{
"_index" : "payload_tags",
"_type" : "_doc",
"_id" : "2",
"_score" : 0.0,
"_source" : {
"tags" : "标签2|0.88 标签3|0.73 标签4|0.4",
"text" : "北京北京"
}
}
]
}
}
2.6 min_score 实现对分值的过滤
下面这个请求,查询包含 标签2 的数据, 使用 标签1 的分值进行排序,同时过滤分值0.2以下的数据
GET payload_tags/_search
{
"query": {
"script_score": {
"query": {
"match": {
"tags": "标签2"
}
},
"script": {
"source": "payload_rank",
"lang": "payload",
"params": {
"field": "tags",
"term": "标签1"
}
},
"min_score":0.2
}
}
}
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 0.95,
"hits" : [
{
"_index" : "payload_tags",
"_type" : "_doc",
"_id" : "1",
"_score" : 0.95,
"_source" : {
"tags" : "标签1|0.95 标签2|0.8 标签3|0.6",
"text" : "北京"
}
}
]
}
}
3. 实现过程
- 定义一个Plugin类,继承
Plugin
父类,这是es所有自定义插件的入口,以便在es启动时加载到guice
中。同时实现ScriptPlugin
接口,这个是Script插件的入口,使组件正确的加载到Script模块中。如下,只需要重写一个方法。
private final SetOnce<PayloadScriptEngine> payloadsScriptEngine = new SetOnce<>();
@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
payloadsScriptEngine.set(new PayloadScriptEngine());
return payloadsScriptEngine.get();
}
- 定义一个
PayloadScriptEngine
类,需要实现ScriptEngine
接口,将脚本引擎正确的载入到es的模块中。 介绍一下ScriptEngine
接口,内容很少,只有如下内容,都已经加上注释了,应该很好理解。
/**
* Script名称
*/
String getType();
/**
* 编译脚本,核心内容在这里,
* @param name Script查询中的name
* @param source Script查询中的source
* @param context scipt的作用域 有很多,比如score filter field等,这里我们只用到score
* @param params 编译时的参数,这里我们不会用到
* @return 返回一个ScriptFactory,这个是我们实现的Script的工厂类
*/
<FactoryType> FactoryType compile(
String name,
String code,
ScriptContext<FactoryType> context,
Map<String, String> params
);
/**
* 需要使用后停用的资源应该在这里关闭
*/
@Override
default void close() throws IOException {}
/**
* 这里配置script支持的作用域,这里我们只需要支持ScoreScript.CONTEXT就可以了
*/
Set<ScriptContext<?>> getSupportedContexts();
-
compile()
方法中,需要返回一个ScriptFactory
,需要做的是实现ScoreScript.Factory
和
ScriptFactory
。
private static class PayloadFactory implements ScoreScript.Factory,
ScriptFactory {
@Override
public boolean isResultDeterministic() {
return true;
}
/**
* ScoreScript 实例的工厂类
*/
@Override
public ScoreScript.LeafFactory newFactory(
Map<String, Object> params,
SearchLookup lookup
) {
// 请注意这里,我们需要将params传进去,因为需要使用params中的两个值 field 和 term,以确定我们获取什么字段的哪个term的payload信息。
return new PayloadLeafFactory(params, lookup);
}
}
-
PayloadLeafFactory
是自定义的一个工厂类,实现ScoreScript.LeafFactory
接口,用来生成ScroreScript实例,核心方法为newInstance(LeafReaderContext context)
,接口详解如下:
/**
* Return {@code true} if the script needs {@code _score} calculated, or {@code false} otherwise.
*/
boolean needs_score();
// 生成ScoreScript实例 ScoreScript是对
ScoreScript newInstance(LeafReaderContext ctx) throws IOException;
- 最后的,也是本插件最核心的
ScoreScript
,这个类会遍历每个doc,然后我们可以在其中获取每个doc的payload值,然后依据payload返回相关的分数。
new ScoreScript(params, lookup, context) {
// payload值,放到成员变量,方便计算完成后返回相关值
BytesRef payload = null;
/*
* 每次一个docid,在这里会遍历符合条件的所有id
*/
@Override
public void setDocument(int docid) {
try {
// 根据docid和field获取terms的信息,这里会取到多个term
Terms terms = context.reader().getTermVector(docid, field);
if (terms != null) {
TermsEnum termsEnum = terms.iterator();
if (termsEnum.seekExact(new BytesRef(term.getBytes()))) { // 获取我们需要关注的term
PostingsEnum postings = termsEnum.postings(null, PostingsEnum.ALL);
postings.nextDoc();
postings.nextPosition();
// 这里获取payload
payload = postings.getPayload();
// 查到之后就跳出方法
return;
}
}
} catch (IOException e) {
logger.debug("docid:{},field:{},term:{},get payload error!,stack:{}", docid, field, term, e.getStackTrace());
}
// 没有查到的情况下,将payload置为null
payload = null;
}
/**
* 这里会生成对应id的分值
*/
@Override
public double execute(ExplanationHolder explanation) {
// payload 为null,可能是没有这个term,或者其他情况,分值置为0
if (payload == null) {
return 0.0d;
}
// 否则置为payload存储的数值
return PayloadHelper.decodeFloat(payload.bytes, payload.offset);
}
};
- 综上,整体的插件的实现已基本完成。
补充优化
该文章的插件后续有过一些优化,在此贴出相关代码,主要是调整term对应payload的获取方式,修改5对应的内容
public ScoreScript newInstance(LeafReaderContext context) throws IOException {
final PostingsEnum postings = context.reader().postings(new Term(field, term), PostingsEnum.PAYLOADS);
if (postings == null) {
return new ScoreScript(params, lookup, context) {
@Override
public double execute(ExplanationHolder explanation) {
return 0;
}
};
}
return new ScoreScript(params, lookup, context) {
int currentDocId = -1;
/*
* 每次一个docid,在这里会遍历符合条件的所有id
*/
@Override
public void setDocument(int docid) {
if (postings.docID() < docid) {
try {
postings.advance(docid);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
currentDocId = docid;
}
/**
* 这里会生成对应id的分值
*/
@Override
public double execute(ExplanationHolder explanation) {
double score = 0;
try {
if (postings.docID() == currentDocId) {
postings.nextPosition();
// 这里获取payload,如果同一个term出现多次,也仅取了第一个的数值.可以取多次,根据场景需要再调整
BytesRef payload = postings.getPayload();
score = PayloadHelper.decodeFloat(payload.bytes, payload.offset);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return score;
}
};
}
使用方式
- 将插件编译完成,放置于plugins目录下
- 启动es集群
- 接下来就可以按照示例,愉悦的使用payload进行检索和排序了
结语
小小插件虽然简单,也颇费心思,感谢阅读!各位朋友如有其它见解和想法也请留言讨论,谢谢~