Skip to content

Commit 1a06dc0

Browse files
committed
fix: cleaning up of prefixes under heavy concurrency
1 parent a4cc5d5 commit 1a06dc0

File tree

2 files changed

+889
-0
lines changed

2 files changed

+889
-0
lines changed
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
-- Drop old prefix-related triggers that conflict with new GC system
2+
DROP TRIGGER IF EXISTS prefixes_delete_hierarchy ON storage.prefixes;
3+
DROP TRIGGER IF EXISTS objects_delete_delete_prefix ON storage.objects;
4+
DROP TRIGGER IF EXISTS objects_update_create_prefix ON storage.objects;
5+
6+
-- Helper: Acquire statement-scoped advisory locks for the top-level path
7+
-- for each \[bucket_id, name] pair to serialize operations per "bucket/top_level_prefix".
8+
CREATE OR REPLACE FUNCTION storage.lock_top_prefixes(bucket_ids text[], names text[])
9+
RETURNS void
10+
LANGUAGE plpgsql
11+
SECURITY DEFINER
12+
SET search_path = storage, pg_temp
13+
AS $$
14+
DECLARE
15+
v_bucket text;
16+
v_top text;
17+
BEGIN
18+
FOR v_bucket, v_top IN
19+
SELECT DISTINCT t.bucket_id,
20+
split_part(t.name, '/', 1) AS top
21+
FROM unnest(bucket_ids, names) AS t(bucket_id, name)
22+
WHERE t.name <> ''
23+
ORDER BY 1, 2
24+
LOOP
25+
PERFORM pg_advisory_xact_lock(hashtextextended(v_bucket || '/' || v_top, 0));
26+
END LOOP;
27+
END;
28+
$$;
29+
30+
-- Helper: Given arrays of bucket_ids and names, compute all ancestor
31+
-- prefixes and delete those that are leaves (no children objects or prefixes).
32+
-- Repeats bottom-up until no more rows are removed.
33+
CREATE OR REPLACE FUNCTION storage.delete_leaf_prefixes(bucket_ids text[], names text[])
34+
RETURNS void
35+
LANGUAGE plpgsql
36+
SECURITY DEFINER
37+
SET search_path = storage, pg_temp
38+
AS $$
39+
DECLARE
40+
v_rows_deleted integer;
41+
BEGIN
42+
LOOP
43+
WITH candidates AS (
44+
SELECT DISTINCT t.bucket_id,
45+
unnest(storage.get_prefixes(t.name)) AS name
46+
FROM unnest(bucket_ids, names) AS t(bucket_id, name)
47+
),
48+
uniq AS (
49+
SELECT bucket_id,
50+
name,
51+
storage.get_level(name) AS level
52+
FROM candidates
53+
WHERE name <> ''
54+
GROUP BY bucket_id, name
55+
),
56+
leaf AS (
57+
SELECT p.bucket_id, p.name, p.level
58+
FROM storage.prefixes AS p
59+
JOIN uniq AS u
60+
ON u.bucket_id = p.bucket_id
61+
AND u.name = p.name
62+
AND u.level = p.level
63+
WHERE NOT EXISTS (
64+
SELECT 1
65+
FROM storage.objects AS o
66+
WHERE o.bucket_id = p.bucket_id
67+
AND storage.get_level(o.name) = p.level + 1
68+
AND o.name COLLATE "C" LIKE p.name || '/%'
69+
)
70+
AND NOT EXISTS (
71+
SELECT 1
72+
FROM storage.prefixes AS c
73+
WHERE c.bucket_id = p.bucket_id
74+
AND c.level = p.level + 1
75+
AND c.name COLLATE "C" LIKE p.name || '/%'
76+
)
77+
)
78+
DELETE FROM storage.prefixes AS p
79+
USING leaf AS l
80+
WHERE p.bucket_id = l.bucket_id
81+
AND p.name = l.name
82+
AND p.level = l.level;
83+
84+
GET DIAGNOSTICS v_rows_deleted = ROW_COUNT;
85+
EXIT WHEN v_rows_deleted = 0;
86+
END LOOP;
87+
END;
88+
$$;
89+
90+
-- After DELETE on storage.objects
91+
-- - Guards with `gc_running`
92+
-- - Locks top-level prefixes for touched objects
93+
-- - Deletes leaf prefixes derived from deleted object names and their ancestors
94+
CREATE OR REPLACE FUNCTION storage.objects_delete_cleanup()
95+
RETURNS trigger
96+
LANGUAGE plpgsql
97+
SECURITY DEFINER
98+
SET search_path = storage, pg_temp
99+
AS $$
100+
DECLARE
101+
v_bucket_ids text[];
102+
v_names text[];
103+
BEGIN
104+
IF current_setting('storage.gc_running', true) = '1' THEN
105+
RETURN NULL;
106+
END IF;
107+
108+
PERFORM set_config('storage.gc_running', '1', true);
109+
110+
SELECT COALESCE(array_agg(d.bucket_id), '{}'),
111+
COALESCE(array_agg(d.name), '{}')
112+
INTO v_bucket_ids, v_names
113+
FROM deleted AS d
114+
WHERE d.name <> '';
115+
116+
PERFORM storage.lock_top_prefixes(v_bucket_ids, v_names);
117+
PERFORM storage.delete_leaf_prefixes(v_bucket_ids, v_names);
118+
119+
RETURN NULL;
120+
END;
121+
$$;
122+
123+
-- After UPDATE on storage.objects
124+
-- - Only OLD names matter for cleanup; NEW prefixes are created elsewhere
125+
-- - Guards with `gc_running`, locks, then prunes leaves derived from OLD names
126+
CREATE OR REPLACE FUNCTION storage.objects_update_cleanup()
127+
RETURNS trigger
128+
LANGUAGE plpgsql
129+
SECURITY DEFINER
130+
SET search_path = storage, pg_temp
131+
AS $$
132+
DECLARE
133+
-- NEW - OLD (destinations to create prefixes for)
134+
v_add_bucket_ids text[];
135+
v_add_names text[];
136+
137+
-- OLD - NEW (sources to prune)
138+
v_src_bucket_ids text[];
139+
v_src_names text[];
140+
BEGIN
141+
IF TG_OP <> 'UPDATE' THEN
142+
RETURN NULL;
143+
END IF;
144+
145+
-- 1) Compute NEW−OLD (added paths) and OLD−NEW (moved-away paths)
146+
WITH added AS (
147+
SELECT n.bucket_id, n.name
148+
FROM new_rows n
149+
WHERE n.name <> '' AND position('/' in n.name) > 0
150+
EXCEPT
151+
SELECT o.bucket_id, o.name FROM old_rows o WHERE o.name <> ''
152+
),
153+
moved AS (
154+
SELECT o.bucket_id, o.name
155+
FROM old_rows o
156+
WHERE o.name <> ''
157+
EXCEPT
158+
SELECT n.bucket_id, n.name FROM new_rows n WHERE n.name <> ''
159+
)
160+
SELECT
161+
-- arrays for ADDED (dest) in stable order
162+
COALESCE( (SELECT array_agg(a.bucket_id ORDER BY a.bucket_id, a.name) FROM added a), '{}' ),
163+
COALESCE( (SELECT array_agg(a.name ORDER BY a.bucket_id, a.name) FROM added a), '{}' ),
164+
-- arrays for MOVED (src) in stable order
165+
COALESCE( (SELECT array_agg(m.bucket_id ORDER BY m.bucket_id, m.name) FROM moved m), '{}' ),
166+
COALESCE( (SELECT array_agg(m.name ORDER BY m.bucket_id, m.name) FROM moved m), '{}' )
167+
INTO v_add_bucket_ids, v_add_names, v_src_bucket_ids, v_src_names;
168+
169+
-- Nothing to do?
170+
IF (array_length(v_add_bucket_ids, 1) IS NULL) AND (array_length(v_src_bucket_ids, 1) IS NULL) THEN
171+
RETURN NULL;
172+
END IF;
173+
174+
-- 2) Take per-(bucket, top) locks: sources first, then destinations (stable order inside helper)
175+
IF array_length(v_src_bucket_ids, 1) IS NOT NULL THEN
176+
PERFORM storage.lock_top_prefixes(v_src_bucket_ids, v_src_names);
177+
END IF;
178+
IF array_length(v_add_bucket_ids, 1) IS NOT NULL THEN
179+
PERFORM storage.lock_top_prefixes(v_add_bucket_ids, v_add_names);
180+
END IF;
181+
182+
-- 3) Create destination prefixes (NEW−OLD) BEFORE pruning sources
183+
IF array_length(v_add_bucket_ids, 1) IS NOT NULL THEN
184+
WITH candidates AS (
185+
SELECT DISTINCT t.bucket_id, unnest(storage.get_prefixes(t.name)) AS name
186+
FROM unnest(v_add_bucket_ids, v_add_names) AS t(bucket_id, name)
187+
WHERE name <> ''
188+
)
189+
INSERT INTO storage.prefixes (bucket_id, name)
190+
SELECT c.bucket_id, c.name
191+
FROM candidates c
192+
ON CONFLICT DO NOTHING;
193+
END IF;
194+
195+
-- 4) Prune source prefixes bottom-up for OLD−NEW
196+
IF array_length(v_src_bucket_ids, 1) IS NOT NULL THEN
197+
-- re-entrancy guard so DELETE on prefixes won't recurse
198+
IF current_setting('storage.gc_running', true) <> '1' THEN
199+
PERFORM set_config('storage.gc_running', '1', true);
200+
END IF;
201+
202+
PERFORM storage.delete_leaf_prefixes(v_src_bucket_ids, v_src_names);
203+
END IF;
204+
205+
RETURN NULL;
206+
END;
207+
$$;
208+
209+
-- After DELETE on storage.prefixes
210+
-- - When prefixes are deleted, remove now-empty ancestor prefixes
211+
-- - Guards with `gc_running`, locks, then prunes leaves derived from deleted prefixes
212+
CREATE OR REPLACE FUNCTION storage.prefixes_delete_cleanup()
213+
RETURNS trigger
214+
LANGUAGE plpgsql
215+
SECURITY DEFINER
216+
SET search_path = storage, pg_temp
217+
AS $$
218+
DECLARE
219+
v_bucket_ids text[];
220+
v_names text[];
221+
BEGIN
222+
IF current_setting('storage.gc_running', true) = '1' THEN
223+
RETURN NULL;
224+
END IF;
225+
226+
PERFORM set_config('storage.gc_running', '1', true);
227+
228+
SELECT COALESCE(array_agg(d.bucket_id), '{}'),
229+
COALESCE(array_agg(d.name), '{}')
230+
INTO v_bucket_ids, v_names
231+
FROM deleted AS d
232+
WHERE d.name <> '';
233+
234+
PERFORM storage.lock_top_prefixes(v_bucket_ids, v_names);
235+
PERFORM storage.delete_leaf_prefixes(v_bucket_ids, v_names);
236+
237+
RETURN NULL;
238+
END;
239+
$$;
240+
241+
-- Trigger bindings
242+
CREATE TRIGGER objects_delete_cleanup
243+
AFTER DELETE ON storage.objects
244+
REFERENCING OLD TABLE AS deleted
245+
FOR EACH STATEMENT
246+
EXECUTE FUNCTION storage.objects_delete_cleanup();
247+
248+
CREATE TRIGGER prefixes_delete_cleanup
249+
AFTER DELETE ON storage.prefixes
250+
REFERENCING OLD TABLE AS deleted
251+
FOR EACH STATEMENT
252+
EXECUTE FUNCTION storage.prefixes_delete_cleanup();
253+
254+
CREATE TRIGGER objects_update_cleanup
255+
AFTER UPDATE ON storage.objects
256+
REFERENCING OLD TABLE AS old_rows NEW TABLE AS new_rows
257+
FOR EACH STATEMENT
258+
EXECUTE FUNCTION storage.objects_update_cleanup();

0 commit comments

Comments
 (0)