tangled
alpha
login
or
join now
8bit.lol
/
pegasus
forked from
futur.blue/pegasus
0
fork
atom
objective categorical abstract machine language personal data server
0
fork
atom
overview
issues
pulls
pipelines
Replace manual queries with hermes
futur.blue
2 months ago
4c433e14
e2053c26
verified
This commit was signed with the committer's
known signature
.
futur.blue
SSH Key Fingerprint:
SHA256:QHGqHWNpqYyw9bt8KmPuJIyeZX9SZewBZ0PR1COtKQ0=
+51
-82
3 changed files
expand all
collapse all
unified
split
pegasus
lib
api
proxy
appBskyFeedGetFeed.ml
repo
getRecord.ml
sequencer.ml
+24
-34
pegasus/lib/api/proxy/appBskyFeedGetFeed.ml
···
28
28
| None ->
29
29
Errors.invalid_request "feed publisher has no PDS endpoint"
30
30
in
31
31
-
let get_record_uri =
32
32
-
Uri.make ~scheme:"https" ~host:pds_host
33
33
-
~path:"/xrpc/com.atproto.repo.getRecord"
34
34
-
~query:
35
35
-
[ ("repo", [repo])
36
36
-
; ("collection", [collection])
37
37
-
; ("rkey", [rkey]) ]
38
38
-
()
39
39
-
in
40
40
-
let%lwt res, body = Util.http_get get_record_uri in
41
41
-
match res.status with
42
42
-
| `OK -> (
43
43
-
let%lwt body_str = Cohttp_lwt.Body.to_string body in
44
44
-
let json = Yojson.Safe.from_string body_str in
45
45
-
let value = Yojson.Safe.Util.(json |> member "value") in
46
46
-
let feed_generator_did =
47
47
-
Yojson.Safe.Util.(value |> member "did" |> to_string_option)
48
48
-
in
49
49
-
match feed_generator_did with
31
31
+
try%lwt
32
32
+
let client = Hermes.make_client ~service:pds_host () in
33
33
+
let%lwt {value= record; _} =
34
34
+
Lexicons.([%xrpc get "com.atproto.repo.getRecord"])
35
35
+
~repo ~collection ~rkey client
36
36
+
in
37
37
+
let feed_generator_did =
38
38
+
Yojson.Safe.Util.(record |> member "did" |> to_string_option)
39
39
+
in
40
40
+
match feed_generator_did with
41
41
+
| None ->
42
42
+
Errors.invalid_request
43
43
+
"feed generator record missing 'did' field"
44
44
+
| Some fg_did -> (
45
45
+
match Dream.header ctx.req "atproto-proxy" with
46
46
+
| Some appview ->
47
47
+
Auth.assert_rpc_scope ctx.auth ~lxm:"app.bsky.feed.getFeed"
48
48
+
~aud:appview ;
49
49
+
Xrpc.service_proxy ctx ~aud:fg_did
50
50
+
~lxm:"app.bsky.feed.getFeedSkeleton"
50
51
| None ->
51
51
-
Errors.invalid_request
52
52
-
"feed generator record missing 'did' field"
53
53
-
| Some fg_did -> (
54
54
-
match Dream.header ctx.req "atproto-proxy" with
55
55
-
| Some appview ->
56
56
-
Auth.assert_rpc_scope ctx.auth
57
57
-
~lxm:"app.bsky.feed.getFeed" ~aud:appview ;
58
58
-
Xrpc.service_proxy ctx ~aud:fg_did
59
59
-
~lxm:"app.bsky.feed.getFeedSkeleton"
60
60
-
| None ->
61
61
-
Errors.invalid_request "missing proxy header" ) )
62
62
-
| _ ->
63
63
-
let%lwt () = Cohttp_lwt.Body.drain_body body in
64
64
-
Errors.internal_error
65
65
-
~msg:"failed to fetch feed generator record" () ) ) )
52
52
+
Errors.invalid_request "missing proxy header" )
53
53
+
with _ ->
54
54
+
Errors.internal_error ~msg:"failed to fetch feed generator record"
55
55
+
() ) ) )
+15
-23
pegasus/lib/api/repo/getRecord.ml
···
56
56
Errors.internal_error ~name:"RecordNotFound"
57
57
~msg:("could not resolve user " ^ input.repo)
58
58
() ;
59
59
-
let get_uri = Uri.of_string pds in
60
60
-
let get_uri =
61
61
-
Uri.with_path get_uri "/xrpc/com.atproto.repo.getRecord"
62
62
-
in
63
63
-
let get_uri = Uri.with_query get_uri (Util.copy_query ctx.req) in
64
64
-
let%lwt res, body =
65
65
-
Util.http_get get_uri
66
66
-
~headers:(Cohttp.Header.of_list [("Accept", "application/json")])
67
67
-
in
68
68
-
match res.status with
69
69
-
| `OK ->
70
70
-
let%lwt json = Cohttp_lwt.Body.to_string body in
71
71
-
let%lwt () = Cohttp_lwt.Body.drain_body body in
72
72
-
Dream.json json
73
73
-
| _ ->
74
74
-
let%lwt () = Cohttp_lwt.Body.drain_body body in
75
75
-
Errors.internal_error ~name:"RecordNotFound"
76
76
-
~msg:
77
77
-
( "could not find record "
78
78
-
^ Util.make_at_uri ~repo:input.repo
79
79
-
~collection:input.collection ~rkey:input.rkey
80
80
-
~fragment:None )
81
81
-
() ) )
59
59
+
let client = Hermes.make_client ~service:pds () in
60
60
+
try%lwt
61
61
+
let%lwt record =
62
62
+
Lexicons.([%xrpc get "com.atproto.repo.getRecord"])
63
63
+
~repo:input_did ~collection:input.collection ~rkey:input.rkey
64
64
+
client
65
65
+
in
66
66
+
record |> output_to_yojson |> Yojson.Safe.to_string |> Dream.json
67
67
+
with _ ->
68
68
+
Errors.internal_error ~name:"RecordNotFound"
69
69
+
~msg:
70
70
+
( "could not find record "
71
71
+
^ Util.make_at_uri ~repo:input.repo ~collection:input.collection
72
72
+
~rkey:input.rkey ~fragment:None )
73
73
+
() ) )
+12
-25
pegasus/lib/sequencer.ml
···
491
491
last_notified := now ;
492
492
List.iter
493
493
(fun crawler ->
494
494
-
let uri =
495
495
-
Uri.with_path crawler "/xrpc/com.atproto.sync.requestCrawl"
496
496
-
in
494
494
+
let service = Uri.to_string crawler in
495
495
+
let client = Hermes.make_client ~service () in
497
496
Lwt.dont_wait
498
497
(fun () ->
499
499
-
let%lwt res, _ =
500
500
-
Cohttp_lwt_unix.Client.post
501
501
-
~headers:
502
502
-
(Cohttp.Header.of_list
503
503
-
[("Content-Type", "application/json")] )
504
504
-
~body:
505
505
-
(Printf.ksprintf Cohttp_lwt.Body.of_string
506
506
-
{|{ "hostname": "%s" }|} Env.hostname )
507
507
-
uri
508
508
-
in
509
509
-
match res.status with
510
510
-
| `OK ->
511
511
-
Lwt.return_unit
512
512
-
| status ->
513
513
-
failwith
514
514
-
("errored with status " ^ Http.Status.to_string status) )
498
498
+
Lexicons.([%xrpc post "com.atproto.sync.requestCrawl"])
499
499
+
~hostname:Env.hostname client )
515
500
(fun exn ->
516
501
Dream.warning (fun log ->
517
517
-
log "failed to requestCrawl %s: %s"
518
518
-
(Uri.to_string crawler) (Printexc.to_string exn) ) ) )
502
502
+
log "failed to requestCrawl %s: %s" service
503
503
+
(Printexc.to_string exn) ) ) )
519
504
Env.crawlers
520
505
end ;
521
506
let to_remove = ref [] in
···
631
616
in
632
617
send (Frame.encode_error err)
633
618
634
634
-
let live_loop ~(conn : Data_store.t) ~(sub : Bus.subscriber) ~(send : bytes -> unit Lwt.t)
635
635
-
~(start_seq : int) : unit Lwt.t =
619
619
+
let live_loop ~(conn : Data_store.t) ~(sub : Bus.subscriber)
620
620
+
~(send : bytes -> unit Lwt.t) ~(start_seq : int) : unit Lwt.t =
636
621
let rec loop last =
637
622
if sub.Bus.closed then
638
623
match sub.Bus.close_reason with
···
675
660
in
676
661
loop start_seq
677
662
678
678
-
let stream_live ~(conn : Data_store.t) ~(send : bytes -> unit Lwt.t) : unit Lwt.t =
663
663
+
let stream_live ~(conn : Data_store.t) ~(send : bytes -> unit Lwt.t) :
664
664
+
unit Lwt.t =
679
665
let%lwt sub = Bus.subscribe () in
680
666
Lwt.finalize
681
667
(fun () ->
···
722
708
Lwt.return_unit
723
709
| Message (payload, _) ->
724
710
send
725
725
-
(Frame.encode_message ~seq:ev.seq ~time:ev.time payload) )
711
711
+
(Frame.encode_message ~seq:ev.seq ~time:ev.time
712
712
+
payload ) )
726
713
events )
727
714
>>= fun () ->
728
715
(* bail if consumer too slow *)