Live video on the AT Protocol

use xrpc for user follow

+127 -265
+19 -22
js/app/components/follow-button.tsx
··· 26 26 }) => { 27 27 const [isFollowing, setIsFollowing] = useState<boolean | null>(null); 28 28 const [error, setError] = useState<string | null>(null); 29 - const [followRKey, setFollowRKey] = useState<string | null>(null); 29 + const [followUri, setFollowUri] = useState<string | null>(null); 30 30 const { url: streamplaceUrl } = useAppSelector(selectStreamplace); 31 31 const dispatch = useAppDispatch(); 32 32 33 33 // Hide button if not logged in or viewing own stream 34 34 if (!currentUserDID || currentUserDID === streamerDID) return null; 35 35 36 - // Fetch initial follow state 36 + // Fetch initial follow state using xrpc 37 37 useEffect(() => { 38 38 let cancelled = false; 39 39 ··· 43 43 setError(null); 44 44 try { 45 45 const res = await fetch( 46 - `${streamplaceUrl}/api/following/${currentUserDID}`, 46 + `${streamplaceUrl}/xrpc/place.stream.graph.getFollowingUser?subjectDID=${encodeURIComponent(streamerDID)}`, 47 47 { 48 48 credentials: "include", 49 49 headers: { ··· 54 54 55 55 if (!res.ok) { 56 56 const errorText = await res.text(); 57 - throw new Error(`Failed to fetch following list: ${errorText}`); 57 + throw new Error(`Failed to fetch follow status: ${errorText}`); 58 58 } 59 59 60 60 const data = await res.json(); 61 61 if (cancelled) return; 62 62 63 - const following = Array.isArray(data) ? data : []; 64 - const followRecord = following.find( 65 - (f: any) => f.SubjectDID === streamerDID, 66 - ); 67 - 68 - if (followRecord) { 63 + if (data.follow) { 69 64 setIsFollowing(true); 70 - setFollowRKey(followRecord.RKey); 65 + setFollowUri(data.follow.uri); 71 66 } else { 72 67 setIsFollowing(false); 73 - setFollowRKey(null); 68 + setFollowUri(null); 74 69 } 75 70 } catch (err) { 76 71 if (!cancelled) setError("Could not determine follow state"); ··· 87 82 setError(null); 88 83 setIsFollowing(true); // Optimistic 89 84 try { 90 - const result = await dispatch(followUser(streamerDID)).unwrap(); 85 + await dispatch(followUser(streamerDID)).unwrap(); 91 86 setIsFollowing(true); 92 - setFollowRKey(result.rkey); 93 87 onFollowChange?.(true); 94 88 } catch (err) { 95 89 setIsFollowing(false); ··· 100 94 }; 101 95 102 96 const handleUnfollow = async () => { 103 - if (!followRKey) { 104 - setError("Cannot unfollow: missing record key"); 105 - return; 106 - } 107 - 108 97 setError(null); 109 98 setIsFollowing(false); // Optimistic 110 99 try { 111 100 await dispatch( 112 - unfollowUser({ subjectDID: streamerDID, rkey: followRKey }), 101 + unfollowUser({ 102 + subjectDID: streamerDID, 103 + ...(followUri ? { followUri } : {}), 104 + }), 113 105 ).unwrap(); 114 106 setIsFollowing(false); 115 - setFollowRKey(null); 107 + setFollowUri(null); 116 108 onFollowChange?.(false); 117 109 } catch (err) { 118 110 setIsFollowing(true); ··· 124 116 125 117 return ( 126 118 <View flexDirection="row" alignItems="center" gap={8}> 127 - {isFollowing ? ( 119 + {isFollowing === null ? ( 120 + // Skeleton loader to prevent layout shift 121 + <Button backgroundColor="transparent" disabled> 122 + &nbsp; 123 + </Button> 124 + ) : isFollowing ? ( 128 125 <Button 129 126 backgroundColor="transparent" 130 127 onPress={handleUnfollow}
+41 -31
js/app/components/livestream/livestream.tsx
··· 56 56 const video = player.segment?.video?.[0]; 57 57 const [videoWidth, setVideoWidth] = useState(0); 58 58 const [videoHeight, setVideoHeight] = useState(0); 59 - const { isKeyboardVisible, keyboardHeight } = useKeyboard(); 60 - const { isIOS, isWeb } = usePlatform(); 59 + const { keyboardHeight } = useKeyboard(); 60 + const { isIOS } = usePlatform(); 61 61 62 62 const [outerHeight, setOuterHeight] = useState(0); 63 63 const [innerHeight, setInnerHeight] = useState(0); ··· 66 66 67 67 const streamerDID = player.livestream?.author?.did; 68 68 const streamerProfile = streamerDID ? profiles[streamerDID] : undefined; 69 - const streamerHandle = streamerProfile?.handle || streamerDID; 69 + const streamerHandle = streamerProfile?.handle; 70 70 71 71 // this would all be really easy if i had library that would give me the 72 72 // safe area view height and width but i don't. so let's measure ··· 217 217 gap="$2" 218 218 minWidth={0} 219 219 > 220 - {streamerHandle && ( 221 - <Text 222 - onPress={() => 223 - Linking.openURL( 224 - `https://bsky.app/profile/${streamerHandle}`, 225 - ) 226 - } 227 - aria-label={`View @${streamerHandle} on Bluesky`} 228 - style={{ cursor: "pointer" }} 229 - > 230 - {`@${streamerHandle}`} 231 - </Text> 220 + {streamerDID && !streamerHandle ? ( 221 + // Skeleton loader for handle 222 + <Text>&nbsp;</Text> 223 + ) : ( 224 + streamerHandle && ( 225 + <Text 226 + onPress={() => 227 + Linking.openURL( 228 + `https://bsky.app/profile/${streamerHandle}`, 229 + ) 230 + } 231 + aria-label={`View @${streamerHandle} on Bluesky`} 232 + style={{ cursor: "pointer" }} 233 + > 234 + {`@${streamerHandle}`} 235 + </Text> 236 + ) 232 237 )} 233 - {streamerDID && currentUserDID && ( 238 + {streamerDID && streamerHandle && currentUserDID && ( 234 239 <FollowButton 235 240 streamerDID={streamerDID} 236 241 currentUserDID={currentUserDID} ··· 322 327 minWidth: 0, 323 328 }} 324 329 > 325 - {streamerHandle && ( 326 - <Text 327 - onPress={() => 328 - Linking.openURL( 329 - `https://bsky.app/profile/${streamerHandle}`, 330 - ) 331 - } 332 - aria-label={`View @${streamerHandle} on Bluesky`} 333 - style={{ cursor: "pointer" }} 334 - numberOfLines={1} 335 - ellipsizeMode="tail" 336 - > 337 - {`@${streamerHandle}`} 338 - </Text> 330 + {streamerDID && !streamerHandle ? ( 331 + // Skeleton loader for handle 332 + <Text>&nbsp;</Text> 333 + ) : ( 334 + streamerHandle && ( 335 + <Text 336 + onPress={() => 337 + Linking.openURL( 338 + `https://bsky.app/profile/${streamerHandle}`, 339 + ) 340 + } 341 + aria-label={`View @${streamerHandle} on Bluesky`} 342 + style={{ cursor: "pointer" }} 343 + numberOfLines={1} 344 + ellipsizeMode="tail" 345 + > 346 + {`@${streamerHandle}`} 347 + </Text> 348 + ) 339 349 )} 340 - {streamerDID && currentUserDID && ( 350 + {streamerDID && streamerHandle && currentUserDID && ( 341 351 <FollowButton 342 352 streamerDID={streamerDID} 343 353 currentUserDID={currentUserDID}
+26 -42
js/app/features/bluesky/blueskySlice.tsx
··· 2 2 Agent, 3 3 AppBskyFeedPost, 4 4 AppBskyGraphBlock, 5 - AppBskyGraphFollow, 6 5 BlobRef, 7 6 RichText, 8 7 } from "@atproto/api"; ··· 90 89 params.delete("code"); 91 90 u.search = params.toString(); 92 91 window.history.replaceState(null, "", u.toString()); 93 - }; 94 - 95 - // Deterministic rkey for follow 96 - const createDeterministicRKey = ( 97 - userDID: string, 98 - subjectDID: string, 99 - ): string => { 100 - const combinedStr = userDID + ":" + subjectDID; 101 - let hash = 0; 102 - for (let i = 0; i < combinedStr.length; i++) { 103 - const char = combinedStr.charCodeAt(i); 104 - hash = (hash << 5) - hash + char; 105 - hash = hash & hash; // Convert to 32bit integer 106 - } 107 - // Convert to hex string and take first 16 chars 108 - const hexHash = Math.abs(hash).toString(16).padStart(8, "0"); 109 - return hexHash.substring(0, 16); 110 92 }; 111 93 112 94 export const blueskySlice = createAppSlice({ ··· 271 253 }, 272 254 rejected: (state, action) => { 273 255 clearQueryParams(); 274 - console.error("getProfile rejected", action.error); 275 256 // state.status = "failed"; 276 257 }, 277 258 }, ··· 869 850 { red, green, blue }: { red: number; green: number; blue: number }, 870 851 thunkAPI, 871 852 ) => { 872 - const now = new Date(); 873 - const { bluesky, streamplace } = thunkAPI.getState() as { 853 + const { bluesky } = thunkAPI.getState() as { 874 854 bluesky: BlueskyState; 875 - streamplace: StreamplaceState; 876 855 }; 877 856 if (!bluesky.pdsAgent) { 878 857 throw new Error("No agent"); ··· 958 937 if (!did) { 959 938 throw new Error("No DID"); 960 939 } 940 + await bluesky.pdsAgent.follow(subjectDID); 961 941 962 - const rkey = createDeterministicRKey(did, subjectDID); 963 - const record: AppBskyGraphFollow.Record = { 964 - subject: subjectDID, 965 - createdAt: new Date().toISOString(), 966 - }; 967 - await bluesky.pdsAgent.com.atproto.repo.createRecord({ 968 - repo: did, 969 - collection: "app.bsky.graph.follow", 970 - rkey: rkey, 971 - record, 972 - }); 973 - 974 - return { subjectDID, rkey }; 942 + return { subjectDID }; 975 943 }, 976 944 { 977 945 pending: (state) => { ··· 988 956 989 957 unfollowUser: create.asyncThunk( 990 958 async ( 991 - { subjectDID, rkey }: { subjectDID: string; rkey: string }, 959 + { subjectDID, followUri }: { subjectDID: string; followUri?: string }, 992 960 thunkAPI, 993 961 ) => { 994 - const { bluesky } = thunkAPI.getState() as { 962 + const { bluesky, streamplace } = thunkAPI.getState() as { 995 963 bluesky: BlueskyState; 964 + streamplace: StreamplaceState; 996 965 }; 997 966 if (!bluesky.pdsAgent) { 998 967 throw new Error("No agent"); ··· 1002 971 throw new Error("No DID"); 1003 972 } 1004 973 1005 - await bluesky.pdsAgent.com.atproto.repo.deleteRecord({ 1006 - repo: did, 1007 - collection: "app.bsky.graph.follow", 1008 - rkey: rkey, 1009 - }); 974 + if (followUri) { 975 + await bluesky.pdsAgent.deleteFollow(followUri); 976 + } else { 977 + const res = await fetch( 978 + `${streamplace.url}/xrpc/place.stream.graph.getFollowingUser?subjectDID=${encodeURIComponent(subjectDID)}`, 979 + { 980 + credentials: "include", 981 + headers: { 982 + "X-User-DID": did, 983 + }, 984 + }, 985 + ); 986 + const data = await res.json(); 987 + 988 + if (!data.follow || !data.follow.uri) { 989 + throw new Error("Follow record not found"); 990 + } 991 + 992 + await bluesky.pdsAgent.deleteFollow(data.follow.uri); 993 + } 1010 994 1011 995 return { subjectDID }; 1012 996 },
-3
pkg/api/api.go
··· 146 146 } 147 147 apiRouter.GET("/api/live-users", a.HandleLiveUsers(ctx)) 148 148 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx)) 149 - apiRouter.GET("/api/following/:user", a.HandleFollowing(ctx)) 150 - apiRouter.POST("/api/follow/:user", a.HandleFollow(ctx)) 151 - apiRouter.DELETE("/api/follow/:user", a.HandleUnfollow(ctx)) 152 149 apiRouter.NotFound = a.HandleAPI404(ctx) 153 150 router.Handler("GET", "/api/*resource", apiRouter) 154 151 router.Handler("POST", "/api/*resource", apiRouter)
-163
pkg/api/api_internal.go
··· 3 3 import ( 4 4 "bufio" 5 5 "context" 6 - "crypto/sha256" 7 6 "encoding/base64" 8 - "encoding/hex" 9 7 "encoding/json" 10 8 "fmt" 11 9 "io" ··· 21 19 "strings" 22 20 "time" 23 21 24 - "github.com/bluesky-social/indigo/api/bsky" 25 22 "github.com/julienschmidt/httprouter" 26 23 "github.com/prometheus/client_golang/prometheus/promhttp" 27 24 sloghttp "github.com/samber/slog-http" ··· 35 32 "stream.place/streamplace/pkg/renditions" 36 33 v0 "stream.place/streamplace/pkg/schema/v0" 37 34 ) 38 - 39 - // Get current user DID from custom header 40 - func getUserDIDFromHeader(r *http.Request) (string, error) { 41 - didHeader := r.Header.Get("X-User-DID") 42 - if didHeader != "" && strings.HasPrefix(didHeader, "did:") { 43 - return didHeader, nil 44 - } 45 - 46 - return "", fmt.Errorf("could not find user DID in headers") 47 - } 48 - 49 - // Deterministic rkey for a follow relationship 50 - func deterministicRKey(userDID, subjectDID string) string { 51 - h := sha256.New() 52 - h.Write([]byte(userDID + ":" + subjectDID)) 53 - return hex.EncodeToString(h.Sum(nil))[:16] // 16 chars for brevity 54 - } 55 - 56 - // GET /api/following/:user 57 - func (a *StreamplaceAPI) HandleFollowing(ctx context.Context) httprouter.Handle { 58 - return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 59 - user := p.ByName("user") 60 - if user == "" { 61 - log.Error(ctx, "Missing user parameter") 62 - errors.WriteHTTPBadRequest(w, "user required", nil) 63 - return 64 - } 65 - user, err := a.NormalizeUser(ctx, user) 66 - if err != nil { 67 - log.Error(ctx, "Failed to normalize user", "user", user, "error", err) 68 - errors.WriteHTTPNotFound(w, "user not found", err) 69 - return 70 - } 71 - 72 - following, err := a.Model.GetUserFollowing(ctx, user) 73 - if err != nil { 74 - log.Error(ctx, "Failed to get user following", "user", user, "error", err) 75 - errors.WriteHTTPInternalServerError(w, "unable to get following", err) 76 - return 77 - } 78 - 79 - bs, err := json.Marshal(following) 80 - if err != nil { 81 - log.Error(ctx, "Failed to marshal following list", "error", err) 82 - errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 83 - return 84 - } 85 - w.Write(bs) 86 - } 87 - } 88 - 89 - // POST /api/follow/:user 90 - func (a *StreamplaceAPI) HandleFollow(ctx context.Context) httprouter.Handle { 91 - return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 92 - userDID, err := getUserDIDFromHeader(r) 93 - if err != nil { 94 - log.Error(ctx, "Failed to get user DID from session", "error", err) 95 - http.Error(w, fmt.Sprintf("Unauthorized: %v", err), http.StatusUnauthorized) 96 - return 97 - } 98 - 99 - subject := p.ByName("user") 100 - if subject == "" { 101 - http.Error(w, "user required", http.StatusBadRequest) 102 - return 103 - } 104 - subject, err = a.NormalizeUser(ctx, subject) 105 - if err != nil { 106 - log.Error(ctx, "Invalid user", "subject", subject, "error", err) 107 - http.Error(w, "invalid user", http.StatusBadRequest) 108 - return 109 - } 110 - 111 - if userDID == subject { 112 - http.Error(w, "cannot follow yourself", http.StatusBadRequest) 113 - return 114 - } 115 - 116 - // Check if already following 117 - following, err := a.Model.GetUserFollowing(ctx, userDID) 118 - if err != nil { 119 - log.Error(ctx, "Error getting user following", "userDID", userDID, "error", err) 120 - } else { 121 - for _, f := range following { 122 - if f.SubjectDID == subject { 123 - log.Debug(ctx, "User already following subject", "userDID", userDID, "subject", subject) 124 - w.WriteHeader(http.StatusNoContent) 125 - return 126 - } 127 - } 128 - } 129 - 130 - rkey := deterministicRKey(userDID, subject) 131 - createdAt := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") 132 - follow := &bsky.GraphFollow{ 133 - Subject: subject, 134 - CreatedAt: createdAt, 135 - } 136 - 137 - err = a.Model.CreateFollow(ctx, userDID, rkey, follow) 138 - if err != nil { 139 - log.Error(ctx, "Failed to create follow in local database", "userDID", userDID, "subject", subject, "error", err) 140 - http.Error(w, fmt.Sprintf("failed to follow: %v", err), http.StatusInternalServerError) 141 - return 142 - } 143 - } 144 - } 145 - 146 - // DELETE /api/follow/:user 147 - func (a *StreamplaceAPI) HandleUnfollow(ctx context.Context) httprouter.Handle { 148 - return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 149 - userDID, err := getUserDIDFromHeader(r) 150 - if err != nil { 151 - log.Error(ctx, "Failed to get user DID from session", "error", err) 152 - http.Error(w, fmt.Sprintf("Unauthorized: %v", err), http.StatusUnauthorized) 153 - return 154 - } 155 - 156 - subject := p.ByName("user") 157 - if subject == "" { 158 - http.Error(w, "user required", http.StatusBadRequest) 159 - return 160 - } 161 - subject, err = a.NormalizeUser(ctx, subject) 162 - if err != nil { 163 - log.Error(ctx, "Invalid user", "subject", subject, "error", err) 164 - http.Error(w, "invalid user", http.StatusBadRequest) 165 - return 166 - } 167 - 168 - // Find the follow record to get the rkey 169 - following, err := a.Model.GetUserFollowing(ctx, userDID) 170 - if err != nil { 171 - log.Error(ctx, "Error getting user following", "userDID", userDID, "error", err) 172 - http.Error(w, fmt.Sprintf("failed to query follows: %v", err), http.StatusInternalServerError) 173 - return 174 - } 175 - 176 - var rkey string 177 - for _, f := range following { 178 - if f.SubjectDID == subject { 179 - rkey = f.RKey 180 - break 181 - } 182 - } 183 - 184 - if rkey == "" { 185 - log.Debug(ctx, "No follow relationship found to delete", "userDID", userDID, "subject", subject) 186 - w.WriteHeader(http.StatusNoContent) 187 - return 188 - } 189 - 190 - err = a.Model.DeleteFollow(ctx, userDID, rkey) 191 - if err != nil { 192 - log.Error(ctx, "Failed to delete follow from local database", "userDID", userDID, "subject", subject, "rkey", rkey, "error", err) 193 - http.Error(w, fmt.Sprintf("failed to unfollow: %v", err), http.StatusInternalServerError) 194 - return 195 - } 196 - } 197 - } 198 35 199 36 func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error { 200 37 handler, err := a.InternalHandler(ctx)
+37 -3
pkg/spxrpc/graph.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "fmt" 5 6 7 + "github.com/bluesky-social/indigo/api/atproto" 8 + "go.opentelemetry.io/otel" 9 + "stream.place/streamplace/pkg/log" 6 10 placestreamtypes "stream.place/streamplace/pkg/streamplace" 7 11 ) 8 12 9 - func (s *Server) handlePlaceStreamGraphGetFollowingUser(ctx context.Context, subjectDID string) (*placestreamtypes.GraphGetFollowingUser_Output, error) { 10 - // this is where following check needs to be implemented 11 - return nil, nil 13 + func (s *Server) handlePlaceStreamGraphGetFollowingUser(ctx context.Context, userDID string, subjectDID string) (*placestreamtypes.GraphGetFollowingUser_Output, error) { 14 + ctx, span := otel.Tracer("server").Start(ctx, "handlePlaceStreamGraphGetFollowingUser") 15 + defer span.End() 16 + 17 + if userDID == "" || !isValidDID(userDID) { 18 + log.Error(ctx, "Missing or invalid user DID") 19 + return &placestreamtypes.GraphGetFollowingUser_Output{}, nil 20 + } 21 + 22 + follows, err := s.model.GetUserFollowing(ctx, userDID) 23 + if err != nil { 24 + log.Error(ctx, "Failed to get user following", "error", err) 25 + return &placestreamtypes.GraphGetFollowingUser_Output{}, nil 26 + } 27 + 28 + for _, follow := range follows { 29 + if follow.SubjectDID == subjectDID { 30 + // User is following the subject, return the follow reference 31 + return &placestreamtypes.GraphGetFollowingUser_Output{ 32 + Follow: &atproto.RepoStrongRef{ 33 + Cid: "", // We don't store CID in our model 34 + Uri: fmt.Sprintf("at://%s/app.bsky.graph.follow/%s", userDID, follow.RKey), 35 + }, 36 + }, nil 37 + } 38 + } 39 + 40 + // User is not following the subject 41 + return &placestreamtypes.GraphGetFollowingUser_Output{}, nil 42 + } 43 + 44 + func isValidDID(did string) bool { 45 + return len(did) > 0 && (did[:7] == "did:plc" || did[:7] == "did:web") 12 46 }
+4 -1
pkg/spxrpc/stubs.go
··· 56 56 func (s *Server) HandlePlaceStreamGraphGetFollowingUser(c echo.Context) error { 57 57 ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandlePlaceStreamGraphGetFollowingUser") 58 58 defer span.End() 59 + 60 + userDID := c.Request().Header.Get("X-User-DID") 59 61 subjectDID := c.QueryParam("subjectDID") 62 + 60 63 var out *placestreamtypes.GraphGetFollowingUser_Output 61 64 var handleErr error 62 65 // func (s *Server) handlePlaceStreamGraphGetFollowingUser(ctx context.Context,subjectDID string) (*placestreamtypes.GraphGetFollowingUser_Output, error) 63 - out, handleErr = s.handlePlaceStreamGraphGetFollowingUser(ctx, subjectDID) 66 + out, handleErr = s.handlePlaceStreamGraphGetFollowingUser(ctx, userDID, subjectDID) 64 67 if handleErr != nil { 65 68 return handleErr 66 69 }