# 将 Supabase 与 Typesense 同步

Supabase (opens new window) 是一个基于 PostgreSQL 构建的开源开发平台,为应用用户提供安全的直接数据库访问,并包含认证管理、TypeScript 边缘函数、日志记录和存储等一系列功能,使其成为开发者中的热门选择。然而,其基于 PostgreSQL 模糊搜索的搜索功能相比 Typesense 等专业搜索引擎稍显不足。

本指南将引导您完成将用户数据与 Typesense 搜索实例同步的过程,以增强搜索能力并为用户提供更强大的搜索体验。

# 第一步:配置 Supabase

# 创建产品表

Supabase 提供基于浏览器的 SQL 和 GUI 编辑器来执行查询。如果您更喜欢使用 PGAdmin 等数据库管理器,请参考 Supabase 教程 (opens new window) 设置外部连接。以下示例使用一个假设的 products 表,您可以使用您偏好的界面创建该表。

# 产品表

CREATE TABLE public.products (
	id UUID NOT NULL DEFAULT uuid_generate_v4 (),
	product_name TEXT NULL,
	updated_at TIMESTAMPTZ NULL DEFAULT now(),
	user_id UUID NULL, -- 引用 Supabase 管理的 auth.users 数据库中认证用户的 id
	CONSTRAINT products_pkey PRIMARY KEY (id),
	CONSTRAINT products_user_id_fkey FOREIGN KEY (user_id) REFERENCES auth.users (id) ON DELETE CASCADE
);

# 启用行级安全

行级安全(Row Level Security,RLS)可以限制用户对表中行记录的操作权限。本教程将在_同步删除_部分使用此功能来实现_软删除_。Supabase 主要使用两种应用用户角色:

  • anon:匿名用户
  • authenticated:已认证用户

通过 RLS,可以限制只有携带正确 user_id 和会话密钥的认证用户才能访问数据。

# 示例:创建 RLS 策略

CREATE POLICY "仅认证用户可以查看他们在产品表中的对应行记录"
ON public.products
FOR SELECT
TO authenticated
USING  (
	-- auth.uid 是 Supabase 提供的辅助函数
	auth.uid() = user_id
);

需要注意的是,一旦为表启用 RLS,默认情况下会阻止所有非管理员角色(所有应用用户)访问该表,除非编写明确授予权限的策略。

要为 products 表启用 RLS,你可以使用 Supabase 的_表编辑器_,或者执行以下查询:

# 启用 RLS

ALTER TABLE products ENABLE ROW LEVEL SECURITY;

本教程将使用以下策略:

# 产品表 RLS 策略

CREATE POLICY "仅认证用户可以查看他们在 PostgreSQL 中的产品"
ON public.products
FOR SELECT
TO authenticated
USING  (
	auth.uid() = user_id
);

CREATE POLICY "仅认证用户可以向产品表插入数据"
ON public.products
FOR INSERT
TO authenticated
WITH  CHECK  (true);

CREATE POLICY "仅认证用户可以更新他们提供的产品"
ON public.products
FOR UPDATE
TO authenticated
USING  (
	auth.uid()  = user_id
)  WITH  CHECK  (
	auth.uid()  = user_id
);

CREATE POLICY "仅认证用户可以从数据库中删除他们的产品"
ON public.products
FOR DELETE
TO authenticated
USING  (
	auth.uid()  = user_id
);

# 启用相关 PostgreSQL 扩展

本教程将使用以下三个扩展:

  1. PG_NET (opens new window):允许数据库使用 JSON 进行异步 http/https 请求
  2. HTTP (opens new window):允许数据库使用所有数据格式进行同步 http/https 请求
  3. PG_CRON (opens new window):使数据库能够兼任 CRON 服务器

在 Supabase 中,可以通过点击导航菜单中的 Database 图标,然后点击 Extensions 来找到并启用这些扩展。更多指导可参考 Supabase 的文档 (opens new window)

PG_NET 扩展将用于实现 PostgreSQL 与 Typesense 的实时同步。HTTP 和 PG_CRON 扩展将一起用于调度和执行批量同步。

注意:虽然本教程大部分内容使用 PG/plSQL 完成,但 Supabase 也支持 PLV8 (opens new window)PLJAVA (opens new window) 扩展。它们分别允许用户使用 JavaScript 和 Java 编写例程。

# 第二步:配置 Typesense

如果您的 Typesense 实例已在运行并连接到互联网,请跳至 设置 API 密钥 部分。本指南参考了 Typesense 安装文档 (opens new window) 的 Docker 部分,并做了一些修改。

# 使用 Docker 安装

Docker 在 Windows、Linux 和 MacOS 上提供一致的配置流程。首先,如果尚未安装,请安装 Docker Desktop (opens new window)。在 Docker 运行后,执行 Typesense 的 Docker 页面 (opens new window) 上的命令:

# 部署 Docker 镜像与容器

mkdir /tmp/Typesense-data
docker run -p 8108:8108 -v/tmp/data:/data typesense/typesense:0.24.1 --data-dir /data --api-key=Hu52dwsas2AdxdE

可以通过以下 URL 来测试您的实例是否正常运行。如果服务已启动,您将收到返回结果 {"ok":true}

# 在浏览器中测试连接

http://localhost:8108/health

# 设置 API 密钥

Typesense 实例部署时带有一个默认 API 密钥:Hu52dwsas2AdxdE。为了更好的安全性,我们将生成新的密钥。首先执行以下 Shell 命令来替换默认 API 密钥:

注意:建议使用 json_pp、jq 或其他 JSON 格式化工具美化 cURL 响应

# 创建新的管理员 API 密钥

curl 'http://localhost:8108/keys' \
    -X POST \
    -H "X-Typesense-API-KEY: Hu52dwsas2AdxdE" \
    -H 'Content-Type: application/json' \
    -d '{"description":"Admin key.","actions": ["*"], "collections": ["*"]}' 

保存返回的管理员密钥值。这应该是唯一活跃的密钥:

# 列出所有活跃 API 密钥

curl 'http://localhost:8108/keys' \
    -X GET \
    -H "X-Typesense-API-KEY: ${Typesense_API_KEY}" 

现在只剩下一个主密钥,您可以用它来创建产品集合。

# 创建产品集合

curl "http://localhost:8108/collections" \
     -X POST \
     -H "Content-Type: application/json" \
     -H "X-Typesense-API-KEY: ${Typesense_API_KEY}" \
     -d '{
       "name": "products",
       "fields": [
         {"name": "id", "type": "string" },
         {"name": "product_name", "type": "string" },
         {"name": "user_id", "type": "string" },
         {"name": "updated_at", "type": "float" }
       ],
       "default_sorting_field": "updated_at"
     }' 

