Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow
at feat/batch-one-qol 106 lines 4.7 kB view raw
1CREATE EXTENSION IF NOT EXISTS pgcrypto; 2 3CREATE OR REPLACE FUNCTION generate_ulid() 4RETURNS TEXT 5AS $$ 6DECLARE 7 -- Crockford's Base32 8 encoding BYTEA = '0123456789ABCDEFGHJKMNPQRSTVWXYZ'; 9 timestamp BYTEA = E'\\000\\000\\000\\000\\000\\000'; 10 output TEXT = ''; 11 12 unix_time BIGINT; 13 ulid BYTEA; 14BEGIN 15 -- 6 timestamp bytes 16 unix_time = (EXTRACT(EPOCH FROM CLOCK_TIMESTAMP()) * 1000)::BIGINT; 17 timestamp = SET_BYTE(timestamp, 0, (unix_time >> 40)::BIT(8)::INTEGER); 18 timestamp = SET_BYTE(timestamp, 1, (unix_time >> 32)::BIT(8)::INTEGER); 19 timestamp = SET_BYTE(timestamp, 2, (unix_time >> 24)::BIT(8)::INTEGER); 20 timestamp = SET_BYTE(timestamp, 3, (unix_time >> 16)::BIT(8)::INTEGER); 21 timestamp = SET_BYTE(timestamp, 4, (unix_time >> 8)::BIT(8)::INTEGER); 22 timestamp = SET_BYTE(timestamp, 5, unix_time::BIT(8)::INTEGER); 23 24 -- 10 entropy bytes 25 ulid = timestamp || gen_random_bytes(10); 26 27 -- Encode the timestamp 28 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 0) & 224) >> 5)); 29 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 0) & 31))); 30 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 1) & 248) >> 3)); 31 output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 1) & 7) << 2) | ((GET_BYTE(ulid, 2) & 192) >> 6))); 32 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 2) & 62) >> 1)); 33 output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 2) & 1) << 4) | ((GET_BYTE(ulid, 3) & 240) >> 4))); 34 output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 3) & 15) << 1) | ((GET_BYTE(ulid, 4) & 128) >> 7))); 35 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 4) & 124) >> 2)); 36 output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 4) & 3) << 3) | ((GET_BYTE(ulid, 5) & 224) >> 5))); 37 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 5) & 31))); 38 39 -- Encode the entropy 40 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 6) & 248) >> 3)); 41 output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 6) & 7) << 2) | ((GET_BYTE(ulid, 7) & 192) >> 6))); 42 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 7) & 62) >> 1)); 43 output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 7) & 1) << 4) | ((GET_BYTE(ulid, 8) & 240) >> 4))); 44 output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 8) & 15) << 1) | ((GET_BYTE(ulid, 9) & 128) >> 7))); 45 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 9) & 124) >> 2)); 46 output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 9) & 3) << 3) | ((GET_BYTE(ulid, 10) & 224) >> 5))); 47 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 10) & 31))); 48 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 11) & 248) >> 3)); 49 output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 11) & 7) << 2) | ((GET_BYTE(ulid, 12) & 192) >> 6))); 50 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 12) & 62) >> 1)); 51 output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 12) & 1) << 4) | ((GET_BYTE(ulid, 13) & 240) >> 4))); 52 output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 13) & 15) << 1) | ((GET_BYTE(ulid, 14) & 128) >> 7))); 53 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 14) & 124) >> 2)); 54 output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 14) & 3) << 3) | ((GET_BYTE(ulid, 15) & 224) >> 5))); 55 output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 15) & 31))); 56 57 RETURN output; 58END 59$$ 60LANGUAGE plpgsql 61VOLATILE; 62 63 64CREATE OR REPLACE FUNCTION apalis.push_job( 65 job_type text, 66 job json DEFAULT NULL :: json, 67 status text DEFAULT 'Pending' :: text, 68 run_at timestamptz DEFAULT NOW() :: timestamptz, 69 max_attempts integer DEFAULT 25 :: integer 70) RETURNS apalis.jobs AS $$ 71 72 DECLARE 73 v_job_row apalis.jobs; 74 v_job_id text; 75 76 BEGIN 77 IF job_type is not NULL and length(job_type) > 512 THEN raise exception 'Job_type is too long (max length: 512).' USING errcode = 'APAJT'; 78 END IF; 79 80 IF max_attempts < 1 THEN raise exception 'Job maximum attempts must be at least 1.' USING errcode = 'APAMA'; 81 end IF; 82 83 SELECT 84 CONCAT('JID-' || generate_ulid()) INTO v_job_id; 85 INSERT INTO 86 apalis.jobs 87 VALUES 88 ( 89 job, 90 v_job_id, 91 job_type, 92 status, 93 0, 94 max_attempts, 95 run_at, 96 NULL, 97 NULL, 98 NULL, 99 NULL 100 ) 101 returning * INTO v_job_row; 102 RETURN v_job_row; 103END; 104$$ LANGUAGE plpgsql volatile; 105 106