# 将 Supabase 与 Typesense 同步
Supabase (opens new window) 是一个基于 PostgreSQL 构建的开源开发平台,为应用用户提供安全的直接数据库访问,并包含认证管理、TypeScript 边缘函数、日志记录和存储等一系列功能,使其成为开发者中的热门选择。然而,其基于 PostgreSQL 模糊搜索的搜索功能相比 Typesense 等专业搜索引擎稍显不足。
本指南将引导您完成将用户数据与 Typesense 搜索实例同步的过程,以增强搜索能力并为用户提供更强大的搜索体验。
# 第一步:配置 Supabase
# 创建产品表
Supabase 提供基于浏览器的 SQL 和 GUI 编辑器来执行查询。如果您更喜欢使用 PGAdmin 等数据库管理器,请参考 Supabase 教程 (opens new window) 设置外部连接。以下示例使用一个假设的 products 表,您可以使用您偏好的界面创建该表。
# 产品表
CREATE TABLE public.products (
id UUID NOT NULL DEFAULT uuid_generate_v4 (),
product_name TEXT NULL,
updated_at TIMESTAMPTZ NULL DEFAULT now(),
user_id UUID NULL, -- 引用 Supabase 管理的 auth.users 数据库中认证用户的 id
CONSTRAINT products_pkey PRIMARY KEY (id),
CONSTRAINT products_user_id_fkey FOREIGN KEY (user_id) REFERENCES auth.users (id) ON DELETE CASCADE
);
# 启用行级安全
行级安全(Row Level Security,RLS)可以限制用户对表中行记录的操作权限。本教程将在_同步删除_部分使用此功能来实现_软删除_。Supabase 主要使用两种应用用户角色:
- anon:匿名用户
- authenticated:已认证用户
通过 RLS,可以限制只有携带正确 user_id 和会话密钥的认证用户才能访问数据。
# 示例:创建 RLS 策略
CREATE POLICY "仅认证用户可以查看他们在产品表中的对应行记录"
ON public.products
FOR SELECT
TO authenticated
USING (
-- auth.uid 是 Supabase 提供的辅助函数
auth.uid() = user_id
);
需要注意的是,一旦为表启用 RLS,默认情况下会阻止所有非管理员角色(所有应用用户)访问该表,除非编写明确授予权限的策略。
要为 products 表启用 RLS,你可以使用 Supabase 的_表编辑器_,或者执行以下查询:
# 启用 RLS
ALTER TABLE products ENABLE ROW LEVEL SECURITY;
本教程将使用以下策略:
# 产品表 RLS 策略
CREATE POLICY "仅认证用户可以查看他们在 PostgreSQL 中的产品"
ON public.products
FOR SELECT
TO authenticated
USING (
auth.uid() = user_id
);
CREATE POLICY "仅认证用户可以向产品表插入数据"
ON public.products
FOR INSERT
TO authenticated
WITH CHECK (true);
CREATE POLICY "仅认证用户可以更新他们提供的产品"
ON public.products
FOR UPDATE
TO authenticated
USING (
auth.uid() = user_id
) WITH CHECK (
auth.uid() = user_id
);
CREATE POLICY "仅认证用户可以从数据库中删除他们的产品"
ON public.products
FOR DELETE
TO authenticated
USING (
auth.uid() = user_id
);
# 启用相关 PostgreSQL 扩展
本教程将使用以下三个扩展:
- PG_NET (opens new window):允许数据库使用 JSON 进行异步 http/https 请求
- HTTP (opens new window):允许数据库使用所有数据格式进行同步 http/https 请求
- PG_CRON (opens new window):使数据库能够兼任 CRON 服务器
在 Supabase 中,可以通过点击导航菜单中的 Database 图标,然后点击 Extensions 来找到并启用这些扩展。更多指导可参考 Supabase 的文档 (opens new window)。
PG_NET 扩展将用于实现 PostgreSQL 与 Typesense 的实时同步。HTTP 和 PG_CRON 扩展将一起用于调度和执行批量同步。
注意:虽然本教程大部分内容使用 PG/plSQL 完成,但 Supabase 也支持 PLV8 (opens new window) 和 PLJAVA (opens new window) 扩展。它们分别允许用户使用 JavaScript 和 Java 编写例程。
# 第二步:配置 Typesense
如果您的 Typesense 实例已在运行并连接到互联网,请跳至 设置 API 密钥 部分。本指南参考了 Typesense 安装文档 (opens new window) 的 Docker 部分,并做了一些修改。
# 使用 Docker 安装
Docker 在 Windows、Linux 和 MacOS 上提供一致的配置流程。首先,如果尚未安装,请安装 Docker Desktop (opens new window)。在 Docker 运行后,执行 Typesense 的 Docker 页面 (opens new window) 上的命令:
# 部署 Docker 镜像与容器
mkdir /tmp/Typesense-data
docker run -p 8108:8108 -v/tmp/data:/data typesense/typesense:0.24.1 --data-dir /data --api-key=Hu52dwsas2AdxdE
可以通过以下 URL 来测试您的实例是否正常运行。如果服务已启动,您将收到返回结果 {"ok":true}
# 在浏览器中测试连接
http://localhost:8108/health
# 设置 API 密钥
Typesense 实例部署时带有一个默认 API 密钥:Hu52dwsas2AdxdE。为了更好的安全性,我们将生成新的密钥。首先执行以下 Shell 命令来替换默认 API 密钥:
注意:建议使用 json_pp、jq 或其他 JSON 格式化工具美化 cURL 响应
# 创建新的管理员 API 密钥
curl 'http://localhost:8108/keys' \
-X POST \
-H "X-Typesense-API-KEY: Hu52dwsas2AdxdE" \
-H 'Content-Type: application/json' \
-d '{"description":"Admin key.","actions": ["*"], "collections": ["*"]}'
保存返回的管理员密钥值。这应该是唯一活跃的密钥:
# 列出所有活跃 API 密钥
curl 'http://localhost:8108/keys' \
-X GET \
-H "X-Typesense-API-KEY: ${Typesense_API_KEY}"
现在只剩下一个主密钥,您可以用它来创建产品集合。
# 创建产品集合
curl "http://localhost:8108/collections" \
-X POST \
-H "Content-Type: application/json" \
-H "X-Typesense-API-KEY: ${Typesense_API_KEY}" \
-d '{
"name": "products",
"fields": [
{"name": "id", "type": "string" },
{"name": "product_name", "type": "string" },
{"name": "user_id", "type": "string" },
{"name": "updated_at", "type": "float" }
],
"default_sorting_field": "updated_at"
}'
我们还需要创建两个密钥:一个用于产品集合的"仅搜索"密钥和一个"主"密钥。
# 创建产品搜索专用 API 密钥
curl 'http://localhost:8108/keys' \
-X POST \
-H "X-Typesense-API-KEY: ${Typesense_API_KEY}" \
-H 'Content-Type: application/json' \
-d '{"description":"Search-only products key.","actions": ["documents:search"], "collections": ["products"]}'
# 创建产品管理 API 密钥
curl 'http://localhost:8108/keys' \
-X POST \
-H "X-Typesense-API-KEY: ${Typesense_API_KEY}" \
-H 'Content-Type: application/json' \
-d '{"description":"Admin products key","actions": ["*"], "collections": ["products"]}'
# 通过隧道连接至互联网
本 Supabase 教程不涉及在专用服务器上部署自托管实例。您可以使用 Ngrok (opens new window) 或 Cloudflare Tunnel (opens new window) 等服务,安全地将本地机器与互联网之间的流量进行转发。Ngrok 使用稍简便且提供慷慨的免费套餐。注册账户后,请按照 Ngrok 安装指南 (opens new window) 操作。完成后,使用终端命令将您的 Typesense 实例转发至互联网。
# 激活 Ngrok 连接
ngrok http 8108
要测试隧道连接,您可以在浏览器中使用 Ngrok 提供的 URL
# 测试连接
<NGROK URL>/health
若收到响应 {"ok":true},则表示连接成功。
# 步骤 3: PostgreSQL 连接性介绍
PG_NET 扩展允许 Supabase 通过 HTTP/HTTPS 请求进行异步通信。
以下示例从 postman-echo API (opens new window) 请求数据。
# 从端点请求数据
SELECT net.http_get('https://postman-echo.com/get?foo1=bar1&foo2=bar2') AS request_id;
当查询完成后,您可以使用返回的 request_id 在 Supabase 的 net._http_response 表中查找响应。也可以通过以下查询查看响应:
# 查看 HTTP/HTTPS 响应消息
SELECT
*
FROM net._http_response
WHERE id = <request_id>
注意:在步骤 5.2 结束时,我们将讨论如何使用此表重试失败的同步。
net.http_post 函数只是直接向 Typesense 发送数据的一种方式。但它有一个重要限制:仅支持 JSON 作为 Content-Type,而 Typesense 需要 NDJSON 兼容性。幸运的是,在处理单行数据时,JSON 和 NDJSON 在功能上是等效的。因此,当只需要从 PostgreSQL 向 Typesense 发送一行数据时,以下代码可以正常工作。
如果您转到 Supabase 侧边栏的 Authentication 选项卡,可以创建新用户。使用新用户配置文件,前往 Supabase 的 Table Editor 并手动向 products 表添加新行。
# 连接 Typesense 的初步尝试
SELECT net.http_post(
-- 添加 TYPESENSE URL
url := '<TYPESENSE URL>/collections/products/documents/import?action=upsert'::TEXT,
-- 将 products 表的行格式化为 JSONB 并转换 updated_at 从 TIMESTAMPTZ 类型到 FLOAT 类型
-- 请求体必须格式化为 JSONB 数据
body := (
SELECT
to_jsonb(rows)
FROM (
SELECT
-- 将 TIMESTAMPTZ 类型转换为 float 类型
EXTRACT(EPOCH FROM updated_at)::float AS updated_at,
id,
product_name,
user_id
FROM products
-- 取消下面行的注释以使查询正常工作
-- LIMIT 1
) rows
)::JSONB,
headers := json_build_object(
'Content-Type', 'application/json',
'X-Typesense-API-KEY', '<API KEY>' -- 添加 API 密钥
)::JSONB,
timeout_milliseconds := 4000
) AS request_id;
如果您尝试批量更新多行数据,将会收到以下错误信息:
执行 SQL 查询失败:用作表达式的子查询返回了多行数据
如前所述,PG_NET 函数不适用于多行更新,但它们在实时同步方面非常强大。PostgreSQL 到 Typesense 的直接实时同步通常使用触发器实现,但这可能会阻塞用户事务。而 PG_NET 函数是异步的,确保不会延迟事务。这一特性也使其成为部署边缘函数的首选方案。该扩展非常健壮,实际上 Supabase 的 webhook 功能底层就是基于此扩展实现的。
另一方面,HTTP 扩展支持 NDJSON 格式,适合直接进行批量更新。它是同步的,因此会等待成功或失败响应,这使得错误处理更加简单。它与 PG_CRON 定时任务兼容,这些任务运行在单独的线程上,不会干扰主数据库操作,从而将对用户体验的影响降到最低。
要进行批量更新,必须将行数据转换为 NDJSON 格式。这可以通过在 PostgreSQL 中编写 PL/pgSQL 函数来实现。
# 将表格行格式化为 NDJSON
CREATE OR REPLACE FUNCTION get_products_ndjson()
RETURNS TEXT
AS $$
DECLARE
ndjson TEXT := '';
BEGIN
SELECT
string_agg(row_to_json(row_data)::text, E'\n')
INTO ndjson
FROM (
SELECT
p.product_name,
p.id,
CAST(EXTRACT(epoch FROM p.updated_at) AS FLOAT) AS updated_at,
p.user_id
FROM products p
) AS row_data;
RETURN ndjson;
END;
$$ LANGUAGE plpgSQL;
使用上述函数,您可以通过 HTTP 扩展实现批量更新插入操作:
# 与 Typesense 同步的批量方法
SELECT
status AS response_status,
content AS response_body,
(unnest(headers)).*
FROM http((
-- HTTP 方法
'POST'::http_method,
-- URL (替换为您的 TYPESENSE URL)
'<TYPESENSE URL>/collections/products/documents/import?action=upsert',
-- 请求头
ARRAY[
-- 添加 API 密钥
http_header('X-Typesense-API-KEY', '<API KEY>')
]::http_header[],
-- 内容类型
'application/x-ndjson',
-- 请求体
(SELECT get_products_ndjson())
)::http_request);
您可以通过以下 cURL 命令检查 Typesense 是否已更新:
# 搜索 Typesense
curl -H "X-TYPESENSE-API-KEY: <API KEY>" \
"<TYPESENSE URL>/collections/products/documents/search?q=*"
# 步骤 4:理解 Supabase 中的调度机制
通过 PG_CRON 扩展,您可以设置定时任务来协调与 Typesense 的批量同步、定期重试失败的请求,以及清理日志表中的陈旧数据。
# 定时任务调度:每分钟调用边缘函数
SELECT
cron.schedule(
'cron-job-name',
'* * * * *', -- 每分钟执行一次(cron语法)
$$
-- SQL查询
SELECT net.http_get(
-- Supabase边缘函数的URL
url:='https://<reference id>.functions.Supabase.co/Typesense-example',
headers:='{
"Content-Type": "application/json",
"Authorization": "Bearer <TOKEN>"
}'::JSONB
) as request_id;
$$
);
# 取消已激活的定时任务
SELECT cron.unschedule('cron-job-name')
PostgreSQL中的定时任务使用cron语法 (opens new window)进行时间设定。每个任务最多每分钟执行一次。遗憾的是,Supabase不原生支持秒级间隔的调度。如果这对您的使用场景造成困扰,建议使用更精确的外部定时任务或消息服务器以获得更好的控制。
在Supabase中,定时任务的详细信息记录在cron模式下的两张表中:
- jobs
- job_run_details
这些表对监控和调试非常有帮助。执行日志可以通过Supabase表编辑器查看,也可以直接查询这些表。
# 查询最近10次定时任务执行失败的记录
SELECT
*
FROM cron.job_run_details
INNER JOIN cron.job ON cron.job.jobid = cron.job_run_details.jobid
WHERE
cron.job.jobname = 'cron-job-name'
AND
cron.job_run_details.status = 'failed'
ORDER BY start_time DESC
LIMIT 10;
定时任务是管理Supabase的重要组成部分,在本指南后续内容中将会频繁涉及。
# 步骤 5.1:在 PostgreSQL 中原生批量同步插入/更新操作
PostgreSQL 中存在多种跟踪未同步行的方法,每种方法都有其优缺点。本指南将探讨实时同步和批量同步的各种策略。最终,您需要确定哪些方法最适合您的数据库设计和使用场景。
创建日志表来跟踪未同步行是我们要演示的第一个策略。这种设置相对简单,并且能最大程度地控制每次同步的行数。
# 创建日志表跟踪未同步行
CREATE TABLE public.products_sync_tracker (
product_id UUID NOT NULL PRIMARY KEY,
is_synced BOOLEAN DEFAULT FALSE,
CONSTRAINT product_id_fkey FOREIGN KEY (product_id) REFERENCES public.products (id) ON DELETE CASCADE
);
通过触发器可以将未同步数据填充到上述表中。
# 创建触发器监控产品插入
CREATE OR REPLACE FUNCTION insert_products_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO products_sync_tracker (product_id)
VALUES (NEW.id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER insert_products_trigger
AFTER INSERT ON public.products
FOR EACH ROW
EXECUTE FUNCTION insert_products_trigger_func();
# 创建触发器监控产品更新
CREATE OR REPLACE FUNCTION update_products_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
-- 更新 products_sync_tracker 表
UPDATE products_sync_tracker
SET is_synced = FALSE
WHERE product_id = NEW.id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_products_trigger
BEFORE UPDATE ON public.products
FOR EACH ROW
EXECUTE FUNCTION update_products_trigger_func();
# 使用 Cron Jobs 调度批量更新/插入
以下 PL/pgSQL 函数将未同步的行转换为 NDJSON 格式,然后执行 upsert 操作到 Typesense。如果 upsert 失败,跟踪表 products_sync_tracker 将被回滚以反映此失败。通过将该函数集成到 cron job 中,可以直接在 PostgreSQL 中实现定时同步,无需使用外部服务器。虽然这很强大,但会占用数据库资源。这些函数应设置为快速完成,因此保持较小的负载大小很重要。默认情况下,HTTP 扩展发出的请求会在 5 秒后超时,但可以通过修改全局变量 http.timeout_msec 来调整。
该函数可分为 6 个步骤:
- 确保没有其他同步进程正在运行。如果存在则退出。
- 通过查询 products 表获取未同步的行。
- 将检索到的行转换为 NDJSON(Newline Delimited JSON)格式。
- 将格式化后的行与 Typesense 同步。
- 检查 Typesense 的响应以确定是否有行同步失败。
- 更新 products 表以标记每行的同步状态,记录失败的尝试。
# 批量同步行的函数
CREATE OR REPLACE FUNCTION sync_products_updates()
RETURNS VOID
LANGUAGE plpgsql
AS $$
DECLARE
-- 锁键:用作函数'锁'的任意数字
-- 同一时间只能有一个函数实例持有该键并运行
-- 当行正在被更新时,如果同时发送多个请求,
-- 无法确定哪个会先被处理。
-- 这可能导致 Typesense 接收到过时数据。
-- 锁定函数可以避免这种负面情况。
lock_key INTEGER := 123456;
is_locked BOOLEAN := FALSE;
-- 用于告知 Typesense 需要同步的行数
total_rows INTEGER;
-- 用于将行转换为 NDJSON 的变量
ndjson TEXT := '';
-- 用于引用 HTTP 响应值的变量
request_status INTEGER;
response_message TEXT;
BEGIN
-- 创建锁,确保同一时间只有一个函数实例运行
SELECT pg_try_advisory_xact_lock(lock_key) INTO is_locked;
IF NOT is_locked THEN
RAISE EXCEPTION '无法锁定。其他任务正在运行';
END IF;
-- 预先将40个未同步产品标记为已同步
-- 创建临时表存储待更新的行
-- 这些行将与 Typesense 同步
CREATE TEMPORARY TABLE updated_rows (
product_id UUID
) ON COMMIT DROP;
WITH soon_to_be_synced_rows AS (
UPDATE products_sync_tracker
SET is_synced = TRUE
WHERE product_id IN (
SELECT
product_id
FROM products_sync_tracker
WHERE is_synced = FALSE
LIMIT 40
)
RETURNING product_id
)
INSERT INTO updated_rows
SELECT * FROM soon_to_be_synced_rows;
SELECT
COUNT(product_id)
INTO total_rows
FROM updated_rows;
IF total_rows < 1 THEN
RAISE EXCEPTION '没有需要同步的行';
END IF;
-- 将待同步行转换为 Typesense 可解析的格式
WITH row_data AS (
SELECT
p.product_name,
p.id,
CAST(EXTRACT(epoch FROM p.updated_at) AS FLOAT) AS updated_at,
p.user_id
FROM products p
JOIN updated_rows u ON p.id = u.product_id
)
SELECT
string_agg(row_to_json(row_data)::text, E'\n')
INTO ndjson
FROM row_data;
SELECT
status,
content
INTO request_status, response_message
FROM http((
'POST'::http_method,
-- 添加 TYPESENSE URL
'<TYPESENSE URL>/collections/products/documents/import?action=upsert',
ARRAY[
-- 添加 API 密钥
http_header('X-Typesense-API-KEY', '<API KEY>')
]::http_header[],
'application/x-ndjson',
ndjson -- 负载数据
)::http_request);
-- 检查请求是否失败
IF request_status <> 200 THEN
-- 将错误信息存入 Supabase Postgres 日志
RAISE LOG 'Typesense 同步请求失败。请求状态: %。消息: %', request_status, response_message;
-- 抛出异常,回滚事务
RAISE EXCEPTION '更新插入失败';
ELSE
-- 成功响应会包含每行结果的 NDJSON
/* 可能的结果:
{"success": true}
{"success": false, "error": "Bad JSON.", "document": "[bad doc]"}
*/
-- 需要处理这些结果以确定哪些行同步成功,哪些失败
WITH ndjson_from_response AS (
SELECT unnest(string_to_array(response_message, E'\n')) AS ndjson_line
),
ndjson_to_json_data AS (
SELECT
ndjson_line::JSON AS json_line
FROM ndjson_from_response
),
failed_syncs AS (
SELECT
json_line
FROM ndjson_to_json_data
WHERE (json_line->>'success')::BOOLEAN = FALSE
),
unsynced_ids AS (
SELECT
((json_line->>'document')::JSON->>'id')::UUID AS ids
FROM failed_syncs
)
UPDATE public.products_sync_tracker
SET is_synced = FALSE
WHERE product_id IN (SELECT ids FROM unsynced_ids);
END IF;
END;
$$;
# 定时任务批量更新/插入 Typesense 数据
SELECT cron.schedule(
'update-insert-Typesense-job',
'* * * * *',
$$
-- SQL 查询语句
SELECT sync_products_updates();
$$
);
要测试 Typesense 是否同步成功,可以通过 Supabase 的表格编辑器手动向 products 表添加一行数据。然后在同一表格编辑器中查看 cron 模式下的执行记录,观察定时任务的执行时间。
# 搜索 Typesense 中的同步数据
curl -X GET "<TYPESENSE URL>/collections/products/documents/search?q=*" \
-H "X-TYPESENSE-API-KEY: <API KEY>"
# 步骤 5.2:使用边缘函数批量同步插入/更新
部分用户可能更倾向于使用服务器作为中间层与 Typesense 通信。这种方式能减轻数据库压力,并能处理相对高容量的同步操作。当需要对数据进行清洗或重新格式化时,这种方法尤为有用。幸运的是,Supabase 提供了基于 Deno (TypeScript) 的无服务器边缘函数。
如果边缘函数执行中途失败,必须有机制能够捕获并解决故障。在前面的示例中,我们通过基于行更新的 products_sync_tracker 表来跟踪行同步状态。但对于边缘函数,我们可以引入另一种更具优势的结构。利用 products 表的 updated_at 列,可以通过时间戳来跟踪未同步的行,并在边缘函数失败时重新发送。为实现这一点,需要移除之前 products 表上的触发器,替换为监控 updated_at 列的新触发器。
# 移除原有触发器
DROP TRIGGER IF EXISTS update_products_trigger
ON products;
DROP FUNCTION IF EXISTS update_products_trigger_func;
DROP TRIGGER IF EXISTS insert_products_trigger
ON products;
DROP FUNCTION IF EXISTS insert_products_trigger_func;
# 为 updated_at 列添加触发器
CREATE OR REPLACE FUNCTION update_products_time_func()
RETURNS TRIGGER AS $$
BEGIN
-- 更新 products.updated_at 列
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_at_products_trigger
BEFORE UPDATE ON public.products
FOR EACH ROW
EXECUTE FUNCTION update_products_time_func();
同时需要追踪边缘函数调用,以便监控失败情况。创建一个 edge_function_tracker 表。
# 创建边缘部署监控表
CREATE TABLE edge_function_tracker(
id UUID NOT NULL DEFAULT uuid_generate_v4 (),
start_time TIMESTAMPTZ DEFAULT NOW (),
start_time_of_prev_func TIMESTAMPTZ,
attempts INTEGER DEFAULT 1,
func_status TEXT NOT NULL DEFAULT 'PENDING' CHECK (func_status IN ('SUCCEEDED', 'FAILED', 'PENDING')),
request_id BIGINT NULL,
CONSTRAINT edge_function_tracker_pkey PRIMARY KEY (id)
);
部署边缘函数的 PG_NET 函数需要封装在 PG/plSQL 函数中,以便在部署前记录函数信息。该过程可分为四个步骤:
- 在 edge_function_tracker 表中创建新条目
- 收集识别未同步行所需的必要信息
- 发送包含必要详情的边缘函数请求
- 记录与边缘部署相关的元数据
# 部署/追踪边缘部署的包装函数
CREATE OR REPLACE FUNCTION edge_deployment_wrapper()
RETURNS VOID
AS $$
DECLARE
func_request_id BIGINT;
payload JSONB;
prev_start_time TIMESTAMPTZ;
BEGIN
-- 在 edge_function_tracker 表中为新部署的边缘函数创建新条目
WITH func_default_vals AS (
INSERT INTO edge_function_tracker
DEFAULT VALUES
RETURNING id, start_time
)
SELECT
row_to_json(func_default_vals.*)::JSONB
INTO payload
FROM func_default_vals;
-- 上次部署后更新的产品处于未同步状态
-- 获取前一个函数时间戳以帮助查询未同步行进行后续处理
SELECT
start_time
INTO prev_start_time
FROM edge_function_tracker
WHERE start_time < (payload->>'start_time')::TIMESTAMPTZ
ORDER BY id DESC
LIMIT 1;
-- 如果没有先前的开始时间,将 prev_start_time 设置为 0
IF NOT FOUND THEN
prev_start_time := '1970-01-01 00:00:00+00'::TIMESTAMPTZ;
END IF;
-- 重新格式化 payload 以包含 prev_start_time
payload := payload || jsonb_build_object('start_time_of_prev_func', prev_start_time);
-- 发送请求到边缘函数
SELECT net.http_post(
url := '<FUNCTION URL>',
body := payload,
headers := '{
"Content-Type": "application/json",
"Authorization": "Bearer <SUPABASE ANON KEY>"
}'::JSONB,
timeout_milliseconds := 4000
) INTO func_request_id;
-- 记录 request_id 和 start_time_of_prev_func
-- 这将用于后续重新部署失败的请求
UPDATE edge_function_tracker
SET
request_id = func_request_id,
start_time_of_prev_func = (payload->>'start_time_of_prev_func')::TIMESTAMPTZ
WHERE id = (payload->>'id')::UUID;
END;
$$ LANGUAGE plpgsql;
这个跟踪用的 PG/plSQL 函数可以通过 cron 作业调用来启动批量同步过程。
# 执行边缘函数的定时任务(Cron Job)
SELECT
cron.schedule(
'update-insert-Typesense-job',
'* * * * *', -- 每分钟执行一次(cron语法)
$$
-- SQL查询
SELECT edge_deployment_wrapper();
$$
);
通常,创建能够在单个请求中执行多次交互的PG/plSQL函数比让边缘函数多次查询数据库获取少量数据更为高效。以下函数将被边缘函数调用,以简化和加速成功请求的同步拒绝处理。
CREATE OR REPLACE FUNCTION report_failed_syncs (func_id UUID, failed_rows_id UUID[])
RETURNS VOID
AS $$
BEGIN
-- 更新"edge_function_tracker"表以反映
-- 函数调用的状态
UPDATE edge_function_tracker
SET func_status = 'SUCCEEDED'
WHERE id = func_id;
-- 这行代码禁用事务的updated_at触发器
SET LOCAL session_replication_role = 'replica';
-- 将失败的product同步更新为NOW(),这样它们会被包含在
-- 下一次同步函数中
UPDATE products
SET updated_at = NOW()
WHERE id = ANY(failed_rows_id);
END;
$$ LANGUAGE plpgsql;
要部署边缘函数,您需要安装Node包管理器,如NPM、Yarn或PNPM。可以通过官方下载页面 (opens new window)下载Node.JS来安装NPM。
创建第一个函数时,请新建一个以函数名命名的文件夹,并在其中添加index.ts文件。以下代码是Supabase PostgreSQL演示 (opens new window)的修改版本,使用了前面提到的_get_products_updates_from_edge_ PL/pgSQL函数。
注意:所有函数及其日志都可以在Supabase仪表板的Edge Functions部分找到。
使用的边缘函数可以分解为7个步骤:
- 解析请求体以识别需要同步的特定行
- 通过查询products表检索未同步的行
- 将检索到的行转换为NDJSON(Newline Delimited JSON)格式
- 将格式化后的行与Typesense同步
- 检查Typesense的响应以确定是否有行同步失败
- 更新products表以指示每行的同步状态,标记任何不成功的尝试
- 在edge_function_tracker表中记录边缘函数的整体成功或失败情况,用于监控和分析
# 更新 Typesense 的边缘函数
import * as postgres from 'https://deno.land/x/postgres@v0.14.2/mod.ts';
import { serve } from 'https://deno.land/std@0.177.0/http/server.ts';
// 在顶部定义你的类型
type TProductRows = {
id: string;
product_name: string;
updated_at: string;
user_id: string;
}[];
type TRequestJSON = {
id: string;
start_time: string;
start_time_of_prev_func: string;
};
// 数据库连接URL默认在所有边缘函数中可用
// 如果遇到问题,可以通过仪表板中的项目设置 > 数据库选项卡获取
const databaseUrl = Deno.env.get('SUPABASE_DB_URL')!;
// 创建一个包含三个延迟建立的连接池
const pool = new postgres.Pool(databaseUrl, 3, true);
serve(async (req) => {
try {
// 从连接池获取连接
const connection = await pool.connect();
// 1. 解析请求体:
const reqJSON = (await req.json()) as TRequestJSON;
try {
// 2. 从数据库检索未同步的产品
const unsyncedRows = (
await connection.queryObject({
args: [reqJSON.start_time_of_prev_func, reqJSON.start_time],
text: `SELECT
products.id,
products.product_name,
CAST(EXTRACT(epoch FROM products.updated_at) AS FLOAT) AS updated_at,
products.user_id
FROM products
-- 只同步自上次同步后更新的产品
WHERE updated_at BETWEEN $1::TIMESTAMPTZ AND $2::TIMESTAMPTZ;
-- 如果使用软删除,需要过滤掉已软删除的行
`,
})
).rows as TProductRows;
// 如果没有未同步的行,函数可以提前返回
if (!unsyncedRows.length) {
const res = await connection.queryObject({
args: [reqJSON.id],
text: `
UPDATE edge_function_tracker
SET func_status = 'SUCCEEDED'
WHERE id = $1::UUID;
`,
});
return new Response('没有需要同步的行', {
status: 200,
headers: {
'Content-Type': 'application/json; charset=utf-8',
},
});
}
// 3. 将行转换为NDJSON格式
const unsyncedProductsNDJSON: string = unsyncedRows
.map((product) =>
JSON.stringify({
...product,
updated_at: parseFloat(product.updated_at),
})
)
.join('\n');
// 4. 将新产品与Typesense同步
const response = await fetch(
// 添加你的TYPESENSE URL
'<TYPESENSE URL>/collections/products/documents/import?action=upsert',
{
method: 'POST',
headers: {
'Content-Type': 'application/x-ndjson',
//添加你的TYPESENSE API密钥
'X-TYPESENSE-API-KEY': '<API KEY>',
},
body: unsyncedProductsNDJSON,
}
);
// 响应将包含结果的NDJSON。如果某些更新失败,
// 我们需要在products_sync_tracker表中反映这个失败
/* 可能的响应结果
{"success": true}
{"success": false, "error": "Bad JSON.", "document": "[bad doc]"}
*/
const ndJsonResponse = await response.text();
// 将响应转换为对象数组
let JSONStringArr = ndJsonResponse.split('\n');
const parsedJSON = JSONStringArr.map((res) => JSON.parse(res));
// 5. 过滤出同步失败的ID
const failedSyncIds = parsedJSON
.filter((doc) => !doc.success)
.map((doc) => JSON.parse(doc.document).id) as string[];
// 如果没有失败的同步,函数可以提前返回
if (!failedSyncIds.length) {
const res = await connection.queryObject({
args: [reqJSON.id],
text: `
UPDATE edge_function_tracker
SET func_status = 'SUCCEEDED'
WHERE id = $1::UUID;
`,
});
return new Response('没有需要同步的行', {
status: 200,
headers: {
'Content-Type': 'application/json; charset=utf-8',
},
});
}
// 6/7. 更新products表中的"updated_at"列为当前时间
// 这样失败的行将在下次边缘函数运行时重新同步
// 声明函数调用成功
const result = await connection.queryObject({
args: [reqJSON.id, failedSyncIds],
text: `
SELECT report_failed_syncs ($1::UUID, $2::UUID[])
`,
});
// 返回带有正确内容类型头的响应
return new Response(
`已同步${reqJSON.start_time_of_prev_func}到${reqJSON.start_time}之间的行`,
{
status: 200,
headers: {
'Content-Type': 'application/x-ndjson; charset=utf-8',
},
}
);
} finally {
// 将连接释放回连接池
connection.release();
}
} catch (err) {
console.error(err);
return new Response(String(err?.message ?? err), {
status: 500,
});
}
});
设置好函数后,在终端中导航到函数的父目录并执行以下命令
# 登录 Supabase 命令行工具
npx supabase login
系统会提示您输入密码,并引导您访问生成访问令牌的链接。登录成功后,您就可以部署函数了。
# 将边缘函数部署到 Supabase
npx supabase functions deploy <您的函数目录名称>
您将收到一个链接,可以查看新函数的相关信息。调用该函数时,还需要使用项目的 ANON KEY(匿名密钥),该密钥可在项目设置的 API 选项卡 中找到。您可以通过 cron 任务来调度函数以同步 Typesense。
需要注意的是,并非所有同步都能成功。建立重试机制非常重要。此外,edge_function_tracker 表可能会变得臃肿并需要清理。下面的 PG/plSQL 函数通过以下步骤解决了这些问题:
- 从 edge_function_tracker 表中删除成功的同步记录
- 在满足特定条件时,将 'PENDING'(待处理)状态的函数更新为 'FAILED'(失败)状态
- 重新尝试失败的同步请求
# 用于管理边缘重新部署和维护的 PL/pgSQL 函数
CREATE OR REPLACE FUNCTION edge_function_maintainer()
RETURNS VOID
AS $$
DECLARE
func_request_id BIGINT;
payload JSONB;
BEGIN
-- 任何位于两个'SUCCEEDED'状态行之间的成功同步记录不再需要维护
-- 为防止表无限增长,将删除这些行
WITH organized_edge_function_tracker AS (
SELECT id,
func_status,
LAG(func_status) OVER (ORDER BY start_time) AS prev_func_status,
LEAD(func_status) OVER (ORDER BY start_time) AS next_func_status
FROM edge_function_tracker
),
success_conditions AS (
SELECT
id
FROM organized_edge_function_tracker
WHERE
func_status = 'SUCCEEDED'
AND
prev_func_status = 'SUCCEEDED'
AND
next_func_status = 'SUCCEEDED'
)
DELETE FROM edge_function_tracker
WHERE id IN (SELECT id FROM success_conditions);
-- 将edge_function_tracker表中所有非200的请求更新为'FAILED'状态
-- 注意:存储所有PG_NET请求状态的_http_response表是"unlogged"的
-- 这意味着在崩溃情况下会丢失所有数据
-- 作为安全措施,所有超过2分钟未响应的函数将被视为失败,即使找不到对应的请求记录
WITH failed_rows AS (
SELECT
edge_function_tracker.id
FROM edge_function_tracker
INNER JOIN net._http_response ON net._http_response.id = edge_function_tracker.request_id
WHERE net._http_response.status_code NOT BETWEEN 200 AND 202
)
UPDATE edge_function_tracker
SET func_status = 'FAILED'
FROM failed_rows
WHERE
failed_rows.id = edge_function_tracker.id
OR
(func_status = 'PENDING' AND (NOW() - start_time > '2 minutes'::interval))
;
-- 获取失败请求的数据,以便放入payload中用于重试
WITH selected_row AS (
SELECT
id,
start_time,
start_time_of_prev_func
FROM edge_function_tracker
WHERE func_status = 'FAILED' AND attempts < 3 -- 如果尝试次数超过3次,则认为该部分数据无法同步并停止尝试
ORDER BY start_time
LIMIT 1
)
SELECT to_jsonb(selected_row.*) INTO payload
FROM selected_row;
-- 检查是否存在可重试的请求
IF (payload->>'id')::UUID IS NOT NULL THEN
-- 向边缘函数发送重试请求
SELECT net.http_post(
url := '<EDGE FUNCTION URL>',
body := payload,
headers := '{
"Content-Type": "application/json",
"Authorization": "Bearer <SUPABASE ANON KEY>"
}'::JSONB,
timeout_milliseconds := 4000
) INTO func_request_id;
-- 记录新的request_id
UPDATE edge_function_tracker
SET
request_id = func_request_id,
attempts = attempts + 1
WHERE id = (payload->>'id')::UUID;
END IF;
END;
$$ LANGUAGE plpgSQL;
该维护函数可以通过cron job定时执行
# 每分钟重试失败函数
SELECT
cron.schedule(
'edge_function_maintainer',
'* * * * *', -- 每分钟执行一次(cron语法)
$$
SELECT edge_function_maintainer();
$$
);
# 步骤6:使用触发器实现实时更新/插入
在某些情况下,实时同步可能很重要。这只能通过触发器来实现。下面的示例直接在Supabase和Typesense之间同步,但你也可以使用触发器调用执行相同操作的Edge函数。
# 使用触发器同步数据
-- 创建一个函数,用于products表的更新/插入触发器
CREATE OR REPLACE FUNCTION public.sync_products()
RETURNS TRIGGER AS $$
BEGIN
-- 向Typesense服务器发起https请求
PERFORM net.http_post(
-- 添加TYPESENSE URL
url := '<TYPESENSE URL>/collections/products/documents/import?action=upsert'::TEXT,
-- NEW关键字代表新行数据
body := (
SELECT
to_jsonb(row.*)
FROM (
SELECT
-- 将TIMESTAMPTZ类型转换为float类型
EXTRACT(EPOCH FROM NEW.updated_at)::float AS updated_at,
NEW.id,
NEW.product_name,
NEW.user_id
) AS row
)::JSONB,
headers := json_build_object(
'Content-Type', 'application/json',
-- 添加API密钥
'X-Typesense-API-KEY', '<API KEY>'
)::JSONB
);
RETURN NEW;
END;
$$ LANGUAGE plpgSQL;
-- 在products表发生任何插入或更新后运行的触发器
CREATE TRIGGER sync_updates_and_inserts_in_Typesense
AFTER INSERT OR UPDATE ON products
FOR EACH ROW
EXECUTE FUNCTION sync_products();
警告:必须再次强调这些请求是异步的,这可以避免阻塞用户事务。请求发送后,后台工作进程会监听响应并将其添加到net._http_response表中。可以通过cron作业或触发器监控net._http_response表中的更新/插入,如步骤5.2末尾所述,重试失败的同步。但需要注意的是,使用触发器立即重试可能很危险,特别是当数据与Typesense不兼容时。如果没有明确的退出条件,可能会进入无限循环。
# 步骤7:批量同步删除操作
# 使用 Cron Jobs 调度批量删除
在 Supabase 和 Typesense 之间同步批量删除操作可以通过以下两种方式实现:
- 将被删除行的 ID 临时保存在中间表中,直到可以从 Typesense 中移除
- 对表中的行进行软删除,直到它们从 Typesense 中移除
注意:由于删除操作不需要 NDJSON 格式,HTTP 和 PG_NET 扩展都可以用于直接从 PostgreSQL 到 Typesense 的删除操作,或者用于触发边缘函数。
本节主要使用 HTTP 扩展。如果计划使用 PG_NET,则需要延迟实际删除行的操作,直到 net._http_response 表中该请求的状态为 200。步骤 5.2 提供了关于如何处理异步恢复的示例。
# 方法一:使用中间表
要实现第一种方法,需要创建一个表来存储被删除行的 ID,直到可以与 Typesense 同步。
# deleted_rows 表
CREATE TABLE deleted_rows(
table_name TEXT, --假设表名和 Typesense 集合名相同
deleted_row_id UUID,
CONSTRAINT deleted_rows_pkey PRIMARY KEY (deleted_row_id)
);
每当主表中的行被删除时,触发器应记录其 ID 的副本。
# 保存 ID 列的触发器
-- 创建函数,在删除时将 id 复制到 deleted_rows 表
CREATE OR REPLACE FUNCTION copy_deleted_product_id()
RETURNS TRIGGER
LANGUAGE plpgSQL
AS $$
BEGIN
INSERT INTO deleted_rows (table_name, deleted_row_id)
VALUES ('products', OLD.id);
RETURN OLD;
END $$;
-- 创建触发器,当从 products 表删除记录时调用该函数
CREATE TRIGGER copy_id_on_delete
AFTER DELETE ON public.products
FOR EACH ROW
EXECUTE FUNCTION copy_deleted_product_id();
在 Typesense 中,可以通过包含删除条件的 DELETE 请求执行批量删除操作,这些条件作为查询参数包含在 URL 中。
# 示例:删除请求
curl -g -H "X-TYPESENSE-API-KEY: ${TYPESENSE_API_KEY}" -X DELETE \
"http://localhost:8108/collections/companies/documents?filter_by=id:[id1, id2, id3]"
注意:可能需要使用 URL 编码字符替代方括号 "[ ]",分别是 %5B 和 %5D
可以编写一个 PL/pgSQL 函数,在从 deleted_rows 表中移除副本之前,将这些删除操作与 Typesense 同步。
# 批量同步已删除行
-- 创建用于从Typesense删除记录的函数
CREATE OR REPLACE FUNCTION bulk_delete_products()
RETURNS VOID
LANGUAGE plpgSQL
AS $$
DECLARE
id_arr UUID[];
query_params TEXT;
request_status INTEGER;
response_message TEXT;
BEGIN
-- 从deleted_rows表中选择ID
SELECT
-- 将值存入数组以便后续删除
array_agg(deleted_row_id),
-- 格式化行数据用于filter_by参数
string_agg(deleted_row_id::text, ',')
INTO id_arr, query_params
FROM deleted_rows
LIMIT 40;
-- 参数必须用方括号"[ ]"格式化,方括号需要URL编码为%5B和%5D
query_params:= '%5B' || query_params || '%5D';
SELECT
status,
content
INTO request_status, response_message
FROM http((
'DELETE'::http_method,
-- 添加TYPESENSE URL
'<TYPESENSE URL>/collections/products/documents?filter_by=id:' || query_params,
ARRAY[
-- 添加API密钥
http_header('X-Typesense-API-KEY', '<API KEY>')
]::http_header[],
'application/json',
NULL
)::http_request);
-- 检查请求是否失败
IF request_status <> 200 THEN
-- 将错误信息存入Supabase Postgres日志
RAISE LOG 'HTTP DELETE请求失败。消息: %', response_message;
-- 抛出异常,回滚事务
RAISE EXCEPTION '删除失败';
ELSE
DELETE FROM deleted_rows WHERE deleted_row_id = ANY(id_arr);
END IF;
END;
$$;
该函数可以通过Cron Job每分钟调用一次。
# 每分钟同步删除的行记录
SELECT
cron.schedule(
'bulk-delete-from-typesese',
'* * * * *', -- 每分钟执行一次(cron语法)
$$
-- SQL查询语句
SELECT bulk_delete_products();
$$
);
# 方法二:软删除方案
在该方案中,必须限制用户直接从 products 表中删除行记录。为确保这一点,关键是要修改行级安全策略(RLS),撤销用户的删除权限。若不进行此修改,Supabase 和 Typesense 之间必然会出现数据不一致的情况。
# 修改RLS策略以禁止用户删除行记录
ALTER POLICY "only an authenticated user is allowed to remove their products "
ON public.products
TO authenticated
USING(
FALSE
);
用户不应直接删除行记录,而是需要通过修改行的方式来标记其为不可用状态。在本例中,将 user_id 列设为 NULL 将使该行对所有应用用户不可见。
可以使用 PG_NET 或 HTTP 扩展创建一个 PG/plSQL 函数,用于将 user_id 列为空的记录同步到 Typesense 后再删除:
# 在删除前同步已置为 Null 的行
-- 创建从 Typesense 删除记录的函数
CREATE OR REPLACE FUNCTION bulk_delete_products()
RETURNS VOID
LANGUAGE plpgsql
AS $$
DECLARE
-- 存储并格式化已删除行的 ID
deleted_id_arr UUID[];
query_params TEXT;
-- 监控 Typesense 的响应
request_status INTEGER;
response_message TEXT;
BEGIN
-- 从 deleted_rows 中选择并格式化 ID
SELECT
string_agg(id::text, ','),
array_agg(id)
INTO query_params, deleted_id_arr
FROM products
WHERE user_id IS NULL
LIMIT 40;
-- 将 ID 列表放入 URL 编码的方括号 [ ] 中,即 %5B 和 %5D
query_params := '%5B' || query_params || '%5D';
-- 向 Typesense 服务器发送删除请求
SELECT
status,
content
INTO request_status, response_message
FROM http((
'DELETE'::http_method,
-- 添加 TYPESENSE URL
'<TYPESENSE URL>/collections/products/documents?filter_by=id:' || query_params,
ARRAY[
-- 添加 API 密钥
http_header('X-Typesense-API-KEY', '<API KEY>')
]::http_header[],
'application/json',
NULL
)::http_request);
-- 检查请求是否失败
IF request_status <> 200 THEN
-- 将错误信息存储到 Supabase Postgres 日志中
RAISE LOG 'HTTP DELETE 请求失败。消息: %', response_message;
-- 抛出异常,回滚事务
RAISE EXCEPTION '删除失败';
ELSE
DELETE FROM products WHERE id = ANY(deleted_id_arr);
END IF;
END;
$$;
可以通过 cron 任务每分钟调用该函数。
# 批量删除任务调度
SELECT
cron.schedule(
'delete-from-typesese',
'* * * * *', -- 每分钟执行一次(cron语法)
$$
-- SQL查询语句
SELECT bulk_delete_products();
$$
);
# 步骤8:实时同步删除操作
# 同步单个删除
在Typesense中,可以通过以文档ID作为路径参数发起请求来删除单个文档。
# cURL请求:删除指定ID的文档
curl -H "X-Typesense-API-KEY: ${Typesense_API_KEY}" -X DELETE \
"http://localhost:8108/collections/products/documents/<id>"
根据数据库的组织方式,您可能会遇到删除某行数据导致另一张表级联删除的情况。这可以通过查询共享字段(如外键)来处理。
# cURL请求:通过共享字段删除多个文档
curl -H "X-Typesense-API-KEY: ${Typesense_API_KEY}" -X DELETE \
"http://localhost:8108/collections/products/documents?filter_by=<shared_field>:=<value>"
实时同步删除操作可以通过触发器来实现管理。
# 使用触发器实现单行删除
-- 创建从Typesense删除记录的函数
CREATE OR REPLACE FUNCTION delete_products()
RETURNS TRIGGER
LANGUAGE plpgSQL
AS $$
BEGIN
SELECT net.http_delete(
url := format('<TYPESENSE URL>/collections/products/documents/%s', OLD.id),
headers := '{"X-Typesense-API-KEY": "<Typesense_API_KEY>"}'
)
RETURN OLD;
END $$;
-- 创建触发器,当从products表删除记录时调用该函数
CREATE TRIGGER delete_products_trigger
AFTER DELETE ON public.products
FOR EACH ROW
EXECUTE FUNCTION delete_products();
需要注意的是,如果同步过程因任何原因失败,将没有剩余数据可供后续尝试参考。为了解决这个问题,您可以实现行的软删除,或者创建一个临时表来存储已删除的查询值,直到确认同步成功完成。后一种方案将在下文针对"按共享字段删除"的部分详细说明。
# 存储待同步确认的删除值表
CREATE TABLE deleted_query_values (
filter_by_field TEXT NOT NULL,
shared_value TEXT NOT NULL,
request_id BIGINT,
created_at TIMESTAMPTZ NULL DEFAULT now()
)
可以修改触发器函数来保留被删除的值。
-- 创建从Typesense删除记录的函数
CREATE OR REPLACE FUNCTION delete_products_trigger()
RETURNS TRIGGER
LANGUAGE plpgSQL
AS $$
DECLARE
func_request_id BIGINT;
BEGIN
SELECT net.http_delete(
-- 添加TYPESENSE URL
url := '<TYPESENSE URL>/collections/products/documents?filter_by=<FILTER_BY_FIELD>:=%s' || OLD.<SHARED_VALUE>::TEXT,
-- 添加API密钥
headers := '{"X-Typesense-API-KEY": "<API KEY>"}'
) INTO func_request_id;
-- 填充表数据
INSERT INTO deleted_query_values (filter_by_field, shared_value, request_id)
VALUES ('<FILTER_BY_FIELD>', OLD.<SHARED_VALUE>::TEXT, func_request_id);
RETURN OLD;
END $$;
可以通过定时任务定期检查 deleted_query_values 和 net._http_response 表,重试失败的删除操作。
CREATE OR REPLACE FUNCTION retry_bulk_deletes()
RETURNS VOID
LANGUAGE plpgSQL
AS $$
DECLARE
value TEXT;
field TEXT;
func_request_id BIGINT;
BEGIN
-- 从deleted_query_values表中清除已同步的行
DELETE FROM deleted_query_values
USING net._http_response
WHERE
net._http_response.status_code = 200
AND
net._http_response.id = deleted_query_values.request_id
-- 获取最早失败的删除查询
SELECT
shared_value,
filter_by_field
INTO value, field
FROM deleted_query_values
INNER JOIN net._http_response ON net._http_response.id = deleted_query_values.request_id
WHERE net._http_response.status_code NOT BETWEEN 200 AND 202
ORDER BY created_at
LIMIT 1;
-- 重新尝试删除同步
SELECT net.http_delete(
-- 添加TYPESENSE URL
url := FORMAT('<TYPESENSE URL>/collections/products/documents?filter_by=%s:=%s', field, value),
-- 添加API密钥
headers := '{"X-Typesense-API-KEY": "<API KEY>"}'
) INTO func_request_id;
-- 更新deleted_query_values表中的新值
UPDATE deleted_query_values
SET
request_id = func_request_id,
created_at = NOW()
WHERE query_param = id;
END $$;
# 总结
在本教程中,我们探讨了将 Supabase 与 Typesense 数据同步的不同方法,确保搜索引擎能及时反映数据库的最新变更。我们涵盖了通过定时轮询和实时策略来同步插入、更新和删除操作,使用了触发器、函数和定时任务等技术。
通过实施这些技术,您可以为应用程序构建一个健壮、高效且响应迅速的搜索系统,为用户提供流畅而精准的搜索体验。
如果您对本指南有任何改进建议,请点击下方的"编辑页面"链接向我们提交拉取请求。