我们还需要创建两个密钥:一个用于产品集合的"仅搜索"密钥和一个"主"密钥。

# 创建产品搜索专用 API 密钥

curl 'http://localhost:8108/keys' \
    -X POST \
    -H "X-Typesense-API-KEY: ${Typesense_API_KEY}" \
    -H 'Content-Type: application/json' \
    -d '{"description":"Search-only products key.","actions": ["documents:search"], "collections": ["products"]}' 

# 创建产品管理 API 密钥

curl 'http://localhost:8108/keys' \
    -X POST \
    -H "X-Typesense-API-KEY: ${Typesense_API_KEY}" \
    -H 'Content-Type: application/json' \
    -d '{"description":"Admin products key","actions": ["*"], "collections": ["products"]}' 

# 通过隧道连接至互联网

本 Supabase 教程不涉及在专用服务器上部署自托管实例。您可以使用 Ngrok (opens new window)Cloudflare Tunnel (opens new window) 等服务,安全地将本地机器与互联网之间的流量进行转发。Ngrok 使用稍简便且提供慷慨的免费套餐。注册账户后,请按照 Ngrok 安装指南 (opens new window) 操作。完成后,使用终端命令将您的 Typesense 实例转发至互联网。

# 激活 Ngrok 连接

ngrok http 8108

要测试隧道连接,您可以在浏览器中使用 Ngrok 提供的 URL

# 测试连接

<NGROK URL>/health

若收到响应 {"ok":true},则表示连接成功。

# 步骤 3: PostgreSQL 连接性介绍

PG_NET 扩展允许 Supabase 通过 HTTP/HTTPS 请求进行异步通信。

以下示例从 postman-echo API (opens new window) 请求数据。

# 从端点请求数据

SELECT net.http_get('https://postman-echo.com/get?foo1=bar1&foo2=bar2') AS request_id;

当查询完成后,您可以使用返回的 request_id 在 Supabase 的 net._http_response 表中查找响应。也可以通过以下查询查看响应:

# 查看 HTTP/HTTPS 响应消息

SELECT
  *
FROM net._http_response
WHERE id = <request_id>

注意:在步骤 5.2 结束时,我们将讨论如何使用此表重试失败的同步。

net.http_post 函数只是直接向 Typesense 发送数据的一种方式。但它有一个重要限制:仅支持 JSON 作为 Content-Type,而 Typesense 需要 NDJSON 兼容性。幸运的是,在处理单行数据时,JSON 和 NDJSON 在功能上是等效的。因此,当只需要从 PostgreSQL 向 Typesense 发送一行数据时,以下代码可以正常工作。

如果您转到 Supabase 侧边栏的 Authentication 选项卡,可以创建新用户。使用新用户配置文件,前往 Supabase 的 Table Editor 并手动向 products 表添加新行。

# 连接 Typesense 的初步尝试

SELECT net.http_post(
    -- 添加 TYPESENSE URL
    url := '<TYPESENSE URL>/collections/products/documents/import?action=upsert'::TEXT,
    -- 将 products 表的行格式化为 JSONB 并转换 updated_at 从 TIMESTAMPTZ 类型到 FLOAT 类型
    -- 请求体必须格式化为 JSONB 数据
    body := (
        SELECT
            to_jsonb(rows)
        FROM (
            SELECT
                -- 将 TIMESTAMPTZ 类型转换为 float 类型
                EXTRACT(EPOCH FROM updated_at)::float AS updated_at,
                id,
                product_name,
                user_id
            FROM products
            -- 取消下面行的注释以使查询正常工作
            -- LIMIT 1
        ) rows
    )::JSONB,
    headers := json_build_object(
            'Content-Type', 'application/json',
            'X-Typesense-API-KEY', '<API KEY>' -- 添加 API 密钥
    )::JSONB,
    timeout_milliseconds := 4000
) AS request_id;

如果您尝试批量更新多行数据,将会收到以下错误信息:

执行 SQL 查询失败:用作表达式的子查询返回了多行数据

如前所述,PG_NET 函数不适用于多行更新,但它们在实时同步方面非常强大。PostgreSQL 到 Typesense 的直接实时同步通常使用触发器实现,但这可能会阻塞用户事务。而 PG_NET 函数是异步的,确保不会延迟事务。这一特性也使其成为部署边缘函数的首选方案。该扩展非常健壮,实际上 Supabase 的 webhook 功能底层就是基于此扩展实现的。

另一方面,HTTP 扩展支持 NDJSON 格式,适合直接进行批量更新。它是同步的,因此会等待成功失败响应,这使得错误处理更加简单。它与 PG_CRON 定时任务兼容,这些任务运行在单独的线程上,不会干扰主数据库操作,从而将对用户体验的影响降到最低。

要进行批量更新,必须将行数据转换为 NDJSON 格式。这可以通过在 PostgreSQL 中编写 PL/pgSQL 函数来实现。

# 将表格行格式化为 NDJSON

CREATE OR REPLACE FUNCTION get_products_ndjson()
RETURNS TEXT
AS $$
DECLARE
    ndjson TEXT := '';
BEGIN
    SELECT 
        string_agg(row_to_json(row_data)::text, E'\n')
        INTO ndjson
    FROM (
        SELECT
            p.product_name,
            p.id,
            CAST(EXTRACT(epoch FROM p.updated_at) AS FLOAT) AS updated_at,
            p.user_id
        FROM products p
    ) AS row_data;
    RETURN ndjson;
END;
$$ LANGUAGE plpgSQL;

使用上述函数,您可以通过 HTTP 扩展实现批量更新插入操作:

# 与 Typesense 同步的批量方法

SELECT
    status AS response_status,
    content AS response_body,
    (unnest(headers)).*
FROM http((
    -- HTTP 方法
    'POST'::http_method,
    -- URL (替换为您的 TYPESENSE URL)
    '<TYPESENSE URL>/collections/products/documents/import?action=upsert',
    -- 请求头
    ARRAY[
        -- 添加 API 密钥
        http_header('X-Typesense-API-KEY', '<API KEY>')
    ]::http_header[],
    -- 内容类型
    'application/x-ndjson',
    -- 请求体
    (SELECT get_products_ndjson())
)::http_request);

您可以通过以下 cURL 命令检查 Typesense 是否已更新:

# 搜索 Typesense

curl -H "X-TYPESENSE-API-KEY: <API KEY>" \
    "<TYPESENSE URL>/collections/products/documents/search?q=*"

# 步骤 4:理解 Supabase 中的调度机制

通过 PG_CRON 扩展,您可以设置定时任务来协调与 Typesense 的批量同步、定期重试失败的请求,以及清理日志表中的陈旧数据。

# 定时任务调度:每分钟调用边缘函数

