Live video on the AT Protocol

prelive: checkbox to decide whether stream ends

+204 -61
+23 -33
js/app/components/live-dashboard/livestream-panel.tsx
··· 1 1 import { 2 - Admonition, 3 2 Button, 4 3 Checkbox, 5 4 ContentMetadataForm, 6 5 Dashboard, 7 6 formatHandle, 8 - formatHandleWithAt, 9 7 getBlob, 10 8 Input, 11 9 resolveDIDDocument, ··· 21 19 zero, 22 20 } from "@streamplace/components"; 23 21 import { error } from "console"; 24 - import { ArrowRight, ImagePlus, X } from "lucide-react-native"; 22 + import { ImagePlus, X } from "lucide-react-native"; 25 23 import { useCallback, useEffect, useMemo, useState } from "react"; 26 24 import { 27 25 Image, 28 26 Platform, 29 - Pressable, 30 27 ScrollView, 31 28 TouchableOpacity, 32 29 View, ··· 113 110 r.md, 114 111 layout.flex.center, 115 112 { 116 - height: 200, 113 + height: 100, 117 114 borderStyle: "dashed", 118 115 }, 119 116 ], ··· 186 183 )} 187 184 </> 188 185 )} 189 - <View style={{ marginTop: 8 }}> 186 + {/* <View style={{ marginTop: 8 }}> 190 187 <Admonition variant="info" size="sm"> 191 188 <Text size="sm"> 192 189 You are required to disclose if your content is not suitable for ··· 199 196 </Text> 200 197 </Pressable> 201 198 </Admonition> 202 - </View> 199 + </View> */} 203 200 </View> 204 201 ); 205 202 }; ··· 226 223 ); 227 224 228 225 const [createPost, setCreatePost] = useState(true); 226 + const [idleTimeout, setIdleTimeout] = useState(true); 229 227 const [sendPushNotification, setSendPushNotification] = useState(true); 230 228 const [canonicalUrl, setCanonicalUrl] = useState<string>(""); 231 229 const defaultCanonicalUrl = useMemo(() => { ··· 299 297 pushNotification: sendPushNotification, 300 298 }, 301 299 canonicalUrl: canonicalUrl || undefined, 300 + idleTimeoutSeconds: idleTimeout ? 300 : 0, 302 301 }); 303 302 } else { 304 303 await updateStreamRecord( ··· 421 420 ? "Waiting for stream to start..." 422 421 : "Waiting for stream to start..."; 423 422 } 424 - return mode === "create" ? "Announce Livestream!" : "Update Livestream!"; 425 - }, [loading, userIsLive, mode]); 423 + if (!livestream || livestream.record.endedAt !== undefined) { 424 + return "Start Livestream!"; 425 + } 426 + return "Update Livestream!"; 427 + }, [loading, userIsLive, mode, livestream]); 426 428 427 429 const Wrapper = scrollable ? ScrollView : View; 428 430 const wrapperProps = scrollable ··· 516 518 { minWidth: 100, textAlign: "left", paddingBottom: 8 }, 517 519 ]} 518 520 > 519 - Streamer 520 - </Text> 521 - <Text 522 - style={[ 523 - text.white, 524 - { fontWeight: "bold", paddingBottom: 8 }, 525 - ]} 526 - > 527 - {profile && formatHandleWithAt(profile)} 528 - </Text> 529 - </View> 530 - <View 531 - style={[ 532 - layout.flex.row, 533 - layout.flex.alignCenter, 534 - w.percent[100], 535 - ]} 536 - > 537 - <Text 538 - style={[ 539 - text.neutral[300], 540 - { minWidth: 100, textAlign: "left", paddingBottom: 8 }, 541 - ]} 542 - > 543 521 Title 544 522 </Text> 545 523 <View style={[flex.values[1]]}> ··· 650 628 setSendPushNotification(checked) 651 629 } 652 630 label={"Send push notification"} 631 + style={[{ fontSize: 12 }]} 632 + /> 633 + </Tooltip> 634 + 635 + <Tooltip 636 + content="Enabling this setting will turn your livestream off after 5 minutes of inactivity, and you'll need to press the 'Start Livestream' button again to start it again next time you stream." 637 + position="top" 638 + > 639 + <Checkbox 640 + checked={idleTimeout} 641 + onCheckedChange={(checked) => setIdleTimeout(checked)} 642 + label={"End livestream automatically"} 653 643 style={[{ fontSize: 12 }]} 654 644 /> 655 645 </Tooltip>
+60 -8
js/app/components/live-dashboard/stream-monitor.tsx
··· 12 12 import { Eye, EyeOff, Signal, Wifi, WifiOff } from "lucide-react-native"; 13 13 import { useEffect, useState } from "react"; 14 14 import { Image, TouchableOpacity, View } from "react-native"; 15 + import Animated from "react-native-reanimated"; 15 16 import { useLiveUser } from "../../hooks/useLiveUser"; 16 17 import { useSegmentTiming } from "../../hooks/useSegmentTiming"; 17 18 import StreamScreen from "./live-selector"; ··· 22 23 userProfile?: any; 23 24 isLive?: boolean; 24 25 videoRef?: any; 26 + } 27 + 28 + function PreviewOverlay() { 29 + // const opacity = useSharedValue(1); 30 + 31 + // useEffect(() => { 32 + // opacity.value = withRepeat(withTiming(0.8, { duration: 1500 }), -1, true); 33 + // }, [opacity]); 34 + 35 + // const animatedStyle = useAnimatedStyle(() => ({ 36 + // opacity: opacity.value, 37 + // })); 38 + 39 + return ( 40 + <View 41 + style={{ 42 + position: "absolute", 43 + top: 0, 44 + left: 0, 45 + right: 0, 46 + bottom: 0, 47 + justifyContent: "flex-start", 48 + alignItems: "flex-start", 49 + pointerEvents: "none", 50 + }} 51 + > 52 + <Animated.Text 53 + style={[ 54 + // animatedStyle, 55 + { 56 + paddingLeft: 16, 57 + paddingTop: 16, 58 + fontSize: 32, 59 + fontWeight: "800", 60 + color: "white", 61 + letterSpacing: 4, 62 + textShadowColor: "rgba(0, 0, 0, 0.9)", 63 + textShadowOffset: { width: 0, height: 2 }, 64 + textShadowRadius: 8, 65 + }, 66 + ]} 67 + > 68 + PREVIEW (NOT LIVE) 69 + </Animated.Text> 70 + </View> 71 + ); 25 72 } 26 73 27 74 export default function StreamMonitor({ ··· 139 186 <View style={[flex.values[1], layout.flex.center, bg.neutral[900]]}> 140 187 {isLive && userProfile ? ( 141 188 isStreamVisible ? ( 142 - <Player 143 - src={userProfile.did} 144 - name={userProfile.handle} 145 - muted={true} 189 + <View 190 + style={{ position: "relative", width: "100%", height: "100%" }} 146 191 > 147 - <DesktopUi /> 148 - <PlayerUI.ViewerLoadingOverlay /> 149 - <OfflineCounter isMobile={true} /> 150 - </Player> 192 + <Player 193 + src={userProfile.did} 194 + name={userProfile.handle} 195 + muted={true} 196 + > 197 + <DesktopUi /> 198 + <PlayerUI.ViewerLoadingOverlay /> 199 + <OfflineCounter isMobile={true} /> 200 + </Player> 201 + {!ls && <PreviewOverlay />} 202 + </View> 151 203 ) : ( 152 204 <View 153 205 style={[
+11 -3
js/app/components/mobile/desktop-ui/live-bubble.tsx
··· 1 - import { Code, useSegment, View, zero } from "@streamplace/components"; 1 + import { 2 + Code, 3 + useLivestream, 4 + useSegment, 5 + View, 6 + zero, 7 + } from "@streamplace/components"; 2 8 import { useMemo } from "react"; 3 9 4 10 const { borders, gap, h, w, px, bg, text } = zero; ··· 6 12 export function LiveBubble() { 7 13 // are we actually live? (is the most recent segment <= 10 seconds old?) 8 14 let seg = useSegment(); 15 + 16 + const livestream = useLivestream(); 9 17 10 18 let segDate = useMemo(() => { 11 19 return seg?.startTime ? new Date(seg.startTime) : undefined; ··· 51 59 { flexDirection: "row", alignItems: "center" }, 52 60 gap.all[1], 53 61 px[2], 54 - bg.destructive[500], 62 + livestream ? bg.destructive[500] : bg.gray[500], 55 63 borders.color.gray[800], 56 64 { paddingVertical: 3 }, 57 65 ]} ··· 67 75 }, 68 76 ]} 69 77 > 70 - LIVE 78 + {livestream ? "LIVE" : "NOT LIVE"} 71 79 </Code> 72 80 </View> 73 81 </View>
+3
js/components/src/streamplace-store/stream.tsx
··· 134 134 submitPost, 135 135 canonicalUrl, 136 136 notificationSettings, 137 + idleTimeoutSeconds, 137 138 }: { 138 139 title: string; 139 140 customThumbnail?: Blob; 140 141 submitPost?: boolean; 141 142 canonicalUrl?: string; 142 143 notificationSettings?: PlaceStreamLivestream.NotificationSettings; 144 + idleTimeoutSeconds?: number; 143 145 }) => { 144 146 if (typeof submitPost !== "boolean") { 145 147 submitPost = true; ··· 263 265 agent: `@streamplace/components/${PackageJson.version} (${platform}, ${platVersion})`, 264 266 post: newPost, 265 267 thumb: thumbnail, 268 + idleTimeoutSeconds: idleTimeoutSeconds, 266 269 }; 267 270 console.log("record", record); 268 271
+5
js/docs/src/content/docs/lex-reference/place-stream-livestream.md
··· 26 26 | `createdAt` | `string` | ✅ | Client-declared timestamp when this livestream started. | Format: `datetime` | 27 27 | `lastSeenAt` | `string` | ❌ | Client-declared timestamp when this livestream was last seen by the Streamplace station. | Format: `datetime` | 28 28 | `endedAt` | `string` | ❌ | Client-declared timestamp when this livestream ended. Ended livestreams are not supposed to start up again. | Format: `datetime` | 29 + | `idleTimeoutSeconds` | `integer` | ❌ | Time in seconds after which this livestream should be automatically ended if idle. Zero means no timeout. | | 29 30 | `post` | [`com.atproto.repo.strongRef`](https://github.com/bluesky-social/atproto/tree/main/lexicons/com/atproto/repo/strongref.json#undefined) | ❌ | The post that announced this livestream. | | 30 31 | `agent` | `string` | ❌ | The source of the livestream, if available, in a User Agent format: `<product> / <product-version> <comment>` e.g. Streamplace/0.7.5 iOS | | 31 32 | `canonicalUrl` | `string` | ❌ | The primary URL where this livestream can be viewed, if available. | Format: `uri` | ··· 168 169 "type": "string", 169 170 "format": "datetime", 170 171 "description": "Client-declared timestamp when this livestream ended. Ended livestreams are not supposed to start up again." 172 + }, 173 + "idleTimeoutSeconds": { 174 + "type": "integer", 175 + "description": "Time in seconds after which this livestream should be automatically ended if idle. Zero means no timeout." 171 176 }, 172 177 "post": { 173 178 "type": "ref",
+4
lexicons/place/stream/livestream.json
··· 36 36 "format": "datetime", 37 37 "description": "Client-declared timestamp when this livestream ended. Ended livestreams are not supposed to start up again." 38 38 }, 39 + "idleTimeoutSeconds": { 40 + "type": "integer", 41 + "description": "Time in seconds after which this livestream should be automatically ended if idle. Zero means no timeout." 42 + }, 39 43 "post": { 40 44 "type": "ref", 41 45 "ref": "com.atproto.repo.strongRef",
+17 -13
pkg/atproto/sync.go
··· 372 372 task := &statedb.FinalizeLivestreamTask{ 373 373 LivestreamURI: aturi.String(), 374 374 } 375 - if rec.LastSeenAt != nil { 376 - scheduledAt, err := time.Parse(time.RFC3339, *rec.LastSeenAt) 377 - if err == nil { 378 - scheduledAt = scheduledAt.Add(constants.LivestreamInactiveCheckInterval).UTC() 379 - taskKey := fmt.Sprintf("finalize-livestream::%s::%s", aturi.String(), scheduledAt.Format(util.ISO8601)) 380 - log.Warn(ctx, "queueing remove red circle task", "taskKey", taskKey, "scheduledAt", scheduledAt) 381 - _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskFinalizeLivestream, task, statedb.WithTaskKey(taskKey), statedb.WithScheduledAt(scheduledAt)) 382 - if err != nil { 383 - return fmt.Errorf("failed to enqueue remove red circle task: %w", err) 384 - } 385 - } else { 386 - log.Error(ctx, "failed to parse last seen at", "err", err) 387 - } 375 + if rec.LastSeenAt == nil || rec.IdleTimeoutSeconds == nil || *rec.IdleTimeoutSeconds == 0 || rec.EndedAt != nil { 376 + return nil 388 377 } 378 + scheduledAt, err := time.Parse(time.RFC3339, *rec.LastSeenAt) 379 + if err != nil { 380 + log.Error(ctx, "failed to parse last seen at", "err", err) 381 + return nil 382 + } 383 + 384 + // if we check after exactly rec.IdleTimeoutSeconds we might miss the finalization by a few seconds 385 + scheduledAt = scheduledAt.Add((time.Duration(*rec.IdleTimeoutSeconds) * time.Second) + (10 * time.Second)).UTC() 386 + taskKey := fmt.Sprintf("finalize-livestream::%s::%s", aturi.String(), scheduledAt.Format(util.ISO8601)) 387 + log.Warn(ctx, "queueing stream finalization task", "taskKey", taskKey, "scheduledAt", scheduledAt) 388 + _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskFinalizeLivestream, task, statedb.WithTaskKey(taskKey), statedb.WithScheduledAt(scheduledAt)) 389 + if err != nil { 390 + return fmt.Errorf("failed to enqueue remove red circle task: %w", err) 391 + } 392 + 389 393 } 390 394 391 395 case *streamplace.LiveTeleport:
+6 -3
pkg/statedb/queue_processor.go
··· 12 12 lexutil "github.com/bluesky-social/indigo/lex/util" 13 13 "github.com/bluesky-social/indigo/xrpc" 14 14 "gorm.io/gorm" 15 - "stream.place/streamplace/pkg/constants" 16 15 "stream.place/streamplace/pkg/integrations/webhook" 17 16 "stream.place/streamplace/pkg/log" 18 17 notificationpkg "stream.place/streamplace/pkg/notifications" ··· 105 104 if err != nil { 106 105 return fmt.Errorf("could not parse last seen at: %w", err) 107 106 } 108 - if time.Since(lastSeenTime) < constants.LivestreamConsideredInactiveAfter { 109 - log.Warn(ctx, "livestream is active, skipping removal of red circle", "lastSeenAt", lastSeenTime) 107 + if rec.IdleTimeoutSeconds == nil || *rec.IdleTimeoutSeconds == 0 { 108 + log.Warn(ctx, "livestream has no idle timeout, skipping finalization", "uri", livestream.URI) 109 + return nil 110 + } 111 + if time.Since(lastSeenTime) < (time.Duration(*rec.IdleTimeoutSeconds) * time.Second) { 112 + log.Warn(ctx, "livestream is active, skipping finalization", "lastSeenAt", lastSeenTime) 110 113 return nil 111 114 } 112 115 session, err := state.GetSessionByDID(livestream.RepoDID)
+73 -1
pkg/streamplace/cbor_gen.go
··· 250 250 } 251 251 252 252 cw := cbg.NewCborWriter(w) 253 - fieldCount := 11 253 + fieldCount := 12 254 254 255 255 if t.Agent == nil { 256 256 fieldCount-- ··· 261 261 } 262 262 263 263 if t.EndedAt == nil { 264 + fieldCount-- 265 + } 266 + 267 + if t.IdleTimeoutSeconds == nil { 264 268 fieldCount-- 265 269 } 266 270 ··· 551 555 } 552 556 } 553 557 558 + // t.IdleTimeoutSeconds (int64) (int64) 559 + if t.IdleTimeoutSeconds != nil { 560 + 561 + if len("idleTimeoutSeconds") > 1000000 { 562 + return xerrors.Errorf("Value in field \"idleTimeoutSeconds\" was too long") 563 + } 564 + 565 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("idleTimeoutSeconds"))); err != nil { 566 + return err 567 + } 568 + if _, err := cw.WriteString(string("idleTimeoutSeconds")); err != nil { 569 + return err 570 + } 571 + 572 + if t.IdleTimeoutSeconds == nil { 573 + if _, err := cw.Write(cbg.CborNull); err != nil { 574 + return err 575 + } 576 + } else { 577 + if *t.IdleTimeoutSeconds >= 0 { 578 + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(*t.IdleTimeoutSeconds)); err != nil { 579 + return err 580 + } 581 + } else { 582 + if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-*t.IdleTimeoutSeconds-1)); err != nil { 583 + return err 584 + } 585 + } 586 + } 587 + 588 + } 589 + 554 590 // t.NotificationSettings (streamplace.Livestream_NotificationSettings) (struct) 555 591 if t.NotificationSettings != nil { 556 592 ··· 789 825 } 790 826 791 827 t.CanonicalUrl = (*string)(&sval) 828 + } 829 + } 830 + // t.IdleTimeoutSeconds (int64) (int64) 831 + case "idleTimeoutSeconds": 832 + { 833 + 834 + b, err := cr.ReadByte() 835 + if err != nil { 836 + return err 837 + } 838 + if b != cbg.CborNull[0] { 839 + if err := cr.UnreadByte(); err != nil { 840 + return err 841 + } 842 + maj, extra, err := cr.ReadHeader() 843 + if err != nil { 844 + return err 845 + } 846 + var extraI int64 847 + switch maj { 848 + case cbg.MajUnsignedInt: 849 + extraI = int64(extra) 850 + if extraI < 0 { 851 + return fmt.Errorf("int64 positive overflow") 852 + } 853 + case cbg.MajNegativeInt: 854 + extraI = int64(extra) 855 + if extraI < 0 { 856 + return fmt.Errorf("int64 negative overflow") 857 + } 858 + extraI = -1 - extraI 859 + default: 860 + return fmt.Errorf("wrong type for int64 field: %d", maj) 861 + } 862 + 863 + t.IdleTimeoutSeconds = (*int64)(&extraI) 792 864 } 793 865 } 794 866 // t.NotificationSettings (streamplace.Livestream_NotificationSettings) (struct)
+2
pkg/streamplace/streamlivestream.go
··· 27 27 CreatedAt string `json:"createdAt" cborgen:"createdAt"` 28 28 // endedAt: Client-declared timestamp when this livestream ended. Ended livestreams are not supposed to start up again. 29 29 EndedAt *string `json:"endedAt,omitempty" cborgen:"endedAt,omitempty"` 30 + // idleTimeoutSeconds: Time in seconds after which this livestream should be automatically ended if idle. Zero means no timeout. 31 + IdleTimeoutSeconds *int64 `json:"idleTimeoutSeconds,omitempty" cborgen:"idleTimeoutSeconds,omitempty"` 30 32 // lastSeenAt: Client-declared timestamp when this livestream was last seen by the Streamplace station. 31 33 LastSeenAt *string `json:"lastSeenAt,omitempty" cborgen:"lastSeenAt,omitempty"` 32 34 NotificationSettings *Livestream_NotificationSettings `json:"notificationSettings,omitempty" cborgen:"notificationSettings,omitempty"`