[mirror] Scalable static site server for Git forges (like GitHub Pages)
at 3830af5392aea8cd8f2bfafb04842d9f038eeb64 399 lines 12 kB view raw
1package git_pages 2 3import ( 4 "cmp" 5 "context" 6 "fmt" 7 "io" 8 "net/http" 9 "os" 10 "os/exec" 11 "path" 12 "path/filepath" 13 "strconv" 14 "strings" 15 "time" 16 17 exponential "github.com/jpillora/backoff" 18 "github.com/kankanreno/go-snowflake" 19 "github.com/prometheus/client_golang/prometheus" 20 "github.com/prometheus/client_golang/prometheus/promauto" 21 "google.golang.org/protobuf/encoding/protojson" 22 "google.golang.org/protobuf/proto" 23 timestamppb "google.golang.org/protobuf/types/known/timestamppb" 24) 25 26var ( 27 auditNotifyOkCount = promauto.NewCounter(prometheus.CounterOpts{ 28 Name: "git_pages_audit_notify_ok", 29 Help: "Count of successful audit notifications", 30 }) 31 auditNotifyErrorCount = promauto.NewCounter(prometheus.CounterOpts{ 32 Name: "git_pages_audit_notify_error", 33 Help: "Count of failed audit notifications", 34 }) 35) 36 37type principalKey struct{} 38 39var PrincipalKey = principalKey{} 40 41func WithPrincipal(ctx context.Context) context.Context { 42 principal := &Principal{} 43 return context.WithValue(ctx, PrincipalKey, principal) 44} 45 46func GetPrincipal(ctx context.Context) *Principal { 47 if principal, ok := ctx.Value(PrincipalKey).(*Principal); ok { 48 return principal 49 } 50 return nil 51} 52 53type AuditID int64 54 55func GenerateAuditID() AuditID { 56 inner, err := snowflake.NextID() 57 if err != nil { 58 panic(err) 59 } 60 return AuditID(inner) 61} 62 63func ParseAuditID(repr string) (AuditID, error) { 64 inner, err := strconv.ParseInt(repr, 16, 64) 65 if err != nil { 66 return AuditID(0), err 67 } 68 return AuditID(inner), nil 69} 70 71func (id AuditID) String() string { 72 return fmt.Sprintf("%016x", int64(id)) 73} 74 75func (id AuditID) CompareTime(when time.Time) int { 76 idMillis := int64(id) >> (snowflake.MachineIDLength + snowflake.SequenceLength) 77 whenMillis := when.UTC().UnixNano() / 1e6 78 return cmp.Compare(idMillis, whenMillis) 79} 80 81func EncodeAuditRecord(record *AuditRecord) (data []byte) { 82 data, err := proto.MarshalOptions{Deterministic: true}.Marshal(record) 83 if err != nil { 84 panic(err) 85 } 86 return 87} 88 89func DecodeAuditRecord(data []byte) (record *AuditRecord, err error) { 90 record = &AuditRecord{} 91 err = proto.Unmarshal(data, record) 92 return 93} 94 95func (record *AuditRecord) GetAuditID() AuditID { 96 return AuditID(record.GetId()) 97} 98 99func (record *AuditRecord) DescribePrincipal() string { 100 var items []string 101 if record.Principal != nil { 102 if record.Principal.GetIpAddress() != "" { 103 items = append(items, record.Principal.GetIpAddress()) 104 } 105 if record.Principal.GetForgeUser() != nil { 106 items = append(items, fmt.Sprintf("%s/%s(%d)", 107 record.Principal.GetForgeUser().GetOrigin(), 108 record.Principal.GetForgeUser().GetHandle(), 109 record.Principal.GetForgeUser().GetId())) 110 } 111 if record.Principal.GetCliAdmin() { 112 items = append(items, "<cli-admin>") 113 } 114 } 115 if len(items) > 0 { 116 return strings.Join(items, ";") 117 } else { 118 return "<unknown>" 119 } 120} 121 122func (record *AuditRecord) DescribeResource() string { 123 desc := "<unknown>" 124 if record.Domain != nil && record.Project != nil { 125 desc = path.Join(*record.Domain, *record.Project) 126 } else if record.Domain != nil { 127 desc = *record.Domain 128 } 129 return desc 130} 131 132type AuditRecordScope int 133 134const ( 135 AuditRecordComplete AuditRecordScope = iota 136 AuditRecordNoManifest 137) 138 139func AuditRecordJSON(record *AuditRecord, scope AuditRecordScope) []byte { 140 switch scope { 141 case AuditRecordComplete: 142 // as-is 143 case AuditRecordNoManifest: 144 // trim the manifest 145 newRecord := &AuditRecord{} 146 proto.Merge(newRecord, record) 147 newRecord.Manifest = nil 148 record = newRecord 149 } 150 151 json, err := protojson.MarshalOptions{ 152 Multiline: true, 153 EmitDefaultValues: true, 154 }.Marshal(record) 155 if err != nil { 156 panic(err) 157 } 158 return json 159} 160 161// This function receives `id` and `record` separately because the record itself may have its 162// ID missing or mismatched. While this is very unlikely, using the actual primary key as 163// the filename is more robust. 164func ExtractAuditRecord(ctx context.Context, id AuditID, record *AuditRecord, dest string) error { 165 const mode = 0o400 // readable by current user, not writable 166 167 err := os.WriteFile(filepath.Join(dest, fmt.Sprintf("%s-event.json", id)), 168 AuditRecordJSON(record, AuditRecordNoManifest), mode) 169 if err != nil { 170 return err 171 } 172 173 if record.Manifest != nil { 174 err = os.WriteFile(filepath.Join(dest, fmt.Sprintf("%s-manifest.json", id)), 175 ManifestJSON(record.Manifest), mode) 176 if err != nil { 177 return err 178 } 179 180 archive, err := os.OpenFile(filepath.Join(dest, fmt.Sprintf("%s-archive.tar", id)), 181 os.O_CREATE|os.O_TRUNC|os.O_WRONLY, mode) 182 if err != nil { 183 return err 184 } 185 defer archive.Close() 186 187 err = CollectTar(ctx, archive, record.Manifest, ManifestMetadata{}) 188 if err != nil { 189 return err 190 } 191 } 192 193 return nil 194} 195 196func AuditEventProcessor(command string, args []string) (http.Handler, error) { 197 var err error 198 199 // Resolve the command to an absolute path, as it will be run from a different current 200 // directory, which would break e.g. `git-pages -audit-server tcp/:3004 ./handler.sh`. 201 if command, err = exec.LookPath(command); err != nil { 202 return nil, err 203 } 204 if command, err = filepath.Abs(command); err != nil { 205 return nil, err 206 } 207 208 router := http.NewServeMux() 209 router.Handle("GET /", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 210 // Go will cancel the request context if the client drops the connection. We don't want 211 // that to interrupt processing. However, we also want the client (not the server) to 212 // handle retries, so instead of spawning a goroutine to process the event, we do this 213 // within the HTTP handler. If an error is returned, the notify goroutine in the worker 214 // will retry the HTTP request (with backoff) until it succeeds. 215 // 216 // This is a somewhat idiosyncratic design and it's not clear that this is the best 217 // possible approach (e.g. if the worker gets restarted and the event processing fails, 218 // it will not be retried), but it should do the job for now. It is expected that 219 // some form of observability is used to highlight event processor errors. 220 ctx := context.WithoutCancel(r.Context()) 221 222 id, err := ParseAuditID(r.URL.RawQuery) 223 if err != nil { 224 logc.Printf(ctx, "audit process err: malformed query\n") 225 http.Error(w, "malformed query", http.StatusBadRequest) 226 return 227 } else { 228 logc.Printf(ctx, "audit process %s", id) 229 } 230 231 record, err := backend.QueryAuditLog(ctx, id) 232 if err != nil { 233 logc.Printf(ctx, "audit process err: missing record\n") 234 http.Error(w, "missing record", http.StatusNotFound) 235 return 236 } 237 238 args := append(args, id.String(), record.GetEvent().String()) 239 cmd := exec.CommandContext(ctx, command, args...) 240 if cmd.Dir, err = os.MkdirTemp("", "auditRecord"); err != nil { 241 panic(fmt.Errorf("mkdtemp: %w", err)) 242 } 243 defer os.RemoveAll(cmd.Dir) 244 245 if err = ExtractAuditRecord(ctx, id, record, cmd.Dir); err != nil { 246 logc.Printf(ctx, "audit process %s err: %s\n", id, err) 247 http.Error(w, err.Error(), http.StatusInternalServerError) 248 return 249 } 250 251 output, err := cmd.CombinedOutput() 252 if err != nil { 253 logc.Printf(ctx, "audit process %s err: %s; %s\n", id, err, string(output)) 254 w.WriteHeader(http.StatusServiceUnavailable) 255 if len(output) == 0 { 256 fmt.Fprintln(w, err.Error()) 257 } 258 } else { 259 logc.Printf(ctx, "audit process %s ok: %s\n", id, string(output)) 260 w.WriteHeader(http.StatusOK) 261 } 262 w.Write(output) 263 })) 264 return router, nil 265} 266 267type auditedBackend struct { 268 Backend 269} 270 271var _ Backend = (*auditedBackend)(nil) 272 273func NewAuditedBackend(backend Backend) Backend { 274 return &auditedBackend{backend} 275} 276 277// This function does not retry appending audit records; as such, if it returns an error, 278// this error must interrupt whatever operation it was auditing. A corollary is that it is 279// possible that appending an audit record succeeds but the audited operation fails. 280// This is considered fine since the purpose of auditing is to record end user intent, not 281// to be a 100% accurate reflection of performed actions. When in doubt, the audit records 282// should be examined together with the application logs. 283func (audited *auditedBackend) appendNewAuditRecord(ctx context.Context, record *AuditRecord) (err error) { 284 if config.Audit.Collect { 285 id := GenerateAuditID() 286 record.Id = proto.Int64(int64(id)) 287 record.Timestamp = timestamppb.Now() 288 record.Principal = GetPrincipal(ctx) 289 290 err = audited.Backend.AppendAuditLog(ctx, id, record) 291 if err != nil { 292 err = fmt.Errorf("audit: %w", err) 293 } else { 294 var subject string 295 if record.Project == nil { 296 subject = *record.Domain 297 } else { 298 subject = path.Join(*record.Domain, *record.Project) 299 } 300 logc.Printf(ctx, "audit %s ok: %s %s\n", subject, id, record.Event.String()) 301 302 // Send a notification to the audit server, if configured, and try to make sure 303 // it is delivered by retrying with exponential backoff on errors. 304 notifyAudit(context.WithoutCancel(ctx), id) 305 } 306 } 307 return 308} 309 310func notifyAudit(ctx context.Context, id AuditID) { 311 if config.Audit.NotifyURL != nil { 312 notifyURL := config.Audit.NotifyURL.URL 313 notifyURL.RawQuery = id.String() 314 315 // See also the explanation in `AuditEventProcessor` above. 316 go func() { 317 backoff := exponential.Backoff{ 318 Jitter: true, 319 Min: time.Second * 1, 320 Max: time.Second * 60, 321 } 322 for { 323 resp, err := http.Get(notifyURL.String()) 324 var body []byte 325 if err == nil { 326 defer resp.Body.Close() 327 body, _ = io.ReadAll(resp.Body) 328 } 329 if err == nil && resp.StatusCode == http.StatusOK { 330 logc.Printf(ctx, "audit notify %s ok: %s\n", id, string(body)) 331 auditNotifyOkCount.Inc() 332 break 333 } else { 334 sleepFor := backoff.Duration() 335 if err != nil { 336 logc.Printf(ctx, "audit notify %s err: %s (retry in %s)", 337 id, err, sleepFor) 338 } else { 339 logc.Printf(ctx, "audit notify %s fail: %s (retry in %s); %s", 340 id, resp.Status, sleepFor, string(body)) 341 } 342 auditNotifyErrorCount.Inc() 343 time.Sleep(sleepFor) 344 } 345 } 346 }() 347 } 348} 349 350func (audited *auditedBackend) CommitManifest( 351 ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions, 352) (err error) { 353 domain, project, ok := strings.Cut(name, "/") 354 if !ok { 355 panic("malformed manifest name") 356 } 357 audited.appendNewAuditRecord(ctx, &AuditRecord{ 358 Event: AuditEvent_CommitManifest.Enum(), 359 Domain: proto.String(domain), 360 Project: proto.String(project), 361 Manifest: manifest, 362 }) 363 364 return audited.Backend.CommitManifest(ctx, name, manifest, opts) 365} 366 367func (audited *auditedBackend) DeleteManifest( 368 ctx context.Context, name string, opts ModifyManifestOptions, 369) (err error) { 370 domain, project, ok := strings.Cut(name, "/") 371 if !ok { 372 panic("malformed manifest name") 373 } 374 audited.appendNewAuditRecord(ctx, &AuditRecord{ 375 Event: AuditEvent_DeleteManifest.Enum(), 376 Domain: proto.String(domain), 377 Project: proto.String(project), 378 }) 379 380 return audited.Backend.DeleteManifest(ctx, name, opts) 381} 382 383func (audited *auditedBackend) FreezeDomain(ctx context.Context, domain string) (err error) { 384 audited.appendNewAuditRecord(ctx, &AuditRecord{ 385 Event: AuditEvent_FreezeDomain.Enum(), 386 Domain: proto.String(domain), 387 }) 388 389 return audited.Backend.FreezeDomain(ctx, domain) 390} 391 392func (audited *auditedBackend) UnfreezeDomain(ctx context.Context, domain string) (err error) { 393 audited.appendNewAuditRecord(ctx, &AuditRecord{ 394 Event: AuditEvent_UnfreezeDomain.Enum(), 395 Domain: proto.String(domain), 396 }) 397 398 return audited.Backend.UnfreezeDomain(ctx, domain) 399}