backend for xcvr appview
at main 271 lines 6.1 kB view raw
1package model 2 3import ( 4 "context" 5 "errors" 6 "net/http" 7 "os" 8 "rvcx/internal/db" 9 "rvcx/internal/log" 10 "rvcx/internal/oauth" 11 "rvcx/internal/recordmanager" 12 "rvcx/internal/types" 13 "sync" 14 "time" 15 16 "github.com/rachel-mp4/lrcd" 17 lrcpb "github.com/rachel-mp4/lrcproto/gen/go" 18) 19 20type Model struct { 21 store *db.Store 22 uriMap map[string]*channelModel 23 logger *log.Logger 24 cli *oauth.PasswordClient 25 mu sync.Mutex 26 rm *recordmanager.RecordManager 27} 28 29type channelModel struct { 30 uri string 31 logger *log.Logger 32 valid bool 33 34 welcome string 35 server *lrcd.Server 36 lastID uint32 37 initChan <-chan lrcd.InitChanMsg 38 mediainitChan <-chan lrcd.MediaInitChanMsg 39 ctx context.Context 40 cancel func() 41 42 clients map[*client]bool 43 clientsmu sync.Mutex 44} 45 46func (m *Model) GetWSHandlerFrom(uri string) (http.HandlerFunc, error) { 47 server, err := m.getServer(uri) 48 if err != nil { 49 return nil, err 50 } 51 return server.WSHandler(), nil 52} 53 54func (m *Model) GetLexStreamFrom(uri string) (http.HandlerFunc, error) { 55 cm, ok := m.uriMap[uri] 56 if !ok { 57 return nil, errors.New("not a valid server") 58 } 59 return cm.WSHandler(uri, m), nil 60} 61 62func Init(store *db.Store, logger *log.Logger, cli *oauth.PasswordClient, rm *recordmanager.RecordManager) *Model { 63 uris, err := store.GetChannelURIs(context.Background()) 64 if err != nil { 65 panic(err) 66 } 67 uriToServerModel := make(map[string]*channelModel, len(uris)) 68 myid := os.Getenv("MY_DID") 69 for _, uri := range uris { 70 valid := (uri.Host == myid) 71 beep := channelModel{ 72 welcome: uri.Topic, 73 uri: uri.URI, 74 logger: logger, 75 lastID: uri.LastID, 76 valid: valid, 77 clients: make(map[*client]bool), 78 clientsmu: sync.Mutex{}, 79 } 80 uriToServerModel[uri.URI] = &beep 81 } 82 return &Model{ 83 store, 84 uriToServerModel, 85 logger, 86 cli, 87 sync.Mutex{}, 88 rm, 89 } 90} 91 92func (m *Model) AddChannel(c *types.Channel) error { 93 _, ok := m.uriMap[c.URI] 94 if ok { 95 return errors.New("tried to add existing server!") 96 } 97 valid := (c.Host == os.Getenv("MY_DID")) 98 var welcome string 99 if c.Topic == nil { 100 welcome = "and now you're connected" 101 } else { 102 welcome = *c.Topic 103 } 104 beep := channelModel{ 105 welcome: welcome, 106 uri: c.URI, 107 logger: m.logger, 108 lastID: 1, 109 valid: valid, 110 clients: make(map[*client]bool), 111 clientsmu: sync.Mutex{}, 112 } 113 m.uriMap[c.URI] = &beep 114 return nil 115} 116 117func (m *Model) UpdateChannel(c *types.Channel) error { 118 cm, ok := m.uriMap[c.URI] 119 if !ok { 120 return m.AddChannel(c) 121 } 122 valid := (c.Host == os.Getenv("my_IDENTITY")) 123 if valid != cm.valid { 124 if valid { 125 cm.valid = true 126 } else { 127 cm.valid = false 128 cm.cancel() 129 } 130 } 131 var welcome string 132 if c.Topic == nil { 133 welcome = "and now you're connected" 134 } else { 135 welcome = *c.Topic 136 } 137 cm.welcome = welcome 138 return nil 139} 140 141func (m *Model) DeleteChannel(uri string) error { 142 cm, ok := m.uriMap[uri] 143 if !ok { 144 return nil 145 } 146 delete(m.uriMap, uri) 147 // this case is for if a malformed channel record is ingested which 148 // doesn't create a channel, but it still shows up in uriMap. probs 149 // shouldn't be in uriMap but idk 150 if cm != nil { 151 cm.cancel() 152 } 153 return nil 154} 155 156func (m *Model) getServer(uri string) (*lrcd.Server, error) { 157 m.mu.Lock() 158 defer m.mu.Unlock() 159 160 cm := m.uriMap[uri] 161 if cm == nil { 162 return nil, errors.New("uri doesn't refer to a channel i am aware of") 163 } 164 if !cm.valid { 165 return nil, errors.New("Not hosted on this backend!") 166 } 167 168 if cm.server == nil { 169 m.logger.Deprintln("i think the server should exist, so i'm making it") 170 var err error 171 lastID := cm.lastID 172 initChan := make(chan lrcd.InitChanMsg, 100) 173 mediainitChan := make(chan lrcd.MediaInitChanMsg, 100) 174 175 server, err := lrcd.NewServer( 176 lrcd.WithResolver(func(externalID string, ctx context.Context) *string { 177 did, err := m.store.FullResolveHandle(externalID, ctx) 178 if err != nil { 179 select { 180 case <-ctx.Done(): 181 return nil 182 default: 183 return &did 184 } 185 } 186 return &did 187 }), 188 lrcd.WithWelcome(cm.welcome), 189 lrcd.WithLogging(os.Stdout, true), 190 lrcd.WithInitialID(lastID), 191 lrcd.WithInitChannel(initChan), 192 lrcd.WithMediainitChannel(mediainitChan), 193 lrcd.WithServerURIAndSecret(uri, os.Getenv("LRCD_SECRET")), 194 ) 195 if err != nil { 196 return nil, errors.New("Error creating server") 197 } 198 199 err = server.Start() 200 if err != nil { 201 return nil, errors.New("Error starting server") 202 } 203 204 if cm.cancel != nil { 205 m.logger.Println("that's weird, old cancel lying around") 206 cm.cancel() 207 } 208 209 ctx, cancel := context.WithCancel(context.Background()) 210 cm.server = server 211 cm.initChan = initChan 212 cm.mediainitChan = mediainitChan 213 cm.cancel = cancel 214 cm.ctx = ctx 215 216 go m.handleInitEvents(cm) 217 } 218 return cm.server, nil 219} 220 221func (m *Model) handleInitEvents(cm *channelModel) { 222 ticker := time.NewTicker(5 * time.Minute) 223 defer ticker.Stop() 224 225 for { 226 select { 227 case <-cm.ctx.Done(): 228 cm.logger.Deprintln("i'm a handleinitevent goroutine and my context is done") 229 return 230 case <-ticker.C: 231 c := cm.server.Connected() 232 if c == 0 { 233 cm.logger.Deprintln("i think the server is empty! gonna break some things") 234 lastID, err := cm.server.Stop() 235 if err != nil { 236 panic(err) 237 } 238 cm.lastID = lastID 239 cm.server = nil 240 cm.initChan = nil 241 cm.cancel() 242 cm.cancel = nil 243 return 244 } 245 case e, ok := <-cm.initChan: 246 if !ok { 247 cm.logger.Println("this is a weird case!") 248 return 249 } 250 err := m.rm.PostSignet(e.ResolvedId, e.Init, cm.uri, context.Background()) 251 if err != nil { 252 m.logger.Println("error posting signet: " + err.Error()) 253 } 254 case me, ok := <-cm.mediainitChan: 255 if !ok { 256 cm.logger.Println("this is a weird case!") 257 return 258 } 259 e := lrcpb.Event_Init{ 260 Init: &lrcpb.Init{ 261 Id: me.Mediainit.Mediainit.Id, 262 ExternalID: me.Mediainit.Mediainit.ExternalID, 263 }, 264 } 265 err := m.rm.PostSignet(me.ResolvedId, e, cm.uri, context.Background()) 266 if err != nil { 267 m.logger.Println("error posting signet: " + err.Error()) 268 } 269 } 270 } 271}