基类: BasePydanticReader
从Elasticsearch/Opensearch索引中读取文档。
这些文档随后可用于下游的Llama Index数据结构中。
参数:
| 名称 |
类型 |
描述 |
默认值 |
endpoint
|
str
|
|
required
|
index
|
str
|
|
required
|
httpx_client_args
|
dict
|
|
None
|
Source code in llama-index-integrations/readers/llama-index-readers-elasticsearch/llama_index/readers/elasticsearch/base.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 | class ElasticsearchReader(BasePydanticReader):
"""
Read documents from an Elasticsearch/Opensearch index.
These documents can then be used in a downstream Llama Index data structure.
Args:
endpoint (str): URL (http/https) of cluster
index (str): Name of the index (required)
httpx_client_args (dict): Optional additional args to pass to the `httpx.Client`
"""
is_remote: bool = True
endpoint: str
index: str
httpx_client_args: Optional[dict] = None
_client: Any = PrivateAttr()
def __init__(
self, endpoint: str, index: str, httpx_client_args: Optional[dict] = None
):
"""Initialize with parameters."""
super().__init__(
endpoint=endpoint, index=index, httpx_client_args=httpx_client_args
)
import_err_msg = """
`httpx` package not found. Install via `pip install httpx`
"""
try:
import httpx
except ImportError:
raise ImportError(import_err_msg)
self._client = httpx.Client(base_url=endpoint, **(httpx_client_args or {}))
@classmethod
def class_name(cls) -> str:
return "ElasticsearchReader"
def load_data(
self,
field: str,
query: Optional[dict] = None,
embedding_field: Optional[str] = None,
metadata_fields: Optional[List[str]] = None,
) -> List[Document]:
"""
Read data from the Elasticsearch index.
Args:
field (str): Field in the document to retrieve text from
query (Optional[dict]): Elasticsearch JSON query DSL object.
For example:
{"query": {"match": {"message": {"query": "this is a test"}}}}
embedding_field (Optional[str]): If there are embeddings stored in
this index, this field can be used
to set the embedding field on the returned Document list.
metadata_fields (Optional[List[str]]): Fields used as metadata. Default
is all fields in the document except those specified by the
field and embedding_field parameters.
Returns:
List[Document]: A list of documents.
"""
res = self._client.post(f"{self.index}/_search", json=query).json()
documents = []
for hit in res["hits"]["hits"]:
doc_id = hit["_id"]
value = hit["_source"][field]
embedding = hit["_source"].get(embedding_field or "", None)
if metadata_fields:
metadata = {
k: v for k, v in hit["_source"].items() if k in metadata_fields
}
else:
hit["_source"].pop(field)
hit["_source"].pop(embedding_field or "", None)
metadata = hit["_source"]
documents.append(
Document(id_=doc_id, text=value, metadata=metadata, embedding=embedding)
)
return documents
|
加载数据
load_data(field: str, query: Optional[dict] = None, embedding_field: Optional[str] = None, metadata_fields: Optional[List[str]] = None) -> List[Document]
从Elasticsearch索引读取数据。
参数:
| 名称 |
类型 |
描述 |
默认值 |
field
|
str
|
|
required
|
query
|
Optional[dict]
|
Elasticsearch JSON查询DSL对象。
例如:
{"query": {"match": {"message": {"query": "this is a test"}}}}
|
None
|
embedding_field
|
Optional[str]
|
如果该索引中存储了嵌入向量,可以使用此字段来设置返回文档列表上的嵌入字段。
|
None
|
metadata_fields
|
Optional[List[str]]
|
用作元数据的字段。默认情况下是文档中除field和embedding_field参数指定的字段外的所有字段。
|
None
|
返回:
Source code in llama-index-integrations/readers/llama-index-readers-elasticsearch/llama_index/readers/elasticsearch/base.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 | def load_data(
self,
field: str,
query: Optional[dict] = None,
embedding_field: Optional[str] = None,
metadata_fields: Optional[List[str]] = None,
) -> List[Document]:
"""
Read data from the Elasticsearch index.
Args:
field (str): Field in the document to retrieve text from
query (Optional[dict]): Elasticsearch JSON query DSL object.
For example:
{"query": {"match": {"message": {"query": "this is a test"}}}}
embedding_field (Optional[str]): If there are embeddings stored in
this index, this field can be used
to set the embedding field on the returned Document list.
metadata_fields (Optional[List[str]]): Fields used as metadata. Default
is all fields in the document except those specified by the
field and embedding_field parameters.
Returns:
List[Document]: A list of documents.
"""
res = self._client.post(f"{self.index}/_search", json=query).json()
documents = []
for hit in res["hits"]["hits"]:
doc_id = hit["_id"]
value = hit["_source"][field]
embedding = hit["_source"].get(embedding_field or "", None)
if metadata_fields:
metadata = {
k: v for k, v in hit["_source"].items() if k in metadata_fields
}
else:
hit["_source"].pop(field)
hit["_source"].pop(embedding_field or "", None)
metadata = hit["_source"]
documents.append(
Document(id_=doc_id, text=value, metadata=metadata, embedding=embedding)
)
return documents
|