Fix unbounded memory use growth during snapshot application
Not sure what I was thinking when I moved the writes to a goroutine. I suppose I assumed the pipe would be read at roughly the same rate it would be written, which is obviously not true.
···665665 a.importerWg.Go(a.streamingImporter)
666666 }
667667668668- isLastChunk := chunkIndex == len(a.expectedChunkHashes)-1
669669- go func(b []byte) {
670670- // From the docs:
671671- // It is safe to call Read and Write in parallel with each other or with Close.
672672- // Parallel calls to Read and parallel calls to Write are also safe:
673673- // the individual calls will be gated sequentially.
674674-675675- // so even if not everything gets written from this chunk (e.g. because the zstd decoder decided not to advance)
676676- // it'll eventually be written, in the correct order
677677- _, _ = a.pipeWriter.Write(b)
678678- if isLastChunk {
679679- _ = a.pipeWriter.Close()
680680- }
681681- }(chunkBytes)
668668+ _, err := a.pipeWriter.Write(chunkBytes)
669669+ if err != nil {
670670+ return stacktrace.Propagate(err)
671671+ }
682672673673+ isLastChunk := chunkIndex == len(a.expectedChunkHashes)-1
683674 if isLastChunk {
675675+ _ = a.pipeWriter.Close()
684676 // wait for importer to finish reading and importing everything
685677 a.importerWg.Wait()
686678