泽拓昆仑Klustron向量数据管理
泽拓昆仑Klustron向量数据管理
自OpenAI于2022年发布 ChatGPT开始,生成式AI引爆了各大行业,全球各大有研究实力的公司及机构都在持续投入巨资在此领域深耕。随着各种AGI应用的落地,向量数据库也将会得到广泛应用。现有的向量数据库不仅有专用的向量数据库比如Milvus, Pinecone,也有关系型数据库PostgreSQL的PGVector, PGEmbedding扩展。
PostgreSQL以插件方式提供了向量数据库功能pgvector,泽拓昆仑Klustron分布式数据库从1.3版本开始,支持通过挂载PGVector来实现向量数据管理,并且通过支持挂载PostgresML来提供库内机器学习功能。
本文先介绍Klustron的向量数据管理概括,然后以示例方式来直观的展示其基本的使用方法。
01 泽拓昆仑Klustron融合数据管理能力简介
泽拓昆仑(Klustron)分布式数据库根植于MySQL和PostgreSQL开源生态,与开源生态完全融合和兼容,不仅兼容MySQL和PostgreSQL的各类应用系统,还兼容MySQL和PostgreSQL开源社区的第三方功能扩展组件,这使得Klustron成为 一个多模型融合的AI数据底座,可以支撑上层丰富多样的应用场景。
通过利用PostgreSQL社区的功能组件,Klustron的融合能力可以做到:
- 融合多模型数据:在支持关系数据模型的基础之上支持 JSON, GIS,向量三种复杂数据类型
- 融合多种来源的数据:集群内部存储的数据,外部数据库,外部对象存储
- 融合开源生态组件:PostGIS,PostgresML, PGVector,等等
- 融合用户自定义算法:多语言(python, java, perl, lua, javascript, PL/SQL)存储过程。
基于对机器学习,库内计算和向量数据管理的需求,Klustron成为一个强大的AI数据底座,让用户可以使用大量服务器的计算资源(包括CPU,内存,GPU),在数据库集群内部运行机器学习,大幅提升效率降低开销。
Klustron让这些组件突破了PostgreSQL受限于单台服务器硬件资源的扩展性难题,可以利用大量服务器的硬件资源实现数据管理和查询分析功能。
不仅如此,Klustron的多层级并行查询处理查询性能增强技术还可以显著提升OLAP分析任务、存储过程和机器学习等计算的性能,这些能力也是单机PostgreSQL不具备的。
02 向量数据管理的需求分析
让我们从通用的技术角度分析一下向量数据管理的需求和特点。目前大模型的embedding以及图片向量化的矢量,都包含数百个到几千个维度,每个维度的浮点数占用8个字节,也就是说每个向量的数据量大致位于1~16 KB这个范围,因此对于很多实际业务系统来说,管理的向量数据规模是TB到PB级别的,甚至更多。
针对向量数据的最常见的查询就是相似度(距离)查询,比如从1亿张图片中查找与某个目标图片最相似的前100个图片。翻译为技术语言就是:从1亿个向量中查找与目标向量距离最近的前100个向量。1亿张图片看似很多,但是如果业务场景是街头的摄像头抓拍的车辆和行人图片,对于中国的任何一个大城市来说,每天城市内所有摄像头拍摄的图片总量都可能达到甚至超过1亿张图片。
由于向量维度很高,而且向量数量通常较大,所以向量的相似度(距离)计算的计算量非常庞大。向量数据的存储规模和计算量都决定了单机服务器的计算资源很容易就会达到瓶颈,所以向量数据天然地需要分布式数据库系统来管理。
在实际业务场景中,通常需要多种数据类型共同使用,除了常见的数值类型,日期时间,字符串等之外,还有空间位置(GIS),JSON, 向量等类型。比如上述场景中,每个摄像头的ID,以及图片拍摄时间,都需要和图片的向量一起存储起来。为了缩小查询范围,通常可以加上拍摄时间范围,以及摄像头的信息作为过滤条件,包括摄像头的地理区域,所在的街道、行政区域名称,及其技术特征等数值过滤条件。于是实际的查询是一个表连接,查询结果是摄像头信息表cameras和图片向量表img_vecs, 过滤出符合条件的摄像头和符合条件的图片向量之后,join得到的结果。
也就是说,在关系数据库系统中管理向量数据,对于应用系统研发来说更加简单便利。如果使用独立的向量数据库和关系数据库,那么还需要在应用层实现跨库的表连接,应用系统研发和后续维护升级的工作量巨大。而且会因为跨库查询的数据不一致导致逻辑错误,这样的技术问题很难通用地低成本地彻底解决。
泽拓昆仑Klustron分布式数据库通过支持在计算节点中挂载PGVector,具备了PGVector向量数据管理功能。用户可以在同一个表中增加各种标量数据类型的列之外,还可以增加向量、GIS、JSON等复杂数据类型的列,并且在同一个事务中读写访问向量、GIS、JSON、以及各种标量数据类型。向量数据在Klustron集群中存储在Klustron-storage存储节点,而不是计算节点中。Klustron-1.3版本中,向量数据在存储节点中作为JSON数据来存储。
03 向量索引面临的挑战
在上述例子中,图片向量表img_vecs做行过滤然后与摄像头信息表cameras按照摄像头ID做join,得到的结果集含有向量列。但是如果要对这个向量列按照与目标向量的相关度排序,是无法直接使用img_vecs表的向量索引img_vecs_idx的。这是因为img_vecs_idx中的一些向量被查询条件过滤掉了,直接用它返回的向量也有一些不符合其他查询条件。所以经典的HNSW等向量索引算法需要特殊的改造然后才能适应关系数据库中的灵活应用。
另一个问题是,OLTP关系数据库中索引的典型实现,包括在PGVector中也是如此,就是要在主行中存储一个表的每行的每个字段,然后在索引行中存储每个主行的对应于该索引定义的字段值,也就是每行的每个字段如果用在N个索引中,那么一共需要存储(N+1)次。由于向量字段非常巨大,重复存储的空间开销非常显著。
考虑到以上问题,我们在Klustron-1.3.1 中没有提供向量索引功能,预计在Klustron-1.3.2中会提供一个可以解决上述问题的向量索引功能。Klustron集群的多层级并行查询处理能力可以部分解决大数据量情况下向量的相似度排序的性能问题,因为在理想情况下可以用一个集群的所有服务器的所有CPU执行同一个查询,可以在所有存储节点上面实现分布式的向量相似度排序,然后在计算节点归并排序结果完成最终全局排序。Klustron-1.3的计算节点可以自动完成这部分工作。
04 泽拓昆仑Klustron向量数据管理的功能用法示例
4.1 环境准备
本文所指的试验场景中,第一个试验场景只需要一套Klustron的实例,但第二个试验场景(问答知识库)需要手头有可用的openai key及网络有合适的连接配置可访问openai的API服务,并且为确保python代码可正常运行,请确认安装了python3 及相关的支持库
4.1.1 Klustron 安装配置
《略》
Klustron 环境说明:
- XPanel: http://192.168.0.152:40180/KunlunXPanel/#/cluster
- 计算节点:192.168.0.152 ,端口: 47001
- 存储节点(shard1):192.168.0.153, 端口:57003 (主)
- 存储节点(shard2):192.168.0.155, 端口:57005 (主)
- Klustron 安装在kl用户下
4.1.2 pgvector 配置确认
打开一个shell会话,连接至Klustron数据库的计算节点,并登录Klustron数据库,如下操作:
kl@kunlun1:~$ source /data/kl/env.sh
kl@kunlun1:~$ psql -h 192.168.0.152 -p 47001 postgres
postgres=# CREATE EXTENSION vector WITH SCHEMA public ;
4.2 基本实验
4.2.1 如下仅是对pgvector的简单使用操作示例:
创建一个存储向量类型的表(items),用于存储embeddings。
postgres=# CREATE TABLE items (
id bigserial PRIMARY KEY,
item text,
embedding vector(2)
);
将向量数据插入表中:
postgres=# INSERT INTO
items (item, embedding)
VALUES ('苹果', '[1, 1]'), ('香蕉', '[1.2, 0.8]'), ('猫', '[6, 0.4]');
查询刚刚插入的数据:
postgres=# select * from items ;
结果示例如下:
使用余弦相似度操作符 <=> 计算猫与香蕉及苹果之间的相似度:
postgres=# SELECT item, 1 - (embedding <=> '[6,0]') AS cosine_similarity
FROM items ORDER BY cosine_similarity DESC;
结果示例:
在上述结果中:
- 猫的结果为0.97,表示基本上最接近匹配;
- 苹果的结果为0.92,表示苹果与猫有一定相似;
- 香蕉的结果为0.90,表示香蕉与猫相似度更低。
4.2.2 pgvector之问答知识库场景
4.2.2.1 环境要求
本试验要求使用openai 的key,请确保手头有可用的api key,并且有适当渠道访问openai服务,或者可以自已搭一个LLM大模型环境,不受网络访问限制,但测试代码可能需要有一些修改。
4.2.2.2 数据准备
本实验以网上一个文档的文本内容为例,将其拆分并存储到Klustron数据库中。数据准备阶段的关键在于将专属领域知识转化为文本embedding,并有效地存储和匹配这些信息。通过利用LLM的语义理解能力,可以获得与特定领域相关的回答和建议。
先安装如下依赖库:
root@kunlun1:/home/kl# pip3 install openai==0.28 psycopg2 tiktoken requests beautifulsoup4 numpy
创建测试表(本文以rds_pg_help_docs为例),用于存储知识库内容:
kl@kunlun1:~$ source /data/kl/env.sh
kl@kunlun1:~$ psql -h 192.168.0.152 -p 47001 postgres
postgres=# CREATE TABLE rds_pg_help_docs (
id bigserial PRIMARY KEY,
title text, -- 文档标题
description text, -- 描述
doc_chunk text, -- 文档分块
token_size int, -- 文档分块字数
embedding vector(1024)); -- 文本嵌入信息
创建.py
文件(本文以knowledge_chunk_storage.py
为例),拆分知识库文档内容并存储到Klustron数据库中,示例代码如下:
knowledge_chunk_storage.py
---------------------------------------------
import openai
import psycopg2
import tiktoken
import requests
from bs4 import BeautifulSoup
EMBEDDING_MODEL = "text-embedding-ada-002"
tokenizer = tiktoken.get_encoding("cl100k_base")
# 连接Klustron数据库
conn = psycopg2.connect(database="postgres",
host="192.168.0.152",
user="abc",
password="abc",
port="47001")
conn.autocommit = True
# OpenAI的API Key
openai.api_key = "Secret API Key"
# 自定义拆分方法(仅为示例用途)
def get_text_chunks(text, max_chunk_size):
chunks_ = []
soup_ = BeautifulSoup(text, 'html.parser')
content = ''.join(soup_.strings).strip()
length = len(content)
start = 0
while start < length:
end = start + max_chunk_size
if end >= length:
end = length
chunk_ = content[start:end]
chunks_.append(chunk_)
start = end
return chunks_
# 指定需要拆分的网页
url = 'https://help.aliyun.com/document_detail/148038.html'
response = requests.get(url)
if response.status_code == 200:
# 获取网页内容
web_html_data = response.text
soup = BeautifulSoup(web_html_data, 'html.parser')
# 获取标题(H1标签)
title = soup.find('h1').text.strip()
# 获取描述(class为shortdesc的p标签内容)
description = soup.find('p', class_='shortdesc').text.strip()
# 拆分并存储
chunks = get_text_chunks(web_html_data, 500)
for chunk in chunks:
doc_item = {
'title': title,
'description': description,
'doc_chunk': chunk,
'token_size': len(tokenizer.encode(chunk))
}
query_embedding_response = openai.Embedding.create(
model=EMBEDDING_MODEL,
input=chunk,
)
doc_item['embedding'] = query_embedding_response['data'][0]['embedding']
cur = conn.cursor()
insert_query = '''
INSERT INTO rds_pg_help_docs
(title, description, doc_chunk, token_size, embedding) VALUES (%s, %s, %s, %s, %s);
'''
cur.execute(insert_query, (
doc_item['title'], doc_item['description'], doc_item['doc_chunk'], doc_item['token_size'],
doc_item['embedding']))
conn.commit()
else:
print('Failed to fetch web page')
注意:
- 替换 openai.api_key = "Secret API Key" 中的secret API key 为openai 的key;
- 替换 psycopg2.connect = …连接到你环境中的Klustron数据库。
运行python程序:
kl@kunlun1:~$ export all_proxy="socks5h://127.0.0.1:1080" #本地代理配置示例
kl@kunlun1:~$ python3 knowledge_chunk_storage.py
执行示例如下
进入数据库使用如下命令查看是否已将知识库文档内容拆分并存储为向量数据。
kl@kunlun1:~$ psql -h 192.168.0.152 -p 47001 postgres
postgres=# SELECT id,token_size,SUBSTRING(title FROM -1 FOR 30), SUBSTRING(description FROM -1 FOR 30), SUBSTRING(doc_chunk FROM -1 FOR 30),SUBSTRING(embedding FROM -1 FOR 50) from rds_pg_help_docs;
查询结果示例如下:
说明:因完整的行数据太长,上述查询SQL只截取了部分内容显示。
4.2.2.3 问答测试
在python项目中,创建.py
文件(本文以chatbot.py
为例),创建问题并与数据库中的知识库内容比较相似度,返回结果。
chatbot.py
------------------------------------------
import openai
import psycopg2
from psycopg2.extras import DictCursor
GPT_MODEL = "gpt-3.5-turbo"
EMBEDDING_MODEL = "text-embedding-ada-002"
GPT_COMPLETIONS_MODEL = "text-davinci-003"
MAX_TOKENS = 1024
# OpenAI的API Key
openai.api_key = "Secret API Key"
prompt = '如何创建一个RDS PostgreSQL实例'
prompt_response = openai.Embedding.create(
model=EMBEDDING_MODEL,
input=prompt,
)
prompt_embedding = prompt_response['data'][0]['embedding']
# 连接Klustron数据库
conn = psycopg2.connect(database="postgres",
host="192.168.0.152",
user="abc",
password="abc",
port="47001")
conn.autocommit = True
def answer(prompt_doc, prompt):
improved_prompt = f"""
按下面提供的文档和步骤来回答接下来的问题:
(1) 首先,分析文档中的内容,看是否与问题相关
(2) 其次,只能用文档中的内容进行回复,越详细越好,并且以markdown格式输出
(3) 最后,如果问题与RDS PostgreSQL不相关,请回复"我对RDS PostgreSQL以外的知识不是很了解"
文档:
\"\"\"
{prompt_doc}
\"\"\"
问题: {prompt}
"""
response = openai.Completion.create(
model=GPT_COMPLETIONS_MODEL,
prompt=improved_prompt,
temperature=0.2,
max_tokens=MAX_TOKENS
)
print(f"{response['choices'][0]['text']}\n")
similarity_threshold = 0.78
max_matched_doc_counts = 8
# 通过pgvector过滤出相似度大于一定阈值的文档块
similarity_search_sql = f'''
SELECT doc_chunk, token_size, 1 - (embedding <=> '{prompt_embedding}') AS similarity
FROM rds_pg_help_docs WHERE 1 - (embedding <=> '{prompt_embedding}') > {similarity_threshold} ORDER BY id LIMIT {max_matched_doc_counts};
'''
cur = conn.cursor(cursor_factory=DictCursor)
cur.execute(similarity_search_sql)
matched_docs = cur.fetchall()
total_tokens = 0
prompt_doc = ''
print('Answer: \n')
for matched_doc in matched_docs:
if total_tokens + matched_doc['token_size'] <= 1000:
prompt_doc += f"\n---\n{matched_doc['doc_chunk']}"
total_tokens += matched_doc['token_size']
continue
answer(prompt_doc,prompt)
total_tokens = 0
prompt_doc = ''
answer(prompt_doc,prompt)
注意:
- 替换 openai.api_key = "Secret API Key" 中的secret API key 为openai 的key;
- 替换 psycopg2.connect = …连接到你环境中的Klustron数据库。
运行Python程序:
kl@kunlun1:~$ python3 chatbot.py
在运行窗口看到类似如下的对应答案:
注意: 可以对拆分方法以及问题prompt进行优化,以获得更加准确、完善的回答,本文仅为示例。
泽拓产品开发团队提供了另外一个以文搜图的很有意思的关于pgvector的使用场景。
- 源代码链接:https://github.com/zettadb/techtalk/blob/main/pgvector%E7%9A%84%E5%BA%94%E7%94%A8%E5%92%8C%E5%8E%9F%E7%90%86/clip.ipynb
- 关于这个demo的描述,见:https://doc.klustron.com/zh/Application_and_principle_of_pgvector.html#-1