this repo has no description

end to end profile creation

+294 -15
+4
.env
··· 1 + VYLET_INDEXER_DATABASE_HOST="127.0.0.1:9090" 2 + VYLET_BOOTSTRAP_SERVERS="127.0.0.1:9092" 3 + VYLET_INDEXER_INPUT_TOPIC="firehose-events-prod" 4 + VYLET_INDEXER_CONSUMER_GROUP="indexer-consumers"
+3
bus/firehose/handler.go
··· 18 18 19 19 func (kf *KafkaFirehose) handleEvent(ctx context.Context, evt *events.XRPCStreamEvent) error { 20 20 logger := kf.logger.With("name", "handleEvent", "seq", evt.Sequence()) 21 + 21 22 logger.Debug("received event") 22 23 23 24 var kind string ··· 96 97 func() { 97 98 status := "error" 98 99 var collection string 100 + 101 + logger := logger.With("collection", collection) 99 102 100 103 defer func() { 101 104 recordsHandled.WithLabelValues(status, collection).Inc()
+72
cmd/indexer/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log" 7 + "os" 8 + 9 + "github.com/bluesky-social/go-util/pkg/telemetry" 10 + _ "github.com/joho/godotenv/autoload" 11 + "github.com/urfave/cli/v2" 12 + "github.com/vylet-app/go/indexer" 13 + ) 14 + 15 + func main() { 16 + app := cli.App{ 17 + Name: "vylet-database", 18 + Flags: []cli.Flag{ 19 + telemetry.CLIFlagDebug, 20 + telemetry.CLIFlagMetricsListenAddress, 21 + &cli.StringFlag{ 22 + Name: "database-host", 23 + Value: "127.0.0.1:9090", 24 + EnvVars: []string{"VYLET_DATABASE_HOST"}, 25 + }, 26 + &cli.StringSliceFlag{ 27 + Name: "bootstrap-servers", 28 + Value: cli.NewStringSlice("localhost:9092"), 29 + EnvVars: []string{"VYLET_BOOTSTRAP_SERVERS"}, 30 + }, 31 + &cli.StringFlag{ 32 + Name: "input-topic", 33 + Value: "firehose-events-prod", 34 + EnvVars: []string{"VYLET_INDEXER_INPUT_TOPIC"}, 35 + }, 36 + &cli.StringFlag{ 37 + Name: "consumer-group", 38 + Required: true, 39 + EnvVars: []string{"VYLET_INDEXER_CONSUMER_GROUP"}, 40 + }, 41 + }, 42 + Action: run, 43 + } 44 + 45 + if err := app.Run(os.Args); err != nil { 46 + log.Fatal(err) 47 + } 48 + } 49 + 50 + func run(cmd *cli.Context) error { 51 + ctx := context.Background() 52 + 53 + logger := telemetry.StartLogger(cmd) 54 + telemetry.StartMetrics(cmd) 55 + 56 + server, err := indexer.New(&indexer.Args{ 57 + Logger: logger, 58 + BootstrapServers: cmd.StringSlice("bootstrap-servers"), 59 + InputTopic: cmd.String("input-topic"), 60 + ConsumerGroup: cmd.String("consumer-group"), 61 + DatabaseHost: cmd.String("database-host"), 62 + }) 63 + if err != nil { 64 + return fmt.Errorf("failed to create new server: %w", err) 65 + } 66 + 67 + if err := server.Run(ctx); err != nil { 68 + return fmt.Errorf("failed to run server: %w", err) 69 + } 70 + 71 + return nil 72 + }
+44
database/client/client.go
··· 1 + package client 2 + 3 + import ( 4 + "crypto/tls" 5 + "fmt" 6 + 7 + vyletdatabase "github.com/vylet-app/go/database/proto" 8 + "google.golang.org/grpc" 9 + "google.golang.org/grpc/credentials" 10 + ) 11 + 12 + type Client struct { 13 + client *grpc.ClientConn 14 + Profile vyletdatabase.ProfileServiceClient 15 + } 16 + 17 + type Args struct { 18 + Addr string 19 + } 20 + 21 + func New(args *Args) (*Client, error) { 22 + tlsConfig := &tls.Config{ 23 + InsecureSkipVerify: true, 24 + } 25 + creds := credentials.NewTLS(tlsConfig) 26 + 27 + conn, err := grpc.NewClient(args.Addr, grpc.WithTransportCredentials(creds)) 28 + if err != nil { 29 + return nil, fmt.Errorf("failed to connect: %w", err) 30 + } 31 + 32 + profileClient := vyletdatabase.NewProfileServiceClient(conn) 33 + 34 + client := Client{ 35 + client: conn, 36 + Profile: profileClient, 37 + } 38 + 39 + return &client, nil 40 + } 41 + 42 + func (c *Client) Close() error { 43 + return c.client.Close() 44 + }
+14 -2
database/proto/profile.pb.go
··· 114 114 type CreateProfileRequest struct { 115 115 state protoimpl.MessageState `protogen:"open.v1"` 116 116 Did string `protobuf:"bytes,1,opt,name=did,proto3" json:"did,omitempty"` 117 + CreatedAt *string `protobuf:"bytes,2,opt,name=created_at,json=createdAt,proto3,oneof" json:"created_at,omitempty"` 117 118 unknownFields protoimpl.UnknownFields 118 119 sizeCache protoimpl.SizeCache 119 120 } ··· 151 152 func (x *CreateProfileRequest) GetDid() string { 152 153 if x != nil { 153 154 return x.Did 155 + } 156 + return "" 157 + } 158 + 159 + func (x *CreateProfileRequest) GetCreatedAt() string { 160 + if x != nil && x.CreatedAt != nil { 161 + return *x.CreatedAt 154 162 } 155 163 return "" 156 164 } ··· 207 215 "\x11GetProfileRequest\x12\x18\n" + 208 216 "\x03did\x18\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x01R\x03did\".\n" + 209 217 "\x12GetProfileResponse\x12\x18\n" + 210 - "\x03did\x18\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x01R\x03did\"0\n" + 218 + "\x03did\x18\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x01R\x03did\"c\n" + 211 219 "\x14CreateProfileRequest\x12\x18\n" + 212 - "\x03did\x18\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x01R\x03did\"<\n" + 220 + "\x03did\x18\x01 \x01(\tB\x06\xbaH\x03\xc8\x01\x01R\x03did\x12\"\n" + 221 + "\n" + 222 + "created_at\x18\x02 \x01(\tH\x00R\tcreatedAt\x88\x01\x01B\r\n" + 223 + "\v_created_at\"<\n" + 213 224 "\x15CreateProfileResponse\x12\x19\n" + 214 225 "\x05error\x18\x01 \x01(\tH\x00R\x05error\x88\x01\x01B\b\n" + 215 226 "\x06_error2\xbf\x01\n" + ··· 255 266 if File_profile_proto != nil { 256 267 return 257 268 } 269 + file_profile_proto_msgTypes[2].OneofWrappers = []any{} 258 270 file_profile_proto_msgTypes[3].OneofWrappers = []any{} 259 271 type x struct{} 260 272 out := protoimpl.TypeBuilder{
+1
database/proto/profile.proto
··· 28 28 string did = 1 [ 29 29 (buf.validate.field).required = true 30 30 ]; 31 + optional string created_at = 2; 31 32 } 32 33 33 34 message CreateProfileResponse {
+3 -1
database/server/migrations.go
··· 54 54 55 55 // RollbackMigration rolls back the last migration 56 56 func RollbackMigration(session *gocql.Session, migrationsPath string) error { 57 - driver, err := cassandra.WithInstance(session, &cassandra.Config{}) 57 + driver, err := cassandra.WithInstance(session, &cassandra.Config{ 58 + KeyspaceName: session.Query("").Consistency(gocql.One).GetConsistency().String(), 59 + }) 58 60 if err != nil { 59 61 return fmt.Errorf("failed to create migration driver: %w", err) 60 62 }
+31 -1
database/server/profile.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "time" 5 6 6 7 vyletdatabase "github.com/vylet-app/go/database/proto" 8 + "github.com/vylet-app/go/internal/helpers" 7 9 ) 8 10 9 11 func (s *Server) CreateProfile(ctx context.Context, req *vyletdatabase.CreateProfileRequest) (*vyletdatabase.CreateProfileResponse, error) { 10 - return nil, nil 12 + logger := s.logger.With("name", "CreateProfile") 13 + 14 + var createdAt string 15 + if req.CreatedAt != nil { 16 + createdAt = *req.CreatedAt 17 + } 18 + 19 + now := time.Now().UTC() 20 + 21 + createdAtTime, err := time.Parse(time.RFC3339Nano, createdAt) 22 + if err != nil { 23 + createdAtTime = now 24 + } 25 + 26 + if err := s.cqlSession.Query( 27 + ` 28 + INSERT INTO profiles 29 + (did, created_at, indexed_at, updated_at) 30 + VALUES 31 + (?, ?, ?, ?) 32 + `, req.Did, createdAtTime, now, now, 33 + ).WithContext(ctx).Exec(); err != nil { 34 + logger.Error("failed to create profile", "did", req.Did, "err", err) 35 + return &vyletdatabase.CreateProfileResponse{ 36 + Error: helpers.ToStringPtr(err.Error()), 37 + }, nil 38 + } 39 + 40 + return &vyletdatabase.CreateProfileResponse{}, nil 11 41 } 12 42 13 43 func (s *Server) GetProfile(ctx context.Context, req *vyletdatabase.GetProfileRequest) (*vyletdatabase.GetProfileResponse, error) {
+7 -1
database/server/server.go
··· 22 22 vyletdatabase "github.com/vylet-app/go/database/proto" 23 23 "google.golang.org/grpc" 24 24 "google.golang.org/grpc/credentials" 25 + "google.golang.org/grpc/reflection" 25 26 ) 26 27 27 28 const ( ··· 93 94 cassandraAddrs: args.CassandraAddrs, 94 95 cassandraKeyspace: args.CassandraKeyspace, 95 96 97 + listenerAddr: args.ListenAddr, 98 + 96 99 cqlSession: session, 97 100 98 101 grpcServer: grpcServer, ··· 106 109 func (s *Server) Run(ctx context.Context) error { 107 110 logger := s.logger.With("name", "Run") 108 111 112 + logger.Info("attempting to listen", "addr", s.listenerAddr) 113 + 109 114 listener, err := net.Listen("tcp", s.listenerAddr) 110 115 if err != nil { 111 116 return fmt.Errorf("failed to listen: %w", err) 112 117 } 113 118 114 - logger.Info("running gRPC server") 119 + logger.Info("running gRPC server", "addr", s.listenerAddr) 115 120 116 121 grpcServerErr := make(chan error, 1) 117 122 go func() { ··· 150 155 151 156 func (s *Server) registerServices() { 152 157 vyletdatabase.RegisterProfileServiceServer(s.grpcServer, s) 158 + reflection.Register(s.grpcServer) 153 159 } 154 160 155 161 func GenerateTLSCertificate(commonName string) (*tls.Certificate, error) {
+1
go.mod
··· 10 10 github.com/golang-migrate/migrate/v4 v4.19.1 11 11 github.com/gorilla/websocket v1.5.1 12 12 github.com/ipfs/go-cid v0.4.1 13 + github.com/joho/godotenv v1.5.1 13 14 github.com/prometheus/client_golang v1.23.2 14 15 github.com/twmb/franz-go v1.19.5 15 16 github.com/urfave/cli/v2 v2.27.7
+2
go.sum
··· 169 169 github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= 170 170 github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= 171 171 github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= 172 + github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= 173 + github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= 172 174 github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= 173 175 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= 174 176 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
+37
indexer/actorprofile.go
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + 8 + vyletkafka "github.com/vylet-app/go/bus/proto" 9 + vyletdatabase "github.com/vylet-app/go/database/proto" 10 + "github.com/vylet-app/go/generated/vylet" 11 + ) 12 + 13 + func (s *Server) handleActorProfile(ctx context.Context, evt *vyletkafka.FirehoseEvent) error { 14 + var rec vylet.ActorProfile 15 + op := evt.Commit 16 + 17 + if op.Operation == vyletkafka.CommitOperation_COMMIT_OPERATION_CREATE { 18 + if err := json.Unmarshal(op.Record, &rec); err != nil { 19 + return fmt.Errorf("failed to unmarshal profile record: %w", err) 20 + } 21 + 22 + resp, err := s.db.Profile.CreateProfile(ctx, &vyletdatabase.CreateProfileRequest{ 23 + Did: evt.Did, 24 + CreatedAt: rec.CreatedAt, 25 + }) 26 + if err != nil { 27 + return fmt.Errorf("failed to create create profile request: %w", err) 28 + } 29 + if resp.Error != nil { 30 + return fmt.Errorf("error creating profile: %s", *resp.Error) 31 + } 32 + } else { 33 + return fmt.Errorf("unhandled operation") 34 + } 35 + 36 + return nil 37 + }
+19
indexer/handler.go
··· 7 7 ) 8 8 9 9 func (s *Server) handleEvent(ctx context.Context, evt *vyletkafka.FirehoseEvent) error { 10 + logger := s.logger.With("name", "handleEvent") 11 + if evt.Commit != nil { 12 + err := s.handleCommit(ctx, evt) 13 + if err != nil { 14 + logger.Error("error handling event", "err", err) 15 + return err 16 + } 17 + return s.handleCommit(ctx, evt) 18 + } 19 + 20 + return nil 21 + } 22 + 23 + func (s *Server) handleCommit(ctx context.Context, evt *vyletkafka.FirehoseEvent) error { 24 + switch evt.Commit.Collection { 25 + case "app.vylet.actor.profile": 26 + return s.handleActorProfile(ctx, evt) 27 + } 28 + 10 29 return nil 11 30 }
+38 -7
indexer/server.go
··· 11 11 12 12 "github.com/bluesky-social/go-util/pkg/bus/consumer" 13 13 vyletkafka "github.com/vylet-app/go/bus/proto" 14 + "github.com/vylet-app/go/database/client" 14 15 ) 15 16 16 17 type Server struct { 17 18 logger *slog.Logger 18 19 19 20 consumer *consumer.Consumer[*vyletkafka.FirehoseEvent] 21 + db *client.Client 20 22 } 21 23 22 24 type Args struct { ··· 25 27 BootstrapServers []string 26 28 InputTopic string 27 29 ConsumerGroup string 30 + 31 + DatabaseHost string 28 32 } 29 33 30 34 func New(args *Args) (*Server, error) { ··· 34 38 35 39 logger := args.Logger 36 40 41 + db, err := client.New(&client.Args{ 42 + Addr: args.DatabaseHost, 43 + }) 44 + if err != nil { 45 + return nil, fmt.Errorf("failed to create a new database client: %w", err) 46 + } 47 + 48 + server := Server{ 49 + logger: logger, 50 + 51 + db: db, 52 + } 53 + 37 54 busConsumer, err := consumer.New( 38 55 logger.With("component", "consumer"), 39 56 args.BootstrapServers, 40 57 args.InputTopic, 41 58 args.ConsumerGroup, 42 59 consumer.WithOffset[*vyletkafka.FirehoseEvent](consumer.OffsetStart), 60 + consumer.WithMessageHandler(server.handleEvent), 43 61 ) 44 62 if err != nil { 45 63 return nil, fmt.Errorf("failed to create new consumer: %w", err) 46 64 } 47 - 48 - server := Server{ 49 - logger: logger, 50 - 51 - consumer: busConsumer, 52 - } 65 + server.consumer = busConsumer 53 66 54 67 return &server, nil 55 68 } ··· 59 72 60 73 shutdownConsumer := make(chan struct{}, 1) 61 74 consumerShutdown := make(chan struct{}, 1) 62 - 75 + consumerErr := make(chan error, 1) 63 76 go func() { 77 + go func() { 78 + if err := s.consumer.Consume(ctx); err != nil { 79 + consumerErr <- err 80 + } 81 + }() 64 82 83 + select { 84 + case <-shutdownConsumer: 85 + case err := <-consumerErr: 86 + s.logger.Error("error consuming", "err", err) 87 + } 88 + 89 + s.consumer.Close() 90 + 91 + close(consumerShutdown) 65 92 }() 66 93 67 94 signals := make(chan os.Signal, 1) ··· 82 109 defer cancel() 83 110 84 111 s.consumer.Close() 112 + 113 + if err := s.db.Close(); err != nil { 114 + logger.Error("failed to close database client", "err", err) 115 + } 85 116 86 117 return nil 87 118 }
+5
internal/helpers/helpers.go
··· 1 + package helpers 2 + 3 + func ToStringPtr(str string) *string { 4 + return &str 5 + }
+11 -2
justfile
··· 12 12 go run ./gen 13 13 14 14 migrate-up: 15 - go run ./cmd/database/migrate up 15 + go run ./cmd/database/migrate -k vylet up 16 16 17 17 migrate-down: 18 - go run ./cmd/database/migrate down 18 + go run ./cmd/database/migrate -k vylet down 19 19 20 20 migrate-create name: 21 21 #!/usr/bin/env bash ··· 35 35 36 36 cassandra-shell: 37 37 docker exec -it cassandra cqlsh 38 + 39 + run-database-server: 40 + go run ./cmd/database 41 + 42 + run-firehose: 43 + go run ./cmd/bus/firehose --desired-collections "app.vylet.*" --websocket-host "wss://bsky.network" --output-topic firehose-events-prod 44 + 45 + run-indexer: 46 + go run ./cmd/indexer
+2 -1
migrations/000001_create_profiles_table.up.cql
··· 1 1 CREATE TABLE IF NOT EXISTS profiles ( 2 2 did TEXT PRIMARY KEY, 3 3 created_at TIMESTAMP, 4 - updated_at TIMESTAMP 4 + indexed_at TIMESTAMP, 5 + updated_at TIMESTAMP, 5 6 );