Live video on the AT Protocol

prelive: implement place.stream.live.startStopLivestream

+808 -167
+1
Makefile
··· 384 384 && sed -i.bak 's/AppBskyGraphBlock\.Main/AppBskyGraphBlock\.Record/' $$(find ./js/streamplace/src/lexicons/types/place/stream -type f) \ 385 385 && sed -i.bak 's/PlaceStreamMultistreamTarget\.Main/PlaceStreamMultistreamTarget\.Record/' $$(find ./js/streamplace/src/lexicons/types/place/stream -type f) \ 386 386 && sed -i.bak 's/PlaceStreamChatProfile\.Main/PlaceStreamChatProfile\.Record/' $$(find ./js/streamplace/src/lexicons/types/place/stream -type f) \ 387 + && sed -i.bak 's/PlaceStreamLivestream\.Main/PlaceStreamLivestream\.Record/' $$(find ./js/streamplace/src/lexicons/types/place/stream/live -type f) \ 387 388 && for x in $$(find ./js/streamplace/src/lexicons -type f -name '*.ts'); do \ 388 389 echo 'import { ComAtprotoSyncGetRepo, AppBskyRichtextFacet, AppBskyGraphBlock, ComAtprotoRepoStrongRef, AppBskyActorDefs, ComAtprotoSyncListRepos, AppBskyActorGetProfile, AppBskyFeedGetFeedSkeleton, ComAtprotoIdentityResolveHandle, ComAtprotoModerationCreateReport, ComAtprotoRepoCreateRecord, ComAtprotoRepoDeleteRecord, ComAtprotoRepoDescribeRepo, ComAtprotoRepoGetRecord, ComAtprotoRepoListRecords, ComAtprotoRepoPutRecord, ComAtprotoRepoUploadBlob, ComAtprotoServerDescribeServer, ComAtprotoSyncGetRecord, ComAtprotoSyncListReposComAtprotoRepoCreateRecord, ComAtprotoRepoDeleteRecord, ComAtprotoRepoGetRecord, ComAtprotoRepoListRecords, ComAtprotoIdentityRefreshIdentity } from "@atproto/api"' >> $$x; \ 389 390 done \
+1 -1
js/app/components/live-dashboard/livestream-panel.tsx
··· 353 353 if (!livestream) return; 354 354 setEndingLivestream(true); 355 355 try { 356 - await endLivestream(livestream); 356 + await endLivestream(); 357 357 } catch (error) { 358 358 console.error("Error ending livestream:", error); 359 359 toast.show("Error", "Failed to end livestream", {
+8 -5
js/components/src/components/mobile-player/video.tsx
··· 544 544 export function WebcamIngestPlayer(props: VideoProps) { 545 545 const ingestMediaSource = usePlayerStore((x) => x.ingestMediaSource); 546 546 const ingestAutoStart = usePlayerStore((x) => x.ingestAutoStart); 547 + const setIngestLive = usePlayerStore((x) => x.setIngestLive); 547 548 548 549 const [error, setError] = useState<Error | null>(null); 549 550 ··· 606 607 }, [ingestMediaSource]); 607 608 608 609 useEffect(() => { 609 - if (!ingestAutoStart) { 610 - setRemoteMediaStream(null); 611 - return; 612 - } 610 + // if (!ingestAutoStart) { 611 + // setRemoteMediaStream(null); 612 + // return; 613 + // } 613 614 if (!localMediaStream) { 614 615 return; 615 616 } 617 + console.log("setting remote media stream", localMediaStream); 618 + setIngestLive(true); 616 619 setRemoteMediaStream(localMediaStream); 617 - }, [localMediaStream, ingestAutoStart]); 620 + }, [localMediaStream, setIngestLive, setRemoteMediaStream]); 618 621 619 622 useEffect(() => { 620 623 if (!videoElement) {
+3 -4
js/components/src/hooks/useLivestreamInfo.ts
··· 1 1 import { useState } from "react"; 2 2 import { useLivestreamStore } from "../livestream-store"; 3 3 import { usePlayerStore } from "../player-store"; 4 - import { useCreateStreamRecord } from "../streamplace-store"; 4 + import { useCreateStreamRecord, useEndLivestream } from "../streamplace-store"; 5 5 6 6 export function useLivestreamInfo(url?: string) { 7 7 const ingest = usePlayerStore((x) => x.ingestConnectionState); 8 8 const profile = useLivestreamStore((x) => x.profile); 9 - const setIngestLive = usePlayerStore((x) => x.setIngestLive); 10 - const stopIngest = usePlayerStore((x) => x.stopIngest); 9 + const endLivestream = useEndLivestream(); 11 10 12 11 const createStreamRecord = useCreateStreamRecord(); 13 12 ··· 49 48 // Stop the current broadcast 50 49 const toggleStopStream = () => { 51 50 console.log("Stopping stream..."); 52 - stopIngest(); 51 + endLivestream(); 53 52 }; 54 53 55 54 return {
+21 -126
js/components/src/streamplace-store/stream.tsx
··· 127 127 let agent = usePDSAgent(); 128 128 let url = useUrl(); 129 129 const uploadThumbnail = useUploadThumbnail(); 130 - 131 130 return async ({ 132 131 title, 133 132 customThumbnail, ··· 143 142 notificationSettings?: PlaceStreamLivestream.NotificationSettings; 144 143 idleTimeoutSeconds?: number; 145 144 }) => { 146 - if (typeof submitPost !== "boolean") { 147 - submitPost = true; 148 - } 149 145 if (!agent) { 150 146 throw new Error("No PDS agent found"); 151 147 } 152 148 153 - if (!agent.did) { 154 - throw new Error("No user DID found, assuming not logged in"); 155 - } 156 - 157 - const u = new URL(url); 158 - 159 - // let thumbnail: BlobRef | undefined = undefined; 160 - 161 - // if (customThumbnail) { 162 - // try { 163 - // thumbnail = await uploadThumbnail(agent, customThumbnail); 164 - // } catch (e) { 165 - // throw new Error(`Custom thumbnail upload failed ${e}`); 166 - // } 167 - // } else { 168 - // // No custom thumbnail: fetch the server-side image and upload it 169 - // // try thrice lel 170 - // let tries = 0; 171 - // try { 172 - // for (; tries < 3; tries++) { 173 - // try { 174 - // console.log( 175 - // `Fetching thumbnail from ${u.protocol}//${u.host}/api/playback/${agent.did}/stream.png`, 176 - // ); 177 - // const thumbnailRes = await fetch( 178 - // `${u.protocol}//${u.host}/api/playback/${agent.did}/stream.png`, 179 - // ); 180 - // if (!thumbnailRes.ok) { 181 - // throw new Error( 182 - // `Failed to fetch thumbnail: ${thumbnailRes.status})`, 183 - // ); 184 - // } 185 - // const thumbnailBlob = await thumbnailRes.blob(); 186 - // console.log(thumbnailBlob); 187 - // thumbnail = await uploadThumbnail(agent, thumbnailBlob); 188 - // } catch (e) { 189 - // console.warn( 190 - // `Failed to fetch thumbnail, retrying (${tries + 1}/3): ${e}`, 191 - // ); 192 - // // Wait 1 second before retrying 193 - // await new Promise((resolve) => setTimeout(resolve, 2000)); 194 - // if (tries === 2) { 195 - // throw new Error(`Failed to fetch thumbnail after 3 tries: ${e}`); 196 - // } 197 - // } 198 - // } 199 - // } catch (e) { 200 - // throw new Error(`Thumbnail upload failed ${e}`); 201 - // } 202 - // } 203 - 204 - let newPost: undefined | { uri: string; cid: string } = undefined; 205 - 206 - const did = agent.did; 207 - const profile = await agent.getProfile({ actor: did }); 208 - 209 - if (submitPost) { 210 - if (!profile) { 211 - throw new Error("No profile found for the user DID"); 212 - } 213 - 214 - const params = new URLSearchParams({ 215 - did: did, 216 - time: new Date().toISOString(), 217 - }); 218 - 219 - let post = await buildGoLivePost( 220 - title, 221 - u, 222 - profile.data, 223 - params, 224 - undefined, 225 - agent, 226 - ); 227 - 228 - newPost = await createNewPost(agent, post); 229 - 230 - if (!newPost.uri || !newPost.cid) { 231 - throw new Error( 232 - "Cannot read properties of undefined (reading 'uri' or 'cid')", 233 - ); 234 - } 235 - } 236 - 237 149 let platform: string = Platform.OS; 238 150 let platVersion: string = Platform.Version 239 151 ? Platform.Version.toString() ··· 246 158 ) { 247 159 platVersion = getBrowserName(window.navigator.userAgent); 248 160 } 161 + if (!agent.did) { 162 + throw new Error("No user DID found, assuming not logged in"); 163 + } 249 164 250 - const thisUrl = `${url}/${profile.data.handle}`; 251 - if (!canonicalUrl) { 252 - canonicalUrl = thisUrl; 253 - } 165 + const thisUrl = `${url}/${agent.did}`; 254 166 255 167 const record: PlaceStreamLivestream.Record = { 256 168 $type: "place.stream.livestream", ··· 263 175 // user agent style string 264 176 // e.g. `@streamplace/components/0.1.0 (ios, 32.0)` 265 177 agent: `@streamplace/components/${PackageJson.version} (${platform}, ${platVersion})`, 266 - post: newPost, 267 - // thumb: thumbnail, 268 178 idleTimeoutSeconds: idleTimeoutSeconds, 269 179 }; 270 - console.log("record", record); 271 180 272 181 if (notificationSettings) { 273 182 record.notificationSettings = notificationSettings; 274 183 } 275 184 276 - await agent.com.atproto.repo.createRecord({ 277 - repo: agent.did, 278 - collection: "place.stream.livestream", 279 - record, 185 + if (customThumbnail) { 186 + try { 187 + const thumbnail = await uploadThumbnail(agent, customThumbnail); 188 + record.thumb = thumbnail; 189 + } catch (e) { 190 + throw new Error(`Custom thumbnail upload failed ${e}`); 191 + } 192 + } 193 + 194 + const output = await agent.place.stream.live.startLivestream({ 195 + livestream: record, 196 + streamer: agent.did, 197 + createBlueskyPost: submitPost, 280 198 }); 281 - return record; 199 + 200 + return output; 282 201 }; 283 202 } 284 203 ··· 347 266 348 267 export function useEndLivestream() { 349 268 let agent = usePDSAgent(); 350 - return async (livestream: LivestreamViewHydrated | null) => { 269 + return async () => { 351 270 if (!agent) { 352 271 throw new Error("No PDS agent found"); 353 272 } ··· 356 275 throw new Error("No user DID found, assuming not logged in"); 357 276 } 358 277 359 - if (!livestream) { 360 - throw new Error("No latest record"); 361 - } 362 - 363 - let rkey = livestream.uri.split("/").pop(); 364 - if (!rkey) { 365 - throw new Error("No rkey?"); 366 - } 367 - 368 - if (livestream.record.endedAt) { 369 - throw new Error("Livestream already ended"); 370 - } 371 - 372 - let record: PlaceStreamLivestream.Record = { 373 - ...livestream.record, 374 - endedAt: new Date().toISOString(), 375 - }; 376 - 377 - await agent.com.atproto.repo.putRecord({ 378 - repo: agent.did, 379 - collection: "place.stream.livestream", 380 - rkey, 381 - record, 382 - }); 383 - return record; 278 + return await agent.place.stream.live.stopLivestream({}); 384 279 }; 385 280 }
+101
js/docs/src/content/docs/lex-reference/live/place-stream-live-startlivestream.md
··· 1 + --- 2 + title: place.stream.live.startLivestream 3 + description: Reference for the place.stream.live.startLivestream lexicon 4 + --- 5 + 6 + **Lexicon Version:** 1 7 + 8 + ## Definitions 9 + 10 + <a name="main"></a> 11 + 12 + ### `main` 13 + 14 + **Type:** `procedure` 15 + 16 + Create a new place.stream.livestream record, automatically populating a thumbnail and creating a Bluesky post and whatnot. You can do this manually by creating a record but this method can work better for mobile livestreaming and such. 17 + 18 + **Parameters:** _(None defined)_ 19 + 20 + **Input:** 21 + 22 + - **Encoding:** `application/json` 23 + - **Schema:** 24 + 25 + **Schema Type:** `object` 26 + 27 + | Name | Type | Req'd | Description | Constraints | 28 + | ------------------- | ------------------------------------------------------------------- | ----- | ----------------------------------------------------------- | ------------- | 29 + | `livestream` | [`place.stream.livestream`](/lex-reference/place-stream-livestream) | ✅ | | | 30 + | `streamer` | `string` | ✅ | The DID of the streamer. | Format: `did` | 31 + | `createBlueskyPost` | `boolean` | ❌ | Whether to create a Bluesky post announcing the livestream. | | 32 + 33 + **Output:** 34 + 35 + - **Encoding:** `application/json` 36 + - **Schema:** 37 + 38 + **Schema Type:** `object` 39 + 40 + | Name | Type | Req'd | Description | Constraints | 41 + | ----- | -------- | ----- | --------------------------------- | ------------- | 42 + | `uri` | `string` | ✅ | The URI of the livestream record. | Format: `uri` | 43 + | `cid` | `string` | ✅ | The CID of the livestream record. | Format: `cid` | 44 + 45 + --- 46 + 47 + ## Lexicon Source 48 + 49 + ```json 50 + { 51 + "lexicon": 1, 52 + "id": "place.stream.live.startLivestream", 53 + "defs": { 54 + "main": { 55 + "type": "procedure", 56 + "description": "Create a new place.stream.livestream record, automatically populating a thumbnail and creating a Bluesky post and whatnot. You can do this manually by creating a record but this method can work better for mobile livestreaming and such.", 57 + "input": { 58 + "encoding": "application/json", 59 + "schema": { 60 + "type": "object", 61 + "required": ["streamer", "livestream"], 62 + "properties": { 63 + "livestream": { 64 + "type": "ref", 65 + "ref": "place.stream.livestream" 66 + }, 67 + "streamer": { 68 + "type": "string", 69 + "format": "did", 70 + "description": "The DID of the streamer." 71 + }, 72 + "createBlueskyPost": { 73 + "type": "boolean", 74 + "description": "Whether to create a Bluesky post announcing the livestream." 75 + } 76 + } 77 + } 78 + }, 79 + "output": { 80 + "encoding": "application/json", 81 + "schema": { 82 + "type": "object", 83 + "required": ["uri", "cid"], 84 + "properties": { 85 + "uri": { 86 + "type": "string", 87 + "format": "uri", 88 + "description": "The URI of the livestream record." 89 + }, 90 + "cid": { 91 + "type": "string", 92 + "format": "cid", 93 + "description": "The CID of the livestream record." 94 + } 95 + } 96 + } 97 + } 98 + } 99 + } 100 + } 101 + ```
+82
js/docs/src/content/docs/lex-reference/live/place-stream-live-stoplivestream.md
··· 1 + --- 2 + title: place.stream.live.stopLivestream 3 + description: Reference for the place.stream.live.stopLivestream lexicon 4 + --- 5 + 6 + **Lexicon Version:** 1 7 + 8 + ## Definitions 9 + 10 + <a name="main"></a> 11 + 12 + ### `main` 13 + 14 + **Type:** `procedure` 15 + 16 + Stop your current livestream, updating your current place.stream.livestream record and ceasing the flow of video. 17 + 18 + **Parameters:** _(None defined)_ 19 + 20 + **Input:** 21 + 22 + - **Encoding:** `application/json` 23 + - **Schema:** 24 + 25 + **Schema Type:** `object` 26 + 27 + _(No properties defined)_ 28 + **Output:** 29 + 30 + - **Encoding:** `application/json` 31 + - **Schema:** 32 + 33 + **Schema Type:** `object` 34 + 35 + | Name | Type | Req'd | Description | Constraints | 36 + | ----- | -------- | ----- | --------------------------------------------- | ------------- | 37 + | `uri` | `string` | ✅ | The URI of the stopped livestream record. | Format: `uri` | 38 + | `cid` | `string` | ✅ | The new CID of the stopped livestream record. | Format: `cid` | 39 + 40 + --- 41 + 42 + ## Lexicon Source 43 + 44 + ```json 45 + { 46 + "lexicon": 1, 47 + "id": "place.stream.live.stopLivestream", 48 + "defs": { 49 + "main": { 50 + "type": "procedure", 51 + "description": "Stop your current livestream, updating your current place.stream.livestream record and ceasing the flow of video.", 52 + "input": { 53 + "encoding": "application/json", 54 + "schema": { 55 + "type": "object", 56 + "required": [], 57 + "properties": {} 58 + } 59 + }, 60 + "output": { 61 + "encoding": "application/json", 62 + "schema": { 63 + "type": "object", 64 + "required": ["uri", "cid"], 65 + "properties": { 66 + "uri": { 67 + "type": "string", 68 + "format": "uri", 69 + "description": "The URI of the stopped livestream record." 70 + }, 71 + "cid": { 72 + "type": "string", 73 + "format": "cid", 74 + "description": "The new CID of the stopped livestream record." 75 + } 76 + } 77 + } 78 + } 79 + } 80 + } 81 + } 82 + ```
+103
js/docs/src/content/docs/lex-reference/openapi.json
··· 1553 1553 ] 1554 1554 } 1555 1555 }, 1556 + "/xrpc/place.stream.live.startLivestream": { 1557 + "post": { 1558 + "summary": "Create a new place.stream.livestream record, automatically populating a thumbnail and creating a Bluesky post and whatnot. You can do this manually by creating a record but this method can work better for mobile livestreaming and such.", 1559 + "operationId": "place.stream.live.startLivestream", 1560 + "tags": ["place.stream.live"], 1561 + "responses": { 1562 + "200": { 1563 + "description": "Success", 1564 + "content": { 1565 + "application/json": { 1566 + "schema": { 1567 + "type": "object", 1568 + "properties": { 1569 + "uri": { 1570 + "type": "string", 1571 + "description": "The URI of the livestream record.", 1572 + "format": "uri" 1573 + }, 1574 + "cid": { 1575 + "type": "string", 1576 + "description": "The CID of the livestream record.", 1577 + "format": "cid" 1578 + } 1579 + }, 1580 + "required": ["uri", "cid"] 1581 + } 1582 + } 1583 + } 1584 + } 1585 + }, 1586 + "requestBody": { 1587 + "required": true, 1588 + "content": { 1589 + "application/json": { 1590 + "schema": { 1591 + "type": "object", 1592 + "properties": { 1593 + "livestream": { 1594 + "$ref": "#/components/schemas/place.stream.livestream" 1595 + }, 1596 + "streamer": { 1597 + "type": "string", 1598 + "description": "The DID of the streamer.", 1599 + "format": "did" 1600 + }, 1601 + "createBlueskyPost": { 1602 + "type": "boolean", 1603 + "description": "Whether to create a Bluesky post announcing the livestream." 1604 + } 1605 + }, 1606 + "required": ["streamer", "livestream"] 1607 + } 1608 + } 1609 + } 1610 + } 1611 + } 1612 + }, 1613 + "/xrpc/place.stream.live.stopLivestream": { 1614 + "post": { 1615 + "summary": "Stop your current livestream, updating your current place.stream.livestream record and ceasing the flow of video.", 1616 + "operationId": "place.stream.live.stopLivestream", 1617 + "tags": ["place.stream.live"], 1618 + "responses": { 1619 + "200": { 1620 + "description": "Success", 1621 + "content": { 1622 + "application/json": { 1623 + "schema": { 1624 + "type": "object", 1625 + "properties": { 1626 + "uri": { 1627 + "type": "string", 1628 + "description": "The URI of the stopped livestream record.", 1629 + "format": "uri" 1630 + }, 1631 + "cid": { 1632 + "type": "string", 1633 + "description": "The new CID of the stopped livestream record.", 1634 + "format": "cid" 1635 + } 1636 + }, 1637 + "required": ["uri", "cid"] 1638 + } 1639 + } 1640 + } 1641 + } 1642 + }, 1643 + "requestBody": { 1644 + "required": true, 1645 + "content": { 1646 + "application/json": { 1647 + "schema": { 1648 + "type": "object", 1649 + "properties": {} 1650 + } 1651 + } 1652 + } 1653 + } 1654 + } 1655 + }, 1556 1656 "/xrpc/place.stream.live.subscribeSegments": { 1557 1657 "get": { 1558 1658 "summary": "Subscribe to a stream's new segments as they come in!", ··· 3450 3550 } 3451 3551 }, 3452 3552 "required": ["did", "handle"] 3553 + }, 3554 + "place.stream.livestream": { 3555 + "description": "Unknown type" 3453 3556 }, 3454 3557 "place.stream.live.subscribeSegments_segment": { 3455 3558 "type": "string",
+51
lexicons/place/stream/live/startLivestream.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "place.stream.live.startLivestream", 4 + "defs": { 5 + "main": { 6 + "type": "procedure", 7 + "description": "Create a new place.stream.livestream record, automatically populating a thumbnail and creating a Bluesky post and whatnot. You can do this manually by creating a record but this method can work better for mobile livestreaming and such.", 8 + "input": { 9 + "encoding": "application/json", 10 + "schema": { 11 + "type": "object", 12 + "required": ["streamer", "livestream"], 13 + "properties": { 14 + "livestream": { 15 + "type": "ref", 16 + "ref": "place.stream.livestream" 17 + }, 18 + "streamer": { 19 + "type": "string", 20 + "format": "did", 21 + "description": "The DID of the streamer." 22 + }, 23 + "createBlueskyPost": { 24 + "type": "boolean", 25 + "description": "Whether to create a Bluesky post announcing the livestream." 26 + } 27 + } 28 + } 29 + }, 30 + "output": { 31 + "encoding": "application/json", 32 + "schema": { 33 + "type": "object", 34 + "required": ["uri", "cid"], 35 + "properties": { 36 + "uri": { 37 + "type": "string", 38 + "format": "uri", 39 + "description": "The URI of the livestream record." 40 + }, 41 + "cid": { 42 + "type": "string", 43 + "format": "cid", 44 + "description": "The CID of the livestream record." 45 + } 46 + } 47 + } 48 + } 49 + } 50 + } 51 + }
+37
lexicons/place/stream/live/stopLivestream.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "place.stream.live.stopLivestream", 4 + "defs": { 5 + "main": { 6 + "type": "procedure", 7 + "description": "Stop your current livestream, updating your current place.stream.livestream record and ceasing the flow of video.", 8 + "input": { 9 + "encoding": "application/json", 10 + "schema": { 11 + "type": "object", 12 + "required": [], 13 + "properties": {} 14 + } 15 + }, 16 + "output": { 17 + "encoding": "application/json", 18 + "schema": { 19 + "type": "object", 20 + "required": ["uri", "cid"], 21 + "properties": { 22 + "uri": { 23 + "type": "string", 24 + "format": "uri", 25 + "description": "The URI of the stopped livestream record." 26 + }, 27 + "cid": { 28 + "type": "string", 29 + "format": "cid", 30 + "description": "The new CID of the stopped livestream record." 31 + } 32 + } 33 + } 34 + } 35 + } 36 + } 37 + }
+291 -31
pkg/spxrpc/place_stream_live.go
··· 1 1 package spxrpc 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "encoding/json" 6 7 "fmt" 7 8 "net/http" 8 9 "net/url" 10 + "os" 9 11 "strconv" 10 12 "time" 11 13 12 - "github.com/bluesky-social/indigo/lex/util" 14 + comatproto "github.com/bluesky-social/indigo/api/atproto" 15 + bsky "github.com/bluesky-social/indigo/api/bsky" 16 + "github.com/bluesky-social/indigo/atproto/syntax" 17 + lexutil "github.com/bluesky-social/indigo/lex/util" 18 + "github.com/bluesky-social/indigo/util" 19 + "github.com/bluesky-social/indigo/xrpc" 13 20 "github.com/gorilla/websocket" 14 21 "github.com/labstack/echo/v4" 15 22 "github.com/streamplace/oatproxy/pkg/oatproxy" 23 + "stream.place/streamplace/pkg/aqtime" 16 24 "stream.place/streamplace/pkg/log" 17 25 "stream.place/streamplace/pkg/spid" 18 26 "stream.place/streamplace/pkg/spmetrics" 19 27 20 - placestreamtypes "stream.place/streamplace/pkg/streamplace" 28 + placestream "stream.place/streamplace/pkg/streamplace" 21 29 ) 22 30 23 - func (s *Server) handlePlaceStreamLiveDenyTeleport(ctx context.Context, input *placestreamtypes.LiveDenyTeleport_Input) (*placestreamtypes.LiveDenyTeleport_Output, error) { 31 + func (s *Server) handlePlaceStreamLiveDenyTeleport(ctx context.Context, input *placestream.LiveDenyTeleport_Input) (*placestream.LiveDenyTeleport_Output, error) { 24 32 session, _ := oatproxy.GetOAuthSession(ctx) 25 33 if session == nil { 26 34 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found") ··· 50 58 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to deny teleport") 51 59 } 52 60 53 - cancelMsg := &placestreamtypes.Livestream_TeleportCanceled{ 61 + cancelMsg := &placestream.Livestream_TeleportCanceled{ 54 62 LexiconTypeID: "place.stream.livestream#teleportCanceled", 55 63 TeleportUri: input.Uri, 56 64 Reason: "denied", ··· 59 67 s.bus.Publish(teleport.RepoDID, cancelMsg) 60 68 s.bus.Publish(teleport.TargetDID, cancelMsg) 61 69 62 - return &placestreamtypes.LiveDenyTeleport_Output{ 70 + return &placestream.LiveDenyTeleport_Output{ 63 71 Success: true, 64 72 }, nil 65 73 } ··· 72 80 }, 73 81 } 74 82 75 - func (s *Server) handlePlaceStreamLiveGetSegments(ctx context.Context, before string, limit int, userDID string) (*placestreamtypes.LiveGetSegments_Output, error) { 83 + func (s *Server) handlePlaceStreamLiveGetSegments(ctx context.Context, before string, limit int, userDID string) (*placestream.LiveGetSegments_Output, error) { 76 84 if userDID == "" { 77 85 return nil, echo.NewHTTPError(http.StatusBadRequest, "User DID is required") 78 86 } ··· 102 110 if err != nil { 103 111 return nil, fmt.Errorf("error proxying to peer: %w", err) 104 112 } 105 - var output placestreamtypes.LiveGetSegments_Output 113 + var output placestream.LiveGetSegments_Output 106 114 err = json.Unmarshal(data, &output) 107 115 if err != nil { 108 116 return nil, fmt.Errorf("error unmarshalling response: %w", err) ··· 123 131 } 124 132 125 133 // Convert segments to the expected output format 126 - output := &placestreamtypes.LiveGetSegments_Output{ 127 - Segments: make([]*placestreamtypes.Segment_SegmentView, len(segments)), 134 + output := &placestream.LiveGetSegments_Output{ 135 + Segments: make([]*placestream.Segment_SegmentView, len(segments)), 128 136 } 129 137 130 138 for i, segment := range segments { ··· 136 144 if err != nil { 137 145 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to get CID: %s", err)) 138 146 } 139 - ltd := &util.LexiconTypeDecoder{Val: record} 147 + ltd := &lexutil.LexiconTypeDecoder{Val: record} 140 148 141 - output.Segments[i] = &placestreamtypes.Segment_SegmentView{ 149 + output.Segments[i] = &placestream.Segment_SegmentView{ 142 150 Record: ltd, 143 151 Cid: c.String(), 144 152 } ··· 147 155 return output, nil 148 156 } 149 157 150 - func (s *Server) handlePlaceStreamLiveGetLiveUsers(ctx context.Context, before string, limit int) (*placestreamtypes.LiveGetLiveUsers_Output, error) { 158 + func (s *Server) handlePlaceStreamLiveGetLiveUsers(ctx context.Context, before string, limit int) (*placestream.LiveGetLiveUsers_Output, error) { 151 159 // Check cache first 152 160 cacheKey := fmt.Sprintf("live_users_%s_%d", before, limit) 153 161 if cached, found := s.LiveUsersCache.Get(cacheKey); found { 154 - return cached.(*placestreamtypes.LiveGetLiveUsers_Output), nil 162 + return cached.(*placestream.LiveGetLiveUsers_Output), nil 155 163 } 156 164 157 165 var beforeTime *time.Time ··· 175 183 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to fetch livestreams") 176 184 } 177 185 178 - streams := make([]*placestreamtypes.Livestream_LivestreamView, len(ls)) 186 + streams := make([]*placestream.Livestream_LivestreamView, len(ls)) 179 187 180 188 for i, l := range ls { 181 189 stream, err := l.ToLivestreamView() ··· 183 191 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to convert livestream to streamplace livestream: %s", err)) 184 192 } 185 193 viewers := spmetrics.GetViewCount(stream.Author.Did) 186 - stream.ViewerCount = &placestreamtypes.Livestream_ViewerCount{ 194 + stream.ViewerCount = &placestream.Livestream_ViewerCount{ 187 195 LexiconTypeID: "place.stream.livestream#viewerCount", 188 196 Count: int64(viewers), 189 197 } 190 198 streams[i] = stream 191 199 } 192 200 193 - liveUsers := &placestreamtypes.LiveGetLiveUsers_Output{ 201 + liveUsers := &placestream.LiveGetLiveUsers_Output{ 194 202 Streams: streams, 195 203 } 196 204 ··· 254 262 } 255 263 } 256 264 257 - func (s *Server) handlePlaceStreamLiveGetRecommendations(ctx context.Context, userDID string) (*placestreamtypes.LiveGetRecommendations_Output, error) { 265 + func (s *Server) handlePlaceStreamLiveGetRecommendations(ctx context.Context, userDID string) (*placestream.LiveGetRecommendations_Output, error) { 258 266 if userDID == "" { 259 267 return nil, echo.NewHTTPError(http.StatusBadRequest, "userDID is required") 260 268 } ··· 275 283 } 276 284 277 285 if len(liveStreamers) > 0 { 278 - var recommendations []*placestreamtypes.LiveGetRecommendations_Output_Recommendations_Elem 286 + var recommendations []*placestream.LiveGetRecommendations_Output_Recommendations_Elem 279 287 for _, did := range liveStreamers { 280 - recommendations = append(recommendations, &placestreamtypes.LiveGetRecommendations_Output_Recommendations_Elem{ 281 - LiveGetRecommendations_LivestreamRecommendation: &placestreamtypes.LiveGetRecommendations_LivestreamRecommendation{ 288 + recommendations = append(recommendations, &placestream.LiveGetRecommendations_Output_Recommendations_Elem{ 289 + LiveGetRecommendations_LivestreamRecommendation: &placestream.LiveGetRecommendations_LivestreamRecommendation{ 282 290 Did: did, 283 291 Source: "streamer", 284 292 }, 285 293 }) 286 294 } 287 - return &placestreamtypes.LiveGetRecommendations_Output{ 295 + return &placestream.LiveGetRecommendations_Output{ 288 296 Recommendations: recommendations, 289 297 UserDID: &userDID, 290 298 }, nil ··· 308 316 } 309 317 310 318 if len(liveFollows) > 0 { 311 - var recommendations []*placestreamtypes.LiveGetRecommendations_Output_Recommendations_Elem 319 + var recommendations []*placestream.LiveGetRecommendations_Output_Recommendations_Elem 312 320 for _, did := range liveFollows { 313 - recommendations = append(recommendations, &placestreamtypes.LiveGetRecommendations_Output_Recommendations_Elem{ 314 - LiveGetRecommendations_LivestreamRecommendation: &placestreamtypes.LiveGetRecommendations_LivestreamRecommendation{ 321 + recommendations = append(recommendations, &placestream.LiveGetRecommendations_Output_Recommendations_Elem{ 322 + LiveGetRecommendations_LivestreamRecommendation: &placestream.LiveGetRecommendations_LivestreamRecommendation{ 315 323 Did: did, 316 324 Source: "follows", 317 325 }, 318 326 }) 319 327 } 320 - return &placestreamtypes.LiveGetRecommendations_Output{ 328 + return &placestream.LiveGetRecommendations_Output{ 321 329 Recommendations: recommendations, 322 330 UserDID: &userDID, 323 331 }, nil ··· 331 339 if err != nil { 332 340 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to filter default streamers") 333 341 } 334 - var recommendations []*placestreamtypes.LiveGetRecommendations_Output_Recommendations_Elem 342 + var recommendations []*placestream.LiveGetRecommendations_Output_Recommendations_Elem 335 343 for _, did := range liveDefaults { 336 - recommendations = append(recommendations, &placestreamtypes.LiveGetRecommendations_Output_Recommendations_Elem{ 337 - LiveGetRecommendations_LivestreamRecommendation: &placestreamtypes.LiveGetRecommendations_LivestreamRecommendation{ 344 + recommendations = append(recommendations, &placestream.LiveGetRecommendations_Output_Recommendations_Elem{ 345 + LiveGetRecommendations_LivestreamRecommendation: &placestream.LiveGetRecommendations_LivestreamRecommendation{ 338 346 Did: did, 339 347 Source: "host", 340 348 }, 341 349 }) 342 350 } 343 - return &placestreamtypes.LiveGetRecommendations_Output{ 351 + return &placestream.LiveGetRecommendations_Output{ 344 352 Recommendations: recommendations, 345 353 UserDID: &userDID, 346 354 }, nil 347 355 } 348 356 349 357 // No recommendations available 350 - return &placestreamtypes.LiveGetRecommendations_Output{ 351 - Recommendations: []*placestreamtypes.LiveGetRecommendations_Output_Recommendations_Elem{}, 358 + return &placestream.LiveGetRecommendations_Output{ 359 + Recommendations: []*placestream.LiveGetRecommendations_Output_Recommendations_Elem{}, 352 360 UserDID: &userDID, 353 361 }, nil 354 362 } 363 + 364 + func (s *Server) handlePlaceStreamLiveStartLivestream(ctx context.Context, body *placestream.LiveStartLivestream_Input) (*placestream.LiveStartLivestream_Output, error) { 365 + session, _ := oatproxy.GetOAuthSession(ctx) 366 + if session != nil { 367 + if session.DID != body.Streamer { 368 + return nil, echo.NewHTTPError(http.StatusForbidden, "you are not the streamer") 369 + } 370 + } else { 371 + svc := GetServiceAuth(ctx) 372 + if svc != nil { 373 + streamerSession, err := s.statefulDB.GetSessionByDID(body.Streamer) 374 + if err != nil { 375 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "error getting streamer session", err) 376 + } 377 + if streamerSession == nil { 378 + return nil, echo.NewHTTPError(http.StatusNotFound, "streamer session not found") 379 + } 380 + session = streamerSession 381 + } else { 382 + return nil, echo.NewHTTPError(http.StatusUnauthorized, "you are not authorized") 383 + } 384 + } 385 + 386 + // proxy to the origin node if the streamer is broadcasting elsewhere 387 + origin, err := s.statefulDB.GetLatestBroadcastOriginForStreamer(session.DID) 388 + if err != nil { 389 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "error getting broadcast origin", err) 390 + } 391 + myDID := s.cli.ServerDID() 392 + if origin != nil && origin.ServerDID != myDID { 393 + bs, err := json.Marshal(body) 394 + if err != nil { 395 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "error marshalling body", err) 396 + } 397 + data, err := s.ProxyServiceRequest(ctx, origin.ServerDID, "POST", "place.stream.live.startLivestream", 398 + url.Values{}, 399 + bytes.NewReader(bs), "application/json") 400 + if err != nil { 401 + return nil, err 402 + } 403 + var output placestream.LiveStartLivestream_Output 404 + err = json.Unmarshal(data, &output) 405 + if err != nil { 406 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "error unmarshalling response", err) 407 + } 408 + return &output, nil 409 + } 410 + 411 + _, client := oatproxy.GetOAuthSession(ctx) 412 + if client == nil { 413 + return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session required to start livestream") 414 + } 415 + 416 + livestream := body.Livestream 417 + now := time.Now().UTC().Format(time.RFC3339) 418 + livestream.LexiconTypeID = "place.stream.livestream" 419 + livestream.CreatedAt = now 420 + livestream.LastSeenAt = &now 421 + 422 + if livestream.Thumb == nil { 423 + // Step 1: get latest thumbnail from localDB and upload to user's PDS 424 + var thumb *lexutil.LexBlob 425 + dbThumb, err := s.localDB.LatestThumbnailForUser(session.DID) 426 + if err != nil { 427 + log.Error(ctx, "failed to get latest thumbnail", "err", err) 428 + } 429 + if dbThumb != nil { 430 + aqt := aqtime.FromTime(dbThumb.Segment.StartTime) 431 + fpath, err := s.cli.SegmentFilePath(session.DID, fmt.Sprintf("%s.%s", aqt.String(), dbThumb.Format)) 432 + if err != nil { 433 + log.Error(ctx, "failed to get thumbnail file path", "err", err) 434 + } else { 435 + thumbData, err := os.ReadFile(fpath) 436 + if err != nil { 437 + log.Error(ctx, "failed to read thumbnail file", "err", err) 438 + } else { 439 + mimeType := "image/jpeg" 440 + if dbThumb.Format == "png" { 441 + mimeType = "image/png" 442 + } 443 + 444 + // Step 2: upload to user's PDS 445 + var uploadOut comatproto.RepoUploadBlob_Output 446 + err = client.Do(ctx, xrpc.Procedure, mimeType, "com.atproto.repo.uploadBlob", nil, bytes.NewReader(thumbData), &uploadOut) 447 + if err != nil { 448 + log.Error(ctx, "failed to upload thumbnail to PDS", "err", err) 449 + } else { 450 + thumb = uploadOut.Blob 451 + } 452 + } 453 + } 454 + } 455 + livestream.Thumb = thumb 456 + } 457 + 458 + // Step 3: create a Bluesky post announcing the livestream 459 + repo, err := s.model.GetRepo(session.DID) 460 + if err != nil { 461 + log.Error(ctx, "failed to get repo", "err", err) 462 + } 463 + 464 + handle := session.DID 465 + if repo != nil && repo.Handle != "" { 466 + handle = repo.Handle 467 + } 468 + 469 + canonicalUrl := fmt.Sprintf("https://%s/%s", s.cli.BroadcasterHost, handle) 470 + if livestream.CanonicalUrl != nil && *livestream.CanonicalUrl != "" { 471 + canonicalUrl = *livestream.CanonicalUrl 472 + } 473 + 474 + if body.CreateBlueskyPost == nil || *body.CreateBlueskyPost { 475 + prefix := "🔴 LIVE " 476 + suffix := " " + livestream.Title 477 + postText := prefix + canonicalUrl + suffix 478 + 479 + linkStart := int64(len(prefix)) 480 + linkEnd := linkStart + int64(len(canonicalUrl)) 481 + 482 + postRecord := &bsky.FeedPost{ 483 + LexiconTypeID: "app.bsky.feed.post", 484 + Text: postText, 485 + CreatedAt: now, 486 + Langs: []string{"en"}, 487 + Facets: []*bsky.RichtextFacet{ 488 + { 489 + Index: &bsky.RichtextFacet_ByteSlice{ 490 + ByteStart: linkStart, 491 + ByteEnd: linkEnd, 492 + }, 493 + Features: []*bsky.RichtextFacet_Features_Elem{ 494 + { 495 + RichtextFacet_Link: &bsky.RichtextFacet_Link{ 496 + LexiconTypeID: "app.bsky.richtext.facet#link", 497 + Uri: canonicalUrl, 498 + }, 499 + }, 500 + }, 501 + }, 502 + }, 503 + Embed: &bsky.FeedPost_Embed{ 504 + EmbedExternal: &bsky.EmbedExternal{ 505 + External: &bsky.EmbedExternal_External{ 506 + Title: fmt.Sprintf("@%s is 🔴LIVE on %s!", handle, s.cli.BroadcasterHost), 507 + Uri: canonicalUrl, 508 + Description: livestream.Title, 509 + Thumb: livestream.Thumb, 510 + }, 511 + }, 512 + }, 513 + } 514 + 515 + postInput := comatproto.RepoCreateRecord_Input{ 516 + Collection: "app.bsky.feed.post", 517 + Record: &lexutil.LexiconTypeDecoder{Val: postRecord}, 518 + Repo: session.DID, 519 + } 520 + var postOutput comatproto.RepoCreateRecord_Output 521 + err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.createRecord", map[string]any{}, postInput, &postOutput) 522 + if err != nil { 523 + log.Error(ctx, "failed to create bluesky post", "err", err) 524 + } else { 525 + livestream.Post = &comatproto.RepoStrongRef{ 526 + Uri: postOutput.Uri, 527 + Cid: postOutput.Cid, 528 + } 529 + } 530 + } 531 + 532 + // Step 4: create the place.stream.livestream record 533 + lsInput := comatproto.RepoCreateRecord_Input{ 534 + Collection: "place.stream.livestream", 535 + Record: &lexutil.LexiconTypeDecoder{Val: livestream}, 536 + Repo: session.DID, 537 + } 538 + var lsOutput comatproto.RepoCreateRecord_Output 539 + err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.createRecord", map[string]any{}, lsInput, &lsOutput) 540 + if err != nil { 541 + return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to create livestream record: %v", err)) 542 + } 543 + 544 + return &placestream.LiveStartLivestream_Output{ 545 + Uri: lsOutput.Uri, 546 + Cid: lsOutput.Cid, 547 + }, nil 548 + } 549 + 550 + func (s *Server) handlePlaceStreamLiveStopLivestream(ctx context.Context, body *placestream.LiveStopLivestream_Input) (*placestream.LiveStopLivestream_Output, error) { 551 + now := time.Now().UTC().Format(util.ISO8601) 552 + session, _ := oatproxy.GetOAuthSession(ctx) 553 + 554 + _, client := oatproxy.GetOAuthSession(ctx) 555 + if client == nil { 556 + return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session required to stop livestream") 557 + } 558 + 559 + livestream, err := s.model.GetLatestLivestreamForRepo(session.DID) 560 + if err != nil { 561 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "error getting livestream", err) 562 + } 563 + 564 + livestreamView, err := livestream.ToLivestreamView() 565 + if err != nil { 566 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "error converting livestream to view", err) 567 + } 568 + 569 + livestreamRecord, ok := livestreamView.Record.Val.(*placestream.Livestream) 570 + if !ok { 571 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "livestream is not a streamplace livestream") 572 + } 573 + 574 + if livestreamRecord.EndedAt != nil { 575 + return nil, echo.NewHTTPError(http.StatusBadRequest, "livestream has already ended") 576 + } 577 + 578 + livestreamRecord.EndedAt = &now 579 + 580 + aturi, err := syntax.ParseATURI(livestreamView.Uri) 581 + if err != nil { 582 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "error parsing ATURI", err) 583 + } 584 + 585 + var swapRecord *string 586 + getOutput := comatproto.RepoGetRecord_Output{} 587 + err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 588 + "repo": session.DID, 589 + "collection": "place.stream.livestream", 590 + "rkey": aturi.RecordKey().String(), 591 + }, nil, &getOutput) 592 + if err != nil { 593 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "error getting livestream record", err) 594 + } 595 + swapRecord = getOutput.Cid 596 + 597 + lsInput := comatproto.RepoPutRecord_Input{ 598 + Collection: "place.stream.livestream", 599 + Record: &lexutil.LexiconTypeDecoder{Val: livestreamRecord}, 600 + Rkey: aturi.RecordKey().String(), 601 + Repo: session.DID, 602 + SwapRecord: swapRecord, 603 + } 604 + var lsOutput comatproto.RepoPutRecord_Output 605 + err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, lsInput, &lsOutput) 606 + if err != nil { 607 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "error updating livestream record", err) 608 + } 609 + 610 + return &placestream.LiveStopLivestream_Output{ 611 + Uri: lsOutput.Uri, 612 + Cid: lsOutput.Cid, 613 + }, nil 614 + }
+38
pkg/spxrpc/stubs.go
··· 290 290 e.GET("/xrpc/place.stream.live.getRecommendations", s.HandlePlaceStreamLiveGetRecommendations) 291 291 e.GET("/xrpc/place.stream.live.getSegments", s.HandlePlaceStreamLiveGetSegments) 292 292 e.GET("/xrpc/place.stream.live.searchActorsTypeahead", s.HandlePlaceStreamLiveSearchActorsTypeahead) 293 + e.POST("/xrpc/place.stream.live.startLivestream", s.HandlePlaceStreamLiveStartLivestream) 294 + e.POST("/xrpc/place.stream.live.stopLivestream", s.HandlePlaceStreamLiveStopLivestream) 293 295 e.POST("/xrpc/place.stream.moderation.createBlock", s.HandlePlaceStreamModerationCreateBlock) 294 296 e.POST("/xrpc/place.stream.moderation.createGate", s.HandlePlaceStreamModerationCreateGate) 295 297 e.POST("/xrpc/place.stream.moderation.deleteBlock", s.HandlePlaceStreamModerationDeleteBlock) ··· 518 520 var handleErr error 519 521 // func (s *Server) handlePlaceStreamLiveSearchActorsTypeahead(ctx context.Context,limit int,q string) (*placestream.LiveSearchActorsTypeahead_Output, error) 520 522 out, handleErr = s.handlePlaceStreamLiveSearchActorsTypeahead(ctx, limit, q) 523 + if handleErr != nil { 524 + return handleErr 525 + } 526 + return c.JSON(200, out) 527 + } 528 + 529 + func (s *Server) HandlePlaceStreamLiveStartLivestream(c echo.Context) error { 530 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandlePlaceStreamLiveStartLivestream") 531 + defer span.End() 532 + 533 + var body placestream.LiveStartLivestream_Input 534 + if err := c.Bind(&body); err != nil { 535 + return err 536 + } 537 + var out *placestream.LiveStartLivestream_Output 538 + var handleErr error 539 + // func (s *Server) handlePlaceStreamLiveStartLivestream(ctx context.Context,body *placestream.LiveStartLivestream_Input) (*placestream.LiveStartLivestream_Output, error) 540 + out, handleErr = s.handlePlaceStreamLiveStartLivestream(ctx, &body) 541 + if handleErr != nil { 542 + return handleErr 543 + } 544 + return c.JSON(200, out) 545 + } 546 + 547 + func (s *Server) HandlePlaceStreamLiveStopLivestream(c echo.Context) error { 548 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandlePlaceStreamLiveStopLivestream") 549 + defer span.End() 550 + 551 + var body placestream.LiveStopLivestream_Input 552 + if err := c.Bind(&body); err != nil { 553 + return err 554 + } 555 + var out *placestream.LiveStopLivestream_Output 556 + var handleErr error 557 + // func (s *Server) handlePlaceStreamLiveStopLivestream(ctx context.Context,body *placestream.LiveStopLivestream_Input) (*placestream.LiveStopLivestream_Output, error) 558 + out, handleErr = s.handlePlaceStreamLiveStopLivestream(ctx, &body) 521 559 if handleErr != nil { 522 560 return handleErr 523 561 }
+38
pkg/streamplace/livestartLivestream.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + // Lexicon schema: place.stream.live.startLivestream 4 + 5 + package streamplace 6 + 7 + import ( 8 + "context" 9 + 10 + lexutil "github.com/bluesky-social/indigo/lex/util" 11 + ) 12 + 13 + // LiveStartLivestream_Input is the input argument to a place.stream.live.startLivestream call. 14 + type LiveStartLivestream_Input struct { 15 + // createBlueskyPost: Whether to create a Bluesky post announcing the livestream. 16 + CreateBlueskyPost *bool `json:"createBlueskyPost,omitempty" cborgen:"createBlueskyPost,omitempty"` 17 + Livestream *Livestream `json:"livestream" cborgen:"livestream"` 18 + // streamer: The DID of the streamer. 19 + Streamer string `json:"streamer" cborgen:"streamer"` 20 + } 21 + 22 + // LiveStartLivestream_Output is the output of a place.stream.live.startLivestream call. 23 + type LiveStartLivestream_Output struct { 24 + // cid: The CID of the livestream record. 25 + Cid string `json:"cid" cborgen:"cid"` 26 + // uri: The URI of the livestream record. 27 + Uri string `json:"uri" cborgen:"uri"` 28 + } 29 + 30 + // LiveStartLivestream calls the XRPC method "place.stream.live.startLivestream". 31 + func LiveStartLivestream(ctx context.Context, c lexutil.LexClient, input *LiveStartLivestream_Input) (*LiveStartLivestream_Output, error) { 32 + var out LiveStartLivestream_Output 33 + if err := c.LexDo(ctx, lexutil.Procedure, "application/json", "place.stream.live.startLivestream", nil, input, &out); err != nil { 34 + return nil, err 35 + } 36 + 37 + return &out, nil 38 + }
+33
pkg/streamplace/livestopLivestream.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + // Lexicon schema: place.stream.live.stopLivestream 4 + 5 + package streamplace 6 + 7 + import ( 8 + "context" 9 + 10 + lexutil "github.com/bluesky-social/indigo/lex/util" 11 + ) 12 + 13 + // LiveStopLivestream_Input is the input argument to a place.stream.live.stopLivestream call. 14 + type LiveStopLivestream_Input struct { 15 + } 16 + 17 + // LiveStopLivestream_Output is the output of a place.stream.live.stopLivestream call. 18 + type LiveStopLivestream_Output struct { 19 + // cid: The new CID of the stopped livestream record. 20 + Cid string `json:"cid" cborgen:"cid"` 21 + // uri: The URI of the stopped livestream record. 22 + Uri string `json:"uri" cborgen:"uri"` 23 + } 24 + 25 + // LiveStopLivestream calls the XRPC method "place.stream.live.stopLivestream". 26 + func LiveStopLivestream(ctx context.Context, c lexutil.LexClient, input *LiveStopLivestream_Input) (*LiveStopLivestream_Output, error) { 27 + var out LiveStopLivestream_Output 28 + if err := c.LexDo(ctx, lexutil.Procedure, "application/json", "place.stream.live.stopLivestream", nil, input, &out); err != nil { 29 + return nil, err 30 + } 31 + 32 + return &out, nil 33 + }