Live video on the AT Protocol

model: implement reindexing, index things by URI instead of CID

+144 -206
+5 -5
pkg/api/api_internal.go
··· 410 410 } 411 411 }) 412 412 413 - router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 414 - cid := p.ByName("cid") 415 - if cid == "" { 416 - errors.WriteHTTPBadRequest(w, "cid required", nil) 413 + router.GET("/chat/:uri", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 414 + uri := p.ByName("uri") 415 + if uri == "" { 416 + errors.WriteHTTPBadRequest(w, "uri required", nil) 417 417 return 418 418 } 419 - msg, err := a.Model.GetChatMessage(cid) 419 + msg, err := a.Model.GetChatMessage(uri) 420 420 if err != nil { 421 421 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 422 422 return
+2 -2
pkg/atproto/atproto.go
··· 49 49 50 50 ctx = log.WithLogValues(ctx, "did", ident.DID.String(), "handle", ident.Handle.String()) 51 51 52 - handleLock := getHandleLock(ident.DID.String()) 52 + handleLock := handleLocks.GetLock(ident.DID.String()) 53 53 handleLock.Lock() 54 54 defer handleLock.Unlock() 55 55 ··· 81 81 } 82 82 83 83 log.Log(ctx, "resolved bluesky identity", "did", ident.DID, "handle", ident.Handle, "pds", ident.PDSEndpoint()) 84 - pdsLock := getPDSLock(ident.PDSEndpoint()) 84 + pdsLock := pdsLocks.GetLock(ident.PDSEndpoint()) 85 85 xrpcc := xrpc.Client{ 86 86 Host: ident.PDSEndpoint(), 87 87 Client: &aqhttp.Client,
+2 -2
pkg/atproto/labeler_firehose.go
··· 172 172 } 173 173 targetDID = did.String() 174 174 // if it's a chat message, attempt to send it to the streamers' websocket 175 - if aturi.Collection() == "place.stream.chat.message" && l.CID != nil { 176 - msg, err := atsync.Model.GetChatMessage(*l.CID) 175 + if aturi.Collection() == "place.stream.chat.message" { 176 + msg, err := atsync.Model.GetChatMessage(l.URI) 177 177 if err != nil { 178 178 log.Error(ctx, "failed to get chat message for label", "err", err) 179 179 continue
+3 -44
pkg/atproto/locks.go
··· 1 1 package atproto 2 2 3 - import "sync" 4 - 5 - // handleLocks provides per-handle synchronization 6 - var handleLocks = struct { 7 - sync.Mutex 8 - locks map[string]*sync.Mutex 9 - }{ 10 - locks: make(map[string]*sync.Mutex), 11 - } 12 - 13 - // getHandleLock returns a mutex for the given handle 14 - func getHandleLock(handle string) *sync.Mutex { 15 - handleLocks.Lock() 16 - defer handleLocks.Unlock() 17 - 18 - if lock, exists := handleLocks.locks[handle]; exists { 19 - return lock 20 - } 3 + import "stream.place/streamplace/pkg/statedb" 21 4 22 - lock := &sync.Mutex{} 23 - handleLocks.locks[handle] = lock 24 - return lock 25 - } 26 - 27 - // pdsLocks provides per-pds synchronization 28 - var pdsLocks = struct { 29 - sync.Mutex 30 - locks map[string]*sync.Mutex 31 - }{ 32 - locks: make(map[string]*sync.Mutex), 33 - } 34 - 35 - // getpdsLock returns a mutex for the given pds 36 - func getPDSLock(pds string) *sync.Mutex { 37 - pdsLocks.Lock() 38 - defer pdsLocks.Unlock() 39 - 40 - if lock, exists := pdsLocks.locks[pds]; exists { 41 - return lock 42 - } 43 - 44 - lock := &sync.Mutex{} 45 - pdsLocks.locks[pds] = lock 46 - return lock 47 - } 5 + var handleLocks = statedb.NewNamedLocks() 6 + var pdsLocks = statedb.NewNamedLocks()
+91
pkg/atproto/migrate.go
··· 1 + package atproto 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "sync" 7 + "sync/atomic" 8 + "time" 9 + 10 + "golang.org/x/sync/errgroup" 11 + "stream.place/streamplace/pkg/log" 12 + ) 13 + 14 + func (atsync *ATProtoSynchronizer) Migrate(ctx context.Context) error { 15 + var allDIDs []string 16 + offset := 0 17 + for { 18 + repos, err := atsync.StatefulDB.ListRepos(100, offset) 19 + if err != nil { 20 + return err 21 + } 22 + if len(repos) == 0 { 23 + break 24 + } 25 + for _, repo := range repos { 26 + allDIDs = append(allDIDs, repo.DID) 27 + } 28 + offset += len(repos) 29 + } 30 + 31 + log.Log(ctx, "starting migration sync", "totalRepos", len(allDIDs)) 32 + 33 + g, ctx := errgroup.WithContext(ctx) 34 + var syncedCount int64 35 + 36 + syncErrors := map[string]error{} 37 + syncErrorMu := sync.Mutex{} 38 + 39 + // Start progress logging goroutine 40 + progressCtx, cancelProgress := context.WithCancel(ctx) 41 + defer cancelProgress() 42 + 43 + go func() { 44 + ticker := time.NewTicker(10 * time.Second) 45 + defer ticker.Stop() 46 + 47 + for { 48 + select { 49 + case <-progressCtx.Done(): 50 + return 51 + case <-ticker.C: 52 + current := atomic.LoadInt64(&syncedCount) 53 + log.Log(ctx, "migration progress", "synced", current, "total", len(allDIDs)) 54 + } 55 + } 56 + }() 57 + 58 + for i, did := range allDIDs { 59 + currentIndex := i 60 + currentDID := did 61 + g.Go(func() error { 62 + log.Debug(ctx, "syncing repo", "did", currentDID, "progress", currentIndex+1, "total", len(allDIDs)) 63 + _, err := atsync.SyncBlueskyRepoCached(ctx, currentDID, atsync.Model) 64 + if err != nil { 65 + log.Error(ctx, "failed to sync repo", "did", currentDID, "err", err) 66 + syncErrorMu.Lock() 67 + syncErrors[currentDID] = err 68 + syncErrorMu.Unlock() 69 + } else { 70 + atomic.AddInt64(&syncedCount, 1) 71 + } 72 + return nil 73 + }) 74 + } 75 + 76 + if err := g.Wait(); err != nil { 77 + log.Error(ctx, "migration failed", "err", err, "synced", atomic.LoadInt64(&syncedCount), "total", len(allDIDs)) 78 + return err 79 + } 80 + 81 + for did, err := range syncErrors { 82 + log.Error(ctx, "migration failed for user", "did", did, "err", err) 83 + } 84 + 85 + if len(allDIDs) > 0 && len(syncErrors) == len(allDIDs) { 86 + return fmt.Errorf("all users failed to migrate") 87 + } 88 + 89 + log.Log(ctx, "migration completed", "synced", len(allDIDs)) 90 + return nil 91 + }
+3 -3
pkg/atproto/sync.go
··· 120 120 if err != nil { 121 121 log.Error(ctx, "failed to create chat message", "err", err) 122 122 } 123 - mcm, err = atsync.Model.GetChatMessage(cid) 123 + mcm, err = atsync.Model.GetChatMessage(aturi.String()) 124 124 if err != nil { 125 125 log.Error(ctx, "failed to get just-saved chat message", "err", err) 126 126 } ··· 251 251 if rec.Reply == nil || rec.Reply.Root == nil { 252 252 return nil 253 253 } 254 - livestream, err := atsync.Model.GetLivestreamByPostCID(rec.Reply.Root.Cid) 254 + livestream, err := atsync.Model.GetLivestreamByPostURI(rec.Reply.Root.Uri) 255 255 if err != nil { 256 256 return fmt.Errorf("failed to get livestream: %w", err) 257 257 } ··· 285 285 RepoDID: userDID, 286 286 Type: "reply", 287 287 Repo: repo, 288 - ReplyRootCID: &livestream.PostCID, 288 + ReplyRootURI: &livestream.PostURI, 289 289 ReplyRootRepoDID: &livestream.RepoDID, 290 290 URI: aturi.String(), 291 291 IndexedAt: &now,
+7 -5
pkg/cmd/streamplace.go
··· 32 32 "stream.place/streamplace/pkg/notifications" 33 33 "stream.place/streamplace/pkg/replication" 34 34 "stream.place/streamplace/pkg/replication/boring" 35 - "stream.place/streamplace/pkg/resync" 36 35 "stream.place/streamplace/pkg/rtmps" 37 36 v0 "stream.place/streamplace/pkg/schema/v0" 38 37 "stream.place/streamplace/pkg/spmetrics" ··· 216 215 } 217 216 218 217 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 219 - if len(os.Args) > 1 && os.Args[1] == "resync" { 220 - return resync.Resync(ctx, &cli) 221 - } 222 218 223 219 err = os.MkdirAll(cli.DataDir, os.ModePerm) 224 220 if err != nil { ··· 310 306 } 311 307 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 312 308 313 - mod, err := model.MakeDB(cli.IndexDBPath) 309 + mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 314 310 if err != nil { 315 311 return err 316 312 } ··· 321 317 return err 322 318 } 323 319 } 320 + 324 321 out := carstore.SQLiteStore{} 325 322 err = out.Open(":memory:") 326 323 if err != nil { ··· 356 353 Noter: noter, 357 354 Bus: b, 358 355 } 356 + err = atsync.Migrate(ctx) 357 + if err != nil { 358 + return fmt.Errorf("failed to migrate: %w", err) 359 + } 360 + 359 361 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 360 362 if err != nil { 361 363 return err
-2
pkg/config/config.go
··· 52 52 Build *BuildFlags 53 53 DataDir string 54 54 DBURL string 55 - IndexDBPath string 56 55 EthAccountAddr string 57 56 EthKeystorePath string 58 57 EthPassword string ··· 128 127 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 129 128 fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state") 130 129 cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL) 131 - cli.DataDirFlag(fs, &cli.IndexDBPath, "index-db-path", "db.sqlite", "path to sqlite database file for maintaining atproto index") 132 130 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 133 131 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 134 132 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
+4 -4
pkg/model/chat_message.go
··· 14 14 ) 15 15 16 16 type ChatMessage struct { 17 - CID string `json:"cid" gorm:"primaryKey;column:cid"` 18 - URI string `json:"uri" gorm:"column:uri"` 17 + URI string `json:"uri" gorm:"primaryKey;column:uri"` 18 + CID string `json:"cid" gorm:"column:cid"` 19 19 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_recent_messages,priority:2"` 20 20 ChatMessage *[]byte `json:"chatMessage" gorm:"column:chat_message"` 21 21 RepoDID string `json:"repoDID" gorm:"column:repo_did"` ··· 83 83 return m.DB.Create(message).Error 84 84 } 85 85 86 - func (m *DBModel) GetChatMessage(cid string) (*ChatMessage, error) { 86 + func (m *DBModel) GetChatMessage(uri string) (*ChatMessage, error) { 87 87 var message ChatMessage 88 88 err := m.DB. 89 89 Preload("Repo"). ··· 91 91 Preload("ReplyTo"). 92 92 Preload("ReplyTo.Repo"). 93 93 Preload("ReplyTo.ChatProfile"). 94 - Where("cid = ?", cid). 94 + Where("uri = ?", uri). 95 95 First(&message). 96 96 Error 97 97 if errors.Is(err, gorm.ErrRecordNotFound) {
+7 -7
pkg/model/feed_post.go
··· 12 12 ) 13 13 14 14 type FeedPost struct { 15 - CID string `json:"cid" gorm:"primaryKey;column:cid"` 16 - URI string `json:"uri"` 15 + URI string `json:"uri" gorm:"primaryKey;column:uri"` 16 + CID string `json:"cid" gorm:"column:cid"` 17 17 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:recent_replies"` 18 - FeedPost *[]byte `json:"feedPost"` 18 + FeedPost *[]byte `json:"feedPost" gorm:"column:feed_post"` 19 19 RepoDID string `json:"repoDID" gorm:"column:repo_did"` 20 20 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 21 21 Type string `json:"type" gorm:"column:type"` 22 - ReplyRootCID *string `json:"replyRootCID,omitempty" gorm:"column:reply_root_cid"` 23 - ReplyRoot *FeedPost `json:"replyRoot,omitempty" gorm:"foreignKey:cid;references:ReplyRootCID"` 22 + ReplyRootURI *string `json:"replyRootURI,omitempty" gorm:"column:reply_root_uri"` 23 + ReplyRoot *FeedPost `json:"replyRoot,omitempty" gorm:"foreignKey:uri;references:ReplyRootURI"` 24 24 ReplyRootRepoDID *string `json:"replyRootRepoDID,omitempty" gorm:"column:reply_root_repo_did;index:recent_replies"` 25 25 ReplyRootRepo *Repo `json:"replyRootRepo,omitempty" gorm:"foreignKey:DID;references:ReplyRootRepoDID"` 26 26 IndexedAt *time.Time `json:"indexedAt,omitempty" gorm:"column:indexed_at"` ··· 77 77 return posts, nil 78 78 } 79 79 80 - func (m *DBModel) GetFeedPost(cid string) (*FeedPost, error) { 80 + func (m *DBModel) GetFeedPost(uri string) (*FeedPost, error) { 81 81 post := FeedPost{} 82 - err := m.DB.Where("CID = ?", cid).First(&post).Error 82 + err := m.DB.Where("uri = ?", uri).First(&post).Error 83 83 if errors.Is(err, gorm.ErrRecordNotFound) { 84 84 return nil, nil 85 85 }
+7 -7
pkg/model/livestream.go
··· 13 13 ) 14 14 15 15 type Livestream struct { 16 - CID string `json:"cid" gorm:"primaryKey;column:cid"` 17 - URI string `json:"uri"` 16 + URI string `json:"uri" gorm:"primaryKey;column:uri"` 17 + CID string `json:"cid" gorm:"column:cid"` 18 18 CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;index:idx_repo_created,priority:2"` 19 19 Livestream *[]byte `json:"livestream"` 20 20 RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_created,priority:1"` 21 21 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 22 22 Post *FeedPost `json:"post,omitempty" gorm:"foreignKey:CID;references:PostCID"` 23 - PostCID string `json:"postCID" gorm:"column:post_cid;index:idx_post_cid"` 24 - PostURI string `json:"postURI" gorm:"column:post_uri"` 23 + PostCID string `json:"postCID" gorm:"column:post_cid"` 24 + PostURI string `json:"postURI" gorm:"column:post_uri;index:idx_post_uri"` 25 25 } 26 26 27 27 func (ls *Livestream) ToLivestreamView() (*streamplace.Livestream_LivestreamView, error) { ··· 62 62 return &livestream, nil 63 63 } 64 64 65 - func (m *DBModel) GetLivestreamByPostCID(postCID string) (*Livestream, error) { 65 + func (m *DBModel) GetLivestreamByPostURI(postURI string) (*Livestream, error) { 66 66 var livestream Livestream 67 67 err := m.DB. 68 68 Preload("Repo"). 69 69 Preload("Post"). 70 - Where("post_cid = ?", postCID). 70 + Where("post_uri = ?", postURI). 71 71 First(&livestream).Error 72 72 if errors.Is(err, gorm.ErrRecordNotFound) { 73 73 return nil, nil 74 74 } 75 75 if err != nil { 76 - return nil, fmt.Errorf("error retrieving livestream by postCID: %w", err) 76 + return nil, fmt.Errorf("error retrieving livestream by postURI: %w", err) 77 77 } 78 78 return &livestream, nil 79 79 }
+11 -10
pkg/model/model.go
··· 5 5 "fmt" 6 6 "os" 7 7 "path/filepath" 8 - "strings" 9 8 "time" 10 9 11 10 comatproto "github.com/bluesky-social/indigo/api/atproto" ··· 68 67 69 68 CreateLivestream(ctx context.Context, ls *Livestream) error 70 69 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) 71 - GetLivestreamByPostCID(postCID string) (*Livestream, error) 70 + GetLivestreamByPostURI(postURI string) (*Livestream, error) 72 71 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) 73 72 74 73 CreateBlock(ctx context.Context, block *Block) error ··· 100 99 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error) 101 100 } 102 101 102 + var DBRevision = 2 103 + 103 104 func MakeDB(dbURL string) (Model, error) { 104 - log.Log(context.Background(), "starting database", "dbURL", dbURL) 105 105 sqliteSuffix := dbURL 106 106 if dbURL != ":memory:" { 107 - if !strings.HasPrefix(dbURL, "sqlite://") { 108 - dbURL = fmt.Sprintf("sqlite://%s", dbURL) 107 + // Ensure dbURL exists as a directory on the filesystem 108 + if err := os.MkdirAll(dbURL, os.ModePerm); err != nil { 109 + return nil, fmt.Errorf("error creating database directory: %w", err) 109 110 } 110 - sqliteSuffix := dbURL[len("sqlite://"):] 111 + dbPath := filepath.Join(dbURL, fmt.Sprintf("index_%d.sqlite", DBRevision)) 112 + sqliteSuffix = dbPath 111 113 // if this isn't ":memory:", ensure that directory exists (eg, if db 112 114 // file is being initialized) 113 - if !strings.Contains(sqliteSuffix, ":?") { 114 - if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil { 115 - return nil, fmt.Errorf("error creating database path: %w", err) 116 - } 115 + if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil { 116 + return nil, fmt.Errorf("error creating database path: %w", err) 117 117 } 118 118 } 119 + log.Log(context.Background(), "starting database", "dbURL", sqliteSuffix) 119 120 dial := sqlite.Open(sqliteSuffix) 120 121 121 122 gormLogger := slogGorm.New(
-113
pkg/resync/resync.go
··· 1 - package resync 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "time" 7 - 8 - "golang.org/x/sync/errgroup" 9 - "stream.place/streamplace/pkg/atproto" 10 - "stream.place/streamplace/pkg/bus" 11 - "stream.place/streamplace/pkg/config" 12 - "stream.place/streamplace/pkg/log" 13 - "stream.place/streamplace/pkg/model" 14 - ) 15 - 16 - // resync a fresh database from the PDSses, copying over the few pieces of local state 17 - // that we have 18 - func Resync(ctx context.Context, cli *config.CLI) error { 19 - oldMod, err := model.MakeDB(cli.IndexDBPath) 20 - if err != nil { 21 - return err 22 - } 23 - tempDBPath := cli.IndexDBPath + ".temp." + fmt.Sprintf("%d", time.Now().UnixNano()) 24 - newMod, err := model.MakeDB(tempDBPath) 25 - if err != nil { 26 - return err 27 - } 28 - repos, err := oldMod.GetAllRepos() 29 - if err != nil { 30 - return err 31 - } 32 - 33 - atsync := &atproto.ATProtoSynchronizer{ 34 - CLI: cli, 35 - Model: newMod, 36 - StatefulDB: nil, // TODO: Add StatefulDB for resync when migration is ready 37 - Noter: nil, 38 - Bus: bus.NewBus(), 39 - } 40 - 41 - doneMap := make(map[string]bool) 42 - 43 - g, ctx := errgroup.WithContext(ctx) 44 - 45 - doneChan := make(chan string) 46 - go func() { 47 - for { 48 - select { 49 - case <-ctx.Done(): 50 - return 51 - case did := <-doneChan: 52 - doneMap[did] = true 53 - case <-time.After(10 * time.Second): 54 - for _, repo := range repos { 55 - if !doneMap[repo.DID] { 56 - log.Warn(ctx, "remaining repos to sync", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS) 57 - } 58 - } 59 - } 60 - } 61 - }() 62 - 63 - for _, repo := range repos { 64 - repo := repo // capture range variable 65 - doneMap[repo.DID] = false 66 - g.Go(func() error { 67 - log.Warn(ctx, "syncing repo", "did", repo.DID, "handle", repo.Handle) 68 - ctx := log.WithLogValues(ctx, "resyncDID", repo.DID, "resyncHandle", repo.Handle) 69 - _, err := atsync.SyncBlueskyRepoCached(ctx, repo.Handle, newMod) 70 - if err != nil { 71 - log.Error(ctx, "failed to sync repo", "did", repo.DID, "handle", repo.Handle, "err", err) 72 - return nil 73 - } 74 - log.Log(ctx, "synced repo", "did", repo.DID, "handle", repo.Handle) 75 - doneChan <- repo.DID 76 - return nil 77 - }) 78 - } 79 - 80 - if err := g.Wait(); err != nil { 81 - return err 82 - } 83 - 84 - // TODO: Update OAuth session migration to use new statefulDB 85 - // oauthSessions, err := oldMod.ListOAuthSessions() 86 - // if err != nil { 87 - // return err 88 - // } 89 - // for _, session := range oauthSessions { 90 - // err := newMod.CreateOAuthSession(session.DownstreamDPoPJKT, &session) 91 - // if err != nil { 92 - // return fmt.Errorf("failed to create oauth session: %w", err) 93 - // } 94 - // } 95 - // log.Log(ctx, "migrated oauth sessions", "count", len(oauthSessions)) 96 - 97 - // TODO: Update notification migration to use new statefulDB 98 - // notificationTokens, err := oldMod.ListNotifications() 99 - // if err != nil { 100 - // return err 101 - // } 102 - // for _, token := range notificationTokens { 103 - // err := newMod.CreateNotification(token.Token, token.RepoDID) 104 - // if err != nil { 105 - // return fmt.Errorf("failed to create notification: %w", err) 106 - // } 107 - // } 108 - // log.Log(ctx, "migrated notification tokens", "count", len(notificationTokens)) 109 - 110 - log.Log(ctx, "resync complete!", "newDBPath", tempDBPath) 111 - 112 - return nil 113 - }
+1 -1
pkg/spxrpc/com_atproto_moderation.go
··· 65 65 did = aturi.Authority().String() 66 66 // if it's chat, we want the clip from the streamer, not from the chatter 67 67 if aturi.Collection() == "place.stream.chat.message" { 68 - msg, err := s.model.GetChatMessage(body.Subject.RepoStrongRef.Cid) 68 + msg, err := s.model.GetChatMessage(body.Subject.RepoStrongRef.Uri) 69 69 if err != nil { 70 70 log.Error(ctx, "failed to get chat message for chat report", "error", err) 71 71 } else {
+1 -1
pkg/statedb/migrate.go
··· 28 28 return err 29 29 } 30 30 31 - oldDB, err := gorm.Open(sqlite.Open(cli.IndexDBPath), &gorm.Config{ 31 + oldDB, err := gorm.Open(sqlite.Open(cli.DataFilePath([]string{"db.sqlite"})), &gorm.Config{ 32 32 Logger: gormLogger, 33 33 }) 34 34 if err != nil {