A privacy-first, self-hosted, fully open source personal knowledge management software, written in typescript and golang. (PERSONAL FORK)
1// SiYuan - Refactor your thinking
2// Copyright (c) 2020-present, b3log.org
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Affero General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17package model
18
19import (
20 "bytes"
21 "crypto/rand"
22 "crypto/sha1"
23 "crypto/sha256"
24 "encoding/base64"
25 "errors"
26 "fmt"
27 "math"
28 mathRand "math/rand"
29 "mime"
30 "net/http"
31 "os"
32 "path"
33 "path/filepath"
34 "sort"
35 "strings"
36 "sync"
37 "sync/atomic"
38 "time"
39
40 "github.com/88250/go-humanize"
41 "github.com/88250/gulu"
42 "github.com/88250/lute"
43 "github.com/88250/lute/ast"
44 "github.com/88250/lute/html"
45 "github.com/88250/lute/parse"
46 "github.com/88250/lute/render"
47 "github.com/emirpasic/gods/sets/hashset"
48 "github.com/siyuan-note/dataparser"
49 "github.com/siyuan-note/dejavu"
50 "github.com/siyuan-note/dejavu/cloud"
51 "github.com/siyuan-note/dejavu/entity"
52 "github.com/siyuan-note/encryption"
53 "github.com/siyuan-note/eventbus"
54 "github.com/siyuan-note/httpclient"
55 "github.com/siyuan-note/logging"
56 "github.com/siyuan-note/siyuan/kernel/conf"
57 "github.com/siyuan-note/siyuan/kernel/task"
58 "github.com/siyuan-note/siyuan/kernel/treenode"
59 "github.com/siyuan-note/siyuan/kernel/util"
60 "github.com/studio-b12/gowebdav"
61)
62
63// AutoPurgeRepoJob 自动清理数据仓库 https://github.com/siyuan-note/siyuan/issues/13091
64func AutoPurgeRepoJob() {
65 task.AppendTaskWithTimeout(task.RepoAutoPurge, 12*time.Hour, autoPurgeRepo, true)
66}
67
68var (
69 autoPurgeRepoAfterFirstSync = false
70 lastAutoPurgeRepo = time.Time{}
71)
72
73func autoPurgeRepo(cron bool) {
74 if cron && !autoPurgeRepoAfterFirstSync {
75 return
76 }
77 if time.Since(lastAutoPurgeRepo) < 6*time.Hour {
78 return
79 }
80
81 autoPurgeRepoAfterFirstSync = true
82 defer func() {
83 lastAutoPurgeRepo = time.Now()
84 }()
85
86 if 1 > len(Conf.Repo.Key) {
87 return
88 }
89
90 repo, err := newRepository()
91 if err != nil {
92 return
93 }
94
95 now := time.Now()
96
97 dateGroupedIndexes := map[string][]*entity.Index{} // 按照日期分组
98 // 收集指定日期内需要保留的索引
99 var date string
100 page := 1
101 for {
102 indexes, pageCount, _, err := repo.GetIndexes(page, 512)
103 if nil != err {
104 logging.LogErrorf("get data repo index logs failed: %s", err)
105 return
106 }
107 if 1 > len(indexes) {
108 break
109 }
110
111 tooOld := false
112 for _, index := range indexes {
113 if now.UnixMilli()-index.Created <= int64(Conf.Repo.IndexRetentionDays)*24*60*60*1000 {
114 date = time.UnixMilli(index.Created).Format("2006-01-02")
115 if _, ok := dateGroupedIndexes[date]; !ok {
116 dateGroupedIndexes[date] = []*entity.Index{}
117 }
118 dateGroupedIndexes[date] = append(dateGroupedIndexes[date], index)
119 } else {
120 tooOld = true
121 break
122 }
123 }
124 if tooOld {
125 break
126 }
127 page++
128 if page > pageCount {
129 break
130 }
131 }
132
133 todayDate := now.Format("2006-01-02")
134 // 筛选出每日需要保留的索引
135 var retentionIndexIDs []string
136 for date, indexes := range dateGroupedIndexes {
137 if len(indexes) <= Conf.Repo.RetentionIndexesDaily || todayDate == date {
138 for _, index := range indexes {
139 retentionIndexIDs = append(retentionIndexIDs, index.ID)
140 }
141 continue
142 }
143
144 keepIndexes := hashset.New()
145 keepIndexes.Add(indexes[0]) // 每天最后一个固定保留
146 // 随机保留指定数量的索引
147 for i := 0; i < Conf.Repo.RetentionIndexesDaily*7; i++ {
148 keepIndexes.Add(indexes[mathRand.Intn(len(indexes)-1)])
149 if keepIndexes.Size() >= Conf.Repo.RetentionIndexesDaily {
150 break
151 }
152 }
153
154 for _, keepIndex := range keepIndexes.Values() {
155 retentionIndexIDs = append(retentionIndexIDs, keepIndex.(*entity.Index).ID)
156 }
157 }
158
159 retentionIndexIDs = gulu.Str.RemoveDuplicatedElem(retentionIndexIDs)
160 if 3 > len(retentionIndexIDs) {
161 logging.LogInfof("no index to purge [ellapsed=%.2fs]", time.Since(now).Seconds())
162 return
163 }
164
165 _, err = repo.Purge(retentionIndexIDs...)
166}
167
168func GetRepoFile(fileID string) (ret []byte, p string, err error) {
169 if 1 > len(Conf.Repo.Key) {
170 err = errors.New(Conf.Language(26))
171 return
172 }
173
174 repo, err := newRepository()
175 if err != nil {
176 return
177 }
178
179 file, err := repo.GetFile(fileID)
180 if err != nil {
181 return
182 }
183
184 ret, err = repo.OpenFile(file)
185 p = file.Path
186 return
187}
188
189func OpenRepoSnapshotDoc(fileID string) (title, content string, displayInText bool, updated int64, err error) {
190 if 1 > len(Conf.Repo.Key) {
191 err = errors.New(Conf.Language(26))
192 return
193 }
194
195 repo, err := newRepository()
196 if err != nil {
197 return
198 }
199
200 file, err := repo.GetFile(fileID)
201 if err != nil {
202 return
203 }
204
205 data, err := repo.OpenFile(file)
206 if err != nil {
207 return
208 }
209
210 updated = file.Updated
211
212 if strings.HasSuffix(file.Path, ".sy") {
213 luteEngine := NewLute()
214 var snapshotTree *parse.Tree
215 displayInText, snapshotTree, err = parseTreeInSnapshot(data, luteEngine)
216 if err != nil {
217 logging.LogErrorf("parse tree from snapshot file [%s] failed", fileID)
218 return
219 }
220 title = snapshotTree.Root.IALAttr("title")
221
222 if !displayInText {
223 renderTree := &parse.Tree{Root: &ast.Node{Type: ast.NodeDocument}}
224 var unlinks []*ast.Node
225 ast.Walk(snapshotTree.Root, func(n *ast.Node, entering bool) ast.WalkStatus {
226 if !entering {
227 return ast.WalkContinue
228 }
229
230 n.RemoveIALAttr("heading-fold")
231 n.RemoveIALAttr("fold")
232 return ast.WalkContinue
233 })
234
235 for _, unlink := range unlinks {
236 unlink.Unlink()
237 }
238
239 var appends []*ast.Node
240 for n := snapshotTree.Root.FirstChild; nil != n; n = n.Next {
241 appends = append(appends, n)
242 }
243 for _, n := range appends {
244 renderTree.Root.AppendChild(n)
245 }
246
247 snapshotTree = renderTree
248 }
249
250 luteEngine.RenderOptions.ProtyleContenteditable = false
251 if displayInText {
252 util.PushMsg(Conf.Language(36), 5000)
253 formatRenderer := render.NewFormatRenderer(snapshotTree, luteEngine.RenderOptions)
254 content = gulu.Str.FromBytes(formatRenderer.Render())
255 } else {
256 content = luteEngine.Tree2BlockDOM(snapshotTree, luteEngine.RenderOptions)
257 }
258 } else {
259 displayInText = true
260 title = file.Path
261 if mimeType := mime.TypeByExtension(filepath.Ext(file.Path)); strings.HasPrefix(mimeType, "text/") || strings.Contains(mimeType, "json") {
262 // 如果是文本文件,直接返回文本内容
263 // All plain text formats are supported when comparing data snapshots https://github.com/siyuan-note/siyuan/issues/12975
264 content = gulu.Str.FromBytes(data)
265 } else {
266 if strings.Contains(file.Path, "assets/") { // 剔除笔记本级或者文档级资源文件路径前缀
267 file.Path = file.Path[strings.Index(file.Path, "assets/"):]
268 if util.IsDisplayableAsset(file.Path) {
269 dir, f := filepath.Split(file.Path)
270 tempRepoDiffDir := filepath.Join(util.TempDir, "repo", "diff", dir)
271 if mkErr := os.MkdirAll(tempRepoDiffDir, 0755); nil != mkErr {
272 logging.LogErrorf("mkdir [%s] failed: %v", tempRepoDiffDir, mkErr)
273 } else {
274 if wrErr := os.WriteFile(filepath.Join(tempRepoDiffDir, f), data, 0644); nil != wrErr {
275 logging.LogErrorf("write file [%s] failed: %v", filepath.Join(tempRepoDiffDir, file.Path), wrErr)
276 }
277 }
278 content = path.Join("repo", "diff", file.Path)
279 }
280 } else {
281 content = file.Path
282 }
283 }
284 }
285 return
286}
287
288type LeftRightDiff struct {
289 LeftIndex *DiffIndex `json:"leftIndex"`
290 RightIndex *DiffIndex `json:"rightIndex"`
291 AddsLeft []*DiffFile `json:"addsLeft"`
292 UpdatesLeft []*DiffFile `json:"updatesLeft"`
293 UpdatesRight []*DiffFile `json:"updatesRight"`
294 RemovesRight []*DiffFile `json:"removesRight"`
295}
296
297type DiffFile struct {
298 FileID string `json:"fileID"`
299 Title string `json:"title"`
300 Path string `json:"path"`
301 HSize string `json:"hSize"`
302 Updated int64 `json:"updated"`
303}
304
305type DiffIndex struct {
306 ID string `json:"id"`
307 Created int64 `json:"created"`
308}
309
310func DiffRepoSnapshots(left, right string) (ret *LeftRightDiff, err error) {
311 if 1 > len(Conf.Repo.Key) {
312 err = errors.New(Conf.Language(26))
313 return
314 }
315
316 repo, err := newRepository()
317 if err != nil {
318 return
319 }
320
321 diff, err := repo.DiffIndex(left, right)
322 if err != nil {
323 return
324 }
325
326 ret = &LeftRightDiff{
327 LeftIndex: &DiffIndex{
328 ID: diff.LeftIndex.ID,
329 Created: diff.LeftIndex.Created,
330 },
331 RightIndex: &DiffIndex{
332 ID: diff.RightIndex.ID,
333 Created: diff.RightIndex.Created,
334 },
335 }
336 luteEngine := NewLute()
337 for _, removeRight := range diff.RemovesRight {
338 title, parseErr := parseTitleInSnapshot(removeRight.ID, repo, luteEngine)
339 if "" == title || nil != parseErr {
340 continue
341 }
342
343 ret.AddsLeft = append(ret.AddsLeft, &DiffFile{
344 FileID: removeRight.ID,
345 Title: title,
346 Path: removeRight.Path,
347 HSize: humanize.BytesCustomCeil(uint64(removeRight.Size), 2),
348 Updated: removeRight.Updated,
349 })
350 }
351 if 1 > len(ret.AddsLeft) {
352 ret.AddsLeft = []*DiffFile{}
353 }
354
355 for _, addLeft := range diff.AddsLeft {
356 title, parseErr := parseTitleInSnapshot(addLeft.ID, repo, luteEngine)
357 if "" == title || nil != parseErr {
358 continue
359 }
360
361 ret.RemovesRight = append(ret.RemovesRight, &DiffFile{
362 FileID: addLeft.ID,
363 Title: title,
364 Path: addLeft.Path,
365 HSize: humanize.BytesCustomCeil(uint64(addLeft.Size), 2),
366 Updated: addLeft.Updated,
367 })
368 }
369 if 1 > len(ret.RemovesRight) {
370 ret.RemovesRight = []*DiffFile{}
371 }
372
373 for _, updateLeft := range diff.UpdatesLeft {
374 title, parseErr := parseTitleInSnapshot(updateLeft.ID, repo, luteEngine)
375 if "" == title || nil != parseErr {
376 continue
377 }
378
379 ret.UpdatesLeft = append(ret.UpdatesLeft, &DiffFile{
380 FileID: updateLeft.ID,
381 Title: title,
382 Path: updateLeft.Path,
383 HSize: humanize.BytesCustomCeil(uint64(updateLeft.Size), 2),
384 Updated: updateLeft.Updated,
385 })
386 }
387 if 1 > len(ret.UpdatesLeft) {
388 ret.UpdatesLeft = []*DiffFile{}
389 }
390
391 for _, updateRight := range diff.UpdatesRight {
392 title, parseErr := parseTitleInSnapshot(updateRight.ID, repo, luteEngine)
393 if "" == title || nil != parseErr {
394 continue
395 }
396
397 ret.UpdatesRight = append(ret.UpdatesRight, &DiffFile{
398 FileID: updateRight.ID,
399 Title: title,
400 Path: updateRight.Path,
401 HSize: humanize.BytesCustomCeil(uint64(updateRight.Size), 2),
402 Updated: updateRight.Updated,
403 })
404 }
405 if 1 > len(ret.UpdatesRight) {
406 ret.UpdatesRight = []*DiffFile{}
407 }
408 return
409}
410
411func parseTitleInSnapshot(fileID string, repo *dejavu.Repo, luteEngine *lute.Lute) (title string, err error) {
412 file, err := repo.GetFile(fileID)
413 if err != nil {
414 logging.LogErrorf("get file [%s] failed: %s", fileID, err)
415 return
416 }
417
418 title = path.Base(file.Path)
419 if strings.HasSuffix(file.Path, ".sy") {
420 var data []byte
421 data, err = repo.OpenFile(file)
422 if err != nil {
423 logging.LogErrorf("open file [%s] failed: %s", fileID, err)
424 return
425 }
426
427 var tree *parse.Tree
428 tree, err = dataparser.ParseJSONWithoutFix(data, luteEngine.ParseOptions)
429 if err != nil {
430 logging.LogErrorf("parse file [%s] failed: %s", fileID, err)
431 return
432 }
433
434 title = tree.Root.IALAttr("title")
435 }
436 return
437}
438
439func parseTreeInSnapshot(data []byte, luteEngine *lute.Lute) (isLargeDoc bool, tree *parse.Tree, err error) {
440 isLargeDoc = 1024*1024*1 <= len(data)
441 tree, err = dataparser.ParseJSONWithoutFix(data, luteEngine.ParseOptions)
442 if err != nil {
443 return
444 }
445 return
446}
447
448type Snapshot struct {
449 *dejavu.Log
450 TypesCount []*TypeCount `json:"typesCount"`
451}
452
453type TypeCount struct {
454 Type string `json:"type"`
455 Count int `json:"count"`
456}
457
458func GetRepoSnapshots(page int) (ret []*Snapshot, pageCount, totalCount int, err error) {
459 ret = []*Snapshot{}
460 if 1 > len(Conf.Repo.Key) {
461 err = errors.New(Conf.Language(26))
462 return
463 }
464
465 repo, err := newRepository()
466 if err != nil {
467 return
468 }
469
470 logs, pageCount, totalCount, err := repo.GetIndexLogs(page, 32)
471 if err != nil {
472 if dejavu.ErrNotFoundIndex == err {
473 logs = []*dejavu.Log{}
474 err = nil
475 return
476 }
477
478 logging.LogErrorf("get data repo index logs failed: %s", err)
479 return
480 }
481
482 ret = buildSnapshots(logs)
483 if 1 > len(ret) {
484 ret = []*Snapshot{}
485 }
486 return
487}
488
489func buildSnapshots(logs []*dejavu.Log) (ret []*Snapshot) {
490 for _, l := range logs {
491 typesCount := statTypesByPath(l.Files)
492 l.Files = nil // 置空,否则返回前端数据量太大
493 ret = append(ret, &Snapshot{
494 Log: l,
495 TypesCount: typesCount,
496 })
497 }
498 return
499}
500
501func statTypesByPath(files []*entity.File) (ret []*TypeCount) {
502 for _, f := range files {
503 ext := util.Ext(f.Path)
504 if "" == ext {
505 ext = "NoExt"
506 }
507
508 found := false
509 for _, tc := range ret {
510
511 if tc.Type == ext {
512 tc.Count++
513 found = true
514 break
515 }
516 }
517 if !found {
518 ret = append(ret, &TypeCount{Type: ext, Count: 1})
519 }
520 }
521
522 sort.Slice(ret, func(i, j int) bool { return ret[i].Count > ret[j].Count })
523 if 10 < len(ret) {
524 otherCount := 0
525 for _, tc := range ret[10:] {
526 tc.Count += otherCount
527 }
528 other := &TypeCount{
529 Type: "Other",
530 Count: otherCount,
531 }
532 ret = append(ret[:10], other)
533 }
534 return
535}
536
537func ImportRepoKey(base64Key string) (retKey string, err error) {
538 util.PushMsg(Conf.Language(136), 3000)
539
540 retKey = strings.TrimSpace(base64Key)
541 retKey = gulu.Str.RemoveInvisible(retKey)
542 if 1 > len(retKey) {
543 err = errors.New(Conf.Language(142))
544 return
545 }
546
547 key, err := base64.StdEncoding.DecodeString(retKey)
548 if err != nil {
549 logging.LogErrorf("import data repo key failed: %s", err)
550 return "", errors.New(Conf.Language(157))
551 }
552 if 32 != len(key) {
553 return "", errors.New(Conf.Language(157))
554 }
555
556 Conf.Repo.Key = key
557 Conf.Save()
558 logging.LogInfof("imported repo key [%x]", sha1.Sum(Conf.Repo.Key))
559
560 if err = os.RemoveAll(Conf.Repo.GetSaveDir()); err != nil {
561 return
562 }
563 if err = os.MkdirAll(Conf.Repo.GetSaveDir(), 0755); err != nil {
564 return
565 }
566
567 initDataRepo()
568 return
569}
570
571func ResetRepo() (err error) {
572 logging.LogInfof("resetting data repo...")
573 msgId := util.PushMsg(Conf.Language(144), 1000*60)
574
575 repo, err := newRepository()
576 if err != nil {
577 return
578 }
579
580 if err = repo.Reset(); err != nil {
581 logging.LogErrorf("reset data repo failed: %s", err)
582 return
583 }
584 logging.LogInfof("reset data repo completed")
585
586 Conf.Repo.Key = nil
587 Conf.Sync.Enabled = false
588 Conf.Save()
589
590 util.PushUpdateMsg(msgId, Conf.Language(145), 3000)
591 task.AppendAsyncTaskWithDelay(task.ReloadUI, 2*time.Second, util.ReloadUI)
592 return
593}
594
595func PurgeCloud() (err error) {
596 msg := Conf.Language(223)
597 util.PushEndlessProgress(msg)
598 defer util.PushClearProgress()
599
600 repo, err := newRepository()
601 if err != nil {
602 return
603 }
604
605 stat, err := repo.PurgeCloud()
606 if err != nil {
607 return
608 }
609
610 deletedIndexes := stat.Indexes
611 deletedObjects := stat.Objects
612 deletedSize := humanize.BytesCustomCeil(uint64(stat.Size), 2)
613 msg = fmt.Sprintf(Conf.Language(232), deletedIndexes, deletedObjects, deletedSize)
614 util.PushMsg(msg, 7000)
615 return
616}
617
618func PurgeRepo() (err error) {
619 msg := Conf.Language(202)
620 util.PushEndlessProgress(msg)
621 defer util.PushClearProgress()
622
623 repo, err := newRepository()
624 if err != nil {
625 return
626 }
627
628 stat, err := repo.Purge()
629 if err != nil {
630 return
631 }
632
633 deletedIndexes := stat.Indexes
634 deletedObjects := stat.Objects
635 deletedSize := humanize.BytesCustomCeil(uint64(stat.Size), 2)
636 msg = fmt.Sprintf(Conf.Language(203), deletedIndexes, deletedObjects, deletedSize)
637 util.PushMsg(msg, 7000)
638 return
639}
640
641func InitRepoKeyFromPassphrase(passphrase string) (err error) {
642 passphrase = gulu.Str.RemoveInvisible(passphrase)
643 passphrase = strings.TrimSpace(passphrase)
644 if "" == passphrase {
645 return errors.New(Conf.Language(142))
646 }
647
648 util.PushMsg(Conf.Language(136), 3000)
649 if err = os.RemoveAll(Conf.Repo.GetSaveDir()); err != nil {
650 return
651 }
652 if err = os.MkdirAll(Conf.Repo.GetSaveDir(), 0755); err != nil {
653 return
654 }
655
656 var key []byte
657 base64Data, base64Err := base64.StdEncoding.DecodeString(passphrase)
658 if nil == base64Err && 32 == len(base64Data) {
659 // 改进数据仓库 `通过密码生成密钥` https://github.com/siyuan-note/siyuan/issues/6782
660 logging.LogInfof("passphrase is base64 encoded, use it as key directly")
661 key = base64Data
662 } else {
663 salt := fmt.Sprintf("%x", sha256.Sum256([]byte(passphrase)))[:16]
664 key, err = encryption.KDF(passphrase, salt)
665 if err != nil {
666 logging.LogErrorf("init data repo key failed: %s", err)
667 return
668 }
669 }
670
671 Conf.Repo.Key = key
672 Conf.Save()
673 logging.LogInfof("inited repo key [%x]", sha1.Sum(Conf.Repo.Key))
674
675 initDataRepo()
676 return
677}
678
679func InitRepoKey() (err error) {
680 util.PushMsg(Conf.Language(136), 3000)
681
682 if err = os.RemoveAll(Conf.Repo.GetSaveDir()); err != nil {
683 return
684 }
685 if err = os.MkdirAll(Conf.Repo.GetSaveDir(), 0755); err != nil {
686 return
687 }
688
689 randomBytes := make([]byte, 16)
690 _, err = rand.Read(randomBytes)
691 if err != nil {
692 return
693 }
694 password := string(randomBytes)
695 randomBytes = make([]byte, 16)
696 _, err = rand.Read(randomBytes)
697 if err != nil {
698 logging.LogErrorf("init data repo key failed: %s", err)
699 return
700 }
701 salt := string(randomBytes)
702
703 key, err := encryption.KDF(password, salt)
704 if err != nil {
705 logging.LogErrorf("init data repo key failed: %s", err)
706 return
707 }
708 Conf.Repo.Key = key
709 Conf.Save()
710 logging.LogInfof("inited repo key [%x]", sha1.Sum(Conf.Repo.Key))
711
712 initDataRepo()
713 return
714}
715
716func initDataRepo() {
717 time.Sleep(1 * time.Second)
718 util.PushMsg(Conf.Language(138), 3000)
719 time.Sleep(1 * time.Second)
720 if initErr := IndexRepo("[Init] Init local data repo"); nil != initErr {
721 util.PushErrMsg(fmt.Sprintf(Conf.Language(140), initErr), 0)
722 }
723}
724
725func CheckoutRepo(id string) {
726 task.AppendTask(task.RepoCheckout, checkoutRepo, id)
727}
728
729func checkoutRepo(id string) {
730 var err error
731 if 1 > len(Conf.Repo.Key) {
732 util.PushErrMsg(Conf.Language(26), 7000)
733 return
734 }
735
736 repo, err := newRepository()
737 if err != nil {
738 logging.LogErrorf("new repository failed: %s", err)
739 util.PushErrMsg(Conf.Language(141), 7000)
740 return
741 }
742
743 util.PushEndlessProgress(Conf.Language(63))
744 FlushTxQueue()
745 CloseWatchAssets()
746 defer WatchAssets()
747 CloseWatchEmojis()
748 defer WatchEmojis()
749
750 // 恢复快照时自动暂停同步,避免刚刚恢复后的数据又被同步覆盖
751 syncEnabled := Conf.Sync.Enabled
752 Conf.Sync.Enabled = false
753 Conf.Save()
754
755 // 回滚快照时默认为当前数据创建一个快照
756 // When rolling back a snapshot, a snapshot is created for the current data by default https://github.com/siyuan-note/siyuan/issues/12470
757 FlushTxQueue()
758 _, err = repo.Index("Backup before checkout", false, map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress})
759 if err != nil {
760 logging.LogErrorf("index repository failed: %s", err)
761 util.PushClearProgress()
762 util.PushErrMsg(fmt.Sprintf(Conf.Language(140), err), 0)
763 return
764 }
765
766 _, _, err = repo.Checkout(id, map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress})
767 if err != nil {
768 logging.LogErrorf("checkout repository failed: %s", err)
769 util.PushClearProgress()
770 util.PushErrMsg(Conf.Language(141), 7000)
771 return
772 }
773
774 FullReindex()
775
776 if syncEnabled {
777 task.AppendAsyncTaskWithDelay(task.PushMsg, 7*time.Second, util.PushMsg, Conf.Language(134), 0)
778 }
779 return
780}
781
782func DownloadCloudSnapshot(tag, id string) (err error) {
783 if 1 > len(Conf.Repo.Key) {
784 err = errors.New(Conf.Language(26))
785 return
786 }
787
788 repo, err := newRepository()
789 if err != nil {
790 return
791 }
792
793 switch Conf.Sync.Provider {
794 case conf.ProviderSiYuan:
795 if !IsSubscriber() {
796 util.PushErrMsg(Conf.Language(29), 5000)
797 return
798 }
799 case conf.ProviderWebDAV, conf.ProviderS3, conf.ProviderLocal:
800 if !IsPaidUser() {
801 util.PushErrMsg(Conf.Language(214), 5000)
802 return
803 }
804 }
805
806 defer util.PushClearProgress()
807
808 var downloadFileCount, downloadChunkCount int
809 var downloadBytes int64
810 if "" == tag {
811 downloadFileCount, downloadChunkCount, downloadBytes, err = repo.DownloadIndex(id, map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress})
812 } else {
813 downloadFileCount, downloadChunkCount, downloadBytes, err = repo.DownloadTagIndex(tag, id, map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress})
814 }
815 if err != nil {
816 return
817 }
818 msg := fmt.Sprintf(Conf.Language(153), downloadFileCount, downloadChunkCount, humanize.BytesCustomCeil(uint64(downloadBytes), 2))
819 util.PushMsg(msg, 5000)
820 util.PushStatusBar(msg)
821 return
822}
823
824func UploadCloudSnapshot(tag, id string) (err error) {
825 if 1 > len(Conf.Repo.Key) {
826 err = errors.New(Conf.Language(26))
827 return
828 }
829
830 repo, err := newRepository()
831 if err != nil {
832 return
833 }
834
835 switch Conf.Sync.Provider {
836 case conf.ProviderSiYuan:
837 if !IsSubscriber() {
838 util.PushErrMsg(Conf.Language(29), 5000)
839 return
840 }
841 case conf.ProviderWebDAV, conf.ProviderS3, conf.ProviderLocal:
842 if !IsPaidUser() {
843 util.PushErrMsg(Conf.Language(214), 5000)
844 return
845 }
846 }
847
848 util.PushEndlessProgress(Conf.Language(116))
849 defer util.PushClearProgress()
850 uploadFileCount, uploadChunkCount, uploadBytes, err := repo.UploadTagIndex(tag, id, map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress})
851 if err != nil {
852 if errors.Is(err, dejavu.ErrCloudBackupCountExceeded) {
853 err = fmt.Errorf(Conf.Language(84), Conf.Language(154))
854 return
855 }
856 err = errors.New(fmt.Sprintf(Conf.Language(84), formatRepoErrorMsg(err)))
857 return
858 }
859 msg := fmt.Sprintf(Conf.Language(152), uploadFileCount, uploadChunkCount, humanize.BytesCustomCeil(uint64(uploadBytes), 2))
860 util.PushMsg(msg, 5000)
861 util.PushStatusBar(msg)
862 return
863}
864
865func RemoveCloudRepoTag(tag string) (err error) {
866 if 1 > len(Conf.Repo.Key) {
867 err = errors.New(Conf.Language(26))
868 return
869 }
870
871 if "" == tag {
872 err = errors.New("tag is empty")
873 return
874 }
875
876 repo, err := newRepository()
877 if err != nil {
878 return
879 }
880
881 switch Conf.Sync.Provider {
882 case conf.ProviderSiYuan:
883 if !IsSubscriber() {
884 util.PushErrMsg(Conf.Language(29), 5000)
885 return
886 }
887 case conf.ProviderWebDAV, conf.ProviderS3, conf.ProviderLocal:
888 if !IsPaidUser() {
889 util.PushErrMsg(Conf.Language(214), 5000)
890 return
891 }
892 }
893
894 err = repo.RemoveCloudRepoTag(tag)
895 if err != nil {
896 return
897 }
898 return
899}
900
901func GetCloudRepoTagSnapshots() (ret []*dejavu.Log, err error) {
902 ret = []*dejavu.Log{}
903 if 1 > len(Conf.Repo.Key) {
904 err = errors.New(Conf.Language(26))
905 return
906 }
907
908 repo, err := newRepository()
909 if err != nil {
910 return
911 }
912
913 switch Conf.Sync.Provider {
914 case conf.ProviderSiYuan:
915 if !IsSubscriber() {
916 util.PushErrMsg(Conf.Language(29), 5000)
917 return
918 }
919 case conf.ProviderWebDAV, conf.ProviderS3, conf.ProviderLocal:
920 if !IsPaidUser() {
921 util.PushErrMsg(Conf.Language(214), 5000)
922 return
923 }
924 }
925
926 logs, err := repo.GetCloudRepoTagLogs(map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar})
927 if err != nil {
928 return
929 }
930 ret = logs
931 if 1 > len(ret) {
932 ret = []*dejavu.Log{}
933 }
934 return
935}
936
937func GetCloudRepoSnapshots(page int) (ret []*dejavu.Log, pageCount, totalCount int, err error) {
938 ret = []*dejavu.Log{}
939 if 1 > len(Conf.Repo.Key) {
940 err = errors.New(Conf.Language(26))
941 return
942 }
943
944 repo, err := newRepository()
945 if err != nil {
946 return
947 }
948
949 switch Conf.Sync.Provider {
950 case conf.ProviderSiYuan:
951 if !IsSubscriber() {
952 util.PushErrMsg(Conf.Language(29), 5000)
953 return
954 }
955 case conf.ProviderWebDAV, conf.ProviderS3, conf.ProviderLocal:
956 if !IsPaidUser() {
957 util.PushErrMsg(Conf.Language(214), 5000)
958 return
959 }
960 }
961
962 if 1 > page {
963 page = 1
964 }
965
966 logs, pageCount, totalCount, err := repo.GetCloudRepoLogs(page)
967 if err != nil {
968 return
969 }
970 ret = logs
971 if 1 > len(ret) {
972 ret = []*dejavu.Log{}
973 }
974 return
975}
976
977func GetTagSnapshots() (ret []*Snapshot, err error) {
978 ret = []*Snapshot{}
979 if 1 > len(Conf.Repo.Key) {
980 err = errors.New(Conf.Language(26))
981 return
982 }
983
984 repo, err := newRepository()
985 if err != nil {
986 return
987 }
988
989 logs, err := repo.GetTagLogs()
990 if err != nil {
991 return
992 }
993 ret = buildSnapshots(logs)
994 if 1 > len(ret) {
995 ret = []*Snapshot{}
996 }
997 return
998}
999
1000func RemoveTagSnapshot(tag string) (err error) {
1001 if 1 > len(Conf.Repo.Key) {
1002 err = errors.New(Conf.Language(26))
1003 return
1004 }
1005
1006 repo, err := newRepository()
1007 if err != nil {
1008 return
1009 }
1010
1011 err = repo.RemoveTag(tag)
1012 return
1013}
1014
1015func TagSnapshot(id, name string) (err error) {
1016 if 1 > len(Conf.Repo.Key) {
1017 err = errors.New(Conf.Language(26))
1018 return
1019 }
1020
1021 name = strings.TrimSpace(name)
1022 name = util.RemoveInvalid(name)
1023 if "" == name {
1024 err = errors.New(Conf.Language(142))
1025 return
1026 }
1027
1028 if !gulu.File.IsValidFilename(name) {
1029 err = errors.New(Conf.Language(151))
1030 return
1031 }
1032
1033 repo, err := newRepository()
1034 if err != nil {
1035 return
1036 }
1037
1038 index, err := repo.GetIndex(id)
1039 if err != nil {
1040 return
1041 }
1042
1043 if err = repo.AddTag(index.ID, name); err != nil {
1044 msg := fmt.Sprintf("Add tag to data snapshot [%s] failed: %s", index.ID, err)
1045 util.PushStatusBar(msg)
1046 return
1047 }
1048 return
1049}
1050
1051func IndexRepo(memo string) (err error) {
1052 if 1 > len(Conf.Repo.Key) {
1053 err = errors.New(Conf.Language(26))
1054 return
1055 }
1056
1057 memo = strings.TrimSpace(memo)
1058 memo = gulu.Str.RemoveInvisible(memo)
1059 if "" == memo {
1060 err = errors.New(Conf.Language(142))
1061 return
1062 }
1063
1064 repo, err := newRepository()
1065 if err != nil {
1066 return
1067 }
1068
1069 util.PushEndlessProgress(Conf.Language(143))
1070
1071 start := time.Now()
1072 latest, _ := repo.Latest()
1073 FlushTxQueue()
1074 index, err := repo.Index(memo, true, map[string]interface{}{
1075 eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress,
1076 })
1077 if err != nil {
1078 util.PushStatusBar("Index data repo failed: " + html.EscapeString(err.Error()))
1079 return
1080 }
1081 elapsed := time.Since(start)
1082
1083 if nil == latest || latest.ID != index.ID {
1084 msg := fmt.Sprintf(Conf.Language(147), elapsed.Seconds())
1085 util.PushStatusBar(msg)
1086 util.PushMsg(msg, 5000)
1087 } else {
1088 msg := fmt.Sprintf(Conf.Language(148), elapsed.Seconds())
1089 util.PushStatusBar(msg)
1090 util.PushMsg(msg, 5000)
1091 }
1092 util.PushClearProgress()
1093 return
1094}
1095
1096var syncingFiles = sync.Map{}
1097var syncingStorages = atomic.Bool{}
1098
1099func waitForSyncingStorages() {
1100 for isSyncingStorages() {
1101 time.Sleep(time.Second)
1102 }
1103}
1104
1105func isSyncingStorages() bool {
1106 return syncingStorages.Load() || isBootSyncing.Load()
1107}
1108
1109func IsSyncingFile(rootID string) (ret bool) {
1110 _, ret = syncingFiles.Load(rootID)
1111 return
1112}
1113
1114func syncRepoDownload() (err error) {
1115 if 1 > len(Conf.Repo.Key) {
1116 planSyncAfter(fixSyncInterval)
1117
1118 msg := Conf.Language(26)
1119 util.PushStatusBar(msg)
1120 util.PushErrMsg(msg, 0)
1121 err = errors.New(msg)
1122 return
1123 }
1124
1125 repo, err := newRepository()
1126 if err != nil {
1127 planSyncAfter(fixSyncInterval)
1128
1129 msg := fmt.Sprintf("sync repo failed: %s", err)
1130 logging.LogErrorf(msg)
1131 util.PushStatusBar(msg)
1132 util.PushErrMsg(msg, 0)
1133 return
1134 }
1135
1136 logging.LogInfof("downloading data repo [device=%s, kernel=%s, provider=%d, mode=%s/%t]", Conf.System.ID, KernelID, Conf.Sync.Provider, "d", true)
1137 start := time.Now()
1138 _, _, err = indexRepoBeforeCloudSync(repo)
1139 if err != nil {
1140 planSyncAfter(fixSyncInterval)
1141
1142 logging.LogErrorf("sync data repo download failed: %s", err)
1143 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err))
1144 Conf.Sync.Stat = msg
1145 Conf.Save()
1146 util.PushStatusBar(msg)
1147 util.PushErrMsg(msg, 0)
1148 return
1149 }
1150
1151 beforeSyncPetals := getPetals()
1152
1153 syncContext := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
1154 mergeResult, trafficStat, err := repo.SyncDownload(syncContext)
1155 elapsed := time.Since(start)
1156 if err != nil {
1157 planSyncAfter(fixSyncInterval)
1158
1159 logging.LogErrorf("sync data repo download failed: %s", err)
1160 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err))
1161 if errors.Is(err, dejavu.ErrCloudStorageSizeExceeded) {
1162 u := Conf.GetUser()
1163 msg = fmt.Sprintf(Conf.Language(43), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2))
1164 if 2 == u.UserSiYuanSubscriptionPlan {
1165 msg = fmt.Sprintf(Conf.Language(68), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2))
1166 }
1167 }
1168 Conf.Sync.Stat = msg
1169 Conf.Save()
1170 util.PushStatusBar(msg)
1171 util.PushErrMsg(msg, 0)
1172 return
1173 }
1174
1175 util.PushStatusBar(fmt.Sprintf(Conf.Language(149), elapsed.Seconds()))
1176 Conf.Sync.Synced = util.CurrentTimeMillis()
1177 msg := fmt.Sprintf(Conf.Language(150), trafficStat.UploadFileCount, trafficStat.DownloadFileCount, trafficStat.UploadChunkCount, trafficStat.DownloadChunkCount, humanize.BytesCustomCeil(uint64(trafficStat.UploadBytes), 2), humanize.BytesCustomFloor(uint64(trafficStat.DownloadBytes), 2))
1178 Conf.Sync.Stat = msg
1179 Conf.Save()
1180 autoSyncErrCount = 0
1181 BootSyncSucc = 0
1182
1183 calcPetalDiff(beforeSyncPetals, mergeResult)
1184 processSyncMergeResult(false, true, mergeResult, trafficStat, "d", elapsed)
1185 return
1186}
1187
1188func syncRepoUpload() (err error) {
1189 if 1 > len(Conf.Repo.Key) {
1190 planSyncAfter(fixSyncInterval)
1191
1192 msg := Conf.Language(26)
1193 util.PushStatusBar(msg)
1194 util.PushErrMsg(msg, 0)
1195 err = errors.New(msg)
1196 return
1197 }
1198
1199 repo, err := newRepository()
1200 if err != nil {
1201 planSyncAfter(fixSyncInterval)
1202
1203 msg := fmt.Sprintf("sync repo failed: %s", err)
1204 logging.LogErrorf(msg)
1205 util.PushStatusBar(msg)
1206 util.PushErrMsg(msg, 0)
1207 return
1208 }
1209
1210 logging.LogInfof("uploading data repo [device=%s, kernel=%s, provider=%d, mode=%s/%t]", Conf.System.ID, KernelID, Conf.Sync.Provider, "u", true)
1211 start := time.Now()
1212 _, _, err = indexRepoBeforeCloudSync(repo)
1213 if err != nil {
1214 planSyncAfter(fixSyncInterval)
1215
1216 logging.LogErrorf("sync data repo upload failed: %s", err)
1217 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err))
1218 Conf.Sync.Stat = msg
1219 Conf.Save()
1220 util.PushStatusBar(msg)
1221 util.PushErrMsg(msg, 0)
1222 return
1223 }
1224
1225 syncContext := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
1226 trafficStat, err := repo.SyncUpload(syncContext)
1227 elapsed := time.Since(start)
1228 if err != nil {
1229 planSyncAfter(fixSyncInterval)
1230
1231 logging.LogErrorf("sync data repo upload failed: %s", err)
1232 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err))
1233 if errors.Is(err, dejavu.ErrCloudStorageSizeExceeded) {
1234 u := Conf.GetUser()
1235 msg = fmt.Sprintf(Conf.Language(43), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2))
1236 if 2 == u.UserSiYuanSubscriptionPlan {
1237 msg = fmt.Sprintf(Conf.Language(68), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2))
1238 }
1239 }
1240 Conf.Sync.Stat = msg
1241 Conf.Save()
1242 util.PushStatusBar(msg)
1243 util.PushErrMsg(msg, 0)
1244 return
1245 }
1246
1247 util.PushStatusBar(fmt.Sprintf(Conf.Language(149), elapsed.Seconds()))
1248 Conf.Sync.Synced = util.CurrentTimeMillis()
1249 msg := fmt.Sprintf(Conf.Language(150), trafficStat.UploadFileCount, trafficStat.DownloadFileCount, trafficStat.UploadChunkCount, trafficStat.DownloadChunkCount, humanize.BytesCustomCeil(uint64(trafficStat.UploadBytes), 2), humanize.BytesCustomCeil(uint64(trafficStat.DownloadBytes), 2))
1250 Conf.Sync.Stat = msg
1251 Conf.Save()
1252 autoSyncErrCount = 0
1253 BootSyncSucc = 0
1254
1255 processSyncMergeResult(false, true, &dejavu.MergeResult{}, trafficStat, "u", elapsed)
1256 return
1257}
1258
1259var isBootSyncing = atomic.Bool{}
1260
1261func bootSyncRepo() (err error) {
1262 if 1 > len(Conf.Repo.Key) {
1263 autoSyncErrCount++
1264 planSyncAfter(fixSyncInterval)
1265
1266 msg := Conf.Language(26)
1267 util.PushStatusBar(msg)
1268 util.PushErrMsg(msg, 0)
1269 err = errors.New(msg)
1270 return
1271 }
1272
1273 repo, err := newRepository()
1274 if err != nil {
1275 autoSyncErrCount++
1276 planSyncAfter(fixSyncInterval)
1277
1278 msg := fmt.Sprintf("sync repo failed: %s", html.EscapeString(err.Error()))
1279 logging.LogErrorf(msg)
1280 util.PushStatusBar(msg)
1281 util.PushErrMsg(msg, 0)
1282 return
1283 }
1284
1285 isBootSyncing.Store(true)
1286
1287 waitGroup := sync.WaitGroup{}
1288 var errs []error
1289 waitGroup.Add(1)
1290 go func() {
1291 defer waitGroup.Done()
1292
1293 start := time.Now()
1294 _, _, indexErr := indexRepoBeforeCloudSync(repo)
1295 if indexErr != nil {
1296 errs = append(errs, indexErr)
1297 autoSyncErrCount++
1298 planSyncAfter(fixSyncInterval)
1299
1300 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(indexErr))
1301 Conf.Sync.Stat = msg
1302 Conf.Save()
1303 util.PushStatusBar(msg)
1304 util.PushErrMsg(msg, 0)
1305 BootSyncSucc = 1
1306 isBootSyncing.Store(false)
1307 return
1308 }
1309
1310 logging.LogInfof("boot index repo elapsed [%.2fs]", time.Since(start).Seconds())
1311 }()
1312
1313 var fetchedFiles []*entity.File
1314 waitGroup.Add(1)
1315 go func() {
1316 defer waitGroup.Done()
1317
1318 start := time.Now()
1319 syncContext := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
1320 cloudLatest, getErr := repo.GetCloudLatest(syncContext)
1321 if nil != getErr {
1322 errs = append(errs, getErr)
1323 if !errors.Is(getErr, cloud.ErrCloudObjectNotFound) {
1324 logging.LogErrorf("download cloud latest failed: %s", getErr)
1325 return
1326 }
1327 }
1328 fetchedFiles, getErr = repo.GetSyncCloudFiles(cloudLatest, syncContext)
1329 if errors.Is(getErr, dejavu.ErrRepoFatal) {
1330 errs = append(errs, getErr)
1331 autoSyncErrCount++
1332 planSyncAfter(fixSyncInterval)
1333
1334 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(getErr))
1335 Conf.Sync.Stat = msg
1336 Conf.Save()
1337 util.PushStatusBar(msg)
1338 util.PushErrMsg(msg, 0)
1339 BootSyncSucc = 1
1340 isBootSyncing.Store(false)
1341 return
1342 }
1343
1344 logging.LogInfof("boot get sync cloud files elapsed [%.2fs]", time.Since(start).Seconds())
1345 }()
1346 waitGroup.Wait()
1347 if 0 < len(errs) {
1348 err = errs[0]
1349 }
1350
1351 if err != nil {
1352 autoSyncErrCount++
1353 planSyncAfter(fixSyncInterval)
1354
1355 logging.LogErrorf("sync data repo failed: %s", err)
1356 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err))
1357 if errors.Is(err, dejavu.ErrCloudStorageSizeExceeded) {
1358 u := Conf.GetUser()
1359 msg = fmt.Sprintf(Conf.Language(43), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2))
1360 if 2 == u.UserSiYuanSubscriptionPlan {
1361 msg = fmt.Sprintf(Conf.Language(68), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2))
1362 }
1363 }
1364 Conf.Sync.Stat = msg
1365 Conf.Save()
1366 util.PushStatusBar(msg)
1367 util.PushErrMsg(msg, 0)
1368 BootSyncSucc = 1
1369 isBootSyncing.Store(false)
1370 return
1371 }
1372
1373 syncingFiles = sync.Map{}
1374 syncingStorages.Store(false)
1375 for _, fetchedFile := range fetchedFiles {
1376 name := path.Base(fetchedFile.Path)
1377 if strings.HasSuffix(name, ".sy") {
1378 id := name[:len(name)-3]
1379 syncingFiles.Store(id, true)
1380 continue
1381 }
1382 if strings.HasPrefix(fetchedFile.Path, "/storage/") {
1383 syncingStorages.Store(true)
1384 }
1385 }
1386
1387 if 0 < len(fetchedFiles) {
1388 go func() {
1389 _, syncErr := syncRepo(false, false)
1390 isBootSyncing.Store(false)
1391 if err != nil {
1392 logging.LogErrorf("boot background sync repo failed: %s", syncErr)
1393 return
1394 }
1395 }()
1396 } else {
1397 isBootSyncing.Store(false)
1398 }
1399 return
1400}
1401
1402func syncRepo(exit, byHand bool) (dataChanged bool, err error) {
1403 if 1 > len(Conf.Repo.Key) {
1404 autoSyncErrCount++
1405 planSyncAfter(fixSyncInterval)
1406
1407 msg := Conf.Language(26)
1408 util.PushStatusBar(msg)
1409 util.PushErrMsg(msg, 0)
1410 err = errors.New(msg)
1411 return
1412 }
1413
1414 repo, err := newRepository()
1415 if err != nil {
1416 autoSyncErrCount++
1417 planSyncAfter(fixSyncInterval)
1418
1419 msg := fmt.Sprintf("sync repo failed: %s", err)
1420 logging.LogErrorf(msg)
1421 util.PushStatusBar(msg)
1422 util.PushErrMsg(msg, 0)
1423 return
1424 }
1425
1426 logging.LogInfof("syncing data repo [device=%s, kernel=%s, provider=%d, mode=%s/%t]", Conf.System.ID, KernelID, Conf.Sync.Provider, "a", byHand)
1427 start := time.Now()
1428 beforeIndex, afterIndex, err := indexRepoBeforeCloudSync(repo)
1429 if err != nil {
1430 autoSyncErrCount++
1431 planSyncAfter(fixSyncInterval)
1432
1433 logging.LogErrorf("sync data repo failed: %s", err)
1434 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err))
1435 Conf.Sync.Stat = msg
1436 Conf.Save()
1437 util.PushStatusBar(msg)
1438 if 1 > autoSyncErrCount || byHand {
1439 util.PushErrMsg(msg, 0)
1440 }
1441 if exit {
1442 ExitSyncSucc = 1
1443 }
1444 return
1445 }
1446
1447 beforeSyncPetals := getPetals()
1448
1449 syncContext := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
1450 mergeResult, trafficStat, err := repo.Sync(syncContext)
1451 elapsed := time.Since(start)
1452 if err != nil {
1453 autoSyncErrCount++
1454 planSyncAfter(fixSyncInterval)
1455
1456 logging.LogErrorf("sync data repo failed: %s", err)
1457 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err))
1458 if errors.Is(err, dejavu.ErrCloudStorageSizeExceeded) {
1459 u := Conf.GetUser()
1460 msg = fmt.Sprintf(Conf.Language(43), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2))
1461 if 2 == u.UserSiYuanSubscriptionPlan {
1462 msg = fmt.Sprintf(Conf.Language(68), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2))
1463 }
1464 }
1465 Conf.Sync.Stat = msg
1466 Conf.Save()
1467 util.PushStatusBar(msg)
1468 if 1 > autoSyncErrCount || byHand {
1469 util.PushErrMsg(msg, 0)
1470 }
1471 if exit {
1472 ExitSyncSucc = 1
1473 }
1474 return
1475 }
1476
1477 dataChanged = nil == beforeIndex || beforeIndex.ID != afterIndex.ID || mergeResult.DataChanged()
1478
1479 util.PushStatusBar(fmt.Sprintf(Conf.Language(149), elapsed.Seconds()))
1480 Conf.Sync.Synced = util.CurrentTimeMillis()
1481 msg := fmt.Sprintf(Conf.Language(150), trafficStat.UploadFileCount, trafficStat.DownloadFileCount, trafficStat.UploadChunkCount, trafficStat.DownloadChunkCount, humanize.BytesCustomCeil(uint64(trafficStat.UploadBytes), 2), humanize.BytesCustomCeil(uint64(trafficStat.DownloadBytes), 2))
1482 Conf.Sync.Stat = msg
1483 Conf.Save()
1484 autoSyncErrCount = 0
1485
1486 calcPetalDiff(beforeSyncPetals, mergeResult)
1487 processSyncMergeResult(exit, byHand, mergeResult, trafficStat, "a", elapsed)
1488
1489 if !exit {
1490 go func() {
1491 // 首次数据同步执行完成后再执行索引订正 Index fixing should not be performed before data synchronization https://github.com/siyuan-note/siyuan/issues/10761
1492 checkIndex()
1493 // 索引订正结束后执行数据仓库清理 Automatic purge for local data repo https://github.com/siyuan-note/siyuan/issues/13091
1494 autoPurgeRepo(false)
1495 }()
1496 }
1497 return
1498}
1499
1500func calcPetalDiff(beforeSyncPetals []*Petal, mergeResult *dejavu.MergeResult) {
1501 var upsertPetals, removePetals []string
1502 afterSyncPetals := getPetals()
1503 for _, afterSyncPetal := range afterSyncPetals {
1504 if beforeSyncPetal := getPetalByName(afterSyncPetal.Name, beforeSyncPetals); nil != beforeSyncPetal {
1505 a, _ := gulu.JSON.MarshalJSON(afterSyncPetal)
1506 b, _ := gulu.JSON.MarshalJSON(beforeSyncPetal)
1507 if !bytes.Equal(a, b) {
1508 upsertPetals = append(upsertPetals, afterSyncPetal.Name)
1509 }
1510 } else {
1511 upsertPetals = append(upsertPetals, afterSyncPetal.Name)
1512 }
1513 }
1514 for _, beforeSyncPetal := range beforeSyncPetals {
1515 if nil == getPetalByName(beforeSyncPetal.Name, afterSyncPetals) {
1516 removePetals = append(removePetals, beforeSyncPetal.Name)
1517 }
1518 }
1519
1520 mergeResult.UpsertPetals = gulu.Str.RemoveDuplicatedElem(upsertPetals)
1521 mergeResult.RemovePetals = gulu.Str.RemoveDuplicatedElem(removePetals)
1522}
1523
1524func processSyncMergeResult(exit, byHand bool, mergeResult *dejavu.MergeResult, trafficStat *dejavu.TrafficStat, mode string, elapsed time.Duration) {
1525 logging.LogInfof("synced data repo [device=%s, kernel=%s, provider=%d, mode=%s/%t, ufc=%d, dfc=%d, ucc=%d, dcc=%d, ub=%s, db=%s] in [%.2fs], merge result [conflicts=%d, upserts=%d, removes=%d]\n\n",
1526 Conf.System.ID, KernelID, Conf.Sync.Provider, mode, byHand,
1527 trafficStat.UploadFileCount, trafficStat.DownloadFileCount, trafficStat.UploadChunkCount, trafficStat.DownloadChunkCount, humanize.BytesCustomCeil(uint64(trafficStat.UploadBytes), 2), humanize.BytesCustomCeil(uint64(trafficStat.DownloadBytes), 2),
1528 elapsed.Seconds(),
1529 len(mergeResult.Conflicts), len(mergeResult.Upserts), len(mergeResult.Removes))
1530
1531 //logSyncMergeResult(mergeResult)
1532
1533 var needReloadFiletree bool
1534 if 0 < len(mergeResult.Conflicts) {
1535 luteEngine := util.NewLute()
1536 if Conf.Sync.GenerateConflictDoc {
1537 // 云端同步发生冲突时生成副本 https://github.com/siyuan-note/siyuan/issues/5687
1538
1539 for _, file := range mergeResult.Conflicts {
1540 if !strings.HasSuffix(file.Path, ".sy") {
1541 continue
1542 }
1543
1544 parts := strings.Split(file.Path[1:], "/")
1545 if 2 > len(parts) {
1546 continue
1547 }
1548 boxID := parts[0]
1549
1550 absPath := filepath.Join(util.TempDir, "repo", "sync", "conflicts", mergeResult.Time.Format("2006-01-02-150405"), file.Path)
1551 tree, loadTreeErr := loadTree(absPath, luteEngine)
1552 if nil != loadTreeErr {
1553 logging.LogErrorf("load conflicted file [%s] failed: %s", absPath, loadTreeErr)
1554 continue
1555 }
1556 tree.Box = boxID
1557 tree.Path = strings.TrimPrefix(file.Path, "/"+boxID)
1558
1559 previousPath := tree.Path
1560 resetTree(tree, "Conflicted", true)
1561 createTreeTx(tree)
1562 box := Conf.Box(boxID)
1563 if nil != box {
1564 box.addSort(previousPath, tree.ID)
1565 }
1566 }
1567
1568 needReloadFiletree = true
1569 }
1570
1571 historyDir := filepath.Join(util.HistoryDir, mergeResult.Time.Format("2006-01-02-150405")+"-sync")
1572 indexHistoryDir(filepath.Base(historyDir), luteEngine)
1573 }
1574
1575 if 1 > len(mergeResult.Upserts) && 1 > len(mergeResult.Removes) && 1 > len(mergeResult.Conflicts) { // 没有数据变更
1576 syncSameCount.Add(1)
1577 if 10 < syncSameCount.Load() {
1578 syncSameCount.Store(5)
1579 }
1580 if !byHand {
1581 delay := time.Minute * time.Duration(int(math.Pow(2, float64(syncSameCount.Load()))))
1582 if fixSyncInterval.Minutes() > delay.Minutes() {
1583 delay = time.Minute * 8
1584 }
1585 planSyncAfter(delay)
1586 }
1587 return
1588 }
1589
1590 // 有数据变更,需要重建索引
1591 var upserts, removes []string
1592 var upsertTrees int
1593 // 可能需要重新加载部分功能
1594 var needReloadFlashcard, needReloadOcrTexts, needReloadPlugin bool
1595 upsertPluginSet := hashset.New()
1596 needUnindexBoxes, needIndexBoxes := map[string]bool{}, map[string]bool{}
1597 for _, file := range mergeResult.Upserts {
1598 upserts = append(upserts, file.Path)
1599 if strings.HasPrefix(file.Path, "/storage/riff/") {
1600 needReloadFlashcard = true
1601 }
1602
1603 if strings.HasPrefix(file.Path, "/assets/ocr-texts.json") {
1604 needReloadOcrTexts = true
1605 }
1606
1607 if strings.HasSuffix(file.Path, "/.siyuan/conf.json") {
1608 needReloadFiletree = true
1609 boxID := strings.TrimSuffix(strings.TrimPrefix(file.Path, "/"), "/.siyuan/conf.json")
1610 needUnindexBoxes[boxID] = true
1611 needIndexBoxes[boxID] = true
1612 }
1613
1614 if strings.HasPrefix(file.Path, "/storage/petal/") {
1615 needReloadPlugin = true
1616 if parts := strings.Split(file.Path, "/"); 3 < len(parts) {
1617 if pluginName := parts[3]; "petals.json" != pluginName {
1618 upsertPluginSet.Add(pluginName)
1619 }
1620 }
1621 }
1622
1623 if strings.HasPrefix(file.Path, "/plugins/") {
1624 if parts := strings.Split(file.Path, "/"); 2 < len(parts) {
1625 needReloadPlugin = true
1626 upsertPluginSet.Add(parts[2])
1627 }
1628 }
1629
1630 if strings.HasSuffix(file.Path, ".sy") {
1631 upsertTrees++
1632 }
1633 }
1634
1635 removeWidgetDirSet, removePluginSet := hashset.New(), hashset.New()
1636 for _, file := range mergeResult.Removes {
1637 removes = append(removes, file.Path)
1638 if strings.HasPrefix(file.Path, "/storage/riff/") {
1639 needReloadFlashcard = true
1640 }
1641
1642 if strings.HasPrefix(file.Path, "/assets/ocr-texts.json") {
1643 needReloadOcrTexts = true
1644 }
1645
1646 if strings.HasSuffix(file.Path, "/.siyuan/conf.json") {
1647 needReloadFiletree = true
1648 boxID := strings.TrimSuffix(strings.TrimPrefix(file.Path, "/"), "/.siyuan/conf.json")
1649 needUnindexBoxes[boxID] = true
1650 }
1651
1652 if strings.HasPrefix(file.Path, "/storage/petal/") {
1653 needReloadPlugin = true
1654 if parts := strings.Split(file.Path, "/"); 3 < len(parts) {
1655 if pluginName := parts[3]; "petals.json" != pluginName {
1656 removePluginSet.Add(pluginName)
1657 }
1658 }
1659 }
1660
1661 if strings.HasPrefix(file.Path, "/plugins/") {
1662 if parts := strings.Split(file.Path, "/"); 2 < len(parts) {
1663 needReloadPlugin = true
1664 removePluginSet.Add(parts[2])
1665 }
1666 }
1667
1668 if strings.HasPrefix(file.Path, "/widgets/") {
1669 if parts := strings.Split(file.Path, "/"); 2 < len(parts) {
1670 removeWidgetDirSet.Add(parts[2])
1671 }
1672 }
1673 }
1674
1675 for _, upsertPetal := range mergeResult.UpsertPetals {
1676 needReloadPlugin = true
1677 upsertPluginSet.Add(upsertPetal)
1678 }
1679 for _, removePetal := range mergeResult.RemovePetals {
1680 needReloadPlugin = true
1681 removePluginSet.Add(removePetal)
1682 }
1683
1684 if needReloadFlashcard {
1685 LoadFlashcards()
1686 }
1687
1688 if needReloadOcrTexts {
1689 util.LoadAssetsTexts()
1690 }
1691
1692 if needReloadPlugin {
1693 pushReloadPlugin(upsertPluginSet, removePluginSet, "")
1694 }
1695
1696 for _, widgetDir := range removeWidgetDirSet.Values() {
1697 widgetDirPath := filepath.Join(util.DataDir, "widgets", widgetDir.(string))
1698 gulu.File.RemoveEmptyDirs(widgetDirPath)
1699 }
1700
1701 syncingFiles = sync.Map{}
1702 syncingStorages.Store(false)
1703
1704 if needFullReindex(upsertTrees) { // 改进同步后全量重建索引判断 https://github.com/siyuan-note/siyuan/issues/5764
1705 FullReindex()
1706 return
1707 }
1708
1709 if exit { // 退出时同步不用推送事件
1710 return
1711 }
1712
1713 for boxID := range needUnindexBoxes {
1714 if box := Conf.GetBox(boxID); nil != box {
1715 box.Unindex()
1716 }
1717 }
1718 for boxID := range needIndexBoxes {
1719 if box := Conf.GetBox(boxID); nil != box {
1720 box.Index()
1721 }
1722 }
1723
1724 needReloadUI := 0 < len(needUnindexBoxes) || 0 < len(needIndexBoxes)
1725 if needReloadUI {
1726 util.ReloadUI()
1727 }
1728
1729 upsertRootIDs, removeRootIDs := incReindex(upserts, removes)
1730 needReloadFiletree = !needReloadUI && (needReloadFiletree || 0 < len(upsertRootIDs) || 0 < len(removeRootIDs))
1731 if needReloadFiletree {
1732 ReloadFiletree()
1733 }
1734
1735 go func() {
1736 util.WaitForUILoaded()
1737
1738 if 0 < len(upsertRootIDs) || 0 < len(removeRootIDs) {
1739 util.BroadcastByType("main", "syncMergeResult", 0, "",
1740 map[string]interface{}{"upsertRootIDs": upsertRootIDs, "removeRootIDs": removeRootIDs})
1741 }
1742
1743 time.Sleep(2 * time.Second)
1744 util.PushStatusBar(fmt.Sprintf(Conf.Language(149), elapsed.Seconds()))
1745
1746 if 0 < len(mergeResult.Conflicts) {
1747 syConflict := false
1748 for _, file := range mergeResult.Conflicts {
1749 if strings.HasSuffix(file.Path, ".sy") {
1750 syConflict = true
1751 break
1752 }
1753 }
1754
1755 if syConflict {
1756 // 数据同步发生冲突时在界面上进行提醒 https://github.com/siyuan-note/siyuan/issues/7332
1757 util.PushMsg(Conf.Language(108), 7000)
1758 }
1759 }
1760 }()
1761}
1762
1763func logSyncMergeResult(mergeResult *dejavu.MergeResult) {
1764 if 1 > len(mergeResult.Conflicts) && 1 > len(mergeResult.Upserts) && 1 > len(mergeResult.Removes) {
1765 return
1766 }
1767
1768 if 0 < len(mergeResult.Conflicts) {
1769 logBuilder := bytes.Buffer{}
1770 for i, f := range mergeResult.Conflicts {
1771 logBuilder.WriteString(" ")
1772 logBuilder.WriteString(f.Path)
1773 if i < len(mergeResult.Conflicts)-1 {
1774 logBuilder.WriteString("\n")
1775 }
1776 }
1777 logging.LogInfof("sync conflicts:\n%s", logBuilder.String())
1778 }
1779 if 0 < len(mergeResult.Upserts) {
1780 logBuilder := bytes.Buffer{}
1781 for i, f := range mergeResult.Upserts {
1782 logBuilder.WriteString(" ")
1783 logBuilder.WriteString(f.Path)
1784 if i < len(mergeResult.Upserts)-1 {
1785 logBuilder.WriteString("\n")
1786 }
1787 }
1788 logging.LogInfof("sync merge upserts:\n%s", logBuilder.String())
1789 }
1790 if 0 < len(mergeResult.Removes) {
1791 logBuilder := bytes.Buffer{}
1792 for i, f := range mergeResult.Removes {
1793 logBuilder.WriteString(" ")
1794 logBuilder.WriteString(f.Path)
1795 if i < len(mergeResult.Removes)-1 {
1796 logBuilder.WriteString("\n")
1797 }
1798 }
1799 logging.LogInfof("sync merge removes:\n%s", logBuilder.String())
1800 }
1801}
1802
1803func needFullReindex(upsertTrees int) bool {
1804 return 0.2 < float64(upsertTrees)/float64(treenode.CountTrees())
1805}
1806
1807var promotedPurgeDataRepo bool
1808
1809func indexRepoBeforeCloudSync(repo *dejavu.Repo) (beforeIndex, afterIndex *entity.Index, err error) {
1810 start := time.Now()
1811
1812 beforeIndex, _ = repo.Latest()
1813 FlushTxQueue()
1814
1815 checkChunks := true
1816 if util.ContainerAndroid == util.Container || util.ContainerIOS == util.Container || util.ContainerHarmony == util.Container {
1817 // 因为移动端私有数据空间不会存在外部操作导致分块损坏的情况,所以不需要检查分块以提升性能 https://github.com/siyuan-note/siyuan/issues/13216
1818 checkChunks = false
1819 }
1820
1821 afterIndex, err = repo.Index("[Sync] Cloud sync", checkChunks,
1822 map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar})
1823 if err != nil {
1824 logging.LogErrorf("index data repo before cloud sync failed: %s", err)
1825 return
1826 }
1827 elapsed := time.Since(start)
1828
1829 if nil == beforeIndex || beforeIndex.ID != afterIndex.ID {
1830 // 对新创建的快照需要更新备注,加入耗时统计
1831 afterIndex.Memo = fmt.Sprintf("[Sync] Cloud sync, completed in %.2fs", elapsed.Seconds())
1832 if err = repo.PutIndex(afterIndex); err != nil {
1833 util.PushStatusBar("Save data snapshot for cloud sync failed")
1834 logging.LogErrorf("put index into data repo before cloud sync failed: %s", err)
1835 return
1836 }
1837 util.PushStatusBar(fmt.Sprintf(Conf.Language(147), elapsed.Seconds()))
1838 } else {
1839 util.PushStatusBar(fmt.Sprintf(Conf.Language(148), elapsed.Seconds()))
1840 }
1841
1842 if Conf.Repo.SyncIndexTiming < elapsed.Milliseconds() {
1843 logging.LogWarnf("index data repo before cloud sync elapsed [%dms]", elapsed.Milliseconds())
1844 if !promotedPurgeDataRepo {
1845 go func() {
1846 util.WaitForUILoaded()
1847 time.Sleep(3 * time.Second)
1848
1849 if indexCount, _ := repo.CountIndexes(); 128 > indexCount {
1850 // 快照数量较少时不推送提示
1851 return
1852 }
1853
1854 util.PushMsg(Conf.language(218), 24000)
1855 promotedPurgeDataRepo = true
1856 }()
1857 }
1858 }
1859 return
1860}
1861
1862func newRepository() (ret *dejavu.Repo, err error) {
1863 cloudConf, err := buildCloudConf()
1864 if err != nil {
1865 return
1866 }
1867
1868 var cloudRepo cloud.Cloud
1869 switch Conf.Sync.Provider {
1870 case conf.ProviderSiYuan:
1871 cloudRepo = cloud.NewSiYuan(&cloud.BaseCloud{Conf: cloudConf})
1872 case conf.ProviderS3:
1873 s3HTTPClient := &http.Client{Transport: httpclient.NewTransport(cloudConf.S3.SkipTlsVerify)}
1874 s3HTTPClient.Timeout = time.Duration(cloudConf.S3.Timeout) * time.Second
1875 cloudRepo = cloud.NewS3(&cloud.BaseCloud{Conf: cloudConf}, s3HTTPClient)
1876 case conf.ProviderWebDAV:
1877 webdavClient := gowebdav.NewClient(cloudConf.WebDAV.Endpoint, cloudConf.WebDAV.Username, cloudConf.WebDAV.Password)
1878 a := cloudConf.WebDAV.Username + ":" + cloudConf.WebDAV.Password
1879 auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(a))
1880 webdavClient.SetHeader("Authorization", auth)
1881 webdavClient.SetHeader("User-Agent", util.UserAgent)
1882 webdavClient.SetTimeout(time.Duration(cloudConf.WebDAV.Timeout) * time.Second)
1883 webdavClient.SetTransport(httpclient.NewTransport(cloudConf.WebDAV.SkipTlsVerify))
1884 cloudRepo = cloud.NewWebDAV(&cloud.BaseCloud{Conf: cloudConf}, webdavClient)
1885 case conf.ProviderLocal:
1886 cloudRepo = cloud.NewLocal(&cloud.BaseCloud{Conf: cloudConf})
1887 default:
1888 err = fmt.Errorf("unknown cloud provider [%d]", Conf.Sync.Provider)
1889 return
1890 }
1891
1892 ignoreLines := getSyncIgnoreLines()
1893 ignoreLines = append(ignoreLines, "/.siyuan/conf.json") // 忽略旧版同步配置
1894 ret, err = dejavu.NewRepo(util.DataDir, util.RepoDir, util.HistoryDir, util.TempDir, Conf.System.ID, Conf.System.Name, Conf.System.OS, Conf.Repo.Key, ignoreLines, cloudRepo)
1895 if err != nil {
1896 logging.LogErrorf("init data repo failed: %s", err)
1897 return
1898 }
1899 return
1900}
1901
1902func init() {
1903 subscribeRepoEvents()
1904}
1905
1906func subscribeRepoEvents() {
1907 eventbus.Subscribe(eventbus.EvtIndexBeforeWalkData, func(context map[string]interface{}, path string) {
1908 msg := fmt.Sprintf(Conf.Language(158), path)
1909 util.SetBootDetails(msg)
1910 util.ContextPushMsg(context, msg)
1911 })
1912
1913 indexWalkDataCount := 0
1914 eventbus.Subscribe(eventbus.EvtIndexWalkData, func(context map[string]interface{}, path string) {
1915 msg := fmt.Sprintf(Conf.Language(158), filepath.Base(path))
1916 if 0 == indexWalkDataCount%1024 {
1917 util.SetBootDetails(msg)
1918 util.ContextPushMsg(context, msg)
1919 }
1920 indexWalkDataCount++
1921 })
1922 eventbus.Subscribe(eventbus.EvtIndexBeforeGetLatestFiles, func(context map[string]interface{}, total int) {
1923 msg := fmt.Sprintf(Conf.Language(159), 0, total)
1924 util.SetBootDetails(msg)
1925 util.ContextPushMsg(context, msg)
1926 })
1927
1928 eventbus.Subscribe(eventbus.EvtIndexGetLatestFile, func(context map[string]interface{}, count int, total int) {
1929 msg := fmt.Sprintf(Conf.Language(159), count, total)
1930 if 0 == count%64 {
1931 util.SetBootDetails(msg)
1932 util.ContextPushMsg(context, msg)
1933 }
1934 })
1935 eventbus.Subscribe(eventbus.EvtIndexUpsertFiles, func(context map[string]interface{}, total int) {
1936 msg := fmt.Sprintf(Conf.Language(160), 0, total)
1937 util.SetBootDetails(msg)
1938 util.ContextPushMsg(context, msg)
1939 })
1940 eventbus.Subscribe(eventbus.EvtIndexUpsertFile, func(context map[string]interface{}, count int, total int) {
1941 msg := fmt.Sprintf(Conf.Language(160), count, total)
1942 if 0 == count%32 {
1943 util.SetBootDetails(msg)
1944 util.ContextPushMsg(context, msg)
1945 }
1946 })
1947
1948 eventbus.Subscribe(eventbus.EvtCheckoutBeforeWalkData, func(context map[string]interface{}, path string) {
1949 msg := fmt.Sprintf(Conf.Language(161), path)
1950 util.SetBootDetails(msg)
1951 util.ContextPushMsg(context, msg)
1952 })
1953 coWalkDataCount := 0
1954 eventbus.Subscribe(eventbus.EvtCheckoutWalkData, func(context map[string]interface{}, path string) {
1955 msg := fmt.Sprintf(Conf.Language(161), filepath.Base(path))
1956 if 0 == coWalkDataCount%512 {
1957 util.SetBootDetails(msg)
1958 util.ContextPushMsg(context, msg)
1959 }
1960 coWalkDataCount++
1961 })
1962 var bootProgressPart int32
1963 eventbus.Subscribe(eventbus.EvtCheckoutUpsertFiles, func(context map[string]interface{}, total int) {
1964 msg := fmt.Sprintf(Conf.Language(162), 0, total)
1965 util.SetBootDetails(msg)
1966 bootProgressPart = int32(10 / float64(total))
1967 util.ContextPushMsg(context, msg)
1968 })
1969 coUpsertFileCount := 0
1970 eventbus.Subscribe(eventbus.EvtCheckoutUpsertFile, func(context map[string]interface{}, count, total int) {
1971 msg := fmt.Sprintf(Conf.Language(162), count, total)
1972 util.IncBootProgress(bootProgressPart, msg)
1973 if 0 == coUpsertFileCount%32 {
1974 util.ContextPushMsg(context, msg)
1975 }
1976 coUpsertFileCount++
1977 })
1978 eventbus.Subscribe(eventbus.EvtCheckoutRemoveFiles, func(context map[string]interface{}, total int) {
1979 msg := fmt.Sprintf(Conf.Language(163), 0, total)
1980 util.SetBootDetails(msg)
1981 bootProgressPart = int32(10 / float64(total))
1982 util.ContextPushMsg(context, msg)
1983 })
1984
1985 eventbus.Subscribe(eventbus.EvtCheckoutRemoveFile, func(context map[string]interface{}, count, total int) {
1986 msg := fmt.Sprintf(Conf.Language(163), count, total)
1987 util.IncBootProgress(bootProgressPart, msg)
1988 if 0 == count%64 {
1989 util.ContextPushMsg(context, msg)
1990 }
1991 })
1992
1993 eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadIndex, func(context map[string]interface{}, id string) {
1994 msg := fmt.Sprintf(Conf.Language(164), id[:7])
1995 util.IncBootProgress(1, msg)
1996 util.ContextPushMsg(context, msg)
1997 })
1998
1999 eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadFiles, func(context map[string]interface{}, total int) {
2000 msg := fmt.Sprintf(Conf.Language(165), 0, total)
2001 util.SetBootDetails(msg)
2002 bootProgressPart = int32(10 / float64(total))
2003 util.ContextPushMsg(context, msg)
2004 })
2005
2006 eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadFile, func(context map[string]interface{}, count, total int) {
2007 msg := fmt.Sprintf(Conf.Language(165), count, total)
2008 util.IncBootProgress(bootProgressPart, msg)
2009 if 0 == count%8 {
2010 util.ContextPushMsg(context, msg)
2011 }
2012 })
2013 eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadChunks, func(context map[string]interface{}, total int) {
2014 msg := fmt.Sprintf(Conf.Language(166), 0, total)
2015 util.SetBootDetails(msg)
2016 bootProgressPart = int32(10 / float64(total))
2017 util.ContextPushMsg(context, msg)
2018 })
2019 eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadChunk, func(context map[string]interface{}, count, total int) {
2020 msg := fmt.Sprintf(Conf.Language(166), count, total)
2021 util.IncBootProgress(bootProgressPart, msg)
2022 if 0 == count%8 {
2023 util.ContextPushMsg(context, msg)
2024 }
2025 })
2026 eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadRef, func(context map[string]interface{}, ref string) {
2027 msg := fmt.Sprintf(Conf.Language(167), ref)
2028 util.IncBootProgress(1, msg)
2029 util.ContextPushMsg(context, msg)
2030 })
2031 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadIndex, func(context map[string]interface{}, id string) {
2032 msg := fmt.Sprintf(Conf.Language(168), id[:7])
2033 util.IncBootProgress(1, msg)
2034 util.ContextPushMsg(context, msg)
2035 })
2036 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadFiles, func(context map[string]interface{}, total int) {
2037 msg := fmt.Sprintf(Conf.Language(169), 0, total)
2038 util.SetBootDetails(msg)
2039 util.ContextPushMsg(context, msg)
2040 })
2041 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadFile, func(context map[string]interface{}, count, total int) {
2042 msg := fmt.Sprintf(Conf.Language(169), count, total)
2043 if 0 == count%8 {
2044 util.SetBootDetails(msg)
2045 util.ContextPushMsg(context, msg)
2046 }
2047 })
2048 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadChunks, func(context map[string]interface{}, total int) {
2049 msg := fmt.Sprintf(Conf.Language(170), 0, total)
2050 util.SetBootDetails(msg)
2051 util.ContextPushMsg(context, msg)
2052 })
2053 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadChunk, func(context map[string]interface{}, count, total int) {
2054 msg := fmt.Sprintf(Conf.Language(170), count, total)
2055 if 0 == count%8 {
2056 util.SetBootDetails(msg)
2057 util.ContextPushMsg(context, msg)
2058 }
2059 })
2060 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadRef, func(context map[string]interface{}, ref string) {
2061 msg := fmt.Sprintf(Conf.Language(171), ref)
2062 util.SetBootDetails(msg)
2063 util.ContextPushMsg(context, msg)
2064 })
2065 eventbus.Subscribe(eventbus.EvtCloudLock, func(context map[string]interface{}) {
2066 msg := fmt.Sprintf(Conf.Language(186))
2067 util.SetBootDetails(msg)
2068 util.ContextPushMsg(context, msg)
2069 })
2070 eventbus.Subscribe(eventbus.EvtCloudUnlock, func(context map[string]interface{}) {
2071 msg := fmt.Sprintf(Conf.Language(187))
2072 util.SetBootDetails(msg)
2073 util.ContextPushMsg(context, msg)
2074 })
2075 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadIndexes, func(context map[string]interface{}) {
2076 msg := fmt.Sprintf(Conf.Language(208))
2077 util.SetBootDetails(msg)
2078 util.ContextPushMsg(context, msg)
2079 })
2080 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadCheckIndex, func(context map[string]interface{}) {
2081 msg := fmt.Sprintf(Conf.Language(209))
2082 util.SetBootDetails(msg)
2083 util.ContextPushMsg(context, msg)
2084 })
2085 eventbus.Subscribe(eventbus.EvtCloudBeforeFixObjects, func(context map[string]interface{}, count, total int) {
2086 msg := fmt.Sprintf(Conf.Language(210), count, total)
2087 util.SetBootDetails(msg)
2088 util.ContextPushMsg(context, msg)
2089 })
2090 eventbus.Subscribe(eventbus.EvtCloudAfterFixObjects, func(context map[string]interface{}) {
2091 msg := fmt.Sprintf(Conf.Language(211))
2092 util.SetBootDetails(msg)
2093 util.ContextPushMsg(context, msg)
2094 })
2095 eventbus.Subscribe(eventbus.EvtCloudCorrupted, func() {
2096 util.PushErrMsg(Conf.language(220), 30000)
2097 })
2098 eventbus.Subscribe(eventbus.EvtCloudPurgeListObjects, func(context map[string]interface{}) {
2099 util.ContextPushMsg(context, Conf.language(224))
2100 })
2101 eventbus.Subscribe(eventbus.EvtCloudPurgeListIndexes, func(context map[string]interface{}) {
2102 util.ContextPushMsg(context, Conf.language(225))
2103 })
2104 eventbus.Subscribe(eventbus.EvtCloudPurgeListRefs, func(context map[string]interface{}) {
2105 util.ContextPushMsg(context, Conf.language(226))
2106 })
2107 eventbus.Subscribe(eventbus.EvtCloudPurgeDownloadIndexes, func(context map[string]interface{}) {
2108 util.ContextPushMsg(context, fmt.Sprintf(Conf.language(227)))
2109 })
2110 eventbus.Subscribe(eventbus.EvtCloudPurgeDownloadFiles, func(context map[string]interface{}) {
2111 util.ContextPushMsg(context, Conf.language(228))
2112 })
2113 eventbus.Subscribe(eventbus.EvtCloudPurgeRemoveIndexes, func(context map[string]interface{}) {
2114 util.ContextPushMsg(context, Conf.language(229))
2115 })
2116 eventbus.Subscribe(eventbus.EvtCloudPurgeRemoveIndexesV2, func(context map[string]interface{}) {
2117 util.ContextPushMsg(context, Conf.language(230))
2118 })
2119 eventbus.Subscribe(eventbus.EvtCloudPurgeRemoveObjects, func(context map[string]interface{}) {
2120 util.ContextPushMsg(context, Conf.language(231))
2121 })
2122}
2123
2124func buildCloudConf() (ret *cloud.Conf, err error) {
2125 if !cloud.IsValidCloudDirName(Conf.Sync.CloudName) {
2126 logging.LogWarnf("invalid cloud repo name, rename it to [main]")
2127 Conf.Sync.CloudName = "main"
2128 Conf.Save()
2129 }
2130
2131 userId, token, availableSize := "0", "", int64(1024*1024*1024*1024*2)
2132 if nil != Conf.User && conf.ProviderSiYuan == Conf.Sync.Provider {
2133 u := Conf.GetUser()
2134 userId = u.UserId
2135 token = u.UserToken
2136 availableSize = u.GetCloudRepoAvailableSize()
2137 }
2138
2139 ret = &cloud.Conf{
2140 Dir: Conf.Sync.CloudName,
2141 UserID: userId,
2142 Token: token,
2143 AvailableSize: availableSize,
2144 Server: util.GetCloudServer(),
2145 }
2146
2147 switch Conf.Sync.Provider {
2148 case conf.ProviderSiYuan:
2149 ret.Endpoint = util.GetCloudSyncServer()
2150 case conf.ProviderS3:
2151 ret.S3 = &cloud.ConfS3{
2152 Endpoint: Conf.Sync.S3.Endpoint,
2153 AccessKey: Conf.Sync.S3.AccessKey,
2154 SecretKey: Conf.Sync.S3.SecretKey,
2155 Bucket: Conf.Sync.S3.Bucket,
2156 Region: Conf.Sync.S3.Region,
2157 PathStyle: Conf.Sync.S3.PathStyle,
2158 SkipTlsVerify: Conf.Sync.S3.SkipTlsVerify,
2159 Timeout: Conf.Sync.S3.Timeout,
2160 ConcurrentReqs: Conf.Sync.S3.ConcurrentReqs,
2161 }
2162 case conf.ProviderWebDAV:
2163 ret.WebDAV = &cloud.ConfWebDAV{
2164 Endpoint: Conf.Sync.WebDAV.Endpoint,
2165 Username: Conf.Sync.WebDAV.Username,
2166 Password: Conf.Sync.WebDAV.Password,
2167 SkipTlsVerify: Conf.Sync.WebDAV.SkipTlsVerify,
2168 Timeout: Conf.Sync.WebDAV.Timeout,
2169 ConcurrentReqs: Conf.Sync.WebDAV.ConcurrentReqs,
2170 }
2171 case conf.ProviderLocal:
2172 ret.Local = &cloud.ConfLocal{
2173 Endpoint: Conf.Sync.Local.Endpoint,
2174 Timeout: Conf.Sync.Local.Timeout,
2175 ConcurrentReqs: Conf.Sync.Local.ConcurrentReqs,
2176 }
2177 default:
2178 err = fmt.Errorf("invalid provider [%d]", Conf.Sync.Provider)
2179 return
2180 }
2181 return
2182}
2183
2184type Backup struct {
2185 Size int64 `json:"size"`
2186 HSize string `json:"hSize"`
2187 Updated string `json:"updated"`
2188 SaveDir string `json:"saveDir"` // 本地备份数据存放目录路径
2189}
2190
2191type Sync struct {
2192 Size int64 `json:"size"`
2193 HSize string `json:"hSize"`
2194 Updated string `json:"updated"`
2195 CloudName string `json:"cloudName"` // 云端同步数据存放目录名
2196 SaveDir string `json:"saveDir"` // 本地同步数据存放目录路径
2197}
2198
2199func GetCloudSpace() (s *Sync, b *Backup, hSize, hAssetSize, hTotalSize, hExchangeSize, hTrafficUploadSize, hTrafficDownloadSize, hTrafficAPIGet, hTrafficAPIPut string, err error) {
2200 stat, err := getCloudSpace()
2201 if err != nil {
2202 err = errors.New(Conf.Language(30) + " " + err.Error())
2203 return
2204 }
2205
2206 syncSize := stat.Sync.Size
2207 syncUpdated := stat.Sync.Updated
2208 s = &Sync{
2209 Size: syncSize,
2210 HSize: "-",
2211 Updated: syncUpdated,
2212 }
2213
2214 backupSize := stat.Backup.Size
2215 backupUpdated := stat.Backup.Updated
2216 b = &Backup{
2217 Size: backupSize,
2218 HSize: "-",
2219 Updated: backupUpdated,
2220 }
2221
2222 assetSize := stat.AssetSize
2223 totalSize := syncSize + backupSize + assetSize
2224 hAssetSize = "-"
2225 hSize = "-"
2226 hTotalSize = "-"
2227 hExchangeSize = "-"
2228 hTrafficUploadSize = "-"
2229 hTrafficDownloadSize = "-"
2230 hTrafficAPIGet = "-"
2231 hTrafficAPIPut = "-"
2232 if conf.ProviderSiYuan == Conf.Sync.Provider {
2233 s.HSize = humanize.BytesCustomCeil(uint64(syncSize), 2)
2234 b.HSize = humanize.BytesCustomCeil(uint64(backupSize), 2)
2235 hAssetSize = humanize.BytesCustomCeil(uint64(assetSize), 2)
2236 hSize = humanize.BytesCustomCeil(uint64(totalSize), 2)
2237 u := Conf.GetUser()
2238 hTotalSize = humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2)
2239 hExchangeSize = humanize.BytesCustomCeil(uint64(u.UserSiYuanPointExchangeRepoSize), 2)
2240 hTrafficUploadSize = humanize.BytesCustomCeil(uint64(u.UserTrafficUpload), 2)
2241 hTrafficDownloadSize = humanize.BytesCustomCeil(uint64(u.UserTrafficDownload), 2)
2242 hTrafficAPIGet = humanize.SIWithDigits(u.UserTrafficAPIGet, 2, "")
2243 hTrafficAPIPut = humanize.SIWithDigits(u.UserTrafficAPIPut, 2, "")
2244 }
2245 return
2246}
2247
2248func getCloudSpace() (stat *cloud.Stat, err error) {
2249 repo, err := newRepository()
2250 if err != nil {
2251 return
2252 }
2253
2254 stat, err = repo.GetCloudRepoStat()
2255 if err != nil {
2256 logging.LogErrorf("get cloud repo stat failed: %s", err)
2257 return
2258 }
2259 return
2260}