A privacy-first, self-hosted, fully open source personal knowledge management software, written in typescript and golang. (PERSONAL FORK)
at lambda-fork/main 2260 lines 64 kB view raw
1// SiYuan - Refactor your thinking 2// Copyright (c) 2020-present, b3log.org 3// 4// This program is free software: you can redistribute it and/or modify 5// it under the terms of the GNU Affero General Public License as published by 6// the Free Software Foundation, either version 3 of the License, or 7// (at your option) any later version. 8// 9// This program is distributed in the hope that it will be useful, 10// but WITHOUT ANY WARRANTY; without even the implied warranty of 11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12// GNU Affero General Public License for more details. 13// 14// You should have received a copy of the GNU Affero General Public License 15// along with this program. If not, see <https://www.gnu.org/licenses/>. 16 17package model 18 19import ( 20 "bytes" 21 "crypto/rand" 22 "crypto/sha1" 23 "crypto/sha256" 24 "encoding/base64" 25 "errors" 26 "fmt" 27 "math" 28 mathRand "math/rand" 29 "mime" 30 "net/http" 31 "os" 32 "path" 33 "path/filepath" 34 "sort" 35 "strings" 36 "sync" 37 "sync/atomic" 38 "time" 39 40 "github.com/88250/go-humanize" 41 "github.com/88250/gulu" 42 "github.com/88250/lute" 43 "github.com/88250/lute/ast" 44 "github.com/88250/lute/html" 45 "github.com/88250/lute/parse" 46 "github.com/88250/lute/render" 47 "github.com/emirpasic/gods/sets/hashset" 48 "github.com/siyuan-note/dataparser" 49 "github.com/siyuan-note/dejavu" 50 "github.com/siyuan-note/dejavu/cloud" 51 "github.com/siyuan-note/dejavu/entity" 52 "github.com/siyuan-note/encryption" 53 "github.com/siyuan-note/eventbus" 54 "github.com/siyuan-note/httpclient" 55 "github.com/siyuan-note/logging" 56 "github.com/siyuan-note/siyuan/kernel/conf" 57 "github.com/siyuan-note/siyuan/kernel/task" 58 "github.com/siyuan-note/siyuan/kernel/treenode" 59 "github.com/siyuan-note/siyuan/kernel/util" 60 "github.com/studio-b12/gowebdav" 61) 62 63// AutoPurgeRepoJob 自动清理数据仓库 https://github.com/siyuan-note/siyuan/issues/13091 64func AutoPurgeRepoJob() { 65 task.AppendTaskWithTimeout(task.RepoAutoPurge, 12*time.Hour, autoPurgeRepo, true) 66} 67 68var ( 69 autoPurgeRepoAfterFirstSync = false 70 lastAutoPurgeRepo = time.Time{} 71) 72 73func autoPurgeRepo(cron bool) { 74 if cron && !autoPurgeRepoAfterFirstSync { 75 return 76 } 77 if time.Since(lastAutoPurgeRepo) < 6*time.Hour { 78 return 79 } 80 81 autoPurgeRepoAfterFirstSync = true 82 defer func() { 83 lastAutoPurgeRepo = time.Now() 84 }() 85 86 if 1 > len(Conf.Repo.Key) { 87 return 88 } 89 90 repo, err := newRepository() 91 if err != nil { 92 return 93 } 94 95 now := time.Now() 96 97 dateGroupedIndexes := map[string][]*entity.Index{} // 按照日期分组 98 // 收集指定日期内需要保留的索引 99 var date string 100 page := 1 101 for { 102 indexes, pageCount, _, err := repo.GetIndexes(page, 512) 103 if nil != err { 104 logging.LogErrorf("get data repo index logs failed: %s", err) 105 return 106 } 107 if 1 > len(indexes) { 108 break 109 } 110 111 tooOld := false 112 for _, index := range indexes { 113 if now.UnixMilli()-index.Created <= int64(Conf.Repo.IndexRetentionDays)*24*60*60*1000 { 114 date = time.UnixMilli(index.Created).Format("2006-01-02") 115 if _, ok := dateGroupedIndexes[date]; !ok { 116 dateGroupedIndexes[date] = []*entity.Index{} 117 } 118 dateGroupedIndexes[date] = append(dateGroupedIndexes[date], index) 119 } else { 120 tooOld = true 121 break 122 } 123 } 124 if tooOld { 125 break 126 } 127 page++ 128 if page > pageCount { 129 break 130 } 131 } 132 133 todayDate := now.Format("2006-01-02") 134 // 筛选出每日需要保留的索引 135 var retentionIndexIDs []string 136 for date, indexes := range dateGroupedIndexes { 137 if len(indexes) <= Conf.Repo.RetentionIndexesDaily || todayDate == date { 138 for _, index := range indexes { 139 retentionIndexIDs = append(retentionIndexIDs, index.ID) 140 } 141 continue 142 } 143 144 keepIndexes := hashset.New() 145 keepIndexes.Add(indexes[0]) // 每天最后一个固定保留 146 // 随机保留指定数量的索引 147 for i := 0; i < Conf.Repo.RetentionIndexesDaily*7; i++ { 148 keepIndexes.Add(indexes[mathRand.Intn(len(indexes)-1)]) 149 if keepIndexes.Size() >= Conf.Repo.RetentionIndexesDaily { 150 break 151 } 152 } 153 154 for _, keepIndex := range keepIndexes.Values() { 155 retentionIndexIDs = append(retentionIndexIDs, keepIndex.(*entity.Index).ID) 156 } 157 } 158 159 retentionIndexIDs = gulu.Str.RemoveDuplicatedElem(retentionIndexIDs) 160 if 3 > len(retentionIndexIDs) { 161 logging.LogInfof("no index to purge [ellapsed=%.2fs]", time.Since(now).Seconds()) 162 return 163 } 164 165 _, err = repo.Purge(retentionIndexIDs...) 166} 167 168func GetRepoFile(fileID string) (ret []byte, p string, err error) { 169 if 1 > len(Conf.Repo.Key) { 170 err = errors.New(Conf.Language(26)) 171 return 172 } 173 174 repo, err := newRepository() 175 if err != nil { 176 return 177 } 178 179 file, err := repo.GetFile(fileID) 180 if err != nil { 181 return 182 } 183 184 ret, err = repo.OpenFile(file) 185 p = file.Path 186 return 187} 188 189func OpenRepoSnapshotDoc(fileID string) (title, content string, displayInText bool, updated int64, err error) { 190 if 1 > len(Conf.Repo.Key) { 191 err = errors.New(Conf.Language(26)) 192 return 193 } 194 195 repo, err := newRepository() 196 if err != nil { 197 return 198 } 199 200 file, err := repo.GetFile(fileID) 201 if err != nil { 202 return 203 } 204 205 data, err := repo.OpenFile(file) 206 if err != nil { 207 return 208 } 209 210 updated = file.Updated 211 212 if strings.HasSuffix(file.Path, ".sy") { 213 luteEngine := NewLute() 214 var snapshotTree *parse.Tree 215 displayInText, snapshotTree, err = parseTreeInSnapshot(data, luteEngine) 216 if err != nil { 217 logging.LogErrorf("parse tree from snapshot file [%s] failed", fileID) 218 return 219 } 220 title = snapshotTree.Root.IALAttr("title") 221 222 if !displayInText { 223 renderTree := &parse.Tree{Root: &ast.Node{Type: ast.NodeDocument}} 224 var unlinks []*ast.Node 225 ast.Walk(snapshotTree.Root, func(n *ast.Node, entering bool) ast.WalkStatus { 226 if !entering { 227 return ast.WalkContinue 228 } 229 230 n.RemoveIALAttr("heading-fold") 231 n.RemoveIALAttr("fold") 232 return ast.WalkContinue 233 }) 234 235 for _, unlink := range unlinks { 236 unlink.Unlink() 237 } 238 239 var appends []*ast.Node 240 for n := snapshotTree.Root.FirstChild; nil != n; n = n.Next { 241 appends = append(appends, n) 242 } 243 for _, n := range appends { 244 renderTree.Root.AppendChild(n) 245 } 246 247 snapshotTree = renderTree 248 } 249 250 luteEngine.RenderOptions.ProtyleContenteditable = false 251 if displayInText { 252 util.PushMsg(Conf.Language(36), 5000) 253 formatRenderer := render.NewFormatRenderer(snapshotTree, luteEngine.RenderOptions) 254 content = gulu.Str.FromBytes(formatRenderer.Render()) 255 } else { 256 content = luteEngine.Tree2BlockDOM(snapshotTree, luteEngine.RenderOptions) 257 } 258 } else { 259 displayInText = true 260 title = file.Path 261 if mimeType := mime.TypeByExtension(filepath.Ext(file.Path)); strings.HasPrefix(mimeType, "text/") || strings.Contains(mimeType, "json") { 262 // 如果是文本文件,直接返回文本内容 263 // All plain text formats are supported when comparing data snapshots https://github.com/siyuan-note/siyuan/issues/12975 264 content = gulu.Str.FromBytes(data) 265 } else { 266 if strings.Contains(file.Path, "assets/") { // 剔除笔记本级或者文档级资源文件路径前缀 267 file.Path = file.Path[strings.Index(file.Path, "assets/"):] 268 if util.IsDisplayableAsset(file.Path) { 269 dir, f := filepath.Split(file.Path) 270 tempRepoDiffDir := filepath.Join(util.TempDir, "repo", "diff", dir) 271 if mkErr := os.MkdirAll(tempRepoDiffDir, 0755); nil != mkErr { 272 logging.LogErrorf("mkdir [%s] failed: %v", tempRepoDiffDir, mkErr) 273 } else { 274 if wrErr := os.WriteFile(filepath.Join(tempRepoDiffDir, f), data, 0644); nil != wrErr { 275 logging.LogErrorf("write file [%s] failed: %v", filepath.Join(tempRepoDiffDir, file.Path), wrErr) 276 } 277 } 278 content = path.Join("repo", "diff", file.Path) 279 } 280 } else { 281 content = file.Path 282 } 283 } 284 } 285 return 286} 287 288type LeftRightDiff struct { 289 LeftIndex *DiffIndex `json:"leftIndex"` 290 RightIndex *DiffIndex `json:"rightIndex"` 291 AddsLeft []*DiffFile `json:"addsLeft"` 292 UpdatesLeft []*DiffFile `json:"updatesLeft"` 293 UpdatesRight []*DiffFile `json:"updatesRight"` 294 RemovesRight []*DiffFile `json:"removesRight"` 295} 296 297type DiffFile struct { 298 FileID string `json:"fileID"` 299 Title string `json:"title"` 300 Path string `json:"path"` 301 HSize string `json:"hSize"` 302 Updated int64 `json:"updated"` 303} 304 305type DiffIndex struct { 306 ID string `json:"id"` 307 Created int64 `json:"created"` 308} 309 310func DiffRepoSnapshots(left, right string) (ret *LeftRightDiff, err error) { 311 if 1 > len(Conf.Repo.Key) { 312 err = errors.New(Conf.Language(26)) 313 return 314 } 315 316 repo, err := newRepository() 317 if err != nil { 318 return 319 } 320 321 diff, err := repo.DiffIndex(left, right) 322 if err != nil { 323 return 324 } 325 326 ret = &LeftRightDiff{ 327 LeftIndex: &DiffIndex{ 328 ID: diff.LeftIndex.ID, 329 Created: diff.LeftIndex.Created, 330 }, 331 RightIndex: &DiffIndex{ 332 ID: diff.RightIndex.ID, 333 Created: diff.RightIndex.Created, 334 }, 335 } 336 luteEngine := NewLute() 337 for _, removeRight := range diff.RemovesRight { 338 title, parseErr := parseTitleInSnapshot(removeRight.ID, repo, luteEngine) 339 if "" == title || nil != parseErr { 340 continue 341 } 342 343 ret.AddsLeft = append(ret.AddsLeft, &DiffFile{ 344 FileID: removeRight.ID, 345 Title: title, 346 Path: removeRight.Path, 347 HSize: humanize.BytesCustomCeil(uint64(removeRight.Size), 2), 348 Updated: removeRight.Updated, 349 }) 350 } 351 if 1 > len(ret.AddsLeft) { 352 ret.AddsLeft = []*DiffFile{} 353 } 354 355 for _, addLeft := range diff.AddsLeft { 356 title, parseErr := parseTitleInSnapshot(addLeft.ID, repo, luteEngine) 357 if "" == title || nil != parseErr { 358 continue 359 } 360 361 ret.RemovesRight = append(ret.RemovesRight, &DiffFile{ 362 FileID: addLeft.ID, 363 Title: title, 364 Path: addLeft.Path, 365 HSize: humanize.BytesCustomCeil(uint64(addLeft.Size), 2), 366 Updated: addLeft.Updated, 367 }) 368 } 369 if 1 > len(ret.RemovesRight) { 370 ret.RemovesRight = []*DiffFile{} 371 } 372 373 for _, updateLeft := range diff.UpdatesLeft { 374 title, parseErr := parseTitleInSnapshot(updateLeft.ID, repo, luteEngine) 375 if "" == title || nil != parseErr { 376 continue 377 } 378 379 ret.UpdatesLeft = append(ret.UpdatesLeft, &DiffFile{ 380 FileID: updateLeft.ID, 381 Title: title, 382 Path: updateLeft.Path, 383 HSize: humanize.BytesCustomCeil(uint64(updateLeft.Size), 2), 384 Updated: updateLeft.Updated, 385 }) 386 } 387 if 1 > len(ret.UpdatesLeft) { 388 ret.UpdatesLeft = []*DiffFile{} 389 } 390 391 for _, updateRight := range diff.UpdatesRight { 392 title, parseErr := parseTitleInSnapshot(updateRight.ID, repo, luteEngine) 393 if "" == title || nil != parseErr { 394 continue 395 } 396 397 ret.UpdatesRight = append(ret.UpdatesRight, &DiffFile{ 398 FileID: updateRight.ID, 399 Title: title, 400 Path: updateRight.Path, 401 HSize: humanize.BytesCustomCeil(uint64(updateRight.Size), 2), 402 Updated: updateRight.Updated, 403 }) 404 } 405 if 1 > len(ret.UpdatesRight) { 406 ret.UpdatesRight = []*DiffFile{} 407 } 408 return 409} 410 411func parseTitleInSnapshot(fileID string, repo *dejavu.Repo, luteEngine *lute.Lute) (title string, err error) { 412 file, err := repo.GetFile(fileID) 413 if err != nil { 414 logging.LogErrorf("get file [%s] failed: %s", fileID, err) 415 return 416 } 417 418 title = path.Base(file.Path) 419 if strings.HasSuffix(file.Path, ".sy") { 420 var data []byte 421 data, err = repo.OpenFile(file) 422 if err != nil { 423 logging.LogErrorf("open file [%s] failed: %s", fileID, err) 424 return 425 } 426 427 var tree *parse.Tree 428 tree, err = dataparser.ParseJSONWithoutFix(data, luteEngine.ParseOptions) 429 if err != nil { 430 logging.LogErrorf("parse file [%s] failed: %s", fileID, err) 431 return 432 } 433 434 title = tree.Root.IALAttr("title") 435 } 436 return 437} 438 439func parseTreeInSnapshot(data []byte, luteEngine *lute.Lute) (isLargeDoc bool, tree *parse.Tree, err error) { 440 isLargeDoc = 1024*1024*1 <= len(data) 441 tree, err = dataparser.ParseJSONWithoutFix(data, luteEngine.ParseOptions) 442 if err != nil { 443 return 444 } 445 return 446} 447 448type Snapshot struct { 449 *dejavu.Log 450 TypesCount []*TypeCount `json:"typesCount"` 451} 452 453type TypeCount struct { 454 Type string `json:"type"` 455 Count int `json:"count"` 456} 457 458func GetRepoSnapshots(page int) (ret []*Snapshot, pageCount, totalCount int, err error) { 459 ret = []*Snapshot{} 460 if 1 > len(Conf.Repo.Key) { 461 err = errors.New(Conf.Language(26)) 462 return 463 } 464 465 repo, err := newRepository() 466 if err != nil { 467 return 468 } 469 470 logs, pageCount, totalCount, err := repo.GetIndexLogs(page, 32) 471 if err != nil { 472 if dejavu.ErrNotFoundIndex == err { 473 logs = []*dejavu.Log{} 474 err = nil 475 return 476 } 477 478 logging.LogErrorf("get data repo index logs failed: %s", err) 479 return 480 } 481 482 ret = buildSnapshots(logs) 483 if 1 > len(ret) { 484 ret = []*Snapshot{} 485 } 486 return 487} 488 489func buildSnapshots(logs []*dejavu.Log) (ret []*Snapshot) { 490 for _, l := range logs { 491 typesCount := statTypesByPath(l.Files) 492 l.Files = nil // 置空,否则返回前端数据量太大 493 ret = append(ret, &Snapshot{ 494 Log: l, 495 TypesCount: typesCount, 496 }) 497 } 498 return 499} 500 501func statTypesByPath(files []*entity.File) (ret []*TypeCount) { 502 for _, f := range files { 503 ext := util.Ext(f.Path) 504 if "" == ext { 505 ext = "NoExt" 506 } 507 508 found := false 509 for _, tc := range ret { 510 511 if tc.Type == ext { 512 tc.Count++ 513 found = true 514 break 515 } 516 } 517 if !found { 518 ret = append(ret, &TypeCount{Type: ext, Count: 1}) 519 } 520 } 521 522 sort.Slice(ret, func(i, j int) bool { return ret[i].Count > ret[j].Count }) 523 if 10 < len(ret) { 524 otherCount := 0 525 for _, tc := range ret[10:] { 526 tc.Count += otherCount 527 } 528 other := &TypeCount{ 529 Type: "Other", 530 Count: otherCount, 531 } 532 ret = append(ret[:10], other) 533 } 534 return 535} 536 537func ImportRepoKey(base64Key string) (retKey string, err error) { 538 util.PushMsg(Conf.Language(136), 3000) 539 540 retKey = strings.TrimSpace(base64Key) 541 retKey = gulu.Str.RemoveInvisible(retKey) 542 if 1 > len(retKey) { 543 err = errors.New(Conf.Language(142)) 544 return 545 } 546 547 key, err := base64.StdEncoding.DecodeString(retKey) 548 if err != nil { 549 logging.LogErrorf("import data repo key failed: %s", err) 550 return "", errors.New(Conf.Language(157)) 551 } 552 if 32 != len(key) { 553 return "", errors.New(Conf.Language(157)) 554 } 555 556 Conf.Repo.Key = key 557 Conf.Save() 558 logging.LogInfof("imported repo key [%x]", sha1.Sum(Conf.Repo.Key)) 559 560 if err = os.RemoveAll(Conf.Repo.GetSaveDir()); err != nil { 561 return 562 } 563 if err = os.MkdirAll(Conf.Repo.GetSaveDir(), 0755); err != nil { 564 return 565 } 566 567 initDataRepo() 568 return 569} 570 571func ResetRepo() (err error) { 572 logging.LogInfof("resetting data repo...") 573 msgId := util.PushMsg(Conf.Language(144), 1000*60) 574 575 repo, err := newRepository() 576 if err != nil { 577 return 578 } 579 580 if err = repo.Reset(); err != nil { 581 logging.LogErrorf("reset data repo failed: %s", err) 582 return 583 } 584 logging.LogInfof("reset data repo completed") 585 586 Conf.Repo.Key = nil 587 Conf.Sync.Enabled = false 588 Conf.Save() 589 590 util.PushUpdateMsg(msgId, Conf.Language(145), 3000) 591 task.AppendAsyncTaskWithDelay(task.ReloadUI, 2*time.Second, util.ReloadUI) 592 return 593} 594 595func PurgeCloud() (err error) { 596 msg := Conf.Language(223) 597 util.PushEndlessProgress(msg) 598 defer util.PushClearProgress() 599 600 repo, err := newRepository() 601 if err != nil { 602 return 603 } 604 605 stat, err := repo.PurgeCloud() 606 if err != nil { 607 return 608 } 609 610 deletedIndexes := stat.Indexes 611 deletedObjects := stat.Objects 612 deletedSize := humanize.BytesCustomCeil(uint64(stat.Size), 2) 613 msg = fmt.Sprintf(Conf.Language(232), deletedIndexes, deletedObjects, deletedSize) 614 util.PushMsg(msg, 7000) 615 return 616} 617 618func PurgeRepo() (err error) { 619 msg := Conf.Language(202) 620 util.PushEndlessProgress(msg) 621 defer util.PushClearProgress() 622 623 repo, err := newRepository() 624 if err != nil { 625 return 626 } 627 628 stat, err := repo.Purge() 629 if err != nil { 630 return 631 } 632 633 deletedIndexes := stat.Indexes 634 deletedObjects := stat.Objects 635 deletedSize := humanize.BytesCustomCeil(uint64(stat.Size), 2) 636 msg = fmt.Sprintf(Conf.Language(203), deletedIndexes, deletedObjects, deletedSize) 637 util.PushMsg(msg, 7000) 638 return 639} 640 641func InitRepoKeyFromPassphrase(passphrase string) (err error) { 642 passphrase = gulu.Str.RemoveInvisible(passphrase) 643 passphrase = strings.TrimSpace(passphrase) 644 if "" == passphrase { 645 return errors.New(Conf.Language(142)) 646 } 647 648 util.PushMsg(Conf.Language(136), 3000) 649 if err = os.RemoveAll(Conf.Repo.GetSaveDir()); err != nil { 650 return 651 } 652 if err = os.MkdirAll(Conf.Repo.GetSaveDir(), 0755); err != nil { 653 return 654 } 655 656 var key []byte 657 base64Data, base64Err := base64.StdEncoding.DecodeString(passphrase) 658 if nil == base64Err && 32 == len(base64Data) { 659 // 改进数据仓库 `通过密码生成密钥` https://github.com/siyuan-note/siyuan/issues/6782 660 logging.LogInfof("passphrase is base64 encoded, use it as key directly") 661 key = base64Data 662 } else { 663 salt := fmt.Sprintf("%x", sha256.Sum256([]byte(passphrase)))[:16] 664 key, err = encryption.KDF(passphrase, salt) 665 if err != nil { 666 logging.LogErrorf("init data repo key failed: %s", err) 667 return 668 } 669 } 670 671 Conf.Repo.Key = key 672 Conf.Save() 673 logging.LogInfof("inited repo key [%x]", sha1.Sum(Conf.Repo.Key)) 674 675 initDataRepo() 676 return 677} 678 679func InitRepoKey() (err error) { 680 util.PushMsg(Conf.Language(136), 3000) 681 682 if err = os.RemoveAll(Conf.Repo.GetSaveDir()); err != nil { 683 return 684 } 685 if err = os.MkdirAll(Conf.Repo.GetSaveDir(), 0755); err != nil { 686 return 687 } 688 689 randomBytes := make([]byte, 16) 690 _, err = rand.Read(randomBytes) 691 if err != nil { 692 return 693 } 694 password := string(randomBytes) 695 randomBytes = make([]byte, 16) 696 _, err = rand.Read(randomBytes) 697 if err != nil { 698 logging.LogErrorf("init data repo key failed: %s", err) 699 return 700 } 701 salt := string(randomBytes) 702 703 key, err := encryption.KDF(password, salt) 704 if err != nil { 705 logging.LogErrorf("init data repo key failed: %s", err) 706 return 707 } 708 Conf.Repo.Key = key 709 Conf.Save() 710 logging.LogInfof("inited repo key [%x]", sha1.Sum(Conf.Repo.Key)) 711 712 initDataRepo() 713 return 714} 715 716func initDataRepo() { 717 time.Sleep(1 * time.Second) 718 util.PushMsg(Conf.Language(138), 3000) 719 time.Sleep(1 * time.Second) 720 if initErr := IndexRepo("[Init] Init local data repo"); nil != initErr { 721 util.PushErrMsg(fmt.Sprintf(Conf.Language(140), initErr), 0) 722 } 723} 724 725func CheckoutRepo(id string) { 726 task.AppendTask(task.RepoCheckout, checkoutRepo, id) 727} 728 729func checkoutRepo(id string) { 730 var err error 731 if 1 > len(Conf.Repo.Key) { 732 util.PushErrMsg(Conf.Language(26), 7000) 733 return 734 } 735 736 repo, err := newRepository() 737 if err != nil { 738 logging.LogErrorf("new repository failed: %s", err) 739 util.PushErrMsg(Conf.Language(141), 7000) 740 return 741 } 742 743 util.PushEndlessProgress(Conf.Language(63)) 744 FlushTxQueue() 745 CloseWatchAssets() 746 defer WatchAssets() 747 CloseWatchEmojis() 748 defer WatchEmojis() 749 750 // 恢复快照时自动暂停同步,避免刚刚恢复后的数据又被同步覆盖 751 syncEnabled := Conf.Sync.Enabled 752 Conf.Sync.Enabled = false 753 Conf.Save() 754 755 // 回滚快照时默认为当前数据创建一个快照 756 // When rolling back a snapshot, a snapshot is created for the current data by default https://github.com/siyuan-note/siyuan/issues/12470 757 FlushTxQueue() 758 _, err = repo.Index("Backup before checkout", false, map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress}) 759 if err != nil { 760 logging.LogErrorf("index repository failed: %s", err) 761 util.PushClearProgress() 762 util.PushErrMsg(fmt.Sprintf(Conf.Language(140), err), 0) 763 return 764 } 765 766 _, _, err = repo.Checkout(id, map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress}) 767 if err != nil { 768 logging.LogErrorf("checkout repository failed: %s", err) 769 util.PushClearProgress() 770 util.PushErrMsg(Conf.Language(141), 7000) 771 return 772 } 773 774 FullReindex() 775 776 if syncEnabled { 777 task.AppendAsyncTaskWithDelay(task.PushMsg, 7*time.Second, util.PushMsg, Conf.Language(134), 0) 778 } 779 return 780} 781 782func DownloadCloudSnapshot(tag, id string) (err error) { 783 if 1 > len(Conf.Repo.Key) { 784 err = errors.New(Conf.Language(26)) 785 return 786 } 787 788 repo, err := newRepository() 789 if err != nil { 790 return 791 } 792 793 switch Conf.Sync.Provider { 794 case conf.ProviderSiYuan: 795 if !IsSubscriber() { 796 util.PushErrMsg(Conf.Language(29), 5000) 797 return 798 } 799 case conf.ProviderWebDAV, conf.ProviderS3, conf.ProviderLocal: 800 if !IsPaidUser() { 801 util.PushErrMsg(Conf.Language(214), 5000) 802 return 803 } 804 } 805 806 defer util.PushClearProgress() 807 808 var downloadFileCount, downloadChunkCount int 809 var downloadBytes int64 810 if "" == tag { 811 downloadFileCount, downloadChunkCount, downloadBytes, err = repo.DownloadIndex(id, map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress}) 812 } else { 813 downloadFileCount, downloadChunkCount, downloadBytes, err = repo.DownloadTagIndex(tag, id, map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress}) 814 } 815 if err != nil { 816 return 817 } 818 msg := fmt.Sprintf(Conf.Language(153), downloadFileCount, downloadChunkCount, humanize.BytesCustomCeil(uint64(downloadBytes), 2)) 819 util.PushMsg(msg, 5000) 820 util.PushStatusBar(msg) 821 return 822} 823 824func UploadCloudSnapshot(tag, id string) (err error) { 825 if 1 > len(Conf.Repo.Key) { 826 err = errors.New(Conf.Language(26)) 827 return 828 } 829 830 repo, err := newRepository() 831 if err != nil { 832 return 833 } 834 835 switch Conf.Sync.Provider { 836 case conf.ProviderSiYuan: 837 if !IsSubscriber() { 838 util.PushErrMsg(Conf.Language(29), 5000) 839 return 840 } 841 case conf.ProviderWebDAV, conf.ProviderS3, conf.ProviderLocal: 842 if !IsPaidUser() { 843 util.PushErrMsg(Conf.Language(214), 5000) 844 return 845 } 846 } 847 848 util.PushEndlessProgress(Conf.Language(116)) 849 defer util.PushClearProgress() 850 uploadFileCount, uploadChunkCount, uploadBytes, err := repo.UploadTagIndex(tag, id, map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress}) 851 if err != nil { 852 if errors.Is(err, dejavu.ErrCloudBackupCountExceeded) { 853 err = fmt.Errorf(Conf.Language(84), Conf.Language(154)) 854 return 855 } 856 err = errors.New(fmt.Sprintf(Conf.Language(84), formatRepoErrorMsg(err))) 857 return 858 } 859 msg := fmt.Sprintf(Conf.Language(152), uploadFileCount, uploadChunkCount, humanize.BytesCustomCeil(uint64(uploadBytes), 2)) 860 util.PushMsg(msg, 5000) 861 util.PushStatusBar(msg) 862 return 863} 864 865func RemoveCloudRepoTag(tag string) (err error) { 866 if 1 > len(Conf.Repo.Key) { 867 err = errors.New(Conf.Language(26)) 868 return 869 } 870 871 if "" == tag { 872 err = errors.New("tag is empty") 873 return 874 } 875 876 repo, err := newRepository() 877 if err != nil { 878 return 879 } 880 881 switch Conf.Sync.Provider { 882 case conf.ProviderSiYuan: 883 if !IsSubscriber() { 884 util.PushErrMsg(Conf.Language(29), 5000) 885 return 886 } 887 case conf.ProviderWebDAV, conf.ProviderS3, conf.ProviderLocal: 888 if !IsPaidUser() { 889 util.PushErrMsg(Conf.Language(214), 5000) 890 return 891 } 892 } 893 894 err = repo.RemoveCloudRepoTag(tag) 895 if err != nil { 896 return 897 } 898 return 899} 900 901func GetCloudRepoTagSnapshots() (ret []*dejavu.Log, err error) { 902 ret = []*dejavu.Log{} 903 if 1 > len(Conf.Repo.Key) { 904 err = errors.New(Conf.Language(26)) 905 return 906 } 907 908 repo, err := newRepository() 909 if err != nil { 910 return 911 } 912 913 switch Conf.Sync.Provider { 914 case conf.ProviderSiYuan: 915 if !IsSubscriber() { 916 util.PushErrMsg(Conf.Language(29), 5000) 917 return 918 } 919 case conf.ProviderWebDAV, conf.ProviderS3, conf.ProviderLocal: 920 if !IsPaidUser() { 921 util.PushErrMsg(Conf.Language(214), 5000) 922 return 923 } 924 } 925 926 logs, err := repo.GetCloudRepoTagLogs(map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}) 927 if err != nil { 928 return 929 } 930 ret = logs 931 if 1 > len(ret) { 932 ret = []*dejavu.Log{} 933 } 934 return 935} 936 937func GetCloudRepoSnapshots(page int) (ret []*dejavu.Log, pageCount, totalCount int, err error) { 938 ret = []*dejavu.Log{} 939 if 1 > len(Conf.Repo.Key) { 940 err = errors.New(Conf.Language(26)) 941 return 942 } 943 944 repo, err := newRepository() 945 if err != nil { 946 return 947 } 948 949 switch Conf.Sync.Provider { 950 case conf.ProviderSiYuan: 951 if !IsSubscriber() { 952 util.PushErrMsg(Conf.Language(29), 5000) 953 return 954 } 955 case conf.ProviderWebDAV, conf.ProviderS3, conf.ProviderLocal: 956 if !IsPaidUser() { 957 util.PushErrMsg(Conf.Language(214), 5000) 958 return 959 } 960 } 961 962 if 1 > page { 963 page = 1 964 } 965 966 logs, pageCount, totalCount, err := repo.GetCloudRepoLogs(page) 967 if err != nil { 968 return 969 } 970 ret = logs 971 if 1 > len(ret) { 972 ret = []*dejavu.Log{} 973 } 974 return 975} 976 977func GetTagSnapshots() (ret []*Snapshot, err error) { 978 ret = []*Snapshot{} 979 if 1 > len(Conf.Repo.Key) { 980 err = errors.New(Conf.Language(26)) 981 return 982 } 983 984 repo, err := newRepository() 985 if err != nil { 986 return 987 } 988 989 logs, err := repo.GetTagLogs() 990 if err != nil { 991 return 992 } 993 ret = buildSnapshots(logs) 994 if 1 > len(ret) { 995 ret = []*Snapshot{} 996 } 997 return 998} 999 1000func RemoveTagSnapshot(tag string) (err error) { 1001 if 1 > len(Conf.Repo.Key) { 1002 err = errors.New(Conf.Language(26)) 1003 return 1004 } 1005 1006 repo, err := newRepository() 1007 if err != nil { 1008 return 1009 } 1010 1011 err = repo.RemoveTag(tag) 1012 return 1013} 1014 1015func TagSnapshot(id, name string) (err error) { 1016 if 1 > len(Conf.Repo.Key) { 1017 err = errors.New(Conf.Language(26)) 1018 return 1019 } 1020 1021 name = strings.TrimSpace(name) 1022 name = util.RemoveInvalid(name) 1023 if "" == name { 1024 err = errors.New(Conf.Language(142)) 1025 return 1026 } 1027 1028 if !gulu.File.IsValidFilename(name) { 1029 err = errors.New(Conf.Language(151)) 1030 return 1031 } 1032 1033 repo, err := newRepository() 1034 if err != nil { 1035 return 1036 } 1037 1038 index, err := repo.GetIndex(id) 1039 if err != nil { 1040 return 1041 } 1042 1043 if err = repo.AddTag(index.ID, name); err != nil { 1044 msg := fmt.Sprintf("Add tag to data snapshot [%s] failed: %s", index.ID, err) 1045 util.PushStatusBar(msg) 1046 return 1047 } 1048 return 1049} 1050 1051func IndexRepo(memo string) (err error) { 1052 if 1 > len(Conf.Repo.Key) { 1053 err = errors.New(Conf.Language(26)) 1054 return 1055 } 1056 1057 memo = strings.TrimSpace(memo) 1058 memo = gulu.Str.RemoveInvisible(memo) 1059 if "" == memo { 1060 err = errors.New(Conf.Language(142)) 1061 return 1062 } 1063 1064 repo, err := newRepository() 1065 if err != nil { 1066 return 1067 } 1068 1069 util.PushEndlessProgress(Conf.Language(143)) 1070 1071 start := time.Now() 1072 latest, _ := repo.Latest() 1073 FlushTxQueue() 1074 index, err := repo.Index(memo, true, map[string]interface{}{ 1075 eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress, 1076 }) 1077 if err != nil { 1078 util.PushStatusBar("Index data repo failed: " + html.EscapeString(err.Error())) 1079 return 1080 } 1081 elapsed := time.Since(start) 1082 1083 if nil == latest || latest.ID != index.ID { 1084 msg := fmt.Sprintf(Conf.Language(147), elapsed.Seconds()) 1085 util.PushStatusBar(msg) 1086 util.PushMsg(msg, 5000) 1087 } else { 1088 msg := fmt.Sprintf(Conf.Language(148), elapsed.Seconds()) 1089 util.PushStatusBar(msg) 1090 util.PushMsg(msg, 5000) 1091 } 1092 util.PushClearProgress() 1093 return 1094} 1095 1096var syncingFiles = sync.Map{} 1097var syncingStorages = atomic.Bool{} 1098 1099func waitForSyncingStorages() { 1100 for isSyncingStorages() { 1101 time.Sleep(time.Second) 1102 } 1103} 1104 1105func isSyncingStorages() bool { 1106 return syncingStorages.Load() || isBootSyncing.Load() 1107} 1108 1109func IsSyncingFile(rootID string) (ret bool) { 1110 _, ret = syncingFiles.Load(rootID) 1111 return 1112} 1113 1114func syncRepoDownload() (err error) { 1115 if 1 > len(Conf.Repo.Key) { 1116 planSyncAfter(fixSyncInterval) 1117 1118 msg := Conf.Language(26) 1119 util.PushStatusBar(msg) 1120 util.PushErrMsg(msg, 0) 1121 err = errors.New(msg) 1122 return 1123 } 1124 1125 repo, err := newRepository() 1126 if err != nil { 1127 planSyncAfter(fixSyncInterval) 1128 1129 msg := fmt.Sprintf("sync repo failed: %s", err) 1130 logging.LogErrorf(msg) 1131 util.PushStatusBar(msg) 1132 util.PushErrMsg(msg, 0) 1133 return 1134 } 1135 1136 logging.LogInfof("downloading data repo [device=%s, kernel=%s, provider=%d, mode=%s/%t]", Conf.System.ID, KernelID, Conf.Sync.Provider, "d", true) 1137 start := time.Now() 1138 _, _, err = indexRepoBeforeCloudSync(repo) 1139 if err != nil { 1140 planSyncAfter(fixSyncInterval) 1141 1142 logging.LogErrorf("sync data repo download failed: %s", err) 1143 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err)) 1144 Conf.Sync.Stat = msg 1145 Conf.Save() 1146 util.PushStatusBar(msg) 1147 util.PushErrMsg(msg, 0) 1148 return 1149 } 1150 1151 beforeSyncPetals := getPetals() 1152 1153 syncContext := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} 1154 mergeResult, trafficStat, err := repo.SyncDownload(syncContext) 1155 elapsed := time.Since(start) 1156 if err != nil { 1157 planSyncAfter(fixSyncInterval) 1158 1159 logging.LogErrorf("sync data repo download failed: %s", err) 1160 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err)) 1161 if errors.Is(err, dejavu.ErrCloudStorageSizeExceeded) { 1162 u := Conf.GetUser() 1163 msg = fmt.Sprintf(Conf.Language(43), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2)) 1164 if 2 == u.UserSiYuanSubscriptionPlan { 1165 msg = fmt.Sprintf(Conf.Language(68), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2)) 1166 } 1167 } 1168 Conf.Sync.Stat = msg 1169 Conf.Save() 1170 util.PushStatusBar(msg) 1171 util.PushErrMsg(msg, 0) 1172 return 1173 } 1174 1175 util.PushStatusBar(fmt.Sprintf(Conf.Language(149), elapsed.Seconds())) 1176 Conf.Sync.Synced = util.CurrentTimeMillis() 1177 msg := fmt.Sprintf(Conf.Language(150), trafficStat.UploadFileCount, trafficStat.DownloadFileCount, trafficStat.UploadChunkCount, trafficStat.DownloadChunkCount, humanize.BytesCustomCeil(uint64(trafficStat.UploadBytes), 2), humanize.BytesCustomFloor(uint64(trafficStat.DownloadBytes), 2)) 1178 Conf.Sync.Stat = msg 1179 Conf.Save() 1180 autoSyncErrCount = 0 1181 BootSyncSucc = 0 1182 1183 calcPetalDiff(beforeSyncPetals, mergeResult) 1184 processSyncMergeResult(false, true, mergeResult, trafficStat, "d", elapsed) 1185 return 1186} 1187 1188func syncRepoUpload() (err error) { 1189 if 1 > len(Conf.Repo.Key) { 1190 planSyncAfter(fixSyncInterval) 1191 1192 msg := Conf.Language(26) 1193 util.PushStatusBar(msg) 1194 util.PushErrMsg(msg, 0) 1195 err = errors.New(msg) 1196 return 1197 } 1198 1199 repo, err := newRepository() 1200 if err != nil { 1201 planSyncAfter(fixSyncInterval) 1202 1203 msg := fmt.Sprintf("sync repo failed: %s", err) 1204 logging.LogErrorf(msg) 1205 util.PushStatusBar(msg) 1206 util.PushErrMsg(msg, 0) 1207 return 1208 } 1209 1210 logging.LogInfof("uploading data repo [device=%s, kernel=%s, provider=%d, mode=%s/%t]", Conf.System.ID, KernelID, Conf.Sync.Provider, "u", true) 1211 start := time.Now() 1212 _, _, err = indexRepoBeforeCloudSync(repo) 1213 if err != nil { 1214 planSyncAfter(fixSyncInterval) 1215 1216 logging.LogErrorf("sync data repo upload failed: %s", err) 1217 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err)) 1218 Conf.Sync.Stat = msg 1219 Conf.Save() 1220 util.PushStatusBar(msg) 1221 util.PushErrMsg(msg, 0) 1222 return 1223 } 1224 1225 syncContext := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} 1226 trafficStat, err := repo.SyncUpload(syncContext) 1227 elapsed := time.Since(start) 1228 if err != nil { 1229 planSyncAfter(fixSyncInterval) 1230 1231 logging.LogErrorf("sync data repo upload failed: %s", err) 1232 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err)) 1233 if errors.Is(err, dejavu.ErrCloudStorageSizeExceeded) { 1234 u := Conf.GetUser() 1235 msg = fmt.Sprintf(Conf.Language(43), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2)) 1236 if 2 == u.UserSiYuanSubscriptionPlan { 1237 msg = fmt.Sprintf(Conf.Language(68), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2)) 1238 } 1239 } 1240 Conf.Sync.Stat = msg 1241 Conf.Save() 1242 util.PushStatusBar(msg) 1243 util.PushErrMsg(msg, 0) 1244 return 1245 } 1246 1247 util.PushStatusBar(fmt.Sprintf(Conf.Language(149), elapsed.Seconds())) 1248 Conf.Sync.Synced = util.CurrentTimeMillis() 1249 msg := fmt.Sprintf(Conf.Language(150), trafficStat.UploadFileCount, trafficStat.DownloadFileCount, trafficStat.UploadChunkCount, trafficStat.DownloadChunkCount, humanize.BytesCustomCeil(uint64(trafficStat.UploadBytes), 2), humanize.BytesCustomCeil(uint64(trafficStat.DownloadBytes), 2)) 1250 Conf.Sync.Stat = msg 1251 Conf.Save() 1252 autoSyncErrCount = 0 1253 BootSyncSucc = 0 1254 1255 processSyncMergeResult(false, true, &dejavu.MergeResult{}, trafficStat, "u", elapsed) 1256 return 1257} 1258 1259var isBootSyncing = atomic.Bool{} 1260 1261func bootSyncRepo() (err error) { 1262 if 1 > len(Conf.Repo.Key) { 1263 autoSyncErrCount++ 1264 planSyncAfter(fixSyncInterval) 1265 1266 msg := Conf.Language(26) 1267 util.PushStatusBar(msg) 1268 util.PushErrMsg(msg, 0) 1269 err = errors.New(msg) 1270 return 1271 } 1272 1273 repo, err := newRepository() 1274 if err != nil { 1275 autoSyncErrCount++ 1276 planSyncAfter(fixSyncInterval) 1277 1278 msg := fmt.Sprintf("sync repo failed: %s", html.EscapeString(err.Error())) 1279 logging.LogErrorf(msg) 1280 util.PushStatusBar(msg) 1281 util.PushErrMsg(msg, 0) 1282 return 1283 } 1284 1285 isBootSyncing.Store(true) 1286 1287 waitGroup := sync.WaitGroup{} 1288 var errs []error 1289 waitGroup.Add(1) 1290 go func() { 1291 defer waitGroup.Done() 1292 1293 start := time.Now() 1294 _, _, indexErr := indexRepoBeforeCloudSync(repo) 1295 if indexErr != nil { 1296 errs = append(errs, indexErr) 1297 autoSyncErrCount++ 1298 planSyncAfter(fixSyncInterval) 1299 1300 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(indexErr)) 1301 Conf.Sync.Stat = msg 1302 Conf.Save() 1303 util.PushStatusBar(msg) 1304 util.PushErrMsg(msg, 0) 1305 BootSyncSucc = 1 1306 isBootSyncing.Store(false) 1307 return 1308 } 1309 1310 logging.LogInfof("boot index repo elapsed [%.2fs]", time.Since(start).Seconds()) 1311 }() 1312 1313 var fetchedFiles []*entity.File 1314 waitGroup.Add(1) 1315 go func() { 1316 defer waitGroup.Done() 1317 1318 start := time.Now() 1319 syncContext := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} 1320 cloudLatest, getErr := repo.GetCloudLatest(syncContext) 1321 if nil != getErr { 1322 errs = append(errs, getErr) 1323 if !errors.Is(getErr, cloud.ErrCloudObjectNotFound) { 1324 logging.LogErrorf("download cloud latest failed: %s", getErr) 1325 return 1326 } 1327 } 1328 fetchedFiles, getErr = repo.GetSyncCloudFiles(cloudLatest, syncContext) 1329 if errors.Is(getErr, dejavu.ErrRepoFatal) { 1330 errs = append(errs, getErr) 1331 autoSyncErrCount++ 1332 planSyncAfter(fixSyncInterval) 1333 1334 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(getErr)) 1335 Conf.Sync.Stat = msg 1336 Conf.Save() 1337 util.PushStatusBar(msg) 1338 util.PushErrMsg(msg, 0) 1339 BootSyncSucc = 1 1340 isBootSyncing.Store(false) 1341 return 1342 } 1343 1344 logging.LogInfof("boot get sync cloud files elapsed [%.2fs]", time.Since(start).Seconds()) 1345 }() 1346 waitGroup.Wait() 1347 if 0 < len(errs) { 1348 err = errs[0] 1349 } 1350 1351 if err != nil { 1352 autoSyncErrCount++ 1353 planSyncAfter(fixSyncInterval) 1354 1355 logging.LogErrorf("sync data repo failed: %s", err) 1356 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err)) 1357 if errors.Is(err, dejavu.ErrCloudStorageSizeExceeded) { 1358 u := Conf.GetUser() 1359 msg = fmt.Sprintf(Conf.Language(43), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2)) 1360 if 2 == u.UserSiYuanSubscriptionPlan { 1361 msg = fmt.Sprintf(Conf.Language(68), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2)) 1362 } 1363 } 1364 Conf.Sync.Stat = msg 1365 Conf.Save() 1366 util.PushStatusBar(msg) 1367 util.PushErrMsg(msg, 0) 1368 BootSyncSucc = 1 1369 isBootSyncing.Store(false) 1370 return 1371 } 1372 1373 syncingFiles = sync.Map{} 1374 syncingStorages.Store(false) 1375 for _, fetchedFile := range fetchedFiles { 1376 name := path.Base(fetchedFile.Path) 1377 if strings.HasSuffix(name, ".sy") { 1378 id := name[:len(name)-3] 1379 syncingFiles.Store(id, true) 1380 continue 1381 } 1382 if strings.HasPrefix(fetchedFile.Path, "/storage/") { 1383 syncingStorages.Store(true) 1384 } 1385 } 1386 1387 if 0 < len(fetchedFiles) { 1388 go func() { 1389 _, syncErr := syncRepo(false, false) 1390 isBootSyncing.Store(false) 1391 if err != nil { 1392 logging.LogErrorf("boot background sync repo failed: %s", syncErr) 1393 return 1394 } 1395 }() 1396 } else { 1397 isBootSyncing.Store(false) 1398 } 1399 return 1400} 1401 1402func syncRepo(exit, byHand bool) (dataChanged bool, err error) { 1403 if 1 > len(Conf.Repo.Key) { 1404 autoSyncErrCount++ 1405 planSyncAfter(fixSyncInterval) 1406 1407 msg := Conf.Language(26) 1408 util.PushStatusBar(msg) 1409 util.PushErrMsg(msg, 0) 1410 err = errors.New(msg) 1411 return 1412 } 1413 1414 repo, err := newRepository() 1415 if err != nil { 1416 autoSyncErrCount++ 1417 planSyncAfter(fixSyncInterval) 1418 1419 msg := fmt.Sprintf("sync repo failed: %s", err) 1420 logging.LogErrorf(msg) 1421 util.PushStatusBar(msg) 1422 util.PushErrMsg(msg, 0) 1423 return 1424 } 1425 1426 logging.LogInfof("syncing data repo [device=%s, kernel=%s, provider=%d, mode=%s/%t]", Conf.System.ID, KernelID, Conf.Sync.Provider, "a", byHand) 1427 start := time.Now() 1428 beforeIndex, afterIndex, err := indexRepoBeforeCloudSync(repo) 1429 if err != nil { 1430 autoSyncErrCount++ 1431 planSyncAfter(fixSyncInterval) 1432 1433 logging.LogErrorf("sync data repo failed: %s", err) 1434 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err)) 1435 Conf.Sync.Stat = msg 1436 Conf.Save() 1437 util.PushStatusBar(msg) 1438 if 1 > autoSyncErrCount || byHand { 1439 util.PushErrMsg(msg, 0) 1440 } 1441 if exit { 1442 ExitSyncSucc = 1 1443 } 1444 return 1445 } 1446 1447 beforeSyncPetals := getPetals() 1448 1449 syncContext := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} 1450 mergeResult, trafficStat, err := repo.Sync(syncContext) 1451 elapsed := time.Since(start) 1452 if err != nil { 1453 autoSyncErrCount++ 1454 planSyncAfter(fixSyncInterval) 1455 1456 logging.LogErrorf("sync data repo failed: %s", err) 1457 msg := fmt.Sprintf(Conf.Language(80), formatRepoErrorMsg(err)) 1458 if errors.Is(err, dejavu.ErrCloudStorageSizeExceeded) { 1459 u := Conf.GetUser() 1460 msg = fmt.Sprintf(Conf.Language(43), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2)) 1461 if 2 == u.UserSiYuanSubscriptionPlan { 1462 msg = fmt.Sprintf(Conf.Language(68), humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2)) 1463 } 1464 } 1465 Conf.Sync.Stat = msg 1466 Conf.Save() 1467 util.PushStatusBar(msg) 1468 if 1 > autoSyncErrCount || byHand { 1469 util.PushErrMsg(msg, 0) 1470 } 1471 if exit { 1472 ExitSyncSucc = 1 1473 } 1474 return 1475 } 1476 1477 dataChanged = nil == beforeIndex || beforeIndex.ID != afterIndex.ID || mergeResult.DataChanged() 1478 1479 util.PushStatusBar(fmt.Sprintf(Conf.Language(149), elapsed.Seconds())) 1480 Conf.Sync.Synced = util.CurrentTimeMillis() 1481 msg := fmt.Sprintf(Conf.Language(150), trafficStat.UploadFileCount, trafficStat.DownloadFileCount, trafficStat.UploadChunkCount, trafficStat.DownloadChunkCount, humanize.BytesCustomCeil(uint64(trafficStat.UploadBytes), 2), humanize.BytesCustomCeil(uint64(trafficStat.DownloadBytes), 2)) 1482 Conf.Sync.Stat = msg 1483 Conf.Save() 1484 autoSyncErrCount = 0 1485 1486 calcPetalDiff(beforeSyncPetals, mergeResult) 1487 processSyncMergeResult(exit, byHand, mergeResult, trafficStat, "a", elapsed) 1488 1489 if !exit { 1490 go func() { 1491 // 首次数据同步执行完成后再执行索引订正 Index fixing should not be performed before data synchronization https://github.com/siyuan-note/siyuan/issues/10761 1492 checkIndex() 1493 // 索引订正结束后执行数据仓库清理 Automatic purge for local data repo https://github.com/siyuan-note/siyuan/issues/13091 1494 autoPurgeRepo(false) 1495 }() 1496 } 1497 return 1498} 1499 1500func calcPetalDiff(beforeSyncPetals []*Petal, mergeResult *dejavu.MergeResult) { 1501 var upsertPetals, removePetals []string 1502 afterSyncPetals := getPetals() 1503 for _, afterSyncPetal := range afterSyncPetals { 1504 if beforeSyncPetal := getPetalByName(afterSyncPetal.Name, beforeSyncPetals); nil != beforeSyncPetal { 1505 a, _ := gulu.JSON.MarshalJSON(afterSyncPetal) 1506 b, _ := gulu.JSON.MarshalJSON(beforeSyncPetal) 1507 if !bytes.Equal(a, b) { 1508 upsertPetals = append(upsertPetals, afterSyncPetal.Name) 1509 } 1510 } else { 1511 upsertPetals = append(upsertPetals, afterSyncPetal.Name) 1512 } 1513 } 1514 for _, beforeSyncPetal := range beforeSyncPetals { 1515 if nil == getPetalByName(beforeSyncPetal.Name, afterSyncPetals) { 1516 removePetals = append(removePetals, beforeSyncPetal.Name) 1517 } 1518 } 1519 1520 mergeResult.UpsertPetals = gulu.Str.RemoveDuplicatedElem(upsertPetals) 1521 mergeResult.RemovePetals = gulu.Str.RemoveDuplicatedElem(removePetals) 1522} 1523 1524func processSyncMergeResult(exit, byHand bool, mergeResult *dejavu.MergeResult, trafficStat *dejavu.TrafficStat, mode string, elapsed time.Duration) { 1525 logging.LogInfof("synced data repo [device=%s, kernel=%s, provider=%d, mode=%s/%t, ufc=%d, dfc=%d, ucc=%d, dcc=%d, ub=%s, db=%s] in [%.2fs], merge result [conflicts=%d, upserts=%d, removes=%d]\n\n", 1526 Conf.System.ID, KernelID, Conf.Sync.Provider, mode, byHand, 1527 trafficStat.UploadFileCount, trafficStat.DownloadFileCount, trafficStat.UploadChunkCount, trafficStat.DownloadChunkCount, humanize.BytesCustomCeil(uint64(trafficStat.UploadBytes), 2), humanize.BytesCustomCeil(uint64(trafficStat.DownloadBytes), 2), 1528 elapsed.Seconds(), 1529 len(mergeResult.Conflicts), len(mergeResult.Upserts), len(mergeResult.Removes)) 1530 1531 //logSyncMergeResult(mergeResult) 1532 1533 var needReloadFiletree bool 1534 if 0 < len(mergeResult.Conflicts) { 1535 luteEngine := util.NewLute() 1536 if Conf.Sync.GenerateConflictDoc { 1537 // 云端同步发生冲突时生成副本 https://github.com/siyuan-note/siyuan/issues/5687 1538 1539 for _, file := range mergeResult.Conflicts { 1540 if !strings.HasSuffix(file.Path, ".sy") { 1541 continue 1542 } 1543 1544 parts := strings.Split(file.Path[1:], "/") 1545 if 2 > len(parts) { 1546 continue 1547 } 1548 boxID := parts[0] 1549 1550 absPath := filepath.Join(util.TempDir, "repo", "sync", "conflicts", mergeResult.Time.Format("2006-01-02-150405"), file.Path) 1551 tree, loadTreeErr := loadTree(absPath, luteEngine) 1552 if nil != loadTreeErr { 1553 logging.LogErrorf("load conflicted file [%s] failed: %s", absPath, loadTreeErr) 1554 continue 1555 } 1556 tree.Box = boxID 1557 tree.Path = strings.TrimPrefix(file.Path, "/"+boxID) 1558 1559 previousPath := tree.Path 1560 resetTree(tree, "Conflicted", true) 1561 createTreeTx(tree) 1562 box := Conf.Box(boxID) 1563 if nil != box { 1564 box.addSort(previousPath, tree.ID) 1565 } 1566 } 1567 1568 needReloadFiletree = true 1569 } 1570 1571 historyDir := filepath.Join(util.HistoryDir, mergeResult.Time.Format("2006-01-02-150405")+"-sync") 1572 indexHistoryDir(filepath.Base(historyDir), luteEngine) 1573 } 1574 1575 if 1 > len(mergeResult.Upserts) && 1 > len(mergeResult.Removes) && 1 > len(mergeResult.Conflicts) { // 没有数据变更 1576 syncSameCount.Add(1) 1577 if 10 < syncSameCount.Load() { 1578 syncSameCount.Store(5) 1579 } 1580 if !byHand { 1581 delay := time.Minute * time.Duration(int(math.Pow(2, float64(syncSameCount.Load())))) 1582 if fixSyncInterval.Minutes() > delay.Minutes() { 1583 delay = time.Minute * 8 1584 } 1585 planSyncAfter(delay) 1586 } 1587 return 1588 } 1589 1590 // 有数据变更,需要重建索引 1591 var upserts, removes []string 1592 var upsertTrees int 1593 // 可能需要重新加载部分功能 1594 var needReloadFlashcard, needReloadOcrTexts, needReloadPlugin bool 1595 upsertPluginSet := hashset.New() 1596 needUnindexBoxes, needIndexBoxes := map[string]bool{}, map[string]bool{} 1597 for _, file := range mergeResult.Upserts { 1598 upserts = append(upserts, file.Path) 1599 if strings.HasPrefix(file.Path, "/storage/riff/") { 1600 needReloadFlashcard = true 1601 } 1602 1603 if strings.HasPrefix(file.Path, "/assets/ocr-texts.json") { 1604 needReloadOcrTexts = true 1605 } 1606 1607 if strings.HasSuffix(file.Path, "/.siyuan/conf.json") { 1608 needReloadFiletree = true 1609 boxID := strings.TrimSuffix(strings.TrimPrefix(file.Path, "/"), "/.siyuan/conf.json") 1610 needUnindexBoxes[boxID] = true 1611 needIndexBoxes[boxID] = true 1612 } 1613 1614 if strings.HasPrefix(file.Path, "/storage/petal/") { 1615 needReloadPlugin = true 1616 if parts := strings.Split(file.Path, "/"); 3 < len(parts) { 1617 if pluginName := parts[3]; "petals.json" != pluginName { 1618 upsertPluginSet.Add(pluginName) 1619 } 1620 } 1621 } 1622 1623 if strings.HasPrefix(file.Path, "/plugins/") { 1624 if parts := strings.Split(file.Path, "/"); 2 < len(parts) { 1625 needReloadPlugin = true 1626 upsertPluginSet.Add(parts[2]) 1627 } 1628 } 1629 1630 if strings.HasSuffix(file.Path, ".sy") { 1631 upsertTrees++ 1632 } 1633 } 1634 1635 removeWidgetDirSet, removePluginSet := hashset.New(), hashset.New() 1636 for _, file := range mergeResult.Removes { 1637 removes = append(removes, file.Path) 1638 if strings.HasPrefix(file.Path, "/storage/riff/") { 1639 needReloadFlashcard = true 1640 } 1641 1642 if strings.HasPrefix(file.Path, "/assets/ocr-texts.json") { 1643 needReloadOcrTexts = true 1644 } 1645 1646 if strings.HasSuffix(file.Path, "/.siyuan/conf.json") { 1647 needReloadFiletree = true 1648 boxID := strings.TrimSuffix(strings.TrimPrefix(file.Path, "/"), "/.siyuan/conf.json") 1649 needUnindexBoxes[boxID] = true 1650 } 1651 1652 if strings.HasPrefix(file.Path, "/storage/petal/") { 1653 needReloadPlugin = true 1654 if parts := strings.Split(file.Path, "/"); 3 < len(parts) { 1655 if pluginName := parts[3]; "petals.json" != pluginName { 1656 removePluginSet.Add(pluginName) 1657 } 1658 } 1659 } 1660 1661 if strings.HasPrefix(file.Path, "/plugins/") { 1662 if parts := strings.Split(file.Path, "/"); 2 < len(parts) { 1663 needReloadPlugin = true 1664 removePluginSet.Add(parts[2]) 1665 } 1666 } 1667 1668 if strings.HasPrefix(file.Path, "/widgets/") { 1669 if parts := strings.Split(file.Path, "/"); 2 < len(parts) { 1670 removeWidgetDirSet.Add(parts[2]) 1671 } 1672 } 1673 } 1674 1675 for _, upsertPetal := range mergeResult.UpsertPetals { 1676 needReloadPlugin = true 1677 upsertPluginSet.Add(upsertPetal) 1678 } 1679 for _, removePetal := range mergeResult.RemovePetals { 1680 needReloadPlugin = true 1681 removePluginSet.Add(removePetal) 1682 } 1683 1684 if needReloadFlashcard { 1685 LoadFlashcards() 1686 } 1687 1688 if needReloadOcrTexts { 1689 util.LoadAssetsTexts() 1690 } 1691 1692 if needReloadPlugin { 1693 pushReloadPlugin(upsertPluginSet, removePluginSet, "") 1694 } 1695 1696 for _, widgetDir := range removeWidgetDirSet.Values() { 1697 widgetDirPath := filepath.Join(util.DataDir, "widgets", widgetDir.(string)) 1698 gulu.File.RemoveEmptyDirs(widgetDirPath) 1699 } 1700 1701 syncingFiles = sync.Map{} 1702 syncingStorages.Store(false) 1703 1704 if needFullReindex(upsertTrees) { // 改进同步后全量重建索引判断 https://github.com/siyuan-note/siyuan/issues/5764 1705 FullReindex() 1706 return 1707 } 1708 1709 if exit { // 退出时同步不用推送事件 1710 return 1711 } 1712 1713 for boxID := range needUnindexBoxes { 1714 if box := Conf.GetBox(boxID); nil != box { 1715 box.Unindex() 1716 } 1717 } 1718 for boxID := range needIndexBoxes { 1719 if box := Conf.GetBox(boxID); nil != box { 1720 box.Index() 1721 } 1722 } 1723 1724 needReloadUI := 0 < len(needUnindexBoxes) || 0 < len(needIndexBoxes) 1725 if needReloadUI { 1726 util.ReloadUI() 1727 } 1728 1729 upsertRootIDs, removeRootIDs := incReindex(upserts, removes) 1730 needReloadFiletree = !needReloadUI && (needReloadFiletree || 0 < len(upsertRootIDs) || 0 < len(removeRootIDs)) 1731 if needReloadFiletree { 1732 ReloadFiletree() 1733 } 1734 1735 go func() { 1736 util.WaitForUILoaded() 1737 1738 if 0 < len(upsertRootIDs) || 0 < len(removeRootIDs) { 1739 util.BroadcastByType("main", "syncMergeResult", 0, "", 1740 map[string]interface{}{"upsertRootIDs": upsertRootIDs, "removeRootIDs": removeRootIDs}) 1741 } 1742 1743 time.Sleep(2 * time.Second) 1744 util.PushStatusBar(fmt.Sprintf(Conf.Language(149), elapsed.Seconds())) 1745 1746 if 0 < len(mergeResult.Conflicts) { 1747 syConflict := false 1748 for _, file := range mergeResult.Conflicts { 1749 if strings.HasSuffix(file.Path, ".sy") { 1750 syConflict = true 1751 break 1752 } 1753 } 1754 1755 if syConflict { 1756 // 数据同步发生冲突时在界面上进行提醒 https://github.com/siyuan-note/siyuan/issues/7332 1757 util.PushMsg(Conf.Language(108), 7000) 1758 } 1759 } 1760 }() 1761} 1762 1763func logSyncMergeResult(mergeResult *dejavu.MergeResult) { 1764 if 1 > len(mergeResult.Conflicts) && 1 > len(mergeResult.Upserts) && 1 > len(mergeResult.Removes) { 1765 return 1766 } 1767 1768 if 0 < len(mergeResult.Conflicts) { 1769 logBuilder := bytes.Buffer{} 1770 for i, f := range mergeResult.Conflicts { 1771 logBuilder.WriteString(" ") 1772 logBuilder.WriteString(f.Path) 1773 if i < len(mergeResult.Conflicts)-1 { 1774 logBuilder.WriteString("\n") 1775 } 1776 } 1777 logging.LogInfof("sync conflicts:\n%s", logBuilder.String()) 1778 } 1779 if 0 < len(mergeResult.Upserts) { 1780 logBuilder := bytes.Buffer{} 1781 for i, f := range mergeResult.Upserts { 1782 logBuilder.WriteString(" ") 1783 logBuilder.WriteString(f.Path) 1784 if i < len(mergeResult.Upserts)-1 { 1785 logBuilder.WriteString("\n") 1786 } 1787 } 1788 logging.LogInfof("sync merge upserts:\n%s", logBuilder.String()) 1789 } 1790 if 0 < len(mergeResult.Removes) { 1791 logBuilder := bytes.Buffer{} 1792 for i, f := range mergeResult.Removes { 1793 logBuilder.WriteString(" ") 1794 logBuilder.WriteString(f.Path) 1795 if i < len(mergeResult.Removes)-1 { 1796 logBuilder.WriteString("\n") 1797 } 1798 } 1799 logging.LogInfof("sync merge removes:\n%s", logBuilder.String()) 1800 } 1801} 1802 1803func needFullReindex(upsertTrees int) bool { 1804 return 0.2 < float64(upsertTrees)/float64(treenode.CountTrees()) 1805} 1806 1807var promotedPurgeDataRepo bool 1808 1809func indexRepoBeforeCloudSync(repo *dejavu.Repo) (beforeIndex, afterIndex *entity.Index, err error) { 1810 start := time.Now() 1811 1812 beforeIndex, _ = repo.Latest() 1813 FlushTxQueue() 1814 1815 checkChunks := true 1816 if util.ContainerAndroid == util.Container || util.ContainerIOS == util.Container || util.ContainerHarmony == util.Container { 1817 // 因为移动端私有数据空间不会存在外部操作导致分块损坏的情况,所以不需要检查分块以提升性能 https://github.com/siyuan-note/siyuan/issues/13216 1818 checkChunks = false 1819 } 1820 1821 afterIndex, err = repo.Index("[Sync] Cloud sync", checkChunks, 1822 map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}) 1823 if err != nil { 1824 logging.LogErrorf("index data repo before cloud sync failed: %s", err) 1825 return 1826 } 1827 elapsed := time.Since(start) 1828 1829 if nil == beforeIndex || beforeIndex.ID != afterIndex.ID { 1830 // 对新创建的快照需要更新备注,加入耗时统计 1831 afterIndex.Memo = fmt.Sprintf("[Sync] Cloud sync, completed in %.2fs", elapsed.Seconds()) 1832 if err = repo.PutIndex(afterIndex); err != nil { 1833 util.PushStatusBar("Save data snapshot for cloud sync failed") 1834 logging.LogErrorf("put index into data repo before cloud sync failed: %s", err) 1835 return 1836 } 1837 util.PushStatusBar(fmt.Sprintf(Conf.Language(147), elapsed.Seconds())) 1838 } else { 1839 util.PushStatusBar(fmt.Sprintf(Conf.Language(148), elapsed.Seconds())) 1840 } 1841 1842 if Conf.Repo.SyncIndexTiming < elapsed.Milliseconds() { 1843 logging.LogWarnf("index data repo before cloud sync elapsed [%dms]", elapsed.Milliseconds()) 1844 if !promotedPurgeDataRepo { 1845 go func() { 1846 util.WaitForUILoaded() 1847 time.Sleep(3 * time.Second) 1848 1849 if indexCount, _ := repo.CountIndexes(); 128 > indexCount { 1850 // 快照数量较少时不推送提示 1851 return 1852 } 1853 1854 util.PushMsg(Conf.language(218), 24000) 1855 promotedPurgeDataRepo = true 1856 }() 1857 } 1858 } 1859 return 1860} 1861 1862func newRepository() (ret *dejavu.Repo, err error) { 1863 cloudConf, err := buildCloudConf() 1864 if err != nil { 1865 return 1866 } 1867 1868 var cloudRepo cloud.Cloud 1869 switch Conf.Sync.Provider { 1870 case conf.ProviderSiYuan: 1871 cloudRepo = cloud.NewSiYuan(&cloud.BaseCloud{Conf: cloudConf}) 1872 case conf.ProviderS3: 1873 s3HTTPClient := &http.Client{Transport: httpclient.NewTransport(cloudConf.S3.SkipTlsVerify)} 1874 s3HTTPClient.Timeout = time.Duration(cloudConf.S3.Timeout) * time.Second 1875 cloudRepo = cloud.NewS3(&cloud.BaseCloud{Conf: cloudConf}, s3HTTPClient) 1876 case conf.ProviderWebDAV: 1877 webdavClient := gowebdav.NewClient(cloudConf.WebDAV.Endpoint, cloudConf.WebDAV.Username, cloudConf.WebDAV.Password) 1878 a := cloudConf.WebDAV.Username + ":" + cloudConf.WebDAV.Password 1879 auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(a)) 1880 webdavClient.SetHeader("Authorization", auth) 1881 webdavClient.SetHeader("User-Agent", util.UserAgent) 1882 webdavClient.SetTimeout(time.Duration(cloudConf.WebDAV.Timeout) * time.Second) 1883 webdavClient.SetTransport(httpclient.NewTransport(cloudConf.WebDAV.SkipTlsVerify)) 1884 cloudRepo = cloud.NewWebDAV(&cloud.BaseCloud{Conf: cloudConf}, webdavClient) 1885 case conf.ProviderLocal: 1886 cloudRepo = cloud.NewLocal(&cloud.BaseCloud{Conf: cloudConf}) 1887 default: 1888 err = fmt.Errorf("unknown cloud provider [%d]", Conf.Sync.Provider) 1889 return 1890 } 1891 1892 ignoreLines := getSyncIgnoreLines() 1893 ignoreLines = append(ignoreLines, "/.siyuan/conf.json") // 忽略旧版同步配置 1894 ret, err = dejavu.NewRepo(util.DataDir, util.RepoDir, util.HistoryDir, util.TempDir, Conf.System.ID, Conf.System.Name, Conf.System.OS, Conf.Repo.Key, ignoreLines, cloudRepo) 1895 if err != nil { 1896 logging.LogErrorf("init data repo failed: %s", err) 1897 return 1898 } 1899 return 1900} 1901 1902func init() { 1903 subscribeRepoEvents() 1904} 1905 1906func subscribeRepoEvents() { 1907 eventbus.Subscribe(eventbus.EvtIndexBeforeWalkData, func(context map[string]interface{}, path string) { 1908 msg := fmt.Sprintf(Conf.Language(158), path) 1909 util.SetBootDetails(msg) 1910 util.ContextPushMsg(context, msg) 1911 }) 1912 1913 indexWalkDataCount := 0 1914 eventbus.Subscribe(eventbus.EvtIndexWalkData, func(context map[string]interface{}, path string) { 1915 msg := fmt.Sprintf(Conf.Language(158), filepath.Base(path)) 1916 if 0 == indexWalkDataCount%1024 { 1917 util.SetBootDetails(msg) 1918 util.ContextPushMsg(context, msg) 1919 } 1920 indexWalkDataCount++ 1921 }) 1922 eventbus.Subscribe(eventbus.EvtIndexBeforeGetLatestFiles, func(context map[string]interface{}, total int) { 1923 msg := fmt.Sprintf(Conf.Language(159), 0, total) 1924 util.SetBootDetails(msg) 1925 util.ContextPushMsg(context, msg) 1926 }) 1927 1928 eventbus.Subscribe(eventbus.EvtIndexGetLatestFile, func(context map[string]interface{}, count int, total int) { 1929 msg := fmt.Sprintf(Conf.Language(159), count, total) 1930 if 0 == count%64 { 1931 util.SetBootDetails(msg) 1932 util.ContextPushMsg(context, msg) 1933 } 1934 }) 1935 eventbus.Subscribe(eventbus.EvtIndexUpsertFiles, func(context map[string]interface{}, total int) { 1936 msg := fmt.Sprintf(Conf.Language(160), 0, total) 1937 util.SetBootDetails(msg) 1938 util.ContextPushMsg(context, msg) 1939 }) 1940 eventbus.Subscribe(eventbus.EvtIndexUpsertFile, func(context map[string]interface{}, count int, total int) { 1941 msg := fmt.Sprintf(Conf.Language(160), count, total) 1942 if 0 == count%32 { 1943 util.SetBootDetails(msg) 1944 util.ContextPushMsg(context, msg) 1945 } 1946 }) 1947 1948 eventbus.Subscribe(eventbus.EvtCheckoutBeforeWalkData, func(context map[string]interface{}, path string) { 1949 msg := fmt.Sprintf(Conf.Language(161), path) 1950 util.SetBootDetails(msg) 1951 util.ContextPushMsg(context, msg) 1952 }) 1953 coWalkDataCount := 0 1954 eventbus.Subscribe(eventbus.EvtCheckoutWalkData, func(context map[string]interface{}, path string) { 1955 msg := fmt.Sprintf(Conf.Language(161), filepath.Base(path)) 1956 if 0 == coWalkDataCount%512 { 1957 util.SetBootDetails(msg) 1958 util.ContextPushMsg(context, msg) 1959 } 1960 coWalkDataCount++ 1961 }) 1962 var bootProgressPart int32 1963 eventbus.Subscribe(eventbus.EvtCheckoutUpsertFiles, func(context map[string]interface{}, total int) { 1964 msg := fmt.Sprintf(Conf.Language(162), 0, total) 1965 util.SetBootDetails(msg) 1966 bootProgressPart = int32(10 / float64(total)) 1967 util.ContextPushMsg(context, msg) 1968 }) 1969 coUpsertFileCount := 0 1970 eventbus.Subscribe(eventbus.EvtCheckoutUpsertFile, func(context map[string]interface{}, count, total int) { 1971 msg := fmt.Sprintf(Conf.Language(162), count, total) 1972 util.IncBootProgress(bootProgressPart, msg) 1973 if 0 == coUpsertFileCount%32 { 1974 util.ContextPushMsg(context, msg) 1975 } 1976 coUpsertFileCount++ 1977 }) 1978 eventbus.Subscribe(eventbus.EvtCheckoutRemoveFiles, func(context map[string]interface{}, total int) { 1979 msg := fmt.Sprintf(Conf.Language(163), 0, total) 1980 util.SetBootDetails(msg) 1981 bootProgressPart = int32(10 / float64(total)) 1982 util.ContextPushMsg(context, msg) 1983 }) 1984 1985 eventbus.Subscribe(eventbus.EvtCheckoutRemoveFile, func(context map[string]interface{}, count, total int) { 1986 msg := fmt.Sprintf(Conf.Language(163), count, total) 1987 util.IncBootProgress(bootProgressPart, msg) 1988 if 0 == count%64 { 1989 util.ContextPushMsg(context, msg) 1990 } 1991 }) 1992 1993 eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadIndex, func(context map[string]interface{}, id string) { 1994 msg := fmt.Sprintf(Conf.Language(164), id[:7]) 1995 util.IncBootProgress(1, msg) 1996 util.ContextPushMsg(context, msg) 1997 }) 1998 1999 eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadFiles, func(context map[string]interface{}, total int) { 2000 msg := fmt.Sprintf(Conf.Language(165), 0, total) 2001 util.SetBootDetails(msg) 2002 bootProgressPart = int32(10 / float64(total)) 2003 util.ContextPushMsg(context, msg) 2004 }) 2005 2006 eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadFile, func(context map[string]interface{}, count, total int) { 2007 msg := fmt.Sprintf(Conf.Language(165), count, total) 2008 util.IncBootProgress(bootProgressPart, msg) 2009 if 0 == count%8 { 2010 util.ContextPushMsg(context, msg) 2011 } 2012 }) 2013 eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadChunks, func(context map[string]interface{}, total int) { 2014 msg := fmt.Sprintf(Conf.Language(166), 0, total) 2015 util.SetBootDetails(msg) 2016 bootProgressPart = int32(10 / float64(total)) 2017 util.ContextPushMsg(context, msg) 2018 }) 2019 eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadChunk, func(context map[string]interface{}, count, total int) { 2020 msg := fmt.Sprintf(Conf.Language(166), count, total) 2021 util.IncBootProgress(bootProgressPart, msg) 2022 if 0 == count%8 { 2023 util.ContextPushMsg(context, msg) 2024 } 2025 }) 2026 eventbus.Subscribe(eventbus.EvtCloudBeforeDownloadRef, func(context map[string]interface{}, ref string) { 2027 msg := fmt.Sprintf(Conf.Language(167), ref) 2028 util.IncBootProgress(1, msg) 2029 util.ContextPushMsg(context, msg) 2030 }) 2031 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadIndex, func(context map[string]interface{}, id string) { 2032 msg := fmt.Sprintf(Conf.Language(168), id[:7]) 2033 util.IncBootProgress(1, msg) 2034 util.ContextPushMsg(context, msg) 2035 }) 2036 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadFiles, func(context map[string]interface{}, total int) { 2037 msg := fmt.Sprintf(Conf.Language(169), 0, total) 2038 util.SetBootDetails(msg) 2039 util.ContextPushMsg(context, msg) 2040 }) 2041 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadFile, func(context map[string]interface{}, count, total int) { 2042 msg := fmt.Sprintf(Conf.Language(169), count, total) 2043 if 0 == count%8 { 2044 util.SetBootDetails(msg) 2045 util.ContextPushMsg(context, msg) 2046 } 2047 }) 2048 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadChunks, func(context map[string]interface{}, total int) { 2049 msg := fmt.Sprintf(Conf.Language(170), 0, total) 2050 util.SetBootDetails(msg) 2051 util.ContextPushMsg(context, msg) 2052 }) 2053 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadChunk, func(context map[string]interface{}, count, total int) { 2054 msg := fmt.Sprintf(Conf.Language(170), count, total) 2055 if 0 == count%8 { 2056 util.SetBootDetails(msg) 2057 util.ContextPushMsg(context, msg) 2058 } 2059 }) 2060 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadRef, func(context map[string]interface{}, ref string) { 2061 msg := fmt.Sprintf(Conf.Language(171), ref) 2062 util.SetBootDetails(msg) 2063 util.ContextPushMsg(context, msg) 2064 }) 2065 eventbus.Subscribe(eventbus.EvtCloudLock, func(context map[string]interface{}) { 2066 msg := fmt.Sprintf(Conf.Language(186)) 2067 util.SetBootDetails(msg) 2068 util.ContextPushMsg(context, msg) 2069 }) 2070 eventbus.Subscribe(eventbus.EvtCloudUnlock, func(context map[string]interface{}) { 2071 msg := fmt.Sprintf(Conf.Language(187)) 2072 util.SetBootDetails(msg) 2073 util.ContextPushMsg(context, msg) 2074 }) 2075 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadIndexes, func(context map[string]interface{}) { 2076 msg := fmt.Sprintf(Conf.Language(208)) 2077 util.SetBootDetails(msg) 2078 util.ContextPushMsg(context, msg) 2079 }) 2080 eventbus.Subscribe(eventbus.EvtCloudBeforeUploadCheckIndex, func(context map[string]interface{}) { 2081 msg := fmt.Sprintf(Conf.Language(209)) 2082 util.SetBootDetails(msg) 2083 util.ContextPushMsg(context, msg) 2084 }) 2085 eventbus.Subscribe(eventbus.EvtCloudBeforeFixObjects, func(context map[string]interface{}, count, total int) { 2086 msg := fmt.Sprintf(Conf.Language(210), count, total) 2087 util.SetBootDetails(msg) 2088 util.ContextPushMsg(context, msg) 2089 }) 2090 eventbus.Subscribe(eventbus.EvtCloudAfterFixObjects, func(context map[string]interface{}) { 2091 msg := fmt.Sprintf(Conf.Language(211)) 2092 util.SetBootDetails(msg) 2093 util.ContextPushMsg(context, msg) 2094 }) 2095 eventbus.Subscribe(eventbus.EvtCloudCorrupted, func() { 2096 util.PushErrMsg(Conf.language(220), 30000) 2097 }) 2098 eventbus.Subscribe(eventbus.EvtCloudPurgeListObjects, func(context map[string]interface{}) { 2099 util.ContextPushMsg(context, Conf.language(224)) 2100 }) 2101 eventbus.Subscribe(eventbus.EvtCloudPurgeListIndexes, func(context map[string]interface{}) { 2102 util.ContextPushMsg(context, Conf.language(225)) 2103 }) 2104 eventbus.Subscribe(eventbus.EvtCloudPurgeListRefs, func(context map[string]interface{}) { 2105 util.ContextPushMsg(context, Conf.language(226)) 2106 }) 2107 eventbus.Subscribe(eventbus.EvtCloudPurgeDownloadIndexes, func(context map[string]interface{}) { 2108 util.ContextPushMsg(context, fmt.Sprintf(Conf.language(227))) 2109 }) 2110 eventbus.Subscribe(eventbus.EvtCloudPurgeDownloadFiles, func(context map[string]interface{}) { 2111 util.ContextPushMsg(context, Conf.language(228)) 2112 }) 2113 eventbus.Subscribe(eventbus.EvtCloudPurgeRemoveIndexes, func(context map[string]interface{}) { 2114 util.ContextPushMsg(context, Conf.language(229)) 2115 }) 2116 eventbus.Subscribe(eventbus.EvtCloudPurgeRemoveIndexesV2, func(context map[string]interface{}) { 2117 util.ContextPushMsg(context, Conf.language(230)) 2118 }) 2119 eventbus.Subscribe(eventbus.EvtCloudPurgeRemoveObjects, func(context map[string]interface{}) { 2120 util.ContextPushMsg(context, Conf.language(231)) 2121 }) 2122} 2123 2124func buildCloudConf() (ret *cloud.Conf, err error) { 2125 if !cloud.IsValidCloudDirName(Conf.Sync.CloudName) { 2126 logging.LogWarnf("invalid cloud repo name, rename it to [main]") 2127 Conf.Sync.CloudName = "main" 2128 Conf.Save() 2129 } 2130 2131 userId, token, availableSize := "0", "", int64(1024*1024*1024*1024*2) 2132 if nil != Conf.User && conf.ProviderSiYuan == Conf.Sync.Provider { 2133 u := Conf.GetUser() 2134 userId = u.UserId 2135 token = u.UserToken 2136 availableSize = u.GetCloudRepoAvailableSize() 2137 } 2138 2139 ret = &cloud.Conf{ 2140 Dir: Conf.Sync.CloudName, 2141 UserID: userId, 2142 Token: token, 2143 AvailableSize: availableSize, 2144 Server: util.GetCloudServer(), 2145 } 2146 2147 switch Conf.Sync.Provider { 2148 case conf.ProviderSiYuan: 2149 ret.Endpoint = util.GetCloudSyncServer() 2150 case conf.ProviderS3: 2151 ret.S3 = &cloud.ConfS3{ 2152 Endpoint: Conf.Sync.S3.Endpoint, 2153 AccessKey: Conf.Sync.S3.AccessKey, 2154 SecretKey: Conf.Sync.S3.SecretKey, 2155 Bucket: Conf.Sync.S3.Bucket, 2156 Region: Conf.Sync.S3.Region, 2157 PathStyle: Conf.Sync.S3.PathStyle, 2158 SkipTlsVerify: Conf.Sync.S3.SkipTlsVerify, 2159 Timeout: Conf.Sync.S3.Timeout, 2160 ConcurrentReqs: Conf.Sync.S3.ConcurrentReqs, 2161 } 2162 case conf.ProviderWebDAV: 2163 ret.WebDAV = &cloud.ConfWebDAV{ 2164 Endpoint: Conf.Sync.WebDAV.Endpoint, 2165 Username: Conf.Sync.WebDAV.Username, 2166 Password: Conf.Sync.WebDAV.Password, 2167 SkipTlsVerify: Conf.Sync.WebDAV.SkipTlsVerify, 2168 Timeout: Conf.Sync.WebDAV.Timeout, 2169 ConcurrentReqs: Conf.Sync.WebDAV.ConcurrentReqs, 2170 } 2171 case conf.ProviderLocal: 2172 ret.Local = &cloud.ConfLocal{ 2173 Endpoint: Conf.Sync.Local.Endpoint, 2174 Timeout: Conf.Sync.Local.Timeout, 2175 ConcurrentReqs: Conf.Sync.Local.ConcurrentReqs, 2176 } 2177 default: 2178 err = fmt.Errorf("invalid provider [%d]", Conf.Sync.Provider) 2179 return 2180 } 2181 return 2182} 2183 2184type Backup struct { 2185 Size int64 `json:"size"` 2186 HSize string `json:"hSize"` 2187 Updated string `json:"updated"` 2188 SaveDir string `json:"saveDir"` // 本地备份数据存放目录路径 2189} 2190 2191type Sync struct { 2192 Size int64 `json:"size"` 2193 HSize string `json:"hSize"` 2194 Updated string `json:"updated"` 2195 CloudName string `json:"cloudName"` // 云端同步数据存放目录名 2196 SaveDir string `json:"saveDir"` // 本地同步数据存放目录路径 2197} 2198 2199func GetCloudSpace() (s *Sync, b *Backup, hSize, hAssetSize, hTotalSize, hExchangeSize, hTrafficUploadSize, hTrafficDownloadSize, hTrafficAPIGet, hTrafficAPIPut string, err error) { 2200 stat, err := getCloudSpace() 2201 if err != nil { 2202 err = errors.New(Conf.Language(30) + " " + err.Error()) 2203 return 2204 } 2205 2206 syncSize := stat.Sync.Size 2207 syncUpdated := stat.Sync.Updated 2208 s = &Sync{ 2209 Size: syncSize, 2210 HSize: "-", 2211 Updated: syncUpdated, 2212 } 2213 2214 backupSize := stat.Backup.Size 2215 backupUpdated := stat.Backup.Updated 2216 b = &Backup{ 2217 Size: backupSize, 2218 HSize: "-", 2219 Updated: backupUpdated, 2220 } 2221 2222 assetSize := stat.AssetSize 2223 totalSize := syncSize + backupSize + assetSize 2224 hAssetSize = "-" 2225 hSize = "-" 2226 hTotalSize = "-" 2227 hExchangeSize = "-" 2228 hTrafficUploadSize = "-" 2229 hTrafficDownloadSize = "-" 2230 hTrafficAPIGet = "-" 2231 hTrafficAPIPut = "-" 2232 if conf.ProviderSiYuan == Conf.Sync.Provider { 2233 s.HSize = humanize.BytesCustomCeil(uint64(syncSize), 2) 2234 b.HSize = humanize.BytesCustomCeil(uint64(backupSize), 2) 2235 hAssetSize = humanize.BytesCustomCeil(uint64(assetSize), 2) 2236 hSize = humanize.BytesCustomCeil(uint64(totalSize), 2) 2237 u := Conf.GetUser() 2238 hTotalSize = humanize.BytesCustomCeil(uint64(u.UserSiYuanRepoSize), 2) 2239 hExchangeSize = humanize.BytesCustomCeil(uint64(u.UserSiYuanPointExchangeRepoSize), 2) 2240 hTrafficUploadSize = humanize.BytesCustomCeil(uint64(u.UserTrafficUpload), 2) 2241 hTrafficDownloadSize = humanize.BytesCustomCeil(uint64(u.UserTrafficDownload), 2) 2242 hTrafficAPIGet = humanize.SIWithDigits(u.UserTrafficAPIGet, 2, "") 2243 hTrafficAPIPut = humanize.SIWithDigits(u.UserTrafficAPIPut, 2, "") 2244 } 2245 return 2246} 2247 2248func getCloudSpace() (stat *cloud.Stat, err error) { 2249 repo, err := newRepository() 2250 if err != nil { 2251 return 2252 } 2253 2254 stat, err = repo.GetCloudRepoStat() 2255 if err != nil { 2256 logging.LogErrorf("get cloud repo stat failed: %s", err) 2257 return 2258 } 2259 return 2260}