[mirror] Scalable static site server for Git forges (like GitHub Pages)
1package git_pages
2
3import (
4 "cmp"
5 "context"
6 "fmt"
7 "io"
8 "net/http"
9 "os"
10 "os/exec"
11 "path"
12 "path/filepath"
13 "strconv"
14 "strings"
15 "time"
16
17 exponential "github.com/jpillora/backoff"
18 "github.com/kankanreno/go-snowflake"
19 "github.com/prometheus/client_golang/prometheus"
20 "github.com/prometheus/client_golang/prometheus/promauto"
21 "google.golang.org/protobuf/encoding/protojson"
22 "google.golang.org/protobuf/proto"
23 timestamppb "google.golang.org/protobuf/types/known/timestamppb"
24)
25
26var (
27 auditNotifyOkCount = promauto.NewCounter(prometheus.CounterOpts{
28 Name: "git_pages_audit_notify_ok",
29 Help: "Count of successful audit notifications",
30 })
31 auditNotifyErrorCount = promauto.NewCounter(prometheus.CounterOpts{
32 Name: "git_pages_audit_notify_error",
33 Help: "Count of failed audit notifications",
34 })
35)
36
37type principalKey struct{}
38
39var PrincipalKey = principalKey{}
40
41func WithPrincipal(ctx context.Context) context.Context {
42 principal := &Principal{}
43 return context.WithValue(ctx, PrincipalKey, principal)
44}
45
46func GetPrincipal(ctx context.Context) *Principal {
47 if principal, ok := ctx.Value(PrincipalKey).(*Principal); ok {
48 return principal
49 }
50 return nil
51}
52
53type AuditID int64
54
55func GenerateAuditID() AuditID {
56 inner, err := snowflake.NextID()
57 if err != nil {
58 panic(err)
59 }
60 return AuditID(inner)
61}
62
63func ParseAuditID(repr string) (AuditID, error) {
64 inner, err := strconv.ParseInt(repr, 16, 64)
65 if err != nil {
66 return AuditID(0), err
67 }
68 return AuditID(inner), nil
69}
70
71func (id AuditID) String() string {
72 return fmt.Sprintf("%016x", int64(id))
73}
74
75func (id AuditID) CompareTime(when time.Time) int {
76 idMillis := int64(id) >> (snowflake.MachineIDLength + snowflake.SequenceLength)
77 whenMillis := when.UTC().UnixNano() / 1e6
78 return cmp.Compare(idMillis, whenMillis)
79}
80
81func EncodeAuditRecord(record *AuditRecord) (data []byte) {
82 data, err := proto.MarshalOptions{Deterministic: true}.Marshal(record)
83 if err != nil {
84 panic(err)
85 }
86 return
87}
88
89func DecodeAuditRecord(data []byte) (record *AuditRecord, err error) {
90 record = &AuditRecord{}
91 err = proto.Unmarshal(data, record)
92 return
93}
94
95func (record *AuditRecord) GetAuditID() AuditID {
96 return AuditID(record.GetId())
97}
98
99func (record *AuditRecord) DescribePrincipal() string {
100 var items []string
101 if record.Principal != nil {
102 if record.Principal.GetIpAddress() != "" {
103 items = append(items, record.Principal.GetIpAddress())
104 }
105 if record.Principal.GetForgeUser() != nil {
106 items = append(items, fmt.Sprintf("%s/%s(%d)",
107 record.Principal.GetForgeUser().GetOrigin(),
108 record.Principal.GetForgeUser().GetHandle(),
109 record.Principal.GetForgeUser().GetId()))
110 }
111 if record.Principal.GetCliAdmin() {
112 items = append(items, "<cli-admin>")
113 }
114 }
115 if len(items) > 0 {
116 return strings.Join(items, ";")
117 } else {
118 return "<unknown>"
119 }
120}
121
122func (record *AuditRecord) DescribeResource() string {
123 desc := "<unknown>"
124 if record.Domain != nil && record.Project != nil {
125 desc = path.Join(*record.Domain, *record.Project)
126 } else if record.Domain != nil {
127 desc = *record.Domain
128 }
129 return desc
130}
131
132type AuditRecordScope int
133
134const (
135 AuditRecordComplete AuditRecordScope = iota
136 AuditRecordNoManifest
137)
138
139func AuditRecordJSON(record *AuditRecord, scope AuditRecordScope) []byte {
140 switch scope {
141 case AuditRecordComplete:
142 // as-is
143 case AuditRecordNoManifest:
144 // trim the manifest
145 newRecord := &AuditRecord{}
146 proto.Merge(newRecord, record)
147 newRecord.Manifest = nil
148 record = newRecord
149 }
150
151 json, err := protojson.MarshalOptions{
152 Multiline: true,
153 EmitDefaultValues: true,
154 }.Marshal(record)
155 if err != nil {
156 panic(err)
157 }
158 return json
159}
160
161// This function receives `id` and `record` separately because the record itself may have its
162// ID missing or mismatched. While this is very unlikely, using the actual primary key as
163// the filename is more robust.
164func ExtractAuditRecord(ctx context.Context, id AuditID, record *AuditRecord, dest string) error {
165 const mode = 0o400 // readable by current user, not writable
166
167 err := os.WriteFile(filepath.Join(dest, fmt.Sprintf("%s-event.json", id)),
168 AuditRecordJSON(record, AuditRecordNoManifest), mode)
169 if err != nil {
170 return err
171 }
172
173 if record.Manifest != nil {
174 err = os.WriteFile(filepath.Join(dest, fmt.Sprintf("%s-manifest.json", id)),
175 ManifestJSON(record.Manifest), mode)
176 if err != nil {
177 return err
178 }
179
180 archive, err := os.OpenFile(filepath.Join(dest, fmt.Sprintf("%s-archive.tar", id)),
181 os.O_CREATE|os.O_TRUNC|os.O_WRONLY, mode)
182 if err != nil {
183 return err
184 }
185 defer archive.Close()
186
187 err = CollectTar(ctx, archive, record.Manifest, ManifestMetadata{})
188 if err != nil {
189 return err
190 }
191 }
192
193 return nil
194}
195
196func AuditEventProcessor(command string, args []string) (http.Handler, error) {
197 var err error
198
199 // Resolve the command to an absolute path, as it will be run from a different current
200 // directory, which would break e.g. `git-pages -audit-server tcp/:3004 ./handler.sh`.
201 if command, err = exec.LookPath(command); err != nil {
202 return nil, err
203 }
204 if command, err = filepath.Abs(command); err != nil {
205 return nil, err
206 }
207
208 router := http.NewServeMux()
209 router.Handle("GET /", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
210 // Go will cancel the request context if the client drops the connection. We don't want
211 // that to interrupt processing. However, we also want the client (not the server) to
212 // handle retries, so instead of spawning a goroutine to process the event, we do this
213 // within the HTTP handler. If an error is returned, the notify goroutine in the worker
214 // will retry the HTTP request (with backoff) until it succeeds.
215 //
216 // This is a somewhat idiosyncratic design and it's not clear that this is the best
217 // possible approach (e.g. if the worker gets restarted and the event processing fails,
218 // it will not be retried), but it should do the job for now. It is expected that
219 // some form of observability is used to highlight event processor errors.
220 ctx := context.WithoutCancel(r.Context())
221
222 id, err := ParseAuditID(r.URL.RawQuery)
223 if err != nil {
224 logc.Printf(ctx, "audit process err: malformed query\n")
225 http.Error(w, "malformed query", http.StatusBadRequest)
226 return
227 } else {
228 logc.Printf(ctx, "audit process %s", id)
229 }
230
231 record, err := backend.QueryAuditLog(ctx, id)
232 if err != nil {
233 logc.Printf(ctx, "audit process err: missing record\n")
234 http.Error(w, "missing record", http.StatusNotFound)
235 return
236 }
237
238 args := append(args, id.String(), record.GetEvent().String())
239 cmd := exec.CommandContext(ctx, command, args...)
240 if cmd.Dir, err = os.MkdirTemp("", "auditRecord"); err != nil {
241 panic(fmt.Errorf("mkdtemp: %w", err))
242 }
243 defer os.RemoveAll(cmd.Dir)
244
245 if err = ExtractAuditRecord(ctx, id, record, cmd.Dir); err != nil {
246 logc.Printf(ctx, "audit process %s err: %s\n", id, err)
247 http.Error(w, err.Error(), http.StatusInternalServerError)
248 return
249 }
250
251 output, err := cmd.CombinedOutput()
252 if err != nil {
253 logc.Printf(ctx, "audit process %s err: %s; %s\n", id, err, string(output))
254 w.WriteHeader(http.StatusServiceUnavailable)
255 if len(output) == 0 {
256 fmt.Fprintln(w, err.Error())
257 }
258 } else {
259 logc.Printf(ctx, "audit process %s ok: %s\n", id, string(output))
260 w.WriteHeader(http.StatusOK)
261 }
262 w.Write(output)
263 }))
264 return router, nil
265}
266
267type auditedBackend struct {
268 Backend
269}
270
271var _ Backend = (*auditedBackend)(nil)
272
273func NewAuditedBackend(backend Backend) Backend {
274 return &auditedBackend{backend}
275}
276
277// This function does not retry appending audit records; as such, if it returns an error,
278// this error must interrupt whatever operation it was auditing. A corollary is that it is
279// possible that appending an audit record succeeds but the audited operation fails.
280// This is considered fine since the purpose of auditing is to record end user intent, not
281// to be a 100% accurate reflection of performed actions. When in doubt, the audit records
282// should be examined together with the application logs.
283func (audited *auditedBackend) appendNewAuditRecord(ctx context.Context, record *AuditRecord) (err error) {
284 if config.Audit.Collect {
285 id := GenerateAuditID()
286 record.Id = proto.Int64(int64(id))
287 record.Timestamp = timestamppb.Now()
288 record.Principal = GetPrincipal(ctx)
289
290 err = audited.Backend.AppendAuditLog(ctx, id, record)
291 if err != nil {
292 err = fmt.Errorf("audit: %w", err)
293 } else {
294 var subject string
295 if record.Project == nil {
296 subject = *record.Domain
297 } else {
298 subject = path.Join(*record.Domain, *record.Project)
299 }
300 logc.Printf(ctx, "audit %s ok: %s %s\n", subject, id, record.Event.String())
301
302 // Send a notification to the audit server, if configured, and try to make sure
303 // it is delivered by retrying with exponential backoff on errors.
304 notifyAudit(context.WithoutCancel(ctx), id)
305 }
306 }
307 return
308}
309
310func notifyAudit(ctx context.Context, id AuditID) {
311 if config.Audit.NotifyURL != nil {
312 notifyURL := config.Audit.NotifyURL.URL
313 notifyURL.RawQuery = id.String()
314
315 // See also the explanation in `AuditEventProcessor` above.
316 go func() {
317 backoff := exponential.Backoff{
318 Jitter: true,
319 Min: time.Second * 1,
320 Max: time.Second * 60,
321 }
322 for {
323 resp, err := http.Get(notifyURL.String())
324 var body []byte
325 if err == nil {
326 defer resp.Body.Close()
327 body, _ = io.ReadAll(resp.Body)
328 }
329 if err == nil && resp.StatusCode == http.StatusOK {
330 logc.Printf(ctx, "audit notify %s ok: %s\n", id, string(body))
331 auditNotifyOkCount.Inc()
332 break
333 } else {
334 sleepFor := backoff.Duration()
335 if err != nil {
336 logc.Printf(ctx, "audit notify %s err: %s (retry in %s)",
337 id, err, sleepFor)
338 } else {
339 logc.Printf(ctx, "audit notify %s fail: %s (retry in %s); %s",
340 id, resp.Status, sleepFor, string(body))
341 }
342 auditNotifyErrorCount.Inc()
343 time.Sleep(sleepFor)
344 }
345 }
346 }()
347 }
348}
349
350func (audited *auditedBackend) CommitManifest(
351 ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions,
352) (err error) {
353 domain, project, ok := strings.Cut(name, "/")
354 if !ok {
355 panic("malformed manifest name")
356 }
357 audited.appendNewAuditRecord(ctx, &AuditRecord{
358 Event: AuditEvent_CommitManifest.Enum(),
359 Domain: proto.String(domain),
360 Project: proto.String(project),
361 Manifest: manifest,
362 })
363
364 return audited.Backend.CommitManifest(ctx, name, manifest, opts)
365}
366
367func (audited *auditedBackend) DeleteManifest(
368 ctx context.Context, name string, opts ModifyManifestOptions,
369) (err error) {
370 domain, project, ok := strings.Cut(name, "/")
371 if !ok {
372 panic("malformed manifest name")
373 }
374 audited.appendNewAuditRecord(ctx, &AuditRecord{
375 Event: AuditEvent_DeleteManifest.Enum(),
376 Domain: proto.String(domain),
377 Project: proto.String(project),
378 })
379
380 return audited.Backend.DeleteManifest(ctx, name, opts)
381}
382
383func (audited *auditedBackend) FreezeDomain(ctx context.Context, domain string) (err error) {
384 audited.appendNewAuditRecord(ctx, &AuditRecord{
385 Event: AuditEvent_FreezeDomain.Enum(),
386 Domain: proto.String(domain),
387 })
388
389 return audited.Backend.FreezeDomain(ctx, domain)
390}
391
392func (audited *auditedBackend) UnfreezeDomain(ctx context.Context, domain string) (err error) {
393 audited.appendNewAuditRecord(ctx, &AuditRecord{
394 Event: AuditEvent_UnfreezeDomain.Enum(),
395 Domain: proto.String(domain),
396 })
397
398 return audited.Backend.UnfreezeDomain(ctx, domain)
399}