tangled
alpha
login
or
join now
mackuba.eu
/
lycan
35
fork
atom
Don't forget to lycansubscribe
35
fork
atom
overview
issues
1
pulls
pipelines
import likes & other things in the firehose client
mackuba.eu
5 months ago
f00d16b6
6c653133
+57
-1
2 changed files
expand all
collapse all
unified
split
app
firehose_client.rb
models
user.rb
+53
-1
app/firehose_client.rb
···
1
1
require 'skyfall'
2
2
3
3
require_relative 'init'
4
4
+
require_relative 'models/post'
4
5
require_relative 'models/subscription'
6
6
+
require_relative 'models/user'
5
7
6
8
class FirehoseClient
7
9
attr_accessor :start_cursor, :service
···
15
17
16
18
def start
17
19
return if @sky
20
20
+
21
21
+
@active_users = load_users
18
22
19
23
log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})"
20
24
···
94
98
Subscription.where(service: @service).update_all(cursor: cursor)
95
99
end
96
100
101
101
+
def load_users
102
102
+
User.active.map { |u| [u.did, u] }.then { |list| Hash[list] }
103
103
+
end
104
104
+
97
105
def process_message(msg)
98
106
save_cursor(msg.seq) if msg.seq % 1000 == 0
99
107
···
108
116
@replaying = false
109
117
end
110
118
119
119
+
@current_user = @active_users[msg.repo]
120
120
+
111
121
msg.operations.each do |op|
112
122
case op.type
123
123
+
when :bsky_like
124
124
+
process_like(msg, op)
125
125
+
when :bsky_repost
126
126
+
process_repost(msg, op)
113
127
when :bsky_post
114
114
-
# ...
128
128
+
process_post(msg, op)
115
129
end
116
130
end
117
131
end
···
120
134
def process_account_event(msg)
121
135
if msg.status == :deleted
122
136
# ...
137
137
+
end
138
138
+
end
139
139
+
140
140
+
def process_like(msg, op)
141
141
+
return unless @current_user
142
142
+
143
143
+
if op.action == :create
144
144
+
@current_user.likes.import_from_record(op.uri, op.raw_record)
145
145
+
elsif op.action == :delete
146
146
+
@current_user.likes.where(rkey: op.rkey).delete_all
147
147
+
end
148
148
+
end
149
149
+
150
150
+
def process_repost(msg, op)
151
151
+
return unless @current_user
152
152
+
153
153
+
if op.action == :create
154
154
+
@current_user.reposts.import_from_record(op.uri, op.raw_record)
155
155
+
elsif op.action == :delete
156
156
+
@current_user.reposts.where(rkey: op.rkey).delete_all
157
157
+
end
158
158
+
end
159
159
+
160
160
+
def process_post(msg, op)
161
161
+
if op.action == :create
162
162
+
if @current_user
163
163
+
@current_user.quotes.import_from_record(op.uri, op.raw_record)
164
164
+
@current_user.pins.import_from_record(op.uri, op.raw_record)
165
165
+
end
166
166
+
elsif op.action == :delete
167
167
+
if @current_user
168
168
+
@current_user.quotes.where(rkey: op.rkey).delete_all
169
169
+
@current_user.pins.where(rkey: op.rkey).delete_all
170
170
+
end
171
171
+
172
172
+
if post = Post.find_by_at_uri(op.uri)
173
173
+
post.destroy
174
174
+
end
123
175
end
124
176
end
125
177
+4
app/models/user.rb
···
50
50
end
51
51
end
52
52
53
53
+
def self.active
54
54
+
self.joins(:imports).distinct
55
55
+
end
56
56
+
53
57
def all_pending_items
54
58
[:likes, :reposts, :quotes, :pins].map { |x| self.send(x).pending.to_a }.reduce(&:+)
55
59
end