Live video on the AT Protocol

fix teleport navigation: pass navigate to all LivestreamProviders and other misc fixes

authored by

Natalie B. and committed by
Eli Mallon
64b9e908 241494d2

+301 -46
+5 -2
js/app/components/mobile/player.tsx
··· 43 43 export function Player( 44 44 props: Partial<PlayerProps> & { 45 45 setFullscreen?: (fullscreen: boolean) => void; 46 + onTeleport?: (targetHandle: string, targetDID: string) => void; 46 47 }, 47 48 ) { 48 49 return ( ··· 188 189 ); 189 190 } 190 191 191 - const handleTeleport = (targetHandle: string, targetDID: string) => { 192 + const defaultHandleTeleport = (targetHandle: string, targetDID: string) => { 192 193 navigation.navigate("Home", { 193 194 screen: "Stream", 194 - params: { user: targetDID }, 195 + params: { user: targetHandle }, 195 196 }); 196 197 }; 198 + 199 + const handleTeleport = props.onTeleport || defaultHandleTeleport; 197 200 198 201 return ( 199 202 <RotationProvider enabled={Platform.OS !== "web"}>
+23 -3
js/app/src/screens/mobile-stream.tsx
··· 1 + import { useNavigation } from "@react-navigation/native"; 1 2 import { 2 3 KeepAwake, 3 4 LivestreamProvider, ··· 33 34 user, 34 35 src, 35 36 extraProps, 37 + onTeleport, 36 38 }: { 37 39 user: string; 38 40 src: string; 39 41 extraProps: Partial<PlayerProps>; 42 + onTeleport?: (targetHandle: string, targetDID: string) => void; 40 43 }) { 41 44 const problems = useLivestreamStore((x) => x.problems); 42 45 ··· 52 55 <> 53 56 <KeepAwake /> 54 57 <FullscreenProvider> 55 - <Player key={src} src={src} {...extraProps} /> 58 + <Player key={src} src={src} {...extraProps} onTeleport={onTeleport} /> 56 59 </FullscreenProvider> 57 60 </> 58 61 ); ··· 60 63 61 64 export default function MobileStream({ route }) { 62 65 const { user, protocol, url } = route?.params ?? {}; 66 + let navi = useNavigation(); 63 67 let extraProps: Partial<PlayerProps> = {}; 64 68 if (isWeb) { 65 69 extraProps = queryToProps(new URLSearchParams(window.location.search)); ··· 69 73 src = url; 70 74 } 71 75 76 + const handleTeleport = (targetHandle: string, targetDID?: string) => { 77 + if (!navi || (!targetHandle && !targetDID)) { 78 + console.error("Navigation or target info missing for teleport"); 79 + return; 80 + } 81 + navi.navigate("Home", { 82 + screen: "Stream", 83 + params: { user: targetHandle }, 84 + }); 85 + }; 86 + 72 87 return ( 73 - <LivestreamProvider src={src}> 88 + <LivestreamProvider key={src} src={src} onTeleport={handleTeleport}> 74 89 <PlayerProvider> 75 - <MobileStreamInner user={user} src={src} extraProps={extraProps} /> 90 + <MobileStreamInner 91 + user={user} 92 + src={src} 93 + extraProps={extraProps} 94 + onTeleport={handleTeleport} 95 + /> 76 96 </PlayerProvider> 77 97 </LivestreamProvider> 78 98 );
+3 -1
js/components/src/components/chat/chat-box.tsx
··· 505 505 targetHandle: "test.bsky.social", 506 506 targetDID: "did:plc:test", 507 507 countdown: 30, 508 - onCancel: () => console.log("teleport cancelled"), 508 + canCancel: true, 509 + onDismiss: (reason) => 510 + console.log("teleport dismissed:", reason), 509 511 }); 510 512 }} 511 513 >
+3 -12
js/components/src/components/stream-notification/stream-notification-manager.ts
··· 3 3 message?: string; 4 4 render?: ( 5 5 isExiting: boolean, 6 - onDismiss: () => void, 6 + onDismiss: (reason?: "user" | "auto") => void, 7 7 startTime?: number, 8 8 ) => React.ReactNode; 9 9 duration?: number; // seconds, 0 = manual dismiss only 10 10 actionLabel?: string; 11 11 onAction?: () => void; 12 - onDismiss?: () => void; 13 - onUserDismiss?: () => void; 14 - onAutoDismiss?: () => void; 12 + onDismiss?: (reason?: "user" | "auto") => void; 15 13 variant?: "default" | "info" | "warning"; 16 14 }; 17 15 ··· 39 37 actionLabel: config.actionLabel, 40 38 onAction: config.onAction, 41 39 onDismiss: config.onDismiss, 42 - onUserDismiss: config.onUserDismiss, 43 - onAutoDismiss: config.onAutoDismiss, 44 40 variant: config.variant ?? "default", 45 41 visible: true, 46 42 startTime: Date.now(), ··· 115 111 ); 116 112 this.notifyListeners(); 117 113 118 - notification.onDismiss?.(); 119 - if (reason === "user") { 120 - notification.onUserDismiss?.(); 121 - } else { 122 - notification.onAutoDismiss?.(); 123 - } 114 + notification.onDismiss?.(reason); 124 115 } 125 116 126 117 getAll(): StreamNotification[] {
+11 -7
js/components/src/components/stream-notification/teleport-notification.tsx
··· 12 12 export function TeleportNotification({ 13 13 targetHandle, 14 14 countdown, 15 + canCancel, 15 16 startTime, 16 17 onDismiss, 17 18 }: { 18 19 targetHandle: string; 19 20 countdown: number; 21 + canCancel: boolean; 20 22 startTime?: number; 21 23 onDismiss: (reason?: "user" | "auto") => void; 22 24 }) { ··· 129 131 ]} 130 132 > 131 133 <Text color="muted">{timeLeft}s</Text> 132 - <Button 133 - onPress={() => onDismiss("user")} 134 - width="min" 135 - variant="destructive" 136 - > 137 - Cancel 138 - </Button> 134 + {canCancel && ( 135 + <Button 136 + onPress={() => onDismiss("user")} 137 + width="min" 138 + variant="destructive" 139 + > 140 + Cancel 141 + </Button> 142 + )} 139 143 </View> 140 144 </View> 141 145 <View
+16 -8
js/components/src/lib/slash-commands/teleport.ts
··· 47 47 }; 48 48 } 49 49 50 - let durationSeconds: number | undefined; 50 + let countdownSeconds = 10; 51 51 if (args.length > 1) { 52 52 const parsedDuration = parseInt(args[1], 10); 53 53 if (isNaN(parsedDuration)) { 54 54 return { 55 55 handled: true, 56 - error: "Duration must be a number (seconds)", 56 + error: "Countdown must be a number (seconds)", 57 57 }; 58 58 } 59 - if (parsedDuration < 60 || parsedDuration > 32400) { 59 + if (parsedDuration < 5 || parsedDuration > 300) { 60 60 return { 61 61 handled: true, 62 - error: 63 - "Duration must be between 60 seconds and 32400 seconds (9 hours)", 62 + error: "Countdown must be between 5 seconds and 5 minutes", 64 63 }; 65 64 } 66 - durationSeconds = parsedDuration; 65 + countdownSeconds = parsedDuration; 67 66 } 68 67 69 68 let targetDID: string; ··· 79 78 }; 80 79 } 81 80 82 - const startsAt = new Date(Date.now() + 30000).toISOString(); 81 + if (targetDID === userDID) { 82 + return { 83 + handled: true, 84 + error: "You cannot teleport to yourself", 85 + }; 86 + } 87 + 88 + const startsAt = new Date( 89 + Date.now() + countdownSeconds * 1000, 90 + ).toISOString(); 83 91 84 92 const record: PlaceStreamLiveTeleport.Record = { 85 93 $type: "place.stream.live.teleport", 86 94 streamer: targetDID, 87 95 startsAt, 88 - ...(durationSeconds ? { durationSeconds } : {}), 96 + countdownSeconds, 89 97 }; 90 98 91 99 try {
+4 -4
js/components/src/lib/stream-notifications.ts
··· 7 7 targetHandle: string; 8 8 targetDID: string; 9 9 countdown: number; 10 - onCancel?: () => void; 11 - onAutoDismiss?: () => void; 10 + canCancel: boolean; 11 + onDismiss?: (reason?: "user" | "auto") => void; 12 12 }) => { 13 13 streamNotification.show({ 14 14 id: "teleport", ··· 16 16 return React.createElement(TeleportNotification, { 17 17 targetHandle: params.targetHandle, 18 18 countdown: params.countdown, 19 + canCancel: params.canCancel, 19 20 startTime: startTime, 20 21 onDismiss: onDismiss, 21 22 }); 22 23 }, 23 24 duration: 0, // manually dismissed by countdown or user cancel 24 25 variant: "warning", 25 - onUserDismiss: params.onCancel, 26 - onAutoDismiss: params.onAutoDismiss, 26 + onDismiss: params.onDismiss, 27 27 }); 28 28 }, 29 29
+4 -1
js/components/src/lib/system-messages.ts
··· 82 82 count: number, 83 83 chatProfile?: any, 84 84 ): ChatMessageViewHydrated => { 85 - const text = `${count} viewer${count !== 1 ? "s" : ""} teleported from ${streamerName}'s stream! Say hi!`; 85 + const text = 86 + count > 0 87 + ? `${count} viewer${count !== 1 ? "s" : ""} teleported from ${streamerName}'s stream! Say hi!` 88 + : `Someone teleported from ${streamerName}'s stream! Say hi!`; 86 89 const message = createSystemMessage(SystemMessageType.notification, text, { 87 90 streamerName, 88 91 count,
+23 -5
js/components/src/livestream-provider/index.tsx
··· 56 56 (state) => state.activeTeleportUri, 57 57 ); 58 58 const profile = useAvatars(activeTeleport ? [activeTeleport.streamer] : []); 59 + const livestreamProfile = useLivestreamStore((state) => state.profile); 59 60 const pdsAgent = usePDSAgent(); 60 61 const userDID = useDID(); 61 62 const prevActiveTeleportRef = useRef(activeTeleport); ··· 74 75 const targetHandle = 75 76 profile[activeTeleport.streamer]?.handle || activeTeleport.streamer; 76 77 78 + // check if the current user is the streamer of the current livestream 79 + const canCancel = livestreamProfile?.did === userDID; 80 + 77 81 StreamNotifications.teleport({ 78 82 targetHandle: targetHandle, 79 83 targetDID: activeTeleport.streamer, 80 84 countdown: countdown, 81 - onCancel: async () => { 82 - if (activeTeleportUri && pdsAgent && userDID) { 85 + canCancel: canCancel, 86 + onDismiss: async (reason) => { 87 + console.log( 88 + "🔍 StreamNotifications.onDismiss called with reason:", 89 + reason, 90 + ); 91 + if (reason === "user" && activeTeleportUri && pdsAgent && userDID) { 83 92 try { 84 93 await deleteTeleport(pdsAgent, userDID, activeTeleportUri); 85 94 } catch (err) { 86 95 console.error("Failed to delete teleport:", err); 87 96 } 88 97 } 89 - }, 90 - onAutoDismiss: () => { 91 - if (onTeleport) { 98 + if (reason === "auto" && onTeleport) { 99 + console.log( 100 + "🔍 Calling onTeleport with:", 101 + targetHandle, 102 + activeTeleport.streamer, 103 + ); 92 104 onTeleport(targetHandle, activeTeleport.streamer); 105 + } else if (reason === "auto" && !onTeleport) { 106 + console.log("🔍 onTeleport is not defined!"); 107 + } else if (reason === "auto") { 108 + console.log( 109 + "🔍 Reason is auto but teleport function not called for unknown reason", 110 + ); 93 111 } 94 112 }, 95 113 });
+4 -2
pkg/atproto/sync.go
··· 389 389 log.Error(ctx, "failed to parse startsAt", "err", err) 390 390 return nil 391 391 } 392 + viewerCount := atsync.Bus.GetViewerCount(userDID) 392 393 tp := &model.Teleport{ 393 394 CID: cid, 394 395 URI: aturi.String(), 395 396 StartsAt: startsAt, 396 397 DurationSeconds: rec.DurationSeconds, 398 + ViewerCount: int64(viewerCount), 397 399 Teleport: recCBOR, 398 400 RepoDID: userDID, 399 401 TargetDID: rec.Streamer, ··· 412 414 } 413 415 414 416 time.AfterFunc(waitDuration, func() { 415 - // verify the teleport still exists 417 + // verify teleport still exists 416 418 existingTp, err := atsync.Model.GetTeleportByURI(aturi.String()) 417 419 if err != nil { 418 420 log.Error(ctx, "failed to get teleport by uri", "err", err) ··· 430 432 return 431 433 } 432 434 433 - viewerCount := atsync.Bus.GetViewerCount(userDID) 435 + viewerCount := existingTp.ViewerCount 434 436 435 437 arrivalMsg := &streamplace.Livestream_TeleportArrival{ 436 438 LexiconTypeID: "place.stream.livestream#teleportArrival",
+2 -1
pkg/model/teleport.go
··· 15 15 CID string `json:"cid" gorm:"column:cid"` 16 16 StartsAt time.Time `json:"startsAt" gorm:"column:starts_at;index:idx_repo_starts,priority:2"` 17 17 DurationSeconds *int64 `json:"durationSeconds" gorm:"column:duration_seconds"` 18 + ViewerCount int64 `json:"viewerCount" gorm:"column:viewer_count;default:0"` 18 19 Teleport *[]byte `json:"teleport"` 19 20 RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_starts,priority:1"` 20 21 TargetDID string `json:"targetDID" gorm:"column:target_did;index:idx_target_did"` ··· 26 27 func (m *DBModel) CreateTeleport(ctx context.Context, tp *Teleport) error { 27 28 return m.DB.Clauses(clause.OnConflict{ 28 29 Columns: []clause.Column{{Name: "uri"}}, 29 - DoUpdates: clause.AssignmentColumns([]string{"cid", "starts_at", "duration_seconds", "teleport", "repo_did", "target_did"}), 30 + DoUpdates: clause.AssignmentColumns([]string{"cid", "starts_at", "duration_seconds", "viewer_count", "teleport", "repo_did", "target_did"}), 30 31 }).Create(tp).Error 31 32 } 32 33
+203
pkg/streamplace/cbor_gen.go
··· 5763 5763 5764 5764 return nil 5765 5765 } 5766 + func (t *LiveRecommendations) MarshalCBOR(w io.Writer) error { 5767 + if t == nil { 5768 + _, err := w.Write(cbg.CborNull) 5769 + return err 5770 + } 5771 + 5772 + cw := cbg.NewCborWriter(w) 5773 + 5774 + if _, err := cw.Write([]byte{163}); err != nil { 5775 + return err 5776 + } 5777 + 5778 + // t.LexiconTypeID (string) (string) 5779 + if len("$type") > 1000000 { 5780 + return xerrors.Errorf("Value in field \"$type\" was too long") 5781 + } 5782 + 5783 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("$type"))); err != nil { 5784 + return err 5785 + } 5786 + if _, err := cw.WriteString(string("$type")); err != nil { 5787 + return err 5788 + } 5789 + 5790 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("place.stream.live.recommendations"))); err != nil { 5791 + return err 5792 + } 5793 + if _, err := cw.WriteString(string("place.stream.live.recommendations")); err != nil { 5794 + return err 5795 + } 5796 + 5797 + // t.CreatedAt (string) (string) 5798 + if len("createdAt") > 1000000 { 5799 + return xerrors.Errorf("Value in field \"createdAt\" was too long") 5800 + } 5801 + 5802 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("createdAt"))); err != nil { 5803 + return err 5804 + } 5805 + if _, err := cw.WriteString(string("createdAt")); err != nil { 5806 + return err 5807 + } 5808 + 5809 + if len(t.CreatedAt) > 1000000 { 5810 + return xerrors.Errorf("Value in field t.CreatedAt was too long") 5811 + } 5812 + 5813 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.CreatedAt))); err != nil { 5814 + return err 5815 + } 5816 + if _, err := cw.WriteString(string(t.CreatedAt)); err != nil { 5817 + return err 5818 + } 5819 + 5820 + // t.Streamers ([]string) (slice) 5821 + if len("streamers") > 1000000 { 5822 + return xerrors.Errorf("Value in field \"streamers\" was too long") 5823 + } 5824 + 5825 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("streamers"))); err != nil { 5826 + return err 5827 + } 5828 + if _, err := cw.WriteString(string("streamers")); err != nil { 5829 + return err 5830 + } 5831 + 5832 + if len(t.Streamers) > 8192 { 5833 + return xerrors.Errorf("Slice value in field t.Streamers was too long") 5834 + } 5835 + 5836 + if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Streamers))); err != nil { 5837 + return err 5838 + } 5839 + for _, v := range t.Streamers { 5840 + if len(v) > 1000000 { 5841 + return xerrors.Errorf("Value in field v was too long") 5842 + } 5843 + 5844 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(v))); err != nil { 5845 + return err 5846 + } 5847 + if _, err := cw.WriteString(string(v)); err != nil { 5848 + return err 5849 + } 5850 + 5851 + } 5852 + return nil 5853 + } 5854 + 5855 + func (t *LiveRecommendations) UnmarshalCBOR(r io.Reader) (err error) { 5856 + *t = LiveRecommendations{} 5857 + 5858 + cr := cbg.NewCborReader(r) 5859 + 5860 + maj, extra, err := cr.ReadHeader() 5861 + if err != nil { 5862 + return err 5863 + } 5864 + defer func() { 5865 + if err == io.EOF { 5866 + err = io.ErrUnexpectedEOF 5867 + } 5868 + }() 5869 + 5870 + if maj != cbg.MajMap { 5871 + return fmt.Errorf("cbor input should be of type map") 5872 + } 5873 + 5874 + if extra > cbg.MaxLength { 5875 + return fmt.Errorf("LiveRecommendations: map struct too large (%d)", extra) 5876 + } 5877 + 5878 + n := extra 5879 + 5880 + nameBuf := make([]byte, 9) 5881 + for i := uint64(0); i < n; i++ { 5882 + nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 5883 + if err != nil { 5884 + return err 5885 + } 5886 + 5887 + if !ok { 5888 + // Field doesn't exist on this type, so ignore it 5889 + if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { 5890 + return err 5891 + } 5892 + continue 5893 + } 5894 + 5895 + switch string(nameBuf[:nameLen]) { 5896 + // t.LexiconTypeID (string) (string) 5897 + case "$type": 5898 + 5899 + { 5900 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 5901 + if err != nil { 5902 + return err 5903 + } 5904 + 5905 + t.LexiconTypeID = string(sval) 5906 + } 5907 + // t.CreatedAt (string) (string) 5908 + case "createdAt": 5909 + 5910 + { 5911 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 5912 + if err != nil { 5913 + return err 5914 + } 5915 + 5916 + t.CreatedAt = string(sval) 5917 + } 5918 + // t.Streamers ([]string) (slice) 5919 + case "streamers": 5920 + 5921 + maj, extra, err = cr.ReadHeader() 5922 + if err != nil { 5923 + return err 5924 + } 5925 + 5926 + if extra > 8192 { 5927 + return fmt.Errorf("t.Streamers: array too large (%d)", extra) 5928 + } 5929 + 5930 + if maj != cbg.MajArray { 5931 + return fmt.Errorf("expected cbor array") 5932 + } 5933 + 5934 + if extra > 0 { 5935 + t.Streamers = make([]string, extra) 5936 + } 5937 + 5938 + for i := 0; i < int(extra); i++ { 5939 + { 5940 + var maj byte 5941 + var extra uint64 5942 + var err error 5943 + _ = maj 5944 + _ = extra 5945 + _ = err 5946 + 5947 + { 5948 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 5949 + if err != nil { 5950 + return err 5951 + } 5952 + 5953 + t.Streamers[i] = string(sval) 5954 + } 5955 + 5956 + } 5957 + } 5958 + 5959 + default: 5960 + // Field doesn't exist on this type, so ignore it 5961 + if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { 5962 + return err 5963 + } 5964 + } 5965 + } 5966 + 5967 + return nil 5968 + }