SELECT
  cron.schedule(
    'cron-job-name',
    '* * * * *', -- 每分钟执行一次(cron语法)
	$$
    -- SQL查询
    SELECT net.http_get(
        -- Supabase边缘函数的URL
        url:='https://<reference id>.functions.Supabase.co/Typesense-example',
        headers:='{
            "Content-Type": "application/json", 
            "Authorization": "Bearer <TOKEN>"
        }'::JSONB
    ) as request_id;
	$$
  );

# 取消已激活的定时任务

SELECT cron.unschedule('cron-job-name')

PostgreSQL中的定时任务使用cron语法 (opens new window)进行时间设定。每个任务最多每分钟执行一次。遗憾的是,Supabase不原生支持秒级间隔的调度。如果这对您的使用场景造成困扰,建议使用更精确的外部定时任务或消息服务器以获得更好的控制。

在Supabase中,定时任务的详细信息记录在cron模式下的两张表中:

  • jobs
  • job_run_details

这些表对监控和调试非常有帮助。执行日志可以通过Supabase表编辑器查看,也可以直接查询这些表。

# 查询最近10次定时任务执行失败的记录

SELECT
    *
FROM cron.job_run_details
INNER JOIN cron.job ON cron.job.jobid = cron.job_run_details.jobid
WHERE 
    cron.job.jobname = 'cron-job-name' 
        AND 
    cron.job_run_details.status = 'failed'
ORDER BY start_time DESC
LIMIT 10;

定时任务是管理Supabase的重要组成部分,在本指南后续内容中将会频繁涉及。

# 步骤 5.1:在 PostgreSQL 中原生批量同步插入/更新操作

PostgreSQL 中存在多种跟踪未同步行的方法,每种方法都有其优缺点。本指南将探讨实时同步和批量同步的各种策略。最终,您需要确定哪些方法最适合您的数据库设计和使用场景。

创建日志表来跟踪未同步行是我们要演示的第一个策略。这种设置相对简单,并且能最大程度地控制每次同步的行数。

# 创建日志表跟踪未同步行

CREATE TABLE public.products_sync_tracker (
    product_id UUID NOT NULL PRIMARY KEY,
    is_synced BOOLEAN DEFAULT FALSE,
    CONSTRAINT product_id_fkey FOREIGN KEY (product_id) REFERENCES public.products (id) ON DELETE CASCADE
);

通过触发器可以将未同步数据填充到上述表中。

# 创建触发器监控产品插入

CREATE OR REPLACE FUNCTION insert_products_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO products_sync_tracker (product_id)
    VALUES (NEW.id);

    RETURN NEW;
END;
$$ LANGUAGE plpgsql;


CREATE TRIGGER insert_products_trigger
    AFTER INSERT ON public.products
    FOR EACH ROW
    EXECUTE FUNCTION insert_products_trigger_func();

# 创建触发器监控产品更新

CREATE OR REPLACE FUNCTION update_products_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
    -- 更新 products_sync_tracker 表
    UPDATE products_sync_tracker
    SET is_synced = FALSE
    WHERE product_id = NEW.id;

    RETURN NEW;
END;
$$ LANGUAGE plpgsql;


CREATE TRIGGER update_products_trigger
    BEFORE UPDATE ON public.products
    FOR EACH ROW
    EXECUTE FUNCTION update_products_trigger_func();

# 使用 Cron Jobs 调度批量更新/插入

以下 PL/pgSQL 函数将未同步的行转换为 NDJSON 格式,然后执行 upsert 操作到 Typesense。如果 upsert 失败,跟踪表 products_sync_tracker 将被回滚以反映此失败。通过将该函数集成到 cron job 中,可以直接在 PostgreSQL 中实现定时同步,无需使用外部服务器。虽然这很强大,但会占用数据库资源。这些函数应设置为快速完成,因此保持较小的负载大小很重要。默认情况下,HTTP 扩展发出的请求会在 5 秒后超时,但可以通过修改全局变量 http.timeout_msec 来调整。

该函数可分为 6 个步骤:

  1. 确保没有其他同步进程正在运行。如果存在则退出。
  2. 通过查询 products 表获取未同步的行。
  3. 将检索到的行转换为 NDJSON(Newline Delimited JSON)格式。
  4. 将格式化后的行与 Typesense 同步。
  5. 检查 Typesense 的响应以确定是否有行同步失败。
  6. 更新 products 表以标记每行的同步状态,记录失败的尝试。

# 批量同步行的函数

CREATE OR REPLACE FUNCTION sync_products_updates()
RETURNS VOID
LANGUAGE plpgsql
AS $$
    DECLARE
        -- 锁键:用作函数'锁'的任意数字
        -- 同一时间只能有一个函数实例持有该键并运行
        -- 当行正在被更新时,如果同时发送多个请求,
        -- 无法确定哪个会先被处理。
        -- 这可能导致 Typesense 接收到过时数据。
        -- 锁定函数可以避免这种负面情况。
        lock_key INTEGER := 123456;
        is_locked BOOLEAN := FALSE;

        -- 用于告知 Typesense 需要同步的行数
        total_rows INTEGER;

        -- 用于将行转换为 NDJSON 的变量
        ndjson TEXT := '';

        -- 用于引用 HTTP 响应值的变量
        request_status INTEGER;
        response_message TEXT;
    BEGIN
        -- 创建锁,确保同一时间只有一个函数实例运行
        SELECT pg_try_advisory_xact_lock(lock_key) INTO is_locked;
        IF NOT is_locked THEN
            RAISE EXCEPTION '无法锁定。其他任务正在运行';
        END IF;

        -- 预先将40个未同步产品标记为已同步
        -- 创建临时表存储待更新的行
        -- 这些行将与 Typesense 同步
        CREATE TEMPORARY TABLE updated_rows (
            product_id UUID
        ) ON COMMIT DROP; 

        WITH soon_to_be_synced_rows AS (
            UPDATE products_sync_tracker
            SET is_synced = TRUE
            WHERE product_id IN (
                SELECT 
                    product_id
                FROM products_sync_tracker
                WHERE is_synced = FALSE
                LIMIT 40
            )
            RETURNING product_id
        )
        INSERT INTO updated_rows
        SELECT * FROM soon_to_be_synced_rows;


        SELECT 
            COUNT(product_id) 
            INTO total_rows
        FROM updated_rows;

        IF total_rows < 1 THEN 
            RAISE EXCEPTION '没有需要同步的行';
        END IF;

        -- 将待同步行转换为 Typesense 可解析的格式
        WITH row_data AS (
            SELECT
                p.product_name,
                p.id,
                CAST(EXTRACT(epoch FROM p.updated_at) AS FLOAT) AS updated_at,
                p.user_id
            FROM products p
            JOIN updated_rows u ON p.id = u.product_id
        )
        SELECT 
            string_agg(row_to_json(row_data)::text, E'\n')
        INTO ndjson
        FROM row_data;

        SELECT
            status,
            content
            INTO request_status, response_message
        FROM http((
            'POST'::http_method,
            -- 添加 TYPESENSE URL
            '<TYPESENSE URL>/collections/products/documents/import?action=upsert',
            ARRAY[
                -- 添加 API 密钥
                http_header('X-Typesense-API-KEY', '<API KEY>')
            ]::http_header[],
            'application/x-ndjson',
            ndjson -- 负载数据
        )::http_request);

        -- 检查请求是否失败
        IF request_status <> 200 THEN
            -- 将错误信息存入 Supabase Postgres 日志
            RAISE LOG 'Typesense 同步请求失败。请求状态: %。消息: %', request_status, response_message;
            -- 抛出异常,回滚事务
            RAISE EXCEPTION '更新插入失败';
        ELSE
            -- 成功响应会包含每行结果的 NDJSON
            /* 可能的结果:
                {"success": true}
                {"success": false, "error": "Bad JSON.", "document": "[bad doc]"}
            */ 
            -- 需要处理这些结果以确定哪些行同步成功,哪些失败
            WITH ndjson_from_response AS (
                SELECT unnest(string_to_array(response_message, E'\n')) AS ndjson_line
            ),
            ndjson_to_json_data AS (
                SELECT 
                    ndjson_line::JSON AS json_line
                FROM ndjson_from_response
            ),
            failed_syncs AS (
                SELECT 
                    json_line
                FROM ndjson_to_json_data
                WHERE (json_line->>'success')::BOOLEAN = FALSE
            ),
            unsynced_ids AS (
                SELECT 
                    ((json_line->>'document')::JSON->>'id')::UUID AS ids
                FROM failed_syncs
            )
            UPDATE public.products_sync_tracker
            SET is_synced = FALSE
            WHERE product_id IN (SELECT ids FROM unsynced_ids);
        END IF;
    END;
