Live video on the AT Protocol

prelive: add logic for endedAt in livestream

+172 -7
+24
js/app/components/live-dashboard/livestream-panel.tsx
··· 13 13 Textarea, 14 14 Tooltip, 15 15 useCreateStreamRecord, 16 + useEndLivestream, 16 17 useLivestream, 17 18 useToast, 18 19 useUpdateStreamRecord, ··· 211 212 const livestream = useLivestream(); 212 213 const createStreamRecord = useCreateStreamRecord(); 213 214 const updateStreamRecord = useUpdateStreamRecord(); 215 + const endLivestream = useEndLivestream(); 214 216 const url = useUrl(); 215 217 216 218 const [title, setTitle] = useState(""); ··· 348 350 livestream, 349 351 ]); 350 352 353 + const handleEndLivestream = useCallback(async () => { 354 + if (!livestream) return; 355 + await endLivestream(livestream); 356 + }, [livestream, endLivestream]); 357 + 351 358 const handleImageSelect = useCallback(() => { 352 359 // Default web file picker behavior 353 360 const input = document.createElement("input"); ··· 658 665 style={[text.white, { fontSize: 16, fontWeight: "bold" }]} 659 666 > 660 667 {buttonText} 668 + </Text> 669 + </Button> 670 + <Button 671 + variant="destructive" 672 + onPress={handleEndLivestream} 673 + style={[ 674 + r.md, 675 + py[3], 676 + w.percent[100], 677 + layout.flex.center, 678 + { opacity: disabled ? 0.5 : 1 }, 679 + ]} 680 + > 681 + <Text 682 + style={[text.white, { fontSize: 16, fontWeight: "bold" }]} 683 + > 684 + End Livestream 661 685 </Text> 662 686 </Button> 663 687 </View>
+39
js/components/src/streamplace-store/stream.tsx
··· 341 341 return record; 342 342 }; 343 343 } 344 + 345 + export function useEndLivestream() { 346 + let agent = usePDSAgent(); 347 + return async (livestream: LivestreamViewHydrated | null) => { 348 + if (!agent) { 349 + throw new Error("No PDS agent found"); 350 + } 351 + 352 + if (!agent.did) { 353 + throw new Error("No user DID found, assuming not logged in"); 354 + } 355 + 356 + if (!livestream) { 357 + throw new Error("No latest record"); 358 + } 359 + 360 + let rkey = livestream.uri.split("/").pop(); 361 + if (!rkey) { 362 + throw new Error("No rkey?"); 363 + } 364 + 365 + if (livestream.record.endedAt) { 366 + throw new Error("Livestream already ended"); 367 + } 368 + 369 + let record: PlaceStreamLivestream.Record = { 370 + ...livestream.record, 371 + endedAt: new Date().toISOString(), 372 + }; 373 + 374 + await agent.com.atproto.repo.putRecord({ 375 + repo: agent.did, 376 + collection: "place.stream.livestream", 377 + rkey, 378 + record, 379 + }); 380 + return record; 381 + }; 382 + }
+8 -2
js/docs/src/content/docs/lex-reference/place-stream-livestream.md
··· 24 24 | `title` | `string` | ✅ | The title of the livestream, as it will be announced to followers. | Max Length: 1400<br/>Max Graphemes: 140 | 25 25 | `url` | `string` | ❌ | The URL where this stream can be found. This is primarily a hint for other Streamplace nodes to locate and replicate the stream. | Format: `uri` | 26 26 | `createdAt` | `string` | ✅ | Client-declared timestamp when this livestream started. | Format: `datetime` | 27 - | `lastSeenAt` | `string` | ❌ | Client-declared timestamp when this livestream was last seen by the user. | Format: `datetime` | 27 + | `lastSeenAt` | `string` | ❌ | Client-declared timestamp when this livestream was last seen by the Streamplace station. | Format: `datetime` | 28 + | `endedAt` | `string` | ❌ | Client-declared timestamp when this livestream ended. Ended livestreams are not supposed to start up again. | Format: `datetime` | 28 29 | `post` | [`com.atproto.repo.strongRef`](https://github.com/bluesky-social/atproto/tree/main/lexicons/com/atproto/repo/strongref.json#undefined) | ❌ | The post that announced this livestream. | | 29 30 | `agent` | `string` | ❌ | The source of the livestream, if available, in a User Agent format: `<product> / <product-version> <comment>` e.g. Streamplace/0.7.5 iOS | | 30 31 | `canonicalUrl` | `string` | ❌ | The primary URL where this livestream can be viewed, if available. | Format: `uri` | ··· 161 162 "lastSeenAt": { 162 163 "type": "string", 163 164 "format": "datetime", 164 - "description": "Client-declared timestamp when this livestream was last seen by the user." 165 + "description": "Client-declared timestamp when this livestream was last seen by the Streamplace station." 166 + }, 167 + "endedAt": { 168 + "type": "string", 169 + "format": "datetime", 170 + "description": "Client-declared timestamp when this livestream ended. Ended livestreams are not supposed to start up again." 165 171 }, 166 172 "post": { 167 173 "type": "ref",
+6 -1
lexicons/place/stream/livestream.json
··· 29 29 "lastSeenAt": { 30 30 "type": "string", 31 31 "format": "datetime", 32 - "description": "Client-declared timestamp when this livestream was last seen by the user." 32 + "description": "Client-declared timestamp when this livestream was last seen by the Streamplace station." 33 + }, 34 + "endedAt": { 35 + "type": "string", 36 + "format": "datetime", 37 + "description": "Client-declared timestamp when this livestream ended. Ended livestreams are not supposed to start up again." 33 38 }, 34 39 "post": { 35 40 "type": "ref",
+34 -2
pkg/statedb/queue_processor.go
··· 8 8 "time" 9 9 10 10 "github.com/bluesky-social/indigo/api/bsky" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + lexutil "github.com/bluesky-social/indigo/lex/util" 13 + "github.com/bluesky-social/indigo/xrpc" 11 14 "gorm.io/gorm" 12 15 "stream.place/streamplace/pkg/constants" 13 16 "stream.place/streamplace/pkg/integrations/webhook" 14 17 "stream.place/streamplace/pkg/log" 15 18 notificationpkg "stream.place/streamplace/pkg/notifications" 16 19 "stream.place/streamplace/pkg/streamplace" 20 + 21 + comatproto "github.com/bluesky-social/indigo/api/atproto" 17 22 ) 18 23 19 24 var TaskNotification = "notification" ··· 112 117 if err != nil { 113 118 return fmt.Errorf("failed to refresh session: %w", err) 114 119 } 115 - _, err = state.OATProxy.GetXrpcClient(session) 120 + client, err := state.OATProxy.GetXrpcClient(session) 116 121 if err != nil { 117 122 return fmt.Errorf("failed to get xrpc client: %w", err) 118 123 } 119 - log.Warn(ctx, "livestream is inactive, finalizing", "uri", livestream.URI, "lastSeenAt", lastSeenTime) 124 + if rec.EndedAt != nil { 125 + log.Warn(ctx, "livestream has already ended, skipping", "uri", livestream.URI, "endedAt", *rec.EndedAt) 126 + return nil 127 + } 128 + 129 + uri, err := syntax.ParseATURI(livestream.URI) 130 + if err != nil { 131 + return fmt.Errorf("failed to parse ATURI: %w", err) 132 + } 133 + 134 + rec.EndedAt = rec.LastSeenAt 135 + 136 + inp := comatproto.RepoPutRecord_Input{ 137 + Collection: "place.stream.livestream", 138 + Record: &lexutil.LexiconTypeDecoder{Val: rec}, 139 + Rkey: uri.RecordKey().String(), 140 + Repo: livestream.RepoDID, 141 + SwapRecord: &livestream.CID, 142 + } 143 + out := comatproto.RepoPutRecord_Output{} 144 + 145 + err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 146 + if err != nil { 147 + return fmt.Errorf("failed to update livestream record: %w", err) 148 + } 149 + 150 + log.Log(ctx, "livestream finalized", "uri", livestream.URI, "endedAt", *rec.EndedAt) 151 + 120 152 return nil 121 153 } 122 154
+58 -1
pkg/streamplace/cbor_gen.go
··· 250 250 } 251 251 252 252 cw := cbg.NewCborWriter(w) 253 - fieldCount := 10 253 + fieldCount := 11 254 254 255 255 if t.Agent == nil { 256 256 fieldCount-- 257 257 } 258 258 259 259 if t.CanonicalUrl == nil { 260 + fieldCount-- 261 + } 262 + 263 + if t.EndedAt == nil { 260 264 fieldCount-- 261 265 } 262 266 ··· 426 430 } 427 431 if _, err := cw.WriteString(string(t.Title)); err != nil { 428 432 return err 433 + } 434 + 435 + // t.EndedAt (string) (string) 436 + if t.EndedAt != nil { 437 + 438 + if len("endedAt") > 1000000 { 439 + return xerrors.Errorf("Value in field \"endedAt\" was too long") 440 + } 441 + 442 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("endedAt"))); err != nil { 443 + return err 444 + } 445 + if _, err := cw.WriteString(string("endedAt")); err != nil { 446 + return err 447 + } 448 + 449 + if t.EndedAt == nil { 450 + if _, err := cw.Write(cbg.CborNull); err != nil { 451 + return err 452 + } 453 + } else { 454 + if len(*t.EndedAt) > 1000000 { 455 + return xerrors.Errorf("Value in field t.EndedAt was too long") 456 + } 457 + 458 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(*t.EndedAt))); err != nil { 459 + return err 460 + } 461 + if _, err := cw.WriteString(string(*t.EndedAt)); err != nil { 462 + return err 463 + } 464 + } 429 465 } 430 466 431 467 // t.CreatedAt (string) (string) ··· 680 716 } 681 717 682 718 t.Title = string(sval) 719 + } 720 + // t.EndedAt (string) (string) 721 + case "endedAt": 722 + 723 + { 724 + b, err := cr.ReadByte() 725 + if err != nil { 726 + return err 727 + } 728 + if b != cbg.CborNull[0] { 729 + if err := cr.UnreadByte(); err != nil { 730 + return err 731 + } 732 + 733 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 734 + if err != nil { 735 + return err 736 + } 737 + 738 + t.EndedAt = (*string)(&sval) 739 + } 683 740 } 684 741 // t.CreatedAt (string) (string) 685 742 case "createdAt":
+3 -1
pkg/streamplace/streamlivestream.go
··· 25 25 CanonicalUrl *string `json:"canonicalUrl,omitempty" cborgen:"canonicalUrl,omitempty"` 26 26 // createdAt: Client-declared timestamp when this livestream started. 27 27 CreatedAt string `json:"createdAt" cborgen:"createdAt"` 28 - // lastSeenAt: Client-declared timestamp when this livestream was last seen by the user. 28 + // endedAt: Client-declared timestamp when this livestream ended. Ended livestreams are not supposed to start up again. 29 + EndedAt *string `json:"endedAt,omitempty" cborgen:"endedAt,omitempty"` 30 + // lastSeenAt: Client-declared timestamp when this livestream was last seen by the Streamplace station. 29 31 LastSeenAt *string `json:"lastSeenAt,omitempty" cborgen:"lastSeenAt,omitempty"` 30 32 NotificationSettings *Livestream_NotificationSettings `json:"notificationSettings,omitempty" cborgen:"notificationSettings,omitempty"` 31 33 // post: The post that announced this livestream.