BM25 功能
BM25 功能可将原始文本转换为稀疏向量,并根据词汇相关性对文档进行评分,从而实现全文检索。它应用基于术语的匹配和频率感知加权,支持高效检索与查询术语密切匹配的文本文档。
作为一个本地文本函数,BM25 函数在 Milvus 内部运行,不需要模型推理或外部集成。它为基于文本的搜索场景提供了一种确定且透明的检索机制。
BM25 如何工作
BM25算法是一种基于术语的相关性评分算法,广泛应用于全文检索。在 Milvus 中,BM25 以稀疏检索管道的形式实现,可将文本转换为术语权重表示,并使用分布式稀疏索引检索前K 个文档。
整个工作流程由两条对称路径组成:文档摄取和查询文本处理,它们共享相同的文本分析逻辑。
文档摄取:从文本到稀疏表示
插入文档时,首先由分析器处理原始文本,将文本标记为单个术语。
例如,文档
"We are loving Milvus!"
可以分析为以下术语:
["we", "love", "milvus"]
然后,每个文档都会以词频(TF)表示,记录每个词在文档中出现的次数。举例来说
{
"we": 1,
"love": 1,
"milvus": 1
}
与此同时,Milvus 还会更新语料库级别的统计数据,包括
每个术语的文档频率(DF)
文档平均长度
将每个术语映射到包含该术语的文档的发布列表
将文档的 TF 表示插入稀疏嵌入(sparse embeddings),其中的术语张贴被划分到各个节点,以实现可扩展的检索。
查询文本处理:应用 IDF 加权
当发出基于文本的查询时,它将由在文档摄取过程中使用的同一分析器进行处理,以确保术语分割的一致性。
例如,查询
"who loves Milvus?"
可以分析为
["who", "love", "milvus"]
对于每个查询词,Milvus 会从语料库统计数据中查找其反向文档频率(IDF)。IDF 反映了一个术语在整个数据集中的信息量:罕见术语的权重较高,而常见术语的权重较低。
从概念上讲,这会产生一组 IDF 加权的查询词,例如
{
"who": 0.1,
"love": 0.5,
"milvus": 1.2
}
BM25 评分和前 K 检索
BM25 通过计算基于匹配查询词的相关性得分来对文档进行排序。评分在术语层面进行,并在文档层面汇总。
术语级评分
对于文档中出现的每个查询词,BM25 都会计算一个词级得分:
term_score =
IDF(term) ×
TF_boost(term, document, k1) ×
length_normalization(document, b)
其中
IDF(term)反映了术语在 Collections 中的罕见程度
TF_boost(..., k1)随着术语频率的增加而增加,但随着频率的增加而饱和
length_normalization(..., b)根据文档长度调整得分
文档级评分和 Top-K 检索
文档的最终得分是所有匹配查询词的词级得分之和:
document_score =
sum of term_score over all matched query terms
文档按其最终得分排序,并返回得分最高的前 K 个文档。
开始使用前
在使用 BM25 功能之前,请规划好你的 Collections Schema,确保它支持词法全文检索:
原始内容的文本字段
您的 Collections 必须包含一个
VARCHAR字段来存储原始文本。该字段是将被处理用于全文搜索的文本来源。文本字段的分析器
文本字段必须启用分析器。分析器定义了在 BM25 函数计算词汇相关性之前,如何对文本进行标记化和规范化。
默认情况下,Milvus 提供一个内置分析器,根据空白和标点符号对文本进行标记化。如果你的应用程序需要自定义标记化或规范化行为,你可以定义一个自定义分析器。有关详情,请参阅 "根据用例选择合适的分析器"。
用于 BM25 输出的稀疏向量
您的 Collections 必须包含一个
SPARSE_FLOAT_VECTOR字段,用于存储 BM25 函数生成的稀疏表示。该字段用于全文搜索时的索引和检索。
在弄清这些 Schema 层面的注意事项后,继续创建 Collections 并使用 BM25 函数。
步骤 1:使用 BM25 函数创建 Collections
要使用 BM25 函数,必须在创建 Collections 时对其进行定义。该函数将成为 Collections Schema 的一部分,并在数据插入和搜索时自动应用。
定义 Schema 字段
您的 Collections 模式必须包含至少三个必填字段:
主字段:唯一标识 Collections 中的每个实体。
文本字段(
VARCHAR):存储原始文本文档。必须设置enable_analyzer=True,以便 Milvus 处理文本,进行 BM25 相关性排序。默认情况下,Milvus 使用standard分析器进行文本分析。要配置不同的分析器,请参阅分析器概述。稀疏向量场(
SPARSE_FLOAT_VECTOR):存储由 BM25 函数自动生成的稀疏嵌入。
from pymilvus import MilvusClient, DataType, Function, FunctionType
client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
schema = client.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True) # Primary field
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=1000, enable_analyzer=True) # Text field
schema.add_field(field_name="sparse", datatype=DataType.SPARSE_FLOAT_VECTOR) # Sparse vector field; no dim required for sparse vectors
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder()
.build();
schema.addField(AddFieldReq.builder()
.fieldName("id")
.dataType(DataType.Int64)
.isPrimaryKey(true)
.autoID(true)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("text")
.dataType(DataType.VarChar)
.maxLength(1000)
.enableAnalyzer(true)
.build());
schema.addField(AddFieldReq.builder()
.fieldName("sparse")
.dataType(DataType.SparseFloatVector)
.build());
import (
"context"
"fmt"
"github.com/milvus-io/milvus/client/v2/column"
"github.com/milvus-io/milvus/client/v2/entity"
"github.com/milvus-io/milvus/client/v2/index"
"github.com/milvus-io/milvus/client/v2/milvusclient"
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
milvusAddr := "http://localhost:19530"
token := "root:Milvus"
client, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
Address: milvusAddr,
APIKey: token
})
if err != nil {
fmt.Println(err.Error())
// handle error
}
defer client.Close(ctx)
schema := entity.NewSchema()
schema.WithField(entity.NewField().
WithName("id").
WithDataType(entity.FieldTypeInt64).
WithIsPrimaryKey(true).
WithIsAutoID(true),
).WithField(entity.NewField().
WithName("text").
WithDataType(entity.FieldTypeVarChar).
WithEnableAnalyzer(true).
WithMaxLength(1000),
).WithField(entity.NewField().
WithName("sparse").
WithDataType(entity.FieldTypeSparseVector),
)
import { MilvusClient, DataType } from "@zilliz/milvus2-sdk-node";
const address = "http://localhost:19530";
const token = "root:Milvus";
const client = new MilvusClient({address, token});
const schema = [
{
name: "id",
data_type: DataType.Int64,
is_primary_key: true,
},
{
name: "text",
data_type: "VarChar",
enable_analyzer: true,
enable_match: true,
max_length: 1000,
},
{
name: "sparse",
data_type: DataType.SparseFloatVector,
},
];
console.log(res.results)
export schema='{
"autoId": true,
"enabledDynamicField": false,
"fields": [
{
"fieldName": "id",
"dataType": "Int64",
"isPrimary": true
},
{
"fieldName": "text",
"dataType": "VarChar",
"elementTypeParams": {
"max_length": 1000,
"enable_analyzer": true
}
},
{
"fieldName": "sparse",
"dataType": "SparseFloatVector"
}
]
}'
定义 BM25 函数
BM25 函数将标记化文本转换为支持 BM25 评分的稀疏向量。
定义该函数并将其添加到 Schema 中:
bm25_function = Function(
name="text_bm25_emb", # Function name
input_field_names=["text"], # Name of the VARCHAR field containing raw text data
output_field_names=["sparse"], # Name of the SPARSE_FLOAT_VECTOR field reserved to store generated embeddings
function_type=FunctionType.BM25, # Set to `BM25`
)
schema.add_function(bm25_function)
import io.milvus.common.clientenum.FunctionType;
import io.milvus.v2.service.collection.request.CreateCollectionReq.Function;
import java.util.*;
schema.addFunction(Function.builder()
.functionType(FunctionType.BM25)
.name("text_bm25_emb")
.inputFieldNames(Collections.singletonList("text"))
.outputFieldNames(Collections.singletonList("sparse"))
.build());
function := entity.NewFunction().
WithName("text_bm25_emb").
WithInputFields("text").
WithOutputFields("sparse").
WithType(entity.FunctionTypeBM25)
schema.WithFunction(function)
const functions = [
{
name: 'text_bm25_emb',
description: 'bm25 function',
type: FunctionType.BM25,
input_field_names: ['text'],
output_field_names: ['sparse'],
params: {},
},
];
export schema='{
"autoId": true,
"enabledDynamicField": false,
"fields": [
{
"fieldName": "id",
"dataType": "Int64",
"isPrimary": true
},
{
"fieldName": "text",
"dataType": "VarChar",
"elementTypeParams": {
"max_length": 1000,
"enable_analyzer": true
}
},
{
"fieldName": "sparse",
"dataType": "SparseFloatVector"
}
],
"functions": [
{
"name": "text_bm25_emb",
"type": "BM25",
"inputFieldNames": ["text"],
"outputFieldNames": ["sparse"],
"params": {}
}
]
}'
配置索引
用必要的字段和内置函数定义模式 Schema 后,为你的 Collections 设置索引。
index_params = client.prepare_index_params()
index_params.add_index(
field_name="sparse",
index_type="SPARSE_INVERTED_INDEX",
metric_type="BM25",
params={
"inverted_index_algo": "DAAT_MAXSCORE",
"bm25_k1": 1.2,
"bm25_b": 0.75
}
)
import io.milvus.v2.common.IndexParam;
Map<String,Object> params = new HashMap<>();
params.put("inverted_index_algo", "DAAT_MAXSCORE");
params.put("bm25_k1", 1.2);
params.put("bm25_b", 0.75);
List<IndexParam> indexes = new ArrayList<>();
indexes.add(IndexParam.builder()
.fieldName("sparse")
.indexType(IndexParam.IndexType.AUTOINDEX)
.metricType(IndexParam.MetricType.BM25)
.extraParams(params)
.build());
indexOption := milvusclient.NewCreateIndexOption("my_collection", "sparse",
index.NewAutoIndex(entity.MetricType(entity.BM25)))
.WithExtraParam("inverted_index_algo", "DAAT_MAXSCORE")
.WithExtraParam("bm25_k1", 1.2)
.WithExtraParam("bm25_b", 0.75)
const index_params = [
{
field_name: "sparse",
metric_type: "BM25",
index_type: "SPARSE_INVERTED_INDEX",
params: {
"inverted_index_algo": "DAAT_MAXSCORE",
"bm25_k1": 1.2,
"bm25_b": 0.75
}
},
];
export indexParams='[
{
"fieldName": "sparse",
"metricType": "BM25",
"indexType": "AUTOINDEX",
"params":{
"inverted_index_algo": "DAAT_MAXSCORE",
"bm25_k1": 1.2,
"bm25_b": 0.75
}
}
]'
创建 Collections
现在使用定义的 Schema 和索引参数创建 Collections:
client.create_collection(
collection_name='my_collection',
schema=schema,
index_params=index_params
)
import io.milvus.v2.service.collection.request.CreateCollectionReq;
CreateCollectionReq requestCreate = CreateCollectionReq.builder()
.collectionName("my_collection")
.collectionSchema(schema)
.indexParams(indexes)
.build();
client.createCollection(requestCreate);
err = client.CreateCollection(ctx,
milvusclient.NewCreateCollectionOption("my_collection", schema).
WithIndexOptions(indexOption))
if err != nil {
fmt.Println(err.Error())
// handle error
}
await client.create_collection(
collection_name: 'my_collection',
schema: schema,
index_params: index_params,
functions: functions
);
export CLUSTER_ENDPOINT="http://localhost:19530"
export TOKEN="root:Milvus"
curl --request POST \
--url "${CLUSTER_ENDPOINT}/v2/vectordb/collections/create" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d "{
\"collectionName\": \"my_collection\",
\"schema\": $schema,
\"indexParams\": $indexParams
}"
一旦创建了具有 BM25 功能的 Collections,就可以插入文本并根据文本查询执行词法搜索。
步骤 2:将文本数据插入 Collections
设置好集合和索引后,就可以插入文本数据了。在此过程中,您只需提供原始文本。我们之前定义的 BM25 函数会自动为每个文本条目生成稀疏向量。
client.insert('my_collection', [
{'text': 'information retrieval is a field of study.'},
{'text': 'information retrieval focuses on finding relevant information in large datasets.'},
{'text': 'data mining and information retrieval overlap in research.'},
])
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.milvus.v2.service.vector.request.InsertReq;
Gson gson = new Gson();
List<JsonObject> rows = Arrays.asList(
gson.fromJson("{\"text\": \"information retrieval is a field of study.\"}", JsonObject.class),
gson.fromJson("{\"text\": \"information retrieval focuses on finding relevant information in large datasets.\"}", JsonObject.class),
gson.fromJson("{\"text\": \"data mining and information retrieval overlap in research.\"}", JsonObject.class)
);
client.insert(InsertReq.builder()
.collectionName("my_collection")
.data(rows)
.build());
// go
await client.insert({
collection_name: 'my_collection',
data: [
{'text': 'information retrieval is a field of study.'},
{'text': 'information retrieval focuses on finding relevant information in large datasets.'},
{'text': 'data mining and information retrieval overlap in research.'},
]);
curl --request POST \
--url "${CLUSTER_ENDPOINT}/v2/vectordb/entities/insert" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
-d '{
"data": [
{"text": "information retrieval is a field of study."},
{"text": "information retrieval focuses on finding relevant information in large datasets."},
{"text": "data mining and information retrieval overlap in research."}
],
"collectionName": "my_collection"
}'
第 3 步:使用文本查询进行搜索
将数据插入 Collections 后,您就可以使用原始文本查询执行全文检索了。Milvus 会自动将你的查询转换成稀疏向量,并使用 BM25 算法对匹配的搜索结果进行排序,然后返回 topK (limit) 结果。
search_params = {
}
res = client.search(
collection_name='my_collection',
data=['whats the focus of information retrieval?'],
anns_field='sparse',
output_fields=['text'], # Fields to return in search results; sparse field cannot be output
limit=3,
search_params=search_params
)
print(res)
import io.milvus.v2.service.vector.request.SearchReq;
import io.milvus.v2.service.vector.request.data.EmbeddedText;
import io.milvus.v2.service.vector.response.SearchResp;
Map<String,Object> searchParams = new HashMap<>();
SearchResp searchResp = client.search(SearchReq.builder()
.collectionName("my_collection")
.data(Collections.singletonList(new EmbeddedText("whats the focus of information retrieval?")))
.annsField("sparse")
.topK(3)
.searchParams(searchParams)
.outputFields(Collections.singletonList("text"))
.build());
annSearchParams := index.NewCustomAnnParam()
resultSets, err := client.Search(ctx, milvusclient.NewSearchOption(
"my_collection", // collectionName
3, // limit
[]entity.Vector{entity.Text("whats the focus of information retrieval?")},
).WithConsistencyLevel(entity.ClStrong).
WithANNSField("sparse").
WithAnnParam(annSearchParams).
WithOutputFields("text"))
if err != nil {
fmt.Println(err.Error())
// handle error
}
for _, resultSet := range resultSets {
fmt.Println("IDs: ", resultSet.IDs.FieldData().GetScalars())
fmt.Println("Scores: ", resultSet.Scores)
fmt.Println("text: ", resultSet.GetColumn("text").FieldData().GetScalars())
}
await client.search(
collection_name: 'my_collection',
data: ['whats the focus of information retrieval?'],
anns_field: 'sparse',
output_fields: ['text'],
limit: 3,
)
curl --request POST \
--url "${CLUSTER_ENDPOINT}/v2/vectordb/entities/search" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
--header "Request-Timeout: 10" \
--data-raw '{
"collectionName": "my_collection",
"data": [
"whats the focus of information retrieval?"
],
"annsField": "sparse",
"limit": 3,
"outputFields": [
"text"
],
"searchParams":{
"params":{}
}
}'