$$;

# 定时任务批量更新/插入 Typesense 数据

SELECT cron.schedule(
    'update-insert-Typesense-job',
    '* * * * *',
    $$
    -- SQL 查询语句
        SELECT sync_products_updates();
    $$
);

要测试 Typesense 是否同步成功,可以通过 Supabase 的表格编辑器手动向 products 表添加一行数据。然后在同一表格编辑器中查看 cron 模式下的执行记录,观察定时任务的执行时间。

# 搜索 Typesense 中的同步数据

curl -X GET "<TYPESENSE URL>/collections/products/documents/search?q=*" \
    -H "X-TYPESENSE-API-KEY: <API KEY>"

# 步骤 5.2:使用边缘函数批量同步插入/更新

部分用户可能更倾向于使用服务器作为中间层与 Typesense 通信。这种方式能减轻数据库压力,并能处理相对高容量的同步操作。当需要对数据进行清洗或重新格式化时,这种方法尤为有用。幸运的是,Supabase 提供了基于 Deno (TypeScript) 的无服务器边缘函数。

如果边缘函数执行中途失败,必须有机制能够捕获并解决故障。在前面的示例中,我们通过基于行更新的 products_sync_tracker 表来跟踪行同步状态。但对于边缘函数,我们可以引入另一种更具优势的结构。利用 products 表的 updated_at 列,可以通过时间戳来跟踪未同步的行,并在边缘函数失败时重新发送。为实现这一点,需要移除之前 products 表上的触发器,替换为监控 updated_at 列的新触发器。

# 移除原有触发器

DROP TRIGGER IF EXISTS update_products_trigger 
ON products;

DROP FUNCTION IF EXISTS update_products_trigger_func;

DROP TRIGGER IF EXISTS insert_products_trigger
ON products;

DROP FUNCTION IF EXISTS insert_products_trigger_func;

# 为 updated_at 列添加触发器

CREATE OR REPLACE FUNCTION update_products_time_func()
RETURNS TRIGGER AS $$
BEGIN
    -- 更新 products.updated_at 列
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;


CREATE TRIGGER update_at_products_trigger
    BEFORE UPDATE ON public.products
    FOR EACH ROW
    EXECUTE FUNCTION update_products_time_func();

同时需要追踪边缘函数调用,以便监控失败情况。创建一个 edge_function_tracker 表。

# 创建边缘部署监控表

CREATE TABLE edge_function_tracker(
    id UUID NOT NULL DEFAULT uuid_generate_v4 (),
    start_time TIMESTAMPTZ DEFAULT NOW (),
    start_time_of_prev_func TIMESTAMPTZ,
    attempts INTEGER DEFAULT 1,
    func_status TEXT NOT NULL DEFAULT 'PENDING' CHECK (func_status IN ('SUCCEEDED', 'FAILED', 'PENDING')),
    request_id BIGINT NULL,
    CONSTRAINT edge_function_tracker_pkey PRIMARY KEY (id)
);

部署边缘函数的 PG_NET 函数需要封装在 PG/plSQL 函数中,以便在部署前记录函数信息。该过程可分为四个步骤:

  1. edge_function_tracker 表中创建新条目
  2. 收集识别未同步行所需的必要信息
  3. 发送包含必要详情的边缘函数请求
  4. 记录与边缘部署相关的元数据

# 部署/追踪边缘部署的包装函数

CREATE OR REPLACE FUNCTION edge_deployment_wrapper()
RETURNS VOID
AS $$
DECLARE
    func_request_id BIGINT;
    payload JSONB;
    prev_start_time TIMESTAMPTZ;
BEGIN
    -- 在 edge_function_tracker 表中为新部署的边缘函数创建新条目
    WITH func_default_vals AS (
        INSERT INTO edge_function_tracker 
        DEFAULT VALUES
        RETURNING id, start_time
    )
    SELECT 
        row_to_json(func_default_vals.*)::JSONB
        INTO payload
    FROM func_default_vals;

    -- 上次部署后更新的产品处于未同步状态
    -- 获取前一个函数时间戳以帮助查询未同步行进行后续处理
    SELECT 
        start_time
        INTO prev_start_time
    FROM edge_function_tracker
    WHERE start_time < (payload->>'start_time')::TIMESTAMPTZ
    ORDER BY id DESC
    LIMIT 1;

    -- 如果没有先前的开始时间,将 prev_start_time 设置为 0
    IF NOT FOUND THEN
        prev_start_time := '1970-01-01 00:00:00+00'::TIMESTAMPTZ;
    END IF;

    -- 重新格式化 payload 以包含 prev_start_time
    payload := payload || jsonb_build_object('start_time_of_prev_func', prev_start_time);
    -- 发送请求到边缘函数
    SELECT net.http_post(
        url := '<FUNCTION URL>',
        body := payload,
        headers := '{
            "Content-Type": "application/json", 
            "Authorization": "Bearer <SUPABASE ANON KEY>"
        }'::JSONB,
        timeout_milliseconds := 4000
    ) INTO func_request_id;

    -- 记录 request_id 和 start_time_of_prev_func
    -- 这将用于后续重新部署失败的请求
    UPDATE edge_function_tracker
    SET 
        request_id = func_request_id,
        start_time_of_prev_func = (payload->>'start_time_of_prev_func')::TIMESTAMPTZ
    WHERE id = (payload->>'id')::UUID;
