Live video on the AT Protocol

live-dashboard: allow customization of ingest fields

authored by

Eli Mallon and committed by
Natalie B.
c8cadde3 a6f7c3be

+479 -33
+36 -29
js/app/components/live-dashboard/stream-key.tsx
··· 5 5 Code, 6 6 Row, 7 7 Text, 8 + useStreamplaceStore, 8 9 useTheme, 9 10 useToast, 10 11 View, 11 12 zero, 12 13 } from "@streamplace/components"; 14 + import useGetIngests from "@streamplace/components/src/streamplace-store/ingest"; 13 15 import Loading from "components/loading/loading"; 14 16 import { Clipboard, ClipboardCheck } from "lucide-react-native"; 15 17 import { useEffect, useState } from "react"; 16 18 import { ScrollView, TextInput } from "react-native"; 17 19 import { useStore } from "store"; 18 20 import { useIsReady, useUserProfile } from "store/hooks"; 21 + import { PlaceStreamIngestDefs } from "streamplace"; 19 22 20 23 const FormRow = ({ children }: { children: React.ReactNode }) => { 21 24 return ( ··· 42 45 }; 43 46 44 47 export function StreamKeyScreen() { 45 - const [protocol, setProtocol] = useState<"whip" | "rtmp">("rtmp"); 48 + const [ingest, setIngest] = useState<PlaceStreamIngestDefs.Ingest | null>( 49 + null, 50 + ); 46 51 const isReady = useIsReady(); 47 52 const userProfile = useUserProfile(); 48 53 const openLoginModal = useStore((state) => state.openLoginModal); 49 54 const route = useRoute(); 50 55 const url = useStore((state) => state.url); 56 + const ingests = useStreamplaceStore((state) => state.ingests); 57 + const getIngests = useGetIngests(); 58 + useEffect(() => { 59 + getIngests(); 60 + }, []); 51 61 52 62 useEffect(() => { 53 63 if (isReady && !userProfile) { ··· 55 65 } 56 66 }, [isReady, userProfile, openLoginModal, route.name, route.params]); 57 67 68 + useEffect(() => { 69 + if (ingests && ingests.length > 0 && !ingest) { 70 + setIngest(ingests[0]); 71 + } 72 + }, [ingests, ingest]); 73 + 58 74 if (!isReady) { 59 75 return <Loading />; 60 76 } ··· 63 79 return <Loading />; 64 80 } 65 81 82 + if (!ingests) { 83 + return <Loading />; 84 + } 85 + 66 86 return ( 67 87 <ScrollView> 68 88 <View flex={1} align="center" justify="start" padding="md" fullWidth> 69 89 <View fullWidth style={{ maxWidth: 600 }}> 70 90 <FormRow> 71 - <Button 72 - width="min" 73 - variant={protocol !== "rtmp" ? "secondary" : "primary"} 74 - onPress={() => setProtocol("rtmp")} 75 - style={{ 76 - borderTopRightRadius: "0px", 77 - borderBottomRightRadius: "0px", 78 - }} 79 - > 80 - RTMP 81 - </Button> 82 - <Button 83 - width="min" 84 - variant={protocol !== "whip" ? "secondary" : "primary"} 85 - onPress={() => setProtocol("whip")} 86 - style={{ 87 - borderTopLeftRadius: "0px", 88 - borderBottomLeftRadius: "0px", 89 - }} 90 - > 91 - WHIP 92 - </Button> 91 + {ingests.map((ing, i) => ( 92 + <Button 93 + width="min" 94 + key={i} 95 + variant={ingest !== ing ? "secondary" : "primary"} 96 + onPress={() => setIngest(ing)} 97 + > 98 + {ing.type.toUpperCase()} 99 + </Button> 100 + ))} 93 101 </FormRow> 94 - {protocol === "whip" && <WHIPDescription url={url} />} 95 - {protocol === "rtmp" && <RTMPDescription url={url} />} 102 + {ingest?.type === "whip" && <WHIPDescription url={ingest.url} />} 103 + {ingest?.type.startsWith("rtmp") && ( 104 + <RTMPDescription url={ingest.url} /> 105 + )} 96 106 <FormRow> 97 107 <Label>Output Settings</Label> 98 108 <Content> ··· 168 178 } 169 179 170 180 export function RTMPDescription({ url }: { url: string }) { 171 - const u = new URL(url); 172 - const rtmpUrl = `rtmps://${u.host}:1935/live`; 173 - 174 181 return ( 175 182 <> 176 183 <FormRow> ··· 183 190 <Label>Server</Label> 184 191 <Content> 185 192 <TextInput 186 - value={rtmpUrl} 193 + value={url} 187 194 readOnly={true} 188 195 style={[ 189 196 {
+1
js/components/src/streamplace-store/index.tsx
··· 1 1 export * from "./block"; 2 2 export * from "./branding"; 3 + export * from "./ingest"; 3 4 export * from "./moderation"; 4 5 export * from "./moderator-management"; 5 6 export * from "./stream";
+32
js/components/src/streamplace-store/ingest.tsx
··· 1 + import { PlaceStreamIngestDefs } from "streamplace"; 2 + import { useDID, useStreamplaceStore } from "./streamplace-store"; 3 + import { usePDSAgent } from "./xrpc"; 4 + 5 + export default function useGetIngests() { 6 + const pdsAgent = usePDSAgent(); 7 + const did = useDID(); 8 + const setIngests = useStreamplaceStore((state) => state.setIngests); 9 + 10 + return async () => { 11 + if (!pdsAgent || !did) { 12 + throw new Error("No PDS agent or DID available"); 13 + } 14 + 15 + const result = await pdsAgent.place.stream.ingest.getIngestUrls(); 16 + if (!result.success) { 17 + throw new Error("Failed to get ingests"); 18 + } 19 + 20 + const ingests = result.data.ingests 21 + .map((ingest) => { 22 + if (PlaceStreamIngestDefs.isIngest(ingest)) { 23 + return ingest; 24 + } 25 + console.error("Invalid ingest", ingest); 26 + return null; 27 + }) 28 + .filter((ingest) => ingest !== null); 29 + 30 + setIngests(ingests); 31 + }; 32 + }
+11 -2
js/components/src/streamplace-store/streamplace-store.tsx
··· 1 1 import { SessionManager } from "@atproto/api/dist/session-manager"; 2 2 import { useContext } from "react"; 3 - import { PlaceStreamChatProfile, PlaceStreamLivestream } from "streamplace"; 3 + import { 4 + PlaceStreamChatProfile, 5 + PlaceStreamIngestDefs, 6 + PlaceStreamLivestream, 7 + } from "streamplace"; 4 8 import { createStore, StoreApi, useStore } from "zustand"; 5 9 import storage from "../storage"; 6 10 import { StreamplaceContext } from "../streamplace-provider/context"; ··· 40 44 oauthSession: SessionManager | null | undefined; 41 45 handle: string | null; 42 46 chatProfile: PlaceStreamChatProfile.Record | null; 47 + 48 + ingests: PlaceStreamIngestDefs.Ingest[] | null; 49 + setIngests: (ingests: PlaceStreamIngestDefs.Ingest[] | null) => void; 43 50 44 51 // Content metadata state 45 52 contentMetadata: ContentMetadataResult | null; ··· 113 120 oauthSession: null, 114 121 handle: null, 115 122 chatProfile: null, 116 - 123 + ingests: null, 124 + setIngests: (ingests: PlaceStreamIngestDefs.Ingest[] | null) => 125 + set({ ingests: ingests }), 117 126 broadcasterDID: null, 118 127 setBroadcasterDID: (broadcasterDID: string | null) => 119 128 set({ broadcasterDID }),
+52
js/docs/src/content/docs/lex-reference/ingest/place-stream-ingest-defs.md
··· 1 + --- 2 + title: place.stream.ingest.defs 3 + description: Reference for the place.stream.ingest.defs lexicon 4 + --- 5 + 6 + **Lexicon Version:** 1 7 + 8 + ## Definitions 9 + 10 + <a name="ingest"></a> 11 + 12 + ### `ingest` 13 + 14 + **Type:** `object` 15 + 16 + An ingest URL for a Streamplace station. 17 + 18 + **Properties:** 19 + 20 + | Name | Type | Req'd | Description | Constraints | 21 + | ------ | -------- | ----- | ----------------------------------------------------------------------- | ------------- | 22 + | `type` | `string` | ✅ | The type of ingest endpoint, currently 'rtmp' and 'whip' are supported. | | 23 + | `url` | `string` | ✅ | The URL of the ingest endpoint. | Format: `uri` | 24 + 25 + --- 26 + 27 + ## Lexicon Source 28 + 29 + ```json 30 + { 31 + "lexicon": 1, 32 + "id": "place.stream.ingest.defs", 33 + "defs": { 34 + "ingest": { 35 + "type": "object", 36 + "description": "An ingest URL for a Streamplace station.", 37 + "required": ["type", "url"], 38 + "properties": { 39 + "type": { 40 + "type": "string", 41 + "description": "The type of ingest endpoint, currently 'rtmp' and 'whip' are supported." 42 + }, 43 + "url": { 44 + "type": "string", 45 + "format": "uri", 46 + "description": "The URL of the ingest endpoint." 47 + } 48 + } 49 + } 50 + } 51 + } 52 + ```
+67
js/docs/src/content/docs/lex-reference/ingest/place-stream-ingest-getingesturls.md
··· 1 + --- 2 + title: place.stream.ingest.getIngestUrls 3 + description: Reference for the place.stream.ingest.getIngestUrls lexicon 4 + --- 5 + 6 + **Lexicon Version:** 1 7 + 8 + ## Definitions 9 + 10 + <a name="main"></a> 11 + 12 + ### `main` 13 + 14 + **Type:** `query` 15 + 16 + Get ingest URLs for a Streamplace station. 17 + 18 + **Parameters:** _(None defined)_ 19 + 20 + **Output:** 21 + 22 + - **Encoding:** `application/json` 23 + - **Schema:** 24 + 25 + **Schema Type:** `object` 26 + 27 + | Name | Type | Req'd | Description | Constraints | 28 + | --------- | ---------------------------------------------------------------------------------------------------------------------- | ----- | ----------- | ----------- | 29 + | `ingests` | Array of Union of:<br/>&nbsp;&nbsp;[`place.stream.ingest.defs#ingest`](/lex-reference/place-stream-ingest-defs#ingest) | ✅ | | | 30 + 31 + --- 32 + 33 + ## Lexicon Source 34 + 35 + ```json 36 + { 37 + "lexicon": 1, 38 + "id": "place.stream.ingest.getIngestUrls", 39 + "defs": { 40 + "main": { 41 + "type": "query", 42 + "description": "Get ingest URLs for a Streamplace station.", 43 + "parameters": { 44 + "type": "params", 45 + "properties": {} 46 + }, 47 + "output": { 48 + "encoding": "application/json", 49 + "schema": { 50 + "type": "object", 51 + "required": ["ingests"], 52 + "properties": { 53 + "ingests": { 54 + "type": "array", 55 + "items": { 56 + "type": "union", 57 + "refs": ["place.stream.ingest.defs#ingest"] 58 + } 59 + } 60 + } 61 + } 62 + }, 63 + "errors": [] 64 + } 65 + } 66 + } 67 + ```
+48
js/docs/src/content/docs/lex-reference/openapi.json
··· 1683 1683 } 1684 1684 } 1685 1685 }, 1686 + "/xrpc/place.stream.ingest.getIngestUrls": { 1687 + "get": { 1688 + "summary": "Get ingest URLs for a Streamplace station.", 1689 + "operationId": "place.stream.ingest.getIngestUrls", 1690 + "tags": ["place.stream.ingest"], 1691 + "responses": { 1692 + "200": { 1693 + "description": "Success", 1694 + "content": { 1695 + "application/json": { 1696 + "schema": { 1697 + "type": "object", 1698 + "properties": { 1699 + "ingests": { 1700 + "type": "array", 1701 + "items": { 1702 + "oneOf": [ 1703 + { 1704 + "$ref": "#/components/schemas/place.stream.ingest.defs_ingest" 1705 + } 1706 + ] 1707 + } 1708 + } 1709 + }, 1710 + "required": ["ingests"] 1711 + } 1712 + } 1713 + } 1714 + } 1715 + } 1716 + } 1717 + }, 1686 1718 "/xrpc/place.stream.graph.getFollowingUser": { 1687 1719 "get": { 1688 1720 "summary": "Get whether or not user A is following user B.", ··· 3599 3631 "type": "string", 3600 3632 "format": "byte", 3601 3633 "description": "MP4 file of a user's signed livestream segment" 3634 + }, 3635 + "place.stream.ingest.defs_ingest": { 3636 + "type": "object", 3637 + "description": "An ingest URL for a Streamplace station.", 3638 + "properties": { 3639 + "type": { 3640 + "type": "string", 3641 + "description": "The type of ingest endpoint, currently 'rtmp' and 'whip' are supported." 3642 + }, 3643 + "url": { 3644 + "type": "string", 3645 + "description": "The URL of the ingest endpoint.", 3646 + "format": "uri" 3647 + } 3648 + }, 3649 + "required": ["type", "url"] 3602 3650 }, 3603 3651 "com.atproto.repo.strongRef": { 3604 3652 "type": "object",
+22
lexicons/place/stream/ingest/defs.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "place.stream.ingest.defs", 4 + "defs": { 5 + "ingest": { 6 + "type": "object", 7 + "description": "An ingest URL for a Streamplace station.", 8 + "required": ["type", "url"], 9 + "properties": { 10 + "type": { 11 + "type": "string", 12 + "description": "The type of ingest endpoint, currently 'rtmp' and 'whip' are supported." 13 + }, 14 + "url": { 15 + "type": "string", 16 + "format": "uri", 17 + "description": "The URL of the ingest endpoint." 18 + } 19 + } 20 + } 21 + } 22 + }
+31
lexicons/place/stream/ingest/getIngestUrls.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "place.stream.ingest.getIngestUrls", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "Get ingest URLs for a Streamplace station.", 8 + "parameters": { 9 + "type": "params", 10 + "properties": {} 11 + }, 12 + "output": { 13 + "encoding": "application/json", 14 + "schema": { 15 + "type": "object", 16 + "required": ["ingests"], 17 + "properties": { 18 + "ingests": { 19 + "type": "array", 20 + "items": { 21 + "type": "union", 22 + "refs": ["place.stream.ingest.defs#ingest"] 23 + } 24 + } 25 + } 26 + } 27 + }, 28 + "errors": [] 29 + } 30 + } 31 + }
-1
pkg/atproto/sync.go
··· 392 392 // if we check after exactly rec.IdleTimeoutSeconds we might miss the finalization by a few seconds 393 393 scheduledAt = scheduledAt.Add((time.Duration(*rec.IdleTimeoutSeconds) * time.Second) + (10 * time.Second)).UTC() 394 394 taskKey := fmt.Sprintf("finalize-livestream::%s::%s", aturi.String(), scheduledAt.Format(util.ISO8601)) 395 - log.Warn(ctx, "queueing stream finalization task", "taskKey", taskKey, "scheduledAt", scheduledAt) 396 395 _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskFinalizeLivestream, task, statedb.WithTaskKey(taskKey), statedb.WithScheduledAt(scheduledAt)) 397 396 if err != nil { 398 397 return fmt.Errorf("failed to enqueue remove red circle task: %w", err)
+13
pkg/config/config.go
··· 31 31 "stream.place/streamplace/pkg/crypto/aqpub" 32 32 "stream.place/streamplace/pkg/integrations/discord/discordtypes" 33 33 "stream.place/streamplace/pkg/log" 34 + placestream "stream.place/streamplace/pkg/streamplace" 34 35 ) 35 36 36 37 const SPDataDir = "$SP_DATA_DIR" ··· 146 147 AdminDIDs []string 147 148 Syndicate []string 148 149 PlayerTelemetry bool 150 + Ingests *placestream.IngestGetIngestUrls_Output 149 151 } 150 152 151 153 // ContentFilters represents the content filtering configuration ··· 813 815 Value: "sqlite://$SP_DATA_DIR/localdb.sqlite", 814 816 Destination: &cli.LocalDBURL, 815 817 Sources: urfavecli.EnvVars("SP_LOCAL_DB_URL"), 818 + }, 819 + &urfavecli.StringFlag{ 820 + Name: "ingests", 821 + Usage: `JSON array of ingests to return from place.stream.ingest.getIngestUrls. Default is auto-generated ingests for RTMP and WHIP`, 822 + Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error { 823 + if s == "" { 824 + return nil 825 + } 826 + return json.Unmarshal([]byte(s), &cli.Ingests) 827 + }, 828 + Sources: urfavecli.EnvVars("SP_INGESTS"), 816 829 }, 817 830 &urfavecli.BoolFlag{ 818 831 Name: "external-signing",
+79
pkg/spxrpc/place_stream_ingest.go
··· 1 + package spxrpc 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net" 7 + "strings" 8 + 9 + placestream "stream.place/streamplace/pkg/streamplace" 10 + ) 11 + 12 + func hostPort(host, port, defaultPort string) string { 13 + if port == defaultPort { 14 + return host 15 + } 16 + return net.JoinHostPort(host, port) 17 + } 18 + 19 + func (s *Server) handlePlaceStreamIngestGetIngestUrls(ctx context.Context) (*placestream.IngestGetIngestUrls_Output, error) { 20 + if s.cli.Ingests != nil { 21 + return s.cli.Ingests, nil 22 + } 23 + broadcasterDID := s.cli.BroadcasterDID() 24 + broadcasterHost := strings.TrimPrefix(broadcasterDID, "did:web:") 25 + out := &placestream.IngestGetIngestUrls_Output{ 26 + Ingests: []*placestream.IngestGetIngestUrls_Output_Ingests_Elem{}, 27 + } 28 + if !s.cli.Secure { 29 + _, rtmpPort, err := net.SplitHostPort(s.cli.RTMPAddr) 30 + if err != nil { 31 + return nil, err 32 + } 33 + rtmpUrl := fmt.Sprintf("rtmp://%s/live", hostPort(broadcasterHost, rtmpPort, "1935")) 34 + out.Ingests = append(out.Ingests, &placestream.IngestGetIngestUrls_Output_Ingests_Elem{ 35 + IngestDefs_Ingest: &placestream.IngestDefs_Ingest{ 36 + Type: "rtmp", 37 + Url: rtmpUrl, 38 + }, 39 + }) 40 + 41 + _, httpPort, err := net.SplitHostPort(s.cli.HTTPAddr) 42 + if err != nil { 43 + return nil, err 44 + } 45 + whipUrl := fmt.Sprintf("http://%s", hostPort(broadcasterHost, httpPort, "80")) 46 + out.Ingests = append(out.Ingests, &placestream.IngestGetIngestUrls_Output_Ingests_Elem{ 47 + IngestDefs_Ingest: &placestream.IngestDefs_Ingest{ 48 + Type: "whip", 49 + Url: whipUrl, 50 + }, 51 + }) 52 + } else { 53 + _, rtmpsPort, err := net.SplitHostPort(s.cli.RTMPSAddr) 54 + if err != nil { 55 + return nil, err 56 + } 57 + rtmpsUrl := fmt.Sprintf("rtmps://%s:%s/live", broadcasterHost, rtmpsPort) 58 + out.Ingests = append(out.Ingests, &placestream.IngestGetIngestUrls_Output_Ingests_Elem{ 59 + IngestDefs_Ingest: &placestream.IngestDefs_Ingest{ 60 + Type: "rtmps", 61 + Url: rtmpsUrl, 62 + }, 63 + }) 64 + 65 + _, httpsPort, err := net.SplitHostPort(s.cli.HTTPSAddr) 66 + if err != nil { 67 + return nil, err 68 + } 69 + whipUrl := fmt.Sprintf("https://%s", hostPort(broadcasterHost, httpsPort, "443")) 70 + out.Ingests = append(out.Ingests, &placestream.IngestGetIngestUrls_Output_Ingests_Elem{ 71 + IngestDefs_Ingest: &placestream.IngestDefs_Ingest{ 72 + Type: "whip", 73 + Url: whipUrl, 74 + }, 75 + }) 76 + } 77 + 78 + return out, nil 79 + }
+14
pkg/spxrpc/stubs.go
··· 285 285 e.POST("/xrpc/place.stream.branding.updateBlob", s.HandlePlaceStreamBrandingUpdateBlob) 286 286 e.GET("/xrpc/place.stream.broadcast.getBroadcaster", s.HandlePlaceStreamBroadcastGetBroadcaster) 287 287 e.GET("/xrpc/place.stream.graph.getFollowingUser", s.HandlePlaceStreamGraphGetFollowingUser) 288 + e.GET("/xrpc/place.stream.ingest.getIngestUrls", s.HandlePlaceStreamIngestGetIngestUrls) 288 289 e.POST("/xrpc/place.stream.live.denyTeleport", s.HandlePlaceStreamLiveDenyTeleport) 289 290 e.GET("/xrpc/place.stream.live.getLiveUsers", s.HandlePlaceStreamLiveGetLiveUsers) 290 291 e.GET("/xrpc/place.stream.live.getProfileCard", s.HandlePlaceStreamLiveGetProfileCard) ··· 413 414 var handleErr error 414 415 // func (s *Server) handlePlaceStreamGraphGetFollowingUser(ctx context.Context,subjectDID string,userDID string) (*placestream.GraphGetFollowingUser_Output, error) 415 416 out, handleErr = s.handlePlaceStreamGraphGetFollowingUser(ctx, subjectDID, userDID) 417 + if handleErr != nil { 418 + return handleErr 419 + } 420 + return c.JSON(200, out) 421 + } 422 + 423 + func (s *Server) HandlePlaceStreamIngestGetIngestUrls(c echo.Context) error { 424 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandlePlaceStreamIngestGetIngestUrls") 425 + defer span.End() 426 + var out *placestream.IngestGetIngestUrls_Output 427 + var handleErr error 428 + // func (s *Server) handlePlaceStreamIngestGetIngestUrls(ctx context.Context) (*placestream.IngestGetIngestUrls_Output, error) 429 + out, handleErr = s.handlePlaceStreamIngestGetIngestUrls(ctx) 416 430 if handleErr != nil { 417 431 return handleErr 418 432 }
-1
pkg/statedb/queue_processor.go
··· 80 80 func (state *StatefulDB) processFinalizeLivestreamTask(ctx context.Context, task *AppTask) error { 81 81 ctx = log.WithLogValues(ctx, "func", "processFinalizeLivestreamTask") 82 82 log.Debug(ctx, "processing finalize livestream task") 83 - log.Warn(ctx, "processing finalize livestream task") 84 83 var finalizeLivestreamTask FinalizeLivestreamTask 85 84 if err := json.Unmarshal(task.Payload, &finalizeLivestreamTask); err != nil { 86 85 return err
+16
pkg/streamplace/ingestdefs.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + // Lexicon schema: place.stream.ingest.defs 4 + 5 + package streamplace 6 + 7 + // IngestDefs_Ingest is a "ingest" in the place.stream.ingest.defs schema. 8 + // 9 + // An ingest URL for a Streamplace station. 10 + type IngestDefs_Ingest struct { 11 + LexiconTypeID string `json:"$type" cborgen:"$type,const=place.stream.ingest.defs#ingest"` 12 + // type: The type of ingest endpoint, currently 'rtmp' and 'whip' are supported. 13 + Type string `json:"type" cborgen:"type"` 14 + // url: The URL of the ingest endpoint. 15 + Url string `json:"url" cborgen:"url"` 16 + }
+57
pkg/streamplace/ingestgetIngestUrls.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + // Lexicon schema: place.stream.ingest.getIngestUrls 4 + 5 + package streamplace 6 + 7 + import ( 8 + "context" 9 + "encoding/json" 10 + "fmt" 11 + 12 + lexutil "github.com/bluesky-social/indigo/lex/util" 13 + ) 14 + 15 + // IngestGetIngestUrls_Output is the output of a place.stream.ingest.getIngestUrls call. 16 + type IngestGetIngestUrls_Output struct { 17 + Ingests []*IngestGetIngestUrls_Output_Ingests_Elem `json:"ingests" cborgen:"ingests"` 18 + } 19 + 20 + type IngestGetIngestUrls_Output_Ingests_Elem struct { 21 + IngestDefs_Ingest *IngestDefs_Ingest 22 + } 23 + 24 + func (t *IngestGetIngestUrls_Output_Ingests_Elem) MarshalJSON() ([]byte, error) { 25 + if t.IngestDefs_Ingest != nil { 26 + t.IngestDefs_Ingest.LexiconTypeID = "place.stream.ingest.defs#ingest" 27 + return json.Marshal(t.IngestDefs_Ingest) 28 + } 29 + return nil, fmt.Errorf("can not marshal empty union as JSON") 30 + } 31 + 32 + func (t *IngestGetIngestUrls_Output_Ingests_Elem) UnmarshalJSON(b []byte) error { 33 + typ, err := lexutil.TypeExtract(b) 34 + if err != nil { 35 + return err 36 + } 37 + 38 + switch typ { 39 + case "place.stream.ingest.defs#ingest": 40 + t.IngestDefs_Ingest = new(IngestDefs_Ingest) 41 + return json.Unmarshal(b, t.IngestDefs_Ingest) 42 + default: 43 + return nil 44 + } 45 + } 46 + 47 + // IngestGetIngestUrls calls the XRPC method "place.stream.ingest.getIngestUrls". 48 + func IngestGetIngestUrls(ctx context.Context, c lexutil.LexClient) (*IngestGetIngestUrls_Output, error) { 49 + var out IngestGetIngestUrls_Output 50 + 51 + params := map[string]interface{}{} 52 + if err := c.LexDo(ctx, lexutil.Query, "", "place.stream.ingest.getIngestUrls", params, nil, &out); err != nil { 53 + return nil, err 54 + } 55 + 56 + return &out, nil 57 + }