···153 return unless @current_user
154155 if op.action == :create
156- @current_user.likes.import_from_record(op.uri, op.raw_record)
157 elsif op.action == :delete
158 @current_user.likes.where(rkey: op.rkey).delete_all
159 end
···163 return unless @current_user
164165 if op.action == :create
166- @current_user.reposts.import_from_record(op.uri, op.raw_record)
167 elsif op.action == :delete
168 @current_user.reposts.where(rkey: op.rkey).delete_all
169 end
···172 def process_post(msg, op)
173 if op.action == :create
174 if @current_user
175- @current_user.quotes.import_from_record(op.uri, op.raw_record)
176- @current_user.pins.import_from_record(op.uri, op.raw_record)
177 end
178 elsif op.action == :delete
179 if @current_user
···153 return unless @current_user
154155 if op.action == :create
156+ @current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose)
157 elsif op.action == :delete
158 @current_user.likes.where(rkey: op.rkey).delete_all
159 end
···163 return unless @current_user
164165 if op.action == :create
166+ @current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose)
167 elsif op.action == :delete
168 @current_user.reposts.where(rkey: op.rkey).delete_all
169 end
···172 def process_post(msg, op)
173 if op.action == :create
174 if @current_user
175+ @current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose)
176+ @current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose)
177 end
178 elsif op.action == :delete
179 if @current_user
+5-5
app/import_manager.rb
···11 @user = user
12 end
1314- def start(sets, include_pending)
15 queued_items = []
16 importers = []
17 sets = [sets] unless sets.is_a?(Array)
···19 sets.each do |set|
20 case set
21 when 'likes'
22- queued_items += @user.likes.pending.to_a if include_pending
23 importers << LikesImporter.new(@user)
24 when 'reposts'
25- queued_items += @user.reposts.pending.to_a if include_pending
26 importers << RepostsImporter.new(@user)
27 when 'posts'
28- queued_items += @user.quotes.pending.to_a + @user.pins.pending.to_a if include_pending
29 importers << PostsImporter.new(@user)
30 when 'all'
31- queued_items += @user.all_pending_items if include_pending
32 importers += [
33 LikesImporter.new(@user),
34 RepostsImporter.new(@user),
···11 @user = user
12 end
1314+ def start(sets)
15 queued_items = []
16 importers = []
17 sets = [sets] unless sets.is_a?(Array)
···19 sets.each do |set|
20 case set
21 when 'likes'
22+ queued_items += @user.likes.in_queue(:import).to_a
23 importers << LikesImporter.new(@user)
24 when 'reposts'
25+ queued_items += @user.reposts.in_queue(:import).to_a
26 importers << RepostsImporter.new(@user)
27 when 'posts'
28+ queued_items += @user.quotes.in_queue(:import).to_a + @user.pins.in_queue(:import).to_a
29 importers << PostsImporter.new(@user)
30 when 'all'
31+ queued_items += @user.all_items_in_queue(:import)
32 importers += [
33 LikesImporter.new(@user),
34 RepostsImporter.new(@user),
+1-1
app/import_worker.rb
···23 def run(collections)
24 import = ImportManager.new(@user)
25 import.report = BasicReport.new if @verbose
26- import.start(collections, false)
27 end
28 end
29
···23 def run(collections)
24 import = ImportManager.new(@user)
25 import.report = BasicReport.new if @verbose
26+ import.start(collections)
27 end
28 end
29
+1-1
app/importers/likes_importer.rb
···1819 records.each do |record|
20 begin
21- like = @user.likes.import_from_record(record['uri'], record['value'])
2223 if like && like.pending? && @item_queue
24 @item_queue.push(like)
···1819 records.each do |record|
20 begin
21+ like = @user.likes.import_from_record(record['uri'], record['value'], queue: :import)
2223 if like && like.pending? && @item_queue
24 @item_queue.push(like)
+2-2
app/importers/posts_importer.rb
···1819 records.each do |record|
20 begin
21- quote = @user.quotes.import_from_record(record['uri'], record['value'])
22- pin = @user.pins.import_from_record(record['uri'], record['value'])
2324 if @item_queue
25 if quote && quote.pending?
···1819 records.each do |record|
20 begin
21+ quote = @user.quotes.import_from_record(record['uri'], record['value'], queue: :import)
22+ pin = @user.pins.import_from_record(record['uri'], record['value'], queue: :import)
2324 if @item_queue
25 if quote && quote.pending?
+1-1
app/importers/reposts_importer.rb
···1819 records.each do |record|
20 begin
21- repost = @user.reposts.import_from_record(record['uri'], record['value'])
2223 if repost && repost.pending? && @item_queue
24 @item_queue.push(repost)
···1819 records.each do |record|
20 begin
21+ repost = @user.reposts.import_from_record(record['uri'], record['value'], queue: :import)
2223 if repost && repost.pending? && @item_queue
24 @item_queue.push(repost)
+11
app/models/importable.rb
···89 included do
10 scope :pending, -> { where(post: nil) }
0000001112 def pending?
13 post_uri != nil
14 end
15000016 def import_item!
17 post_uri = AT_URI(self.post_uri)
18 return nil if !post_uri.is_post?
···20 if post = Post.find_by_at_uri(post_uri)
21 self.post = post
22 self.post_uri = nil
023 end
2425 self.save!
···89 included do
10 scope :pending, -> { where(post: nil) }
11+ scope :in_queue, ->(q) { where(queue: q) }
12+13+ enum :queue, { firehose: 0, import: 1 }
14+15+ validates_presence_of :post_uri, if: -> { post_id.nil? }
16+ validate :check_queue
1718 def pending?
19 post_uri != nil
20 end
2122+ def check_queue
23+ errors.add(:queue, 'must be nil if already processed') if queue && post
24+ end
25+26 def import_item!
27 post_uri = AT_URI(self.post_uri)
28 return nil if !post_uri.is_post?
···30 if post = Post.find_by_at_uri(post_uri)
31 self.post = post
32 self.post_uri = nil
33+ self.queue = nil
34 end
3536 self.save!
···19 before_destroy :delete_posts_cascading
2021 has_many :likes, foreign_key: 'actor_id', dependent: :delete_all do
22- def import_from_record(like_uri, record)
23 like = self.new_from_record(like_uri, record)
24 return nil if like.nil? || self.where(rkey: like.rkey).exists?
25026 like.import_item!
27 end
28 end
2930 has_many :reposts, foreign_key: 'actor_id', dependent: :delete_all do
31- def import_from_record(repost_uri, record)
32 repost = self.new_from_record(repost_uri, record)
33 return nil if repost.nil? || self.where(rkey: repost.rkey).exists?
34035 repost.import_item!
36 end
37 end
3839 has_many :quotes, foreign_key: 'actor_id', dependent: :delete_all do
40- def import_from_record(post_uri, record)
41 quote = self.new_from_record(post_uri, record)
42 return nil if quote.nil? || self.where(rkey: quote.rkey).exists?
43044 quote.import_item!
45 end
46 end
4748 has_many :pins, foreign_key: 'actor_id', dependent: :delete_all do
49- def import_from_record(post_uri, record)
50 pin = self.new_from_record(post_uri, record)
51 return nil if pin.nil? || self.where(rkey: pin.rkey).exists?
52053 pin.import_item!
54 end
55 end
···6869 def all_pending_items
70 [:likes, :reposts, :quotes, :pins].map { |x| self.send(x).pending.to_a }.reduce(&:+)
000071 end
7273 def delete_posts_cascading
···19 before_destroy :delete_posts_cascading
2021 has_many :likes, foreign_key: 'actor_id', dependent: :delete_all do
22+ def import_from_record(like_uri, record, **args)
23 like = self.new_from_record(like_uri, record)
24 return nil if like.nil? || self.where(rkey: like.rkey).exists?
2526+ like.assign_attributes(args)
27 like.import_item!
28 end
29 end
3031 has_many :reposts, foreign_key: 'actor_id', dependent: :delete_all do
32+ def import_from_record(repost_uri, record, **args)
33 repost = self.new_from_record(repost_uri, record)
34 return nil if repost.nil? || self.where(rkey: repost.rkey).exists?
3536+ repost.assign_attributes(args)
37 repost.import_item!
38 end
39 end
4041 has_many :quotes, foreign_key: 'actor_id', dependent: :delete_all do
42+ def import_from_record(post_uri, record, **args)
43 quote = self.new_from_record(post_uri, record)
44 return nil if quote.nil? || self.where(rkey: quote.rkey).exists?
4546+ quote.assign_attributes(args)
47 quote.import_item!
48 end
49 end
5051 has_many :pins, foreign_key: 'actor_id', dependent: :delete_all do
52+ def import_from_record(post_uri, record, **args)
53 pin = self.new_from_record(post_uri, record)
54 return nil if pin.nil? || self.where(rkey: pin.rkey).exists?
5556+ pin.assign_attributes(args)
57 pin.import_item!
58 end
59 end
···7273 def all_pending_items
74 [:likes, :reposts, :quotes, :pins].map { |x| self.send(x).pending.to_a }.reduce(&:+)
75+ end
76+77+ def all_items_in_queue(queue)
78+ [:likes, :reposts, :quotes, :pins].map { |x| self.send(x).in_queue(queue).to_a }.reduce(&:+)
79 end
8081 def delete_posts_cascading
+5-1
app/post_downloader.rb
···88 end
8990 def update_item(item, post)
91- item.update!(post: post, post_uri: nil)
9293 @total_count += 1
94 @oldest_imported = [@oldest_imported, item.time].min
···147 # delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023)
148 item.destroy if hostname == 'bsky.social'
149 end
0000150 end
151 end
152 end
···88 end
8990 def update_item(item, post)
91+ item.update!(post: post, post_uri: nil, queue: nil)
9293 @total_count += 1
94 @oldest_imported = [@oldest_imported, item.time].min
···147 # delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023)
148 item.destroy if hostname == 'bsky.social'
149 end
150+ end
151+152+ if !item.destroyed?
153+ item.update!(queue: nil)
154 end
155 end
156 end
+7
db/migrate/20250923014702_add_queued_field.rb
···0000000
···1+class AddQueuedField < ActiveRecord::Migration[7.2]
2+ def change
3+ [:likes, :reposts, :quotes, :pins].each do |table|
4+ add_column(table, :queue, :smallint, null: true)
5+ end
6+ end
7+end
+5-1
db/schema.rb
···10#
11# It's strongly recommended that you check this file into your version control system.
1213-ActiveRecord::Schema[7.2].define(version: 2025_09_20_182018) do
14 # These are extensions that must be enabled in order to support this database
15 enable_extension "plpgsql"
16···34 t.datetime "time", null: false
35 t.bigint "post_id"
36 t.string "post_uri"
037 t.index ["actor_id", "rkey"], name: "index_likes_on_actor_id_and_rkey", unique: true
38 t.index ["actor_id", "time", "id"], name: "index_likes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
39 end
···45 t.text "pin_text", null: false
46 t.bigint "post_id"
47 t.string "post_uri"
048 t.index ["actor_id", "rkey"], name: "index_pins_on_actor_id_and_rkey", unique: true
49 t.index ["actor_id", "time", "id"], name: "index_pins_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
50 end
···66 t.text "quote_text", null: false
67 t.bigint "post_id"
68 t.string "post_uri"
069 t.index ["actor_id", "rkey"], name: "index_quotes_on_actor_id_and_rkey", unique: true
70 t.index ["actor_id", "time", "id"], name: "index_quotes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
71 end
···76 t.datetime "time", null: false
77 t.bigint "post_id"
78 t.string "post_uri"
079 t.index ["actor_id", "rkey"], name: "index_reposts_on_actor_id_and_rkey", unique: true
80 t.index ["actor_id", "time", "id"], name: "index_reposts_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
81 end
···10#
11# It's strongly recommended that you check this file into your version control system.
1213+ActiveRecord::Schema[7.2].define(version: 2025_09_23_014702) do
14 # These are extensions that must be enabled in order to support this database
15 enable_extension "plpgsql"
16···34 t.datetime "time", null: false
35 t.bigint "post_id"
36 t.string "post_uri"
37+ t.integer "queue", limit: 2
38 t.index ["actor_id", "rkey"], name: "index_likes_on_actor_id_and_rkey", unique: true
39 t.index ["actor_id", "time", "id"], name: "index_likes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
40 end
···46 t.text "pin_text", null: false
47 t.bigint "post_id"
48 t.string "post_uri"
49+ t.integer "queue", limit: 2
50 t.index ["actor_id", "rkey"], name: "index_pins_on_actor_id_and_rkey", unique: true
51 t.index ["actor_id", "time", "id"], name: "index_pins_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
52 end
···68 t.text "quote_text", null: false
69 t.bigint "post_id"
70 t.string "post_uri"
71+ t.integer "queue", limit: 2
72 t.index ["actor_id", "rkey"], name: "index_quotes_on_actor_id_and_rkey", unique: true
73 t.index ["actor_id", "time", "id"], name: "index_quotes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
74 end
···79 t.datetime "time", null: false
80 t.bigint "post_id"
81 t.string "post_uri"
82+ t.integer "queue", limit: 2
83 t.index ["actor_id", "rkey"], name: "index_reposts_on_actor_id_and_rkey", unique: true
84 t.index ["actor_id", "time", "id"], name: "index_reposts_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
85 end