END;
$$ LANGUAGE plpgsql;

这个跟踪用的 PG/plSQL 函数可以通过 cron 作业调用来启动批量同步过程。

# 执行边缘函数的定时任务(Cron Job)

SELECT
  cron.schedule(
    'update-insert-Typesense-job',
    '* * * * *', -- 每分钟执行一次(cron语法)
	$$
    -- SQL查询
        SELECT edge_deployment_wrapper();
	$$
);

通常,创建能够在单个请求中执行多次交互的PG/plSQL函数比让边缘函数多次查询数据库获取少量数据更为高效。以下函数将被边缘函数调用,以简化和加速成功请求的同步拒绝处理。

CREATE OR REPLACE FUNCTION report_failed_syncs (func_id UUID, failed_rows_id UUID[])
RETURNS VOID
AS $$
BEGIN
    -- 更新"edge_function_tracker"表以反映
    -- 函数调用的状态

    UPDATE edge_function_tracker
    SET func_status = 'SUCCEEDED'
    WHERE id = func_id;

    -- 这行代码禁用事务的updated_at触发器

    SET LOCAL session_replication_role = 'replica';

    -- 将失败的product同步更新为NOW(),这样它们会被包含在
    -- 下一次同步函数中

    UPDATE products
    SET updated_at = NOW() 
    WHERE id = ANY(failed_rows_id);
END;
$$ LANGUAGE plpgsql;

要部署边缘函数,您需要安装Node包管理器,如NPM、Yarn或PNPM。可以通过官方下载页面 (opens new window)下载Node.JS来安装NPM。

创建第一个函数时,请新建一个以函数名命名的文件夹,并在其中添加index.ts文件。以下代码是Supabase PostgreSQL演示 (opens new window)的修改版本,使用了前面提到的_get_products_updates_from_edge_ PL/pgSQL函数。

注意:所有函数及其日志都可以在Supabase仪表板的Edge Functions部分找到。

使用的边缘函数可以分解为7个步骤:

  1. 解析请求体以识别需要同步的特定行
  2. 通过查询products表检索未同步的行
  3. 将检索到的行转换为NDJSON(Newline Delimited JSON)格式
  4. 将格式化后的行与Typesense同步
  5. 检查Typesense的响应以确定是否有行同步失败
  6. 更新products表以指示每行的同步状态,标记任何不成功的尝试
  7. 在edge_function_tracker表中记录边缘函数的整体成功或失败情况,用于监控和分析

# 更新 Typesense 的边缘函数

import * as postgres from 'https://deno.land/x/postgres@v0.14.2/mod.ts';
import { serve } from 'https://deno.land/std@0.177.0/http/server.ts';

// 在顶部定义你的类型
type TProductRows = {
	id: string;
	product_name: string;
	updated_at: string;
	user_id: string;
}[];

type TRequestJSON = {
	id: string;
	start_time: string;
	start_time_of_prev_func: string;
};

// 数据库连接URL默认在所有边缘函数中可用
// 如果遇到问题,可以通过仪表板中的项目设置 > 数据库选项卡获取

const databaseUrl = Deno.env.get('SUPABASE_DB_URL')!;

// 创建一个包含三个延迟建立的连接池
const pool = new postgres.Pool(databaseUrl, 3, true);

serve(async (req) => {
	try {
		// 从连接池获取连接
		const connection = await pool.connect();
		// 1. 解析请求体:
		const reqJSON = (await req.json()) as TRequestJSON;
		try {
			// 2. 从数据库检索未同步的产品
			const unsyncedRows = (
				await connection.queryObject({
					args: [reqJSON.start_time_of_prev_func, reqJSON.start_time],
					text: `SELECT 
							products.id,
							products.product_name,
							CAST(EXTRACT(epoch FROM products.updated_at) AS FLOAT) AS updated_at,
							products.user_id
						FROM products
						-- 只同步自上次同步后更新的产品
						WHERE updated_at BETWEEN $1::TIMESTAMPTZ AND $2::TIMESTAMPTZ;
						-- 如果使用软删除,需要过滤掉已软删除的行
					`,
				})
			).rows as TProductRows;
			// 如果没有未同步的行,函数可以提前返回
			if (!unsyncedRows.length) {
				const res = await connection.queryObject({
					args: [reqJSON.id],
					text: `
						UPDATE edge_function_tracker
						SET func_status = 'SUCCEEDED'
						WHERE id = $1::UUID;
					`,
				});
				return new Response('没有需要同步的行', {
					status: 200,
					headers: {
						'Content-Type': 'application/json; charset=utf-8',
					},
				});
			}
			// 3. 将行转换为NDJSON格式
			const unsyncedProductsNDJSON: string = unsyncedRows
				.map((product) =>
					JSON.stringify({
						...product,
						updated_at: parseFloat(product.updated_at),
					})
				)
				.join('\n');
			// 4. 将新产品与Typesense同步
			const response = await fetch(
				// 添加你的TYPESENSE URL
				'<TYPESENSE URL>/collections/products/documents/import?action=upsert',
				{
					method: 'POST',
					headers: {
						'Content-Type': 'application/x-ndjson',
						//添加你的TYPESENSE API密钥
						'X-TYPESENSE-API-KEY': '<API KEY>',
					},
					body: unsyncedProductsNDJSON,
				}
			);

			// 响应将包含结果的NDJSON。如果某些更新失败,
			// 我们需要在products_sync_tracker表中反映这个失败
			/* 可能的响应结果
				{"success": true}
				{"success": false, "error": "Bad JSON.", "document": "[bad doc]"}
			*/
			const ndJsonResponse = await response.text();

			// 将响应转换为对象数组
			let JSONStringArr = ndJsonResponse.split('\n');
			const parsedJSON = JSONStringArr.map((res) => JSON.parse(res));

			// 5. 过滤出同步失败的ID
			const failedSyncIds = parsedJSON
				.filter((doc) => !doc.success)
				.map((doc) => JSON.parse(doc.document).id) as string[];

			// 如果没有失败的同步,函数可以提前返回
			if (!failedSyncIds.length) {
				const res = await connection.queryObject({
					args: [reqJSON.id],
					text: `
						UPDATE edge_function_tracker
						SET func_status = 'SUCCEEDED'
						WHERE id = $1::UUID;
					`,
				});
				return new Response('没有需要同步的行', {
					status: 200,
					headers: {
						'Content-Type': 'application/json; charset=utf-8',
					},
				});
			}

			// 6/7. 更新products表中的"updated_at"列为当前时间
			// 这样失败的行将在下次边缘函数运行时重新同步
			// 声明函数调用成功
			const result = await connection.queryObject({
				args: [reqJSON.id, failedSyncIds],
				text: `
					SELECT report_failed_syncs ($1::UUID,  $2::UUID[])
				`,
			});

			// 返回带有正确内容类型头的响应
			return new Response(
				`已同步${reqJSON.start_time_of_prev_func}${reqJSON.start_time}之间的行`,
				{
					status: 200,
					headers: {
						'Content-Type': 'application/x-ndjson; charset=utf-8',
					},
				}
			);
		} finally {
			// 将连接释放回连接池
			connection.release();
		}
	} catch (err) {
		console.error(err);
		return new Response(String(err?.message ?? err), {
			status: 500,
		});
	}
});

