Live video on the AT Protocol

Revert "devenv: remove for now, it broke @atproto versioning"

This reverts commit d3bce1d0304bd318f1d22412a910f8b428c9b9fb.

+480 -150
+5
js/dev-env/README.md
··· 1 + # dev-env 2 + 3 + Borrowed directly from 4 + [atcute](https://github.com/mary-ext/atcute/tree/trunk/packages/internal/dev-env), 5 + MIT licensed. Used primarily as a CI environment.
+2
js/dev-env/lib/constants.ts
··· 1 + export const ADMIN_PASSWORD = "admin-pass"; 2 + export const JWT_SECRET = "jwt-secret";
+5
js/dev-env/lib/index.ts
··· 1 + export * from "./constants.js"; 2 + export * from "./network.js"; 3 + export * from "./pds.js"; 4 + export * from "./plc.js"; 5 + export * from "./utils.js";
+32
js/dev-env/lib/network.ts
··· 1 + import { TestPdsServer, type PdsServerOptions } from "./pds.js"; 2 + import { TestPlcServer, type PlcServerOptions } from "./plc.js"; 3 + import { mockNetworkUtilities } from "./utils.js"; 4 + 5 + export type NetworkConfig = { 6 + pds: Partial<PdsServerOptions>; 7 + plc: Partial<PlcServerOptions>; 8 + }; 9 + 10 + export class TestNetwork { 11 + constructor( 12 + public readonly plc: TestPlcServer, 13 + public readonly pds: TestPdsServer, 14 + ) {} 15 + 16 + static async create(cfg: Partial<NetworkConfig>): Promise<TestNetwork> { 17 + const plc = await TestPlcServer.create(cfg.plc ?? {}); 18 + const pds = await TestPdsServer.create({ didPlcUrl: plc.url, ...cfg.pds }); 19 + 20 + mockNetworkUtilities(pds); 21 + 22 + return new TestNetwork(plc, pds); 23 + } 24 + 25 + async processAll() { 26 + await this.pds.processAll(); 27 + } 28 + 29 + async close() { 30 + await Promise.all([this.plc.close(), this.pds.close()]); 31 + } 32 + }
+122
js/dev-env/lib/pds.ts
··· 1 + import fs from "node:fs/promises"; 2 + import os from "node:os"; 3 + import path from "node:path"; 4 + 5 + import { Secp256k1Keypair, randomStr } from "@atproto/crypto"; 6 + import * as pds from "@atproto/pds"; 7 + 8 + import getPort from "get-port"; 9 + import * as ui8 from "uint8arrays"; 10 + 11 + import { ADMIN_PASSWORD, JWT_SECRET } from "./constants.js"; 12 + 13 + export interface PdsServerOptions extends Partial<pds.ServerEnvironment> { 14 + didPlcUrl: string; 15 + } 16 + 17 + export interface AdditionalPdsContext { 18 + dataDirectory: string; 19 + blobstoreLoc: string; 20 + } 21 + 22 + export class TestPdsServer { 23 + constructor( 24 + public readonly server: pds.PDS, 25 + public readonly url: string, 26 + public readonly port: number, 27 + public readonly additional: AdditionalPdsContext, 28 + ) {} 29 + 30 + static async create(config: PdsServerOptions): Promise<TestPdsServer> { 31 + const plcRotationKey = await Secp256k1Keypair.create({ exportable: true }); 32 + const plcRotationPriv = ui8.toString(await plcRotationKey.export(), "hex"); 33 + const recoveryKey = (await Secp256k1Keypair.create()).did(); 34 + 35 + const port = config.port || (await getPort()); 36 + const url = `http://localhost:${port}`; 37 + 38 + const blobstoreLoc = path.join(os.tmpdir(), randomStr(8, "base32")); 39 + const dataDirectory = path.join(os.tmpdir(), randomStr(8, "base32")); 40 + 41 + await fs.mkdir(dataDirectory, { recursive: true }); 42 + 43 + const env: pds.ServerEnvironment = { 44 + devMode: true, 45 + port, 46 + dataDirectory: dataDirectory, 47 + blobstoreDiskLocation: blobstoreLoc, 48 + recoveryDidKey: recoveryKey, 49 + adminPassword: ADMIN_PASSWORD, 50 + jwtSecret: JWT_SECRET, 51 + serviceHandleDomains: [".test"], 52 + bskyAppViewUrl: "https://appview.invalid", 53 + bskyAppViewDid: "did:example:invalid", 54 + bskyAppViewCdnUrlPattern: "http://cdn.appview.com/%s/%s/%s", 55 + modServiceUrl: "https://moderator.invalid", 56 + modServiceDid: "did:example:invalid", 57 + plcRotationKeyK256PrivateKeyHex: plcRotationPriv, 58 + inviteRequired: false, 59 + disableSsrfProtection: true, 60 + serviceName: "Development PDS", 61 + // brandColor: "#ffcb1e", 62 + errorColor: undefined, 63 + logoUrl: 64 + "https://uxwing.com/wp-content/themes/uxwing/download/animals-and-birds/bee-icon.png", 65 + homeUrl: "https://bsky.social/", 66 + termsOfServiceUrl: "https://bsky.social/about/support/tos", 67 + privacyPolicyUrl: "https://bsky.social/about/support/privacy-policy", 68 + supportUrl: "https://blueskyweb.zendesk.com/hc/en-us", 69 + ...config, 70 + }; 71 + 72 + const cfg = pds.envToCfg(env); 73 + const secrets = pds.envToSecrets(env); 74 + 75 + const server = await pds.PDS.create(cfg, secrets); 76 + 77 + await server.start(); 78 + 79 + return new TestPdsServer(server, url, port, { 80 + dataDirectory: dataDirectory, 81 + blobstoreLoc: blobstoreLoc, 82 + }); 83 + } 84 + 85 + get ctx(): pds.AppContext { 86 + return this.server.ctx; 87 + } 88 + 89 + adminAuth(): string { 90 + return ( 91 + "Basic " + 92 + ui8.toString( 93 + ui8.fromString(`admin:${ADMIN_PASSWORD}`, "utf8"), 94 + "base64pad", 95 + ) 96 + ); 97 + } 98 + 99 + adminAuthHeaders() { 100 + return { 101 + authorization: this.adminAuth(), 102 + }; 103 + } 104 + 105 + jwtSecretKey() { 106 + return pds.createSecretKeyObject(JWT_SECRET); 107 + } 108 + 109 + async processAll() { 110 + await this.ctx.backgroundQueue.processAll(); 111 + } 112 + 113 + async close() { 114 + await this.server.destroy(); 115 + 116 + await fs.rm(this.additional.dataDirectory, { 117 + recursive: true, 118 + force: true, 119 + }); 120 + await fs.rm(this.additional.blobstoreLoc, { force: true }); 121 + } 122 + }
+35
js/dev-env/lib/plc.ts
··· 1 + import { AppContext, Database, PlcServer } from "@did-plc/server"; 2 + 3 + import getPort from "get-port"; 4 + 5 + export interface PlcServerOptions { 6 + port?: number; 7 + } 8 + 9 + export class TestPlcServer { 10 + constructor( 11 + public readonly server: PlcServer, 12 + public readonly url: string, 13 + public readonly port: number, 14 + ) {} 15 + 16 + static async create(cfg: PlcServerOptions = {}): Promise<TestPlcServer> { 17 + const port = cfg.port ?? (await getPort()); 18 + const url = `http://localhost:${port}`; 19 + 20 + const db = Database.mock(); 21 + const server = PlcServer.create({ db, port }); 22 + 23 + await server.start(); 24 + 25 + return new TestPlcServer(server, url, port); 26 + } 27 + 28 + get context(): AppContext { 29 + return this.server.ctx; 30 + } 31 + 32 + async close() { 33 + await this.server.destroy(); 34 + } 35 + }
+49
js/dev-env/lib/utils.ts
··· 1 + import type { IdResolver } from "@atproto/identity"; 2 + 3 + import axios from "axios"; 4 + 5 + import type { TestPdsServer } from "./pds.js"; 6 + 7 + export const mockNetworkUtilities = (pds: TestPdsServer) => { 8 + mockResolvers(pds.ctx.idResolver, pds); 9 + }; 10 + 11 + export const mockResolvers = (idResolver: IdResolver, pds: TestPdsServer) => { 12 + // Map pds public url to its local url when resolving from plc 13 + const origResolveDid = idResolver.did.resolveNoCache; 14 + idResolver.did.resolveNoCache = async (did: string) => { 15 + const result = await (origResolveDid.call( 16 + idResolver.did, 17 + did, 18 + ) as ReturnType<typeof origResolveDid>); 19 + const service = result?.service?.find((svc) => svc.id === "#atproto_pds"); 20 + 21 + if (typeof service?.serviceEndpoint === "string") { 22 + service.serviceEndpoint = service.serviceEndpoint.replace( 23 + pds.ctx.cfg.service.publicUrl, 24 + `http://localhost:${pds.port}`, 25 + ); 26 + } 27 + 28 + return result; 29 + }; 30 + 31 + const origResolveHandleDns = idResolver.handle.resolveDns; 32 + idResolver.handle.resolve = async (handle: string) => { 33 + const isPdsHandle = pds.ctx.cfg.identity.serviceHandleDomains.some( 34 + (domain) => handle.endsWith(domain), 35 + ); 36 + 37 + if (!isPdsHandle) { 38 + return origResolveHandleDns.call(idResolver.handle, handle); 39 + } 40 + 41 + const url = `${pds.url}/.well-known/atproto-did`; 42 + try { 43 + const res = await axios.get(url, { headers: { host: handle } }); 44 + return res.data; 45 + } catch (err) { 46 + return undefined; 47 + } 48 + }; 49 + };
+23
js/dev-env/package.json
··· 1 + { 2 + "private": true, 3 + "type": "module", 4 + "name": "@atcute/internal-dev-env", 5 + "version": "0.7.21", 6 + "dependencies": { 7 + "@atproto/crypto": "^0.4.3", 8 + "@atproto/identity": "^0.4.5", 9 + "@atproto/pds": "^0.4.90", 10 + "@did-plc/server": "^0.0.1", 11 + "axios": "^1.7.9", 12 + "better-sqlite3": "10.1.0", 13 + "get-port": "^7.1.0", 14 + "typescript": "^5.8.2", 15 + "uint8arrays": "^5.1.0" 16 + }, 17 + "exports": { 18 + ".": "./dist/index.js" 19 + }, 20 + "scripts": { 21 + "prepare": "tsc" 22 + } 23 + }
+8
js/dev-env/run.mjs
··· 1 + import { TestNetwork } from "./dist/index.js"; 2 + 3 + (async () => { 4 + const network = await TestNetwork.create({}); 5 + console.log( 6 + JSON.stringify({ "pds-url": network.pds.url, "plc-url": network.plc.url }), 7 + ); 8 + })();
+6
js/dev-env/run.ts
··· 1 + import { TestNetwork } from "./lib"; 2 + 3 + (async () => { 4 + const network = await TestNetwork.create({}); 5 + console.log("hi"); 6 + })();
+22
js/dev-env/tsconfig.json
··· 1 + { 2 + "compilerOptions": { 3 + "outDir": "dist/", 4 + "esModuleInterop": true, 5 + "skipLibCheck": true, 6 + "target": "ESNext", 7 + "allowJs": true, 8 + "resolveJsonModule": true, 9 + "moduleDetection": "force", 10 + "isolatedModules": true, 11 + "verbatimModuleSyntax": true, 12 + "strict": true, 13 + "noImplicitOverride": true, 14 + "noUnusedLocals": true, 15 + "noUnusedParameters": true, 16 + "noFallthroughCasesInSwitch": true, 17 + "module": "NodeNext", 18 + "sourceMap": true, 19 + "declaration": true 20 + }, 21 + "include": ["lib"] 22 + }
+171 -150
pkg/atproto/chat_message_test.go
··· 1 1 package atproto 2 2 3 - // func TestChatMessage(t *testing.T) { 4 - // dev := devenv.WithDevEnv(t) 5 - // t.Logf("dev: %+v", dev) 6 - // cli := config.CLI{ 7 - // PublicHost: "example.com", 8 - // DBURL: ":memory:", 9 - // RelayHost: strings.ReplaceAll(dev.PDSURL, "http://", "ws://"), 10 - // PLCURL: dev.PLCURL, 11 - // } 12 - // t.Logf("cli: %+v", cli) 13 - // b := bus.NewBus() 14 - // cli.DataDir = t.TempDir() 15 - // mod, err := model.MakeDB(":memory:") 16 - // require.NoError(t, err) 17 - // state, err := statedb.MakeDB(&cli, nil, mod) 18 - // require.NoError(t, err) 19 - // atsync := &ATProtoSynchronizer{ 20 - // CLI: &cli, 21 - // StatefulDB: state, 22 - // Model: mod, 23 - // Bus: b, 24 - // } 3 + import ( 4 + "context" 5 + "fmt" 6 + "slices" 7 + "strings" 8 + "testing" 9 + "time" 10 + 11 + comatproto "github.com/bluesky-social/indigo/api/atproto" 12 + lexutil "github.com/bluesky-social/indigo/lex/util" 13 + "github.com/bluesky-social/indigo/util" 14 + "github.com/cenkalti/backoff" 15 + "github.com/stretchr/testify/require" 16 + "stream.place/streamplace/pkg/bus" 17 + "stream.place/streamplace/pkg/config" 18 + "stream.place/streamplace/pkg/devenv" 19 + "stream.place/streamplace/pkg/model" 20 + "stream.place/streamplace/pkg/statedb" 21 + "stream.place/streamplace/pkg/streamplace" 22 + ) 23 + 24 + func TestChatMessage(t *testing.T) { 25 + dev := devenv.WithDevEnv(t) 26 + t.Logf("dev: %+v", dev) 27 + cli := config.CLI{ 28 + PublicHost: "example.com", 29 + DBURL: ":memory:", 30 + RelayHost: strings.ReplaceAll(dev.PDSURL, "http://", "ws://"), 31 + PLCURL: dev.PLCURL, 32 + } 33 + t.Logf("cli: %+v", cli) 34 + b := bus.NewBus() 35 + cli.DataDir = t.TempDir() 36 + mod, err := model.MakeDB(":memory:") 37 + require.NoError(t, err) 38 + state, err := statedb.MakeDB(&cli, nil, mod) 39 + require.NoError(t, err) 40 + atsync := &ATProtoSynchronizer{ 41 + CLI: &cli, 42 + StatefulDB: state, 43 + Model: mod, 44 + Bus: b, 45 + } 25 46 26 - // ctx, cancel := context.WithCancel(context.Background()) 47 + ctx, cancel := context.WithCancel(context.Background()) 27 48 28 - // done := make(chan struct{}) 49 + done := make(chan struct{}) 29 50 30 - // go func() { 31 - // err := atsync.StartFirehose(ctx) 32 - // require.NoError(t, err) 33 - // close(done) 34 - // }() 51 + go func() { 52 + err := atsync.StartFirehose(ctx) 53 + require.NoError(t, err) 54 + close(done) 55 + }() 35 56 36 - // user := dev.CreateAccount(t) 37 - // user2 := dev.CreateAccount(t) 57 + user := dev.CreateAccount(t) 58 + user2 := dev.CreateAccount(t) 38 59 39 - // ch := b.Subscribe(user.DID) 40 - // defer b.Unsubscribe(user.DID, ch) 60 + ch := b.Subscribe(user.DID) 61 + defer b.Unsubscribe(user.DID, ch) 41 62 42 - // busMessages := []bus.Message{} 43 - // go func() { 44 - // for msg := range ch { 45 - // t.Logf("message: %+v", msg) 46 - // busMessages = append(busMessages, msg) 47 - // } 48 - // }() 63 + busMessages := []bus.Message{} 64 + go func() { 65 + for msg := range ch { 66 + t.Logf("message: %+v", msg) 67 + busMessages = append(busMessages, msg) 68 + } 69 + }() 49 70 50 - // msg := &streamplace.ChatMessage{ 51 - // LexiconTypeID: "place.stream.chat.message", 52 - // Text: "Hello, world!", 53 - // CreatedAt: time.Now().Add(-time.Second).Format(util.ISO8601), 54 - // Streamer: user.DID, 55 - // } 71 + msg := &streamplace.ChatMessage{ 72 + LexiconTypeID: "place.stream.chat.message", 73 + Text: "Hello, world!", 74 + CreatedAt: time.Now().Add(-time.Second).Format(util.ISO8601), 75 + Streamer: user.DID, 76 + } 56 77 57 - // rec1, err := comatproto.RepoCreateRecord(ctx, user.XRPC, &comatproto.RepoCreateRecord_Input{ 58 - // Collection: "place.stream.chat.message", 59 - // Repo: user.DID, 60 - // Record: &lexutil.LexiconTypeDecoder{Val: msg}, 61 - // }) 62 - // require.NoError(t, err) 78 + rec1, err := comatproto.RepoCreateRecord(ctx, user.XRPC, &comatproto.RepoCreateRecord_Input{ 79 + Collection: "place.stream.chat.message", 80 + Repo: user.DID, 81 + Record: &lexutil.LexiconTypeDecoder{Val: msg}, 82 + }) 83 + require.NoError(t, err) 63 84 64 - // msg2 := &streamplace.ChatMessage{ 65 - // LexiconTypeID: "place.stream.chat.message", 66 - // Text: "Hello, world 2!", 67 - // CreatedAt: time.Now().Format(util.ISO8601), 68 - // Streamer: user.DID, 69 - // } 85 + msg2 := &streamplace.ChatMessage{ 86 + LexiconTypeID: "place.stream.chat.message", 87 + Text: "Hello, world 2!", 88 + CreatedAt: time.Now().Format(util.ISO8601), 89 + Streamer: user.DID, 90 + } 70 91 71 - // _, err = comatproto.RepoCreateRecord(ctx, user2.XRPC, &comatproto.RepoCreateRecord_Input{ 72 - // Collection: "place.stream.chat.message", 73 - // Repo: user2.DID, 74 - // Record: &lexutil.LexiconTypeDecoder{Val: msg2}, 75 - // }) 76 - // require.NoError(t, err) 92 + _, err = comatproto.RepoCreateRecord(ctx, user2.XRPC, &comatproto.RepoCreateRecord_Input{ 93 + Collection: "place.stream.chat.message", 94 + Repo: user2.DID, 95 + Record: &lexutil.LexiconTypeDecoder{Val: msg2}, 96 + }) 97 + require.NoError(t, err) 77 98 78 - // messages := []*streamplace.ChatDefs_MessageView{} 79 - // err = untilNoErrors(t, func() error { 80 - // messages, err = mod.MostRecentChatMessages(user.DID) 81 - // if err != nil { 82 - // return err 83 - // } 84 - // if len(messages) != 2 { 85 - // return fmt.Errorf("expected 2 messages, got %d", len(messages)) 86 - // } 87 - // if len(busMessages) != 2 { 88 - // return fmt.Errorf("expected 2 bus messages, got %d", len(busMessages)) 89 - // } 90 - // return nil 91 - // }) 92 - // // Reverse the messages slice to match expected order (most recent first) 93 - // slices.SortFunc(messages, func(a, b *streamplace.ChatDefs_MessageView) int { 94 - // aTime := a.Record.Val.(*streamplace.ChatMessage).CreatedAt 95 - // bTime := b.Record.Val.(*streamplace.ChatMessage).CreatedAt 96 - // if aTime < bTime { 97 - // return -1 98 - // } else if aTime > bTime { 99 - // return 1 100 - // } 101 - // return 0 102 - // }) 103 - // require.Equal(t, msg.Text, messages[0].Record.Val.(*streamplace.ChatMessage).Text) 104 - // require.Equal(t, msg2.Text, messages[1].Record.Val.(*streamplace.ChatMessage).Text) 105 - // busMessage1 := busMessages[0].(*streamplace.ChatDefs_MessageView) 106 - // busMessage2 := busMessages[1].(*streamplace.ChatDefs_MessageView) 107 - // require.Equal(t, msg.Text, busMessage1.Record.Val.(*streamplace.ChatMessage).Text) 108 - // require.Equal(t, msg2.Text, busMessage2.Record.Val.(*streamplace.ChatMessage).Text) 99 + messages := []*streamplace.ChatDefs_MessageView{} 100 + err = untilNoErrors(t, func() error { 101 + messages, err = mod.MostRecentChatMessages(user.DID) 102 + if err != nil { 103 + return err 104 + } 105 + if len(messages) != 2 { 106 + return fmt.Errorf("expected 2 messages, got %d", len(messages)) 107 + } 108 + if len(busMessages) != 2 { 109 + return fmt.Errorf("expected 2 bus messages, got %d", len(busMessages)) 110 + } 111 + return nil 112 + }) 113 + // Reverse the messages slice to match expected order (most recent first) 114 + slices.SortFunc(messages, func(a, b *streamplace.ChatDefs_MessageView) int { 115 + aTime := a.Record.Val.(*streamplace.ChatMessage).CreatedAt 116 + bTime := b.Record.Val.(*streamplace.ChatMessage).CreatedAt 117 + if aTime < bTime { 118 + return -1 119 + } else if aTime > bTime { 120 + return 1 121 + } 122 + return 0 123 + }) 124 + require.Equal(t, msg.Text, messages[0].Record.Val.(*streamplace.ChatMessage).Text) 125 + require.Equal(t, msg2.Text, messages[1].Record.Val.(*streamplace.ChatMessage).Text) 126 + busMessage1 := busMessages[0].(*streamplace.ChatDefs_MessageView) 127 + busMessage2 := busMessages[1].(*streamplace.ChatDefs_MessageView) 128 + require.Equal(t, msg.Text, busMessage1.Record.Val.(*streamplace.ChatMessage).Text) 129 + require.Equal(t, msg2.Text, busMessage2.Record.Val.(*streamplace.ChatMessage).Text) 109 130 110 - // rkey := strings.TrimPrefix(rec1.Uri, fmt.Sprintf("at://%s/place.stream.chat.message/", user.DID)) 131 + rkey := strings.TrimPrefix(rec1.Uri, fmt.Sprintf("at://%s/place.stream.chat.message/", user.DID)) 111 132 112 - // _, err = comatproto.RepoDeleteRecord(ctx, user.XRPC, &comatproto.RepoDeleteRecord_Input{ 113 - // Collection: "place.stream.chat.message", 114 - // Repo: user.DID, 115 - // Rkey: rkey, 116 - // }) 133 + _, err = comatproto.RepoDeleteRecord(ctx, user.XRPC, &comatproto.RepoDeleteRecord_Input{ 134 + Collection: "place.stream.chat.message", 135 + Repo: user.DID, 136 + Rkey: rkey, 137 + }) 117 138 118 - // require.NoError(t, err) 139 + require.NoError(t, err) 119 140 120 - // err = untilNoErrors(t, func() error { 121 - // messages, err = mod.MostRecentChatMessages(user.DID) 122 - // if err != nil { 123 - // return err 124 - // } 125 - // if len(messages) != 1 { 126 - // return fmt.Errorf("expected 1 message, got %d", len(messages)) 127 - // } 128 - // if len(busMessages) != 3 { 129 - // return fmt.Errorf("expected 3 bus messages, got %d", len(busMessages)) 130 - // } 131 - // return nil 132 - // }) 133 - // require.NoError(t, err) 134 - // require.Equal(t, msg2.Text, messages[0].Record.Val.(*streamplace.ChatMessage).Text) 135 - // busMessage3 := busMessages[2].(*streamplace.ChatDefs_MessageView) 136 - // require.Equal(t, true, *busMessage3.Deleted) 141 + err = untilNoErrors(t, func() error { 142 + messages, err = mod.MostRecentChatMessages(user.DID) 143 + if err != nil { 144 + return err 145 + } 146 + if len(messages) != 1 { 147 + return fmt.Errorf("expected 1 message, got %d", len(messages)) 148 + } 149 + if len(busMessages) != 3 { 150 + return fmt.Errorf("expected 3 bus messages, got %d", len(busMessages)) 151 + } 152 + return nil 153 + }) 154 + require.NoError(t, err) 155 + require.Equal(t, msg2.Text, messages[0].Record.Val.(*streamplace.ChatMessage).Text) 156 + busMessage3 := busMessages[2].(*streamplace.ChatDefs_MessageView) 157 + require.Equal(t, true, *busMessage3.Deleted) 137 158 138 - // cancel() 139 - // <-done 140 - // } 159 + cancel() 160 + <-done 161 + } 141 162 142 - // func untilNoErrors(t *testing.T, f func() error) error { 143 - // ticker := backoff.NewTicker(NewExponentialBackOff()) 144 - // defer ticker.Stop() 145 - // var err error 146 - // for i := 0; i < 10; i++ { 147 - // err = f() 148 - // if err == nil { 149 - // return err 150 - // } 151 - // if i < 9 { 152 - // <-ticker.C 153 - // } 154 - // } 155 - // return err 156 - // } 163 + func untilNoErrors(t *testing.T, f func() error) error { 164 + ticker := backoff.NewTicker(NewExponentialBackOff()) 165 + defer ticker.Stop() 166 + var err error 167 + for i := 0; i < 10; i++ { 168 + err = f() 169 + if err == nil { 170 + return err 171 + } 172 + if i < 9 { 173 + <-ticker.C 174 + } 175 + } 176 + return err 177 + } 157 178 158 - // // More aggressive backoff for tests 159 - // func NewExponentialBackOff() *backoff.ExponentialBackOff { 160 - // b := &backoff.ExponentialBackOff{ 161 - // InitialInterval: 100 * time.Millisecond, 162 - // RandomizationFactor: backoff.DefaultRandomizationFactor, 163 - // Multiplier: backoff.DefaultMultiplier, 164 - // MaxInterval: 2 * time.Second, 165 - // MaxElapsedTime: 10 * time.Second, 166 - // Clock: backoff.SystemClock, 167 - // } 168 - // b.Reset() 169 - // return b 170 - // } 179 + // More aggressive backoff for tests 180 + func NewExponentialBackOff() *backoff.ExponentialBackOff { 181 + b := &backoff.ExponentialBackOff{ 182 + InitialInterval: 100 * time.Millisecond, 183 + RandomizationFactor: backoff.DefaultRandomizationFactor, 184 + Multiplier: backoff.DefaultMultiplier, 185 + MaxInterval: 2 * time.Second, 186 + MaxElapsedTime: 10 * time.Second, 187 + Clock: backoff.SystemClock, 188 + } 189 + b.Reset() 190 + return b 191 + }