Openstatus www.openstatus.dev

๐Ÿ”ฅ Go tweak (#495)

* ๐Ÿ”ฅ to the moon

* ๐Ÿ”ฅ to the moon

* ๐Ÿ”ฅ to the moon

* ๐Ÿ”ฅ to the moon

* ๐Ÿ”ฅ to the moon

authored by

Thibault Le Ouay and committed by
GitHub
920ab4f4 5d552738

+371 -65
+30
apps/checker/README.md
··· 1 + # OpenStatus Checker 2 + 3 + The checker service to ping external service. 4 + 5 + It pings the service and save thedata to the tinybird 6 + 7 + ## How to run 8 + 9 + ```bash 10 + go run *.go 11 + ``` 12 + 13 + ## How to build 14 + 15 + ```bash 16 + go build -o checker *.go 17 + ``` 18 + 19 + ## How to run in docker 20 + 21 + ```bash 22 + docker build -t checker . 23 + docker run -p 8080:8080 checker 24 + ``` 25 + 26 + ## How to deploy 27 + 28 + ```bash 29 + fly deploy 30 + ```
+12 -5
apps/checker/fly.toml
··· 19 19 [http_service] 20 20 internal_port = 8080 21 21 force_https = true 22 - auto_stop_machines = true 23 - auto_start_machines = true 24 - min_machines_running = 0 22 + auto_stop_machines = false 23 + auto_start_machines = false 25 24 processes = ["app"] 26 25 27 26 [[vm]] 28 27 cpu_kind = "shared" 29 - cpus = 2 30 - memory_mb = 512 28 + cpus = 1 29 + memory_mb = 256 30 + 31 + 32 + [[http_service.checks]] 33 + grace_period = "10s" 34 + interval = "15s" 35 + method = "GET" 36 + timeout = "5s" 37 + path = "/ping"
+134 -48
apps/checker/main.go
··· 1 1 package main 2 2 3 3 import ( 4 - "bytes" 5 4 "encoding/json" 6 5 "fmt" 7 6 "net/http" 8 7 "os" 9 - "time" 8 + "strconv" 10 9 11 10 "github.com/go-chi/chi/v5" 12 11 "github.com/go-chi/chi/v5/middleware" ··· 31 30 r := chi.NewRouter() 32 31 r.Use(middleware.Logger) 33 32 r.Post("/", func(w http.ResponseWriter, r *http.Request) { 34 - if r.Header.Get("Authorization") != "Basic "+ os.Getenv("CRON_SECRET") { 33 + if r.Header.Get("Authorization") != "Basic "+os.Getenv("CRON_SECRET") { 35 34 http.Error(w, "Unauthorized", 401) 36 35 return 37 - } 38 - region := os.Getenv("FLY_REGION") 36 + } 37 + i, err := strconv.Atoi(r.Header.Get("X-CloudTasks-TaskRetryCount")) 38 + if err != nil { 39 + http.Error(w, "Something went whont", 400) 40 + return 41 + } 42 + // If something went wrong we only try it twice 43 + if i > 1 { 44 + w.WriteHeader(http.StatusOK) 45 + w.Write([]byte("Ok")) 46 + return 47 + } 48 + 39 49 if r.Body == nil { 40 50 http.Error(w, "Please send a request body", 400) 41 51 return 42 52 } 43 53 var u InputData 54 + // region := os.Getenv("FLY_REGION") 44 55 45 - err := json.NewDecoder(r.Body).Decode(&u) 56 + err = json.NewDecoder(r.Body).Decode(&u) 46 57 47 - fmt.Printf("Start checker for %+v", u) 58 + fmt.Printf("๐Ÿš€ Start checker for %+v \n", u) 48 59 49 60 if err != nil { 50 - http.Error(w, err.Error(), 400) 61 + w.Write([]byte("Ok")) 62 + w.WriteHeader(200) 51 63 return 52 64 } 53 - request, error := http.NewRequest(u.Method, u.Url, bytes.NewReader([]byte(u.Body))) 54 65 55 - // Setting headers 56 - for _, header := range u.Headers { 57 - fmt.Printf("%+v", header) 58 - if header.Key != "" && header.Value != "" { 59 - request.Header.Set(header.Key, header.Value) 66 + client := &http.Client{} 67 + defer client.CloseIdleConnections() 68 + 69 + response, error := ping(client, u) 70 + 71 + if error != nil { 72 + sendToTinybird(response) 73 + if u.Status == "active" { 74 + // updateStatus(UpdateData{ 75 + // MonitorId: u.MonitorId, 76 + // Status: "error", 77 + // Message: error.Error(), 78 + // Region: region, 79 + // }) 60 80 } 81 + w.Write([]byte("Ok")) 82 + w.WriteHeader(200) 83 + return 61 84 } 62 85 63 - if error != nil { 64 - fmt.Println(error) 86 + sendToTinybird(response) 87 + 88 + if response.StatusCode < 200 || response.StatusCode >= 300 { 89 + // If the status code is not within the 200 range, we update the status to error 90 + // updateStatus(UpdateData{ 91 + // MonitorId: u.MonitorId, 92 + // Status: "error", 93 + // StatusCode: response.StatusCode, 94 + // Region: region, 95 + // }) 96 + } 97 + if u.Status == "error" { 98 + // If the status was error, we update it to active 99 + // updateStatus(UpdateData{ 100 + // MonitorId: u.MonitorId, 101 + // Status: "active", 102 + // Region: region, 103 + // StatusCode: response.StatusCode, 104 + // }) 65 105 } 66 106 67 - client := &http.Client{} 68 - start := time.Now().UTC().UnixMilli() 69 - response, error := client.Do(request) 70 - end := time.Now().UTC().UnixMilli() 107 + fmt.Printf("โฑ๏ธ End checker for %+v with latency %+d and statusCode %+d", u, response.Latency, response.StatusCode) 108 + w.Write([]byte("Ok")) 109 + w.WriteHeader(200) 110 + return 111 + }) 112 + // That's the new checker sending to the correct ingest endpoint 113 + r.Post("/checker", func(w http.ResponseWriter, r *http.Request) { 114 + if r.Header.Get("Authorization") != "Basic "+os.Getenv("CRON_SECRET") { 115 + http.Error(w, "Unauthorized", 401) 116 + return 117 + } 118 + i, err := strconv.Atoi(r.Header.Get("X-CloudTasks-TaskRetryCount")) 119 + if err != nil { 120 + http.Error(w, "Something went whont", 400) 121 + return 122 + } 123 + // If something went wrong we only try it twice 124 + if i > 1 { 125 + w.WriteHeader(http.StatusOK) 126 + w.Write([]byte("Ok")) 127 + return 128 + } 71 129 72 - // Retry if error 73 - if error != nil { 74 - response, error = client.Do(request) 75 - end = time.Now().UTC().UnixMilli() 130 + if r.Body == nil { 131 + http.Error(w, "Please send a request body", 400) 132 + return 76 133 } 134 + var u InputData 135 + // region := os.Getenv("FLY_REGION") 136 + 137 + err = json.NewDecoder(r.Body).Decode(&u) 77 138 78 - latency := end - start 79 - fmt.Println("๐Ÿš€ Checked url: %v with latency %v in region %v ", u.Url, latency, region) 80 - fmt.Printf("Response %+v for %+v", response, u) 139 + fmt.Printf("๐Ÿš€ Start checker for %+v \n", u) 140 + 141 + if err != nil { 142 + w.Write([]byte("Ok")) 143 + w.WriteHeader(200) 144 + return 145 + } 146 + 147 + client := &http.Client{} 148 + defer client.CloseIdleConnections() 149 + 150 + response, error := ping(client, u) 151 + 81 152 if error != nil { 82 - tiny((PingData{ 83 - Latency: (latency), 84 - MonitorId: u.MonitorId, 85 - Region: region, 86 - WorkspaceId: u.WorkspaceId, 87 - Timestamp: time.Now().UTC().UnixMilli(), 88 - Url: u.Url, 89 - Message: error.Error(), 90 - })) 91 - } else { 92 - tiny((PingData{ 93 - Latency: (latency), 94 - MonitorId: u.MonitorId, 95 - Region: region, 96 - WorkspaceId: u.WorkspaceId, 97 - StatusCode: int16(response.StatusCode), 98 - Timestamp: time.Now().UTC().UnixMilli(), 99 - Url: u.Url, 100 - })) 153 + sendToTinybirdNew(response) 154 + if u.Status == "active" { 155 + // updateStatus(UpdateData{ 156 + // MonitorId: u.MonitorId, 157 + // Status: "error", 158 + // Message: error.Error(), 159 + // Region: region, 160 + // }) 161 + } 162 + w.Write([]byte("Ok")) 163 + w.WriteHeader(200) 164 + return 101 165 } 102 166 103 - fmt.Printf("End checker for %+v", u) 167 + sendToTinybirdNew(response) 168 + 169 + if response.StatusCode < 200 || response.StatusCode >= 300 { 170 + // If the status code is not within the 200 range, we update the status to error 171 + // updateStatus(UpdateData{ 172 + // MonitorId: u.MonitorId, 173 + // Status: "error", 174 + // StatusCode: response.StatusCode, 175 + // Region: region, 176 + // }) 177 + } 178 + if u.Status == "error" { 179 + // If the status was error, we update it to active 180 + // updateStatus(UpdateData{ 181 + // MonitorId: u.MonitorId, 182 + // Status: "active", 183 + // Region: region, 184 + // StatusCode: response.StatusCode, 185 + // }) 186 + } 104 187 188 + fmt.Printf("โฑ๏ธ End checker for %+v with latency %+d and statusCode %+d", u, response.Latency, response.StatusCode) 105 189 w.Write([]byte("Ok")) 106 190 w.WriteHeader(200) 191 + return 107 192 }) 108 193 109 194 r.Get("/ping", func(w http.ResponseWriter, r *http.Request) { ··· 116 201 } 117 202 118 203 w.Header().Set("Content-Type", "application/json") 119 - w.WriteHeader(http.StatusCreated) 204 + w.WriteHeader(http.StatusOK) 120 205 json.NewEncoder(w).Encode(data) 206 + return 121 207 }) 122 208 123 209 http.ListenAndServe(":8080", r)
+81 -9
apps/checker/ping.go
··· 6 6 "fmt" 7 7 "io" 8 8 "net/http" 9 + "net/url" 9 10 "os" 10 11 "time" 11 12 ) ··· 14 15 WorkspaceId string `json:"workspaceId"` 15 16 MonitorId string `json:"monitorId"` 16 17 Timestamp int64 `json:"timestamp"` 17 - StatusCode int16 `json:"statusCode"` 18 + StatusCode int `json:"statusCode,omitempty"` 18 19 Latency int64 `json:"latency"` 19 20 CronTimestamp int64 `json:"cronTimestamp"` 20 21 Url string `json:"url"` 21 22 Region string `json:"region"` 22 - Message string `json:"message"` 23 + Message string `json:"message,omitempty"` 23 24 } 24 25 25 - func tiny(pingData PingData) { 26 - url := "https://api.tinybird.co/v0/events?name=golang_ping_response__v1" 27 - fmt.Println("URL:>", url) 26 + func sendToTinybird(pingData PingData) { 27 + url := "https://api.tinybird.co/v0/events?name=golang_ping_response__v3" 28 + fmt.Printf("๐Ÿ“ˆ Sending data to Tinybird for %+v \n", pingData) 28 29 bearer := "Bearer " + os.Getenv("TINYBIRD_TOKEN") 29 30 payloadBuf := new(bytes.Buffer) 30 31 json.NewEncoder(payloadBuf).Encode(pingData) ··· 33 34 req.Header.Set("Content-Type", "application/json") 34 35 35 36 client := &http.Client{Timeout: time.Second * 10} 36 - resp, err := client.Do(req) 37 + _, err = client.Do(req) 37 38 if err != nil { 38 39 fmt.Println(err) 40 + panic(err) 41 + } 42 + // Should we add a retry mechanism here? 43 + 44 + } 39 45 46 + func sendToTinybirdNew(pingData PingData) { 47 + url := "https://api.tinybird.co/v0/events?name=ping_response__v5" 48 + fmt.Printf("๐Ÿ“ˆ Sending data to Tinybird for %+v \n", pingData) 49 + bearer := "Bearer " + os.Getenv("TINYBIRD_TOKEN") 50 + payloadBuf := new(bytes.Buffer) 51 + json.NewEncoder(payloadBuf).Encode(pingData) 52 + req, err := http.NewRequest("POST", url, payloadBuf) 53 + req.Header.Set("Authorization", bearer) 54 + req.Header.Set("Content-Type", "application/json") 55 + 56 + client := &http.Client{Timeout: time.Second * 10} 57 + _, err = client.Do(req) 58 + if err != nil { 59 + fmt.Println(err) 60 + panic(err) 40 61 } 41 - defer resp.Body.Close() 62 + // Should we add a retry mechanism here? 42 63 43 - body, _ := io.ReadAll(resp.Body) 44 - fmt.Println(string(body)) 64 + } 65 + 66 + func ping(client *http.Client, inputData InputData) (PingData, error) { 67 + 68 + region := os.Getenv("FLY_REGION") 69 + request, err := http.NewRequest(inputData.Method, inputData.Url, bytes.NewReader([]byte(inputData.Body))) 70 + 71 + if err != nil { 72 + return PingData{}, fmt.Errorf("Unable to create request: %w", err) 73 + } 74 + request.Header.Set("User-Agent", "OpenStatus/1.0") 75 + 76 + // Setting headers 77 + for _, header := range inputData.Headers { 78 + if header.Key != "" && header.Value != "" { 79 + request.Header.Set(header.Key, header.Value) 80 + } 81 + } 82 + 83 + start := time.Now() 84 + response, err := client.Do(request) 85 + latency := time.Since(start).Milliseconds() 86 + defer response.Body.Close() 87 + 88 + _, err = io.ReadAll(response.Body) 89 + if err != nil { 90 + if urlErr, ok := err.(*url.Error); ok { 91 + if urlErr.Timeout() { 92 + return PingData{ 93 + Latency: latency, 94 + MonitorId: inputData.MonitorId, 95 + Region: region, 96 + WorkspaceId: inputData.WorkspaceId, 97 + Timestamp: time.Now().UTC().UnixMilli(), 98 + Url: inputData.Url, 99 + Message: fmt.Sprintf("Timeout after %d ms", latency), 100 + }, nil 101 + } 102 + } 103 + 104 + return PingData{}, fmt.Errorf("Error while reading body from %s: %w", inputData.Url, err) 105 + } 106 + 107 + return PingData{ 108 + Latency: latency, 109 + StatusCode: response.StatusCode, 110 + MonitorId: inputData.MonitorId, 111 + Region: region, 112 + WorkspaceId: inputData.WorkspaceId, 113 + Timestamp: time.Now().UTC().UnixMilli(), 114 + CronTimestamp: inputData.CronTimestamp, 115 + Url: inputData.Url, 116 + }, nil 45 117 }
+36
apps/checker/update.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "encoding/json" 6 + "fmt" 7 + "net/http" 8 + "os" 9 + "time" 10 + ) 11 + 12 + type UpdateData struct { 13 + MonitorId string `json:"monitorId"` 14 + Status string `json:"status"` 15 + Message string `json:"message,omitempty"` 16 + StatusCode int `json:"statusCode,omitempty"` 17 + Region string `json:"region"` 18 + } 19 + 20 + func updateStatus(updateData UpdateData) { 21 + url := "https://openstatus-api.fly.dev/updateStatus" 22 + fmt.Println("URL:>", url) 23 + bearer := "Bearer " + os.Getenv("CRON_SECRET") 24 + payloadBuf := new(bytes.Buffer) 25 + json.NewEncoder(payloadBuf).Encode(updateData) 26 + req, err := http.NewRequest("POST", url, payloadBuf) 27 + req.Header.Set("Authorization", bearer) 28 + req.Header.Set("Content-Type", "application/json") 29 + 30 + client := &http.Client{Timeout: time.Second * 10} 31 + _, err = client.Do(req) 32 + if err != nil { 33 + panic(err) 34 + } 35 + // Should we add a retry mechanism here? 36 + }
+6 -3
apps/server/src/checker/alerting.ts
··· 1 1 import { db, eq, schema } from "@openstatus/db"; 2 - import type { MonitorRegion, MonitorStatus } from "@openstatus/db/src/schema"; 2 + import type { 3 + MonitorFlyRegion, 4 + MonitorStatus, 5 + } from "@openstatus/db/src/schema"; 3 6 import { 4 7 selectMonitorSchema, 5 8 selectNotificationSchema, 6 9 } from "@openstatus/db/src/schema"; 7 10 import { flyRegionsDict } from "@openstatus/utils"; 8 11 9 - import { env } from "../env"; 10 12 import { checkerAudit } from "../utils/audit-log"; 11 13 import { providerToFunction } from "./utils"; 12 14 ··· 58 60 export const upsertMonitorStatus = async ({ 59 61 monitorId, 60 62 status, 63 + region, 61 64 }: { 62 65 monitorId: string; 63 66 status: MonitorStatus; 67 + region: MonitorFlyRegion; 64 68 }) => { 65 - const region = env.FLY_REGION as MonitorRegion; 66 69 await db 67 70 .insert(schema.monitorStatusTable) 68 71 .values({ status, region, monitorId: Number(monitorId) })
+69
apps/server/src/checker/index.ts
··· 1 1 import { Hono } from "hono"; 2 + import { z } from "zod"; 3 + 4 + import { flyRegions } from "@openstatus/db/src/schema/monitors/constants"; 2 5 3 6 import { env } from "../env"; 7 + import { checkerAudit } from "../utils/audit-log"; 8 + import { upsertMonitorStatus } from "./alerting"; 4 9 import { checkerRetryPolicy } from "./checker"; 5 10 import { payloadSchema } from "./schema"; 6 11 import type { Payload } from "./schema"; ··· 113 118 return c.text("Internal Server Error", 500); 114 119 } 115 120 }); 121 + 122 + checkerRoute.post("/statusChange", async (c) => { 123 + const auth = c.req.header("Authorization"); 124 + if (auth !== `Basic ${env.CRON_SECRET}`) { 125 + console.error("Unauthorized"); 126 + return c.text("Unauthorized", 401); 127 + } 128 + 129 + const json = await c.req.json(); 130 + const schema = z.object({ 131 + monitorId: z.string(), 132 + status: z.enum(["active", "error"]), // that's the new status 133 + message: z.string().optional(), 134 + statusCode: z.number().optional(), 135 + region: z.enum(flyRegions), 136 + }); 137 + 138 + const result = schema.safeParse(json); 139 + if (!result.success) { 140 + // console.error(result.error); 141 + return c.text("Unprocessable Entity", 422); 142 + } 143 + const { monitorId, status, message, region, statusCode } = result.data; 144 + 145 + console.log(`๐Ÿ“ update monitor status ${JSON.stringify(result.data)}`); 146 + 147 + switch (status) { 148 + case "active": 149 + await upsertMonitorStatus({ 150 + monitorId: monitorId, 151 + status: "active", 152 + region: region, 153 + }); 154 + if (!statusCode) { 155 + return; 156 + } 157 + await checkerAudit.publishAuditLog({ 158 + id: `monitor:${monitorId}`, 159 + action: "monitor.recovered", 160 + targets: [{ id: monitorId, type: "monitor" }], 161 + metadata: { region: region, statusCode: statusCode }, 162 + }); 163 + break; 164 + case "error": 165 + await upsertMonitorStatus({ 166 + monitorId: monitorId, 167 + status: "error", 168 + region: region, 169 + }); 170 + // ALPHA 171 + await checkerAudit.publishAuditLog({ 172 + id: `monitor:${monitorId}`, 173 + action: "monitor.failed", 174 + targets: [{ id: monitorId, type: "monitor" }], 175 + metadata: { 176 + region: region, 177 + statusCode: statusCode, 178 + message, 179 + }, 180 + }); 181 + break; 182 + } 183 + return c.text("Ok", 200); 184 + });
+3
packages/db/src/schema/monitors/validation.ts
··· 2 2 import { z } from "zod"; 3 3 4 4 import { 5 + flyRegions, 5 6 monitorJobTypes, 6 7 monitorMethods, 7 8 monitorPeriodicity, ··· 15 16 export const monitorStatusSchema = z.enum(monitorStatus); 16 17 export const monitorRegionSchema = z.enum(monitorRegions); 17 18 export const monitorJobTypesSchema = z.enum(monitorJobTypes); 19 + export const monitorFlyRegionSchema = z.enum(flyRegions); 18 20 19 21 // TODO: shared function 20 22 function stringToArrayProcess<T>(string: T) {} ··· 79 81 export type MonitorPeriodicity = z.infer<typeof monitorPeriodicitySchema>; 80 82 export type MonitorMethod = z.infer<typeof monitorMethodsSchema>; 81 83 export type MonitorRegion = z.infer<typeof monitorRegionSchema>; 84 + export type MonitorFlyRegion = z.infer<typeof monitorFlyRegionSchema>; 82 85 export type MonitorJobType = z.infer<typeof monitorJobTypesSchema>;