设置好函数后,在终端中导航到函数的父目录并执行以下命令

# 登录 Supabase 命令行工具

npx supabase login

系统会提示您输入密码,并引导您访问生成访问令牌的链接。登录成功后,您就可以部署函数了。

# 将边缘函数部署到 Supabase

npx supabase functions deploy <您的函数目录名称>

您将收到一个链接,可以查看新函数的相关信息。调用该函数时,还需要使用项目的 ANON KEY(匿名密钥),该密钥可在项目设置的 API 选项卡 中找到。您可以通过 cron 任务来调度函数以同步 Typesense。

需要注意的是,并非所有同步都能成功。建立重试机制非常重要。此外,edge_function_tracker 表可能会变得臃肿并需要清理。下面的 PG/plSQL 函数通过以下步骤解决了这些问题:

  1. edge_function_tracker 表中删除成功的同步记录
  2. 在满足特定条件时,将 'PENDING'(待处理)状态的函数更新为 'FAILED'(失败)状态
  3. 重新尝试失败的同步请求

# 用于管理边缘重新部署和维护的 PL/pgSQL 函数

CREATE OR REPLACE FUNCTION edge_function_maintainer()
RETURNS VOID
AS $$
DECLARE
    func_request_id BIGINT;
    payload JSONB;
BEGIN
    -- 任何位于两个'SUCCEEDED'状态行之间的成功同步记录不再需要维护
    -- 为防止表无限增长,将删除这些行
    WITH organized_edge_function_tracker AS (
        SELECT id,
            func_status,
            LAG(func_status) OVER (ORDER BY start_time) AS prev_func_status,
            LEAD(func_status) OVER (ORDER BY start_time) AS next_func_status
        FROM edge_function_tracker
    ),
    success_conditions AS (
        SELECT
            id
        FROM organized_edge_function_tracker
        WHERE 
            func_status = 'SUCCEEDED' 
                AND 
            prev_func_status = 'SUCCEEDED' 
                AND 
            next_func_status = 'SUCCEEDED'
    )
    DELETE FROM edge_function_tracker
    WHERE id IN (SELECT id FROM success_conditions);

    -- 将edge_function_tracker表中所有非200的请求更新为'FAILED'状态
    -- 注意:存储所有PG_NET请求状态的_http_response表是"unlogged"的
    -- 这意味着在崩溃情况下会丢失所有数据
    -- 作为安全措施,所有超过2分钟未响应的函数将被视为失败,即使找不到对应的请求记录
    WITH failed_rows AS (
        SELECT 
            edge_function_tracker.id
        FROM edge_function_tracker
        INNER JOIN net._http_response ON net._http_response.id = edge_function_tracker.request_id
        WHERE net._http_response.status_code NOT BETWEEN 200 AND 202
    )
    UPDATE edge_function_tracker
    SET func_status = 'FAILED'
    FROM failed_rows
    WHERE 
        failed_rows.id = edge_function_tracker.id
            OR
        (func_status = 'PENDING' AND (NOW() - start_time > '2 minutes'::interval))
    ;


    -- 获取失败请求的数据,以便放入payload中用于重试
    WITH selected_row AS (
        SELECT 
            id,
            start_time,
            start_time_of_prev_func
        FROM edge_function_tracker
        WHERE func_status = 'FAILED' AND attempts < 3 -- 如果尝试次数超过3次,则认为该部分数据无法同步并停止尝试
        ORDER BY start_time
        LIMIT 1
    )
    SELECT to_jsonb(selected_row.*) INTO payload
    FROM selected_row;


    -- 检查是否存在可重试的请求
    IF (payload->>'id')::UUID IS NOT NULL THEN
        -- 向边缘函数发送重试请求
        SELECT net.http_post(
            url := '<EDGE FUNCTION URL>',
            body := payload,
            headers := '{
                "Content-Type": "application/json", 
                "Authorization": "Bearer <SUPABASE ANON KEY>"
            }'::JSONB,
            timeout_milliseconds := 4000
        ) INTO func_request_id;

        -- 记录新的request_id
        UPDATE edge_function_tracker
        SET 
            request_id = func_request_id,
            attempts = attempts + 1
        WHERE id = (payload->>'id')::UUID;
    END IF;
END;
$$ LANGUAGE plpgSQL;

该维护函数可以通过cron job定时执行

# 每分钟重试失败函数

SELECT
  cron.schedule(
    'edge_function_maintainer',
    '* * * * *', -- 每分钟执行一次(cron语法)
	$$
        SELECT edge_function_maintainer();
	$$
  );

# 步骤6:使用触发器实现实时更新/插入

在某些情况下,实时同步可能很重要。这只能通过触发器来实现。下面的示例直接在Supabase和Typesense之间同步,但你也可以使用触发器调用执行相同操作的Edge函数。

# 使用触发器同步数据

-- 创建一个函数,用于products表的更新/插入触发器
CREATE OR REPLACE FUNCTION public.sync_products()
RETURNS TRIGGER AS $$
BEGIN
    -- 向Typesense服务器发起https请求
	PERFORM net.http_post(
        -- 添加TYPESENSE URL
        url := '<TYPESENSE URL>/collections/products/documents/import?action=upsert'::TEXT,
        -- NEW关键字代表新行数据
        body := (
            SELECT
                to_jsonb(row.*)
            FROM (
                SELECT
                    -- 将TIMESTAMPTZ类型转换为float类型
                    EXTRACT(EPOCH FROM NEW.updated_at)::float AS updated_at,
                    NEW.id,
                    NEW.product_name,
                    NEW.user_id
            ) AS row
        )::JSONB,
        headers := json_build_object(
            'Content-Type', 'application/json',
            -- 添加API密钥
            'X-Typesense-API-KEY', '<API KEY>'
        )::JSONB
    );
	RETURN NEW;
END;
$$ LANGUAGE plpgSQL;

-- 在products表发生任何插入或更新后运行的触发器
CREATE TRIGGER sync_updates_and_inserts_in_Typesense
AFTER INSERT OR UPDATE ON products
FOR EACH ROW
EXECUTE FUNCTION sync_products();

警告:必须再次强调这些请求是异步的,这可以避免阻塞用户事务。请求发送后,后台工作进程会监听响应并将其添加到net._http_response表中。可以通过cron作业或触发器监控net._http_response表中的更新/插入,如步骤5.2末尾所述,重试失败的同步。但需要注意的是,使用触发器立即重试可能很危险,特别是当数据与Typesense不兼容时。如果没有明确的退出条件,可能会进入无限循环。

# 步骤7:批量同步删除操作

# 使用 Cron Jobs 调度批量删除

在 Supabase 和 Typesense 之间同步批量删除操作可以通过以下两种方式实现:

  1. 将被删除行的 ID 临时保存在中间表中,直到可以从 Typesense 中移除
  2. 对表中的行进行软删除,直到它们从 Typesense 中移除

注意:由于删除操作不需要 NDJSON 格式,HTTPPG_NET 扩展都可以用于直接从 PostgreSQL 到 Typesense 的删除操作,或者用于触发边缘函数。

本节主要使用 HTTP 扩展。如果计划使用 PG_NET,则需要延迟实际删除行的操作,直到 net._http_response 表中该请求的状态为 200。步骤 5.2 提供了关于如何处理异步恢复的示例。

# 方法一:使用中间表

要实现第一种方法,需要创建一个表来存储被删除行的 ID,直到可以与 Typesense 同步。

# deleted_rows 表

CREATE TABLE deleted_rows(
    table_name TEXT, --假设表名和 Typesense 集合名相同
    deleted_row_id UUID,
    CONSTRAINT deleted_rows_pkey PRIMARY KEY (deleted_row_id)
);

每当主表中的行被删除时,触发器应记录其 ID 的副本。

# 保存 ID 列的触发器

-- 创建函数,在删除时将 id 复制到 deleted_rows 表
CREATE OR REPLACE FUNCTION copy_deleted_product_id()
    RETURNS TRIGGER
    LANGUAGE plpgSQL
AS $$
BEGIN
    INSERT INTO deleted_rows (table_name, deleted_row_id)
    VALUES ('products', OLD.id);
    RETURN OLD;
END $$;

-- 创建触发器,当从 products 表删除记录时调用该函数
CREATE TRIGGER copy_id_on_delete
    AFTER DELETE ON public.products
    FOR EACH ROW
    EXECUTE FUNCTION copy_deleted_product_id();

在 Typesense 中,可以通过包含删除条件的 DELETE 请求执行批量删除操作,这些条件作为查询参数包含在 URL 中。

# 示例:删除请求

curl -g -H "X-TYPESENSE-API-KEY: ${TYPESENSE_API_KEY}" -X DELETE \
"http://localhost:8108/collections/companies/documents?filter_by=id:[id1, id2, id3]"

注意:可能需要使用 URL 编码字符替代方括号 "[ ]",分别是 %5B 和 %5D

可以编写一个 PL/pgSQL 函数,在从 deleted_rows 表中移除副本之前,将这些删除操作与 Typesense 同步。

# 批量同步已删除行

-- 创建用于从Typesense删除记录的函数
CREATE OR REPLACE FUNCTION bulk_delete_products()
    RETURNS VOID
    LANGUAGE plpgSQL
AS $$
    DECLARE
        id_arr UUID[];
        query_params TEXT;
        request_status INTEGER;
        response_message TEXT;
    BEGIN
        -- 从deleted_rows表中选择ID
        SELECT
            -- 将值存入数组以便后续删除
            array_agg(deleted_row_id),
            -- 格式化行数据用于filter_by参数
            string_agg(deleted_row_id::text, ',')
            INTO id_arr, query_params
        FROM deleted_rows
        LIMIT 40;

        -- 参数必须用方括号"[ ]"格式化,方括号需要URL编码为%5B和%5D
        query_params:= '%5B' || query_params || '%5D';

        SELECT
            status,
            content
            INTO request_status, response_message
        FROM http((
            'DELETE'::http_method,
            -- 添加TYPESENSE URL
            '<TYPESENSE URL>/collections/products/documents?filter_by=id:' || query_params,
            ARRAY[
                -- 添加API密钥
                http_header('X-Typesense-API-KEY', '<API KEY>')
            ]::http_header[],
            'application/json',
            NULL
        )::http_request);

        -- 检查请求是否失败
        IF request_status <> 200 THEN
            -- 将错误信息存入Supabase Postgres日志
            RAISE LOG 'HTTP DELETE请求失败。消息: %', response_message;
            -- 抛出异常,回滚事务
            RAISE EXCEPTION '删除失败';
        ELSE
            DELETE FROM deleted_rows WHERE deleted_row_id = ANY(id_arr);
        END IF;
    END;
$$;

该函数可以通过Cron Job每分钟调用一次。

# 每分钟同步删除的行记录

SELECT
  cron.schedule(
    'bulk-delete-from-typesese',
    '* * * * *', -- 每分钟执行一次(cron语法)
	$$
    -- SQL查询语句
        SELECT bulk_delete_products();
	$$
  );

# 方法二:软删除方案

在该方案中,必须限制用户直接从 products 表中删除行记录。为确保这一点,关键是要修改行级安全策略(RLS),撤销用户的删除权限。若不进行此修改,Supabase 和 Typesense 之间必然会出现数据不一致的情况。

# 修改RLS策略以禁止用户删除行记录

ALTER POLICY "only an authenticated user is allowed to remove their products " 
ON public.products 
TO authenticated
USING(
  FALSE
);

用户不应直接删除行记录,而是需要通过修改行的方式来标记其为不可用状态。在本例中,将 user_id 列设为 NULL 将使该行对所有应用用户不可见。

可以使用 PG_NETHTTP 扩展创建一个 PG/plSQL 函数,用于将 user_id 列为空的记录同步到 Typesense 后再删除:

# 在删除前同步已置为 Null 的行

-- 创建从 Typesense 删除记录的函数
CREATE OR REPLACE FUNCTION bulk_delete_products()
RETURNS VOID
LANGUAGE plpgsql
AS $$
    DECLARE
        -- 存储并格式化已删除行的 ID
        deleted_id_arr UUID[];
        query_params TEXT;

        -- 监控 Typesense 的响应
        request_status INTEGER;
        response_message TEXT;
    BEGIN
        -- 从 deleted_rows 中选择并格式化 ID
        SELECT
            string_agg(id::text, ','),
            array_agg(id)
            INTO query_params, deleted_id_arr
        FROM products
        WHERE user_id IS NULL
        LIMIT 40;

        -- 将 ID 列表放入 URL 编码的方括号 [ ] 中,即 %5B 和 %5D
        query_params :=  '%5B' || query_params || '%5D';

        -- 向 Typesense 服务器发送删除请求
        SELECT
            status,
            content
            INTO request_status, response_message
        FROM http((
            'DELETE'::http_method,
            -- 添加 TYPESENSE URL
            '<TYPESENSE URL>/collections/products/documents?filter_by=id:' || query_params, 
            ARRAY[
                -- 添加 API 密钥
                http_header('X-Typesense-API-KEY', '<API KEY>')
            ]::http_header[],
            'application/json',
            NULL
        )::http_request);

        -- 检查请求是否失败
        IF request_status <> 200 THEN
            -- 将错误信息存储到 Supabase Postgres 日志中
            RAISE LOG 'HTTP DELETE 请求失败。消息: %', response_message;
            -- 抛出异常,回滚事务
            RAISE EXCEPTION '删除失败';
        ELSE
            DELETE FROM products WHERE id = ANY(deleted_id_arr);
        END IF;
    END;
$$;

可以通过 cron 任务每分钟调用该函数。

# 批量删除任务调度

SELECT
  cron.schedule(
    'delete-from-typesese',
    '* * * * *', -- 每分钟执行一次(cron语法)
	$$
    -- SQL查询语句
        SELECT bulk_delete_products();
	$$
  );

# 步骤8:实时同步删除操作

# 同步单个删除

在Typesense中,可以通过以文档ID作为路径参数发起请求来删除单个文档。

# cURL请求:删除指定ID的文档

curl -H "X-Typesense-API-KEY: ${Typesense_API_KEY}" -X DELETE \
    "http://localhost:8108/collections/products/documents/<id>"

根据数据库的组织方式,您可能会遇到删除某行数据导致另一张表级联删除的情况。这可以通过查询共享字段(如外键)来处理。

# cURL请求:通过共享字段删除多个文档

curl -H "X-Typesense-API-KEY: ${Typesense_API_KEY}" -X DELETE \
    "http://localhost:8108/collections/products/documents?filter_by=<shared_field>:=<value>"

实时同步删除操作可以通过触发器来实现管理。

# 使用触发器实现单行删除

-- 创建从Typesense删除记录的函数
CREATE OR REPLACE FUNCTION delete_products()
    RETURNS TRIGGER
    LANGUAGE plpgSQL
AS $$
BEGIN
    SELECT net.http_delete(
        url := format('<TYPESENSE URL>/collections/products/documents/%s', OLD.id),
        headers := '{"X-Typesense-API-KEY": "<Typesense_API_KEY>"}'
    )
    RETURN OLD;
END $$;

-- 创建触发器,当从products表删除记录时调用该函数
CREATE TRIGGER delete_products_trigger
    AFTER DELETE ON public.products
    FOR EACH ROW
    EXECUTE FUNCTION delete_products();

需要注意的是,如果同步过程因任何原因失败,将没有剩余数据可供后续尝试参考。为了解决这个问题,您可以实现行的软删除,或者创建一个临时表来存储已删除的查询值,直到确认同步成功完成。后一种方案将在下文针对"按共享字段删除"的部分详细说明。

# 存储待同步确认的删除值表

CREATE TABLE deleted_query_values (
    filter_by_field TEXT NOT NULL,
    shared_value TEXT NOT NULL, 
    request_id BIGINT,
    created_at TIMESTAMPTZ NULL DEFAULT now()
)

可以修改触发器函数来保留被删除的值。

-- 创建从Typesense删除记录的函数
CREATE OR REPLACE FUNCTION delete_products_trigger()
    RETURNS TRIGGER
    LANGUAGE plpgSQL
AS $$
DECLARE
    func_request_id BIGINT;
BEGIN
    SELECT net.http_delete(
        -- 添加TYPESENSE URL
        url := '<TYPESENSE URL>/collections/products/documents?filter_by=<FILTER_BY_FIELD>:=%s' || OLD.<SHARED_VALUE>::TEXT,
        -- 添加API密钥
        headers := '{"X-Typesense-API-KEY": "<API KEY>"}'
    ) INTO func_request_id;

    -- 填充表数据
    INSERT INTO deleted_query_values (filter_by_field, shared_value, request_id)
    VALUES ('<FILTER_BY_FIELD>', OLD.<SHARED_VALUE>::TEXT, func_request_id);

    RETURN OLD;
END $$;

可以通过定时任务定期检查 deleted_query_valuesnet._http_response 表,重试失败的删除操作。

CREATE OR REPLACE FUNCTION retry_bulk_deletes()
RETURNS VOID
LANGUAGE plpgSQL
AS $$
DECLARE
    value TEXT;
    field TEXT;
    func_request_id BIGINT;
BEGIN
    -- 从deleted_query_values表中清除已同步的行
    DELETE FROM deleted_query_values 
    USING net._http_response
    WHERE 
        net._http_response.status_code = 200
            AND
        net._http_response.id = deleted_query_values.request_id
        

    -- 获取最早失败的删除查询
    SELECT 
        shared_value,
        filter_by_field
        INTO value, field
    FROM deleted_query_values
    INNER JOIN net._http_response ON net._http_response.id = deleted_query_values.request_id
    WHERE net._http_response.status_code NOT BETWEEN 200 AND 202
    ORDER BY created_at
    LIMIT 1;

    -- 重新尝试删除同步
    SELECT net.http_delete(
        -- 添加TYPESENSE URL
        url := FORMAT('<TYPESENSE URL>/collections/products/documents?filter_by=%s:=%s', field, value),
        -- 添加API密钥
        headers := '{"X-Typesense-API-KEY": "<API KEY>"}'
    ) INTO func_request_id;

    -- 更新deleted_query_values表中的新值
    UPDATE deleted_query_values
    SET 
        request_id = func_request_id,
        created_at = NOW()
    WHERE query_param = id;
END $$;

# 总结

在本教程中,我们探讨了将 Supabase 与 Typesense 数据同步的不同方法,确保搜索引擎能及时反映数据库的最新变更。我们涵盖了通过定时轮询和实时策略来同步插入、更新和删除操作,使用了触发器、函数和定时任务等技术。

通过实施这些技术,您可以为应用程序构建一个健壮、高效且响应迅速的搜索系统,为用户提供流畅而精准的搜索体验。

如果您对本指南有任何改进建议,请点击下方的"编辑页面"链接向我们提交拉取请求。