Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aria2 下载增加做种状态 #1422

Merged
merged 3 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/aria2/aria2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package aria2
import (
"context"
"fmt"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"net/url"
"sync"
"time"
Expand All @@ -14,6 +12,8 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/monitor"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/balancer"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
)

// Instance 默认使用的Aria2处理实例
Expand All @@ -40,7 +40,7 @@ func Init(isReload bool, pool cluster.Pool, mqClient mq.MQ) {

if !isReload {
// 从数据库中读取未完成任务,创建监控
unfinished := model.GetDownloadsByStatus(common.Ready, common.Paused, common.Downloading)
unfinished := model.GetDownloadsByStatus(common.Ready, common.Paused, common.Downloading, common.Seeding)

for i := 0; i < len(unfinished); i++ {
// 创建任务监控
Expand Down
9 changes: 7 additions & 2 deletions pkg/aria2/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
Canceled
// Unknown 未知状态
Unknown
// Seeding 做种中
Seeding
)

var (
Expand Down Expand Up @@ -94,11 +96,14 @@ func (instance *DummyAria2) DeleteTempFile(src *model.Download) error {
}

// GetStatus 将给定的状态字符串转换为状态标识数字
func GetStatus(status string) int {
switch status {
func GetStatus(status rpc.StatusInfo) int {
switch status.Status {
case "complete":
return Complete
case "active":
if status.BitTorrent.Mode != "" && status.CompletedLength == status.TotalLength {
return Seeding
}
return Downloading
case "waiting":
return Ready
Expand Down
25 changes: 17 additions & 8 deletions pkg/aria2/common/common_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package common

import (
"testing"

model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/stretchr/testify/assert"
"testing"
)

func TestDummyAria2(t *testing.T) {
Expand Down Expand Up @@ -35,11 +37,18 @@ func TestDummyAria2(t *testing.T) {
func TestGetStatus(t *testing.T) {
a := assert.New(t)

a.Equal(GetStatus("complete"), Complete)
a.Equal(GetStatus("active"), Downloading)
a.Equal(GetStatus("waiting"), Ready)
a.Equal(GetStatus("paused"), Paused)
a.Equal(GetStatus("error"), Error)
a.Equal(GetStatus("removed"), Canceled)
a.Equal(GetStatus("unknown"), Unknown)
a.Equal(GetStatus(rpc.StatusInfo{Status: "complete"}), Complete)
a.Equal(GetStatus(rpc.StatusInfo{Status: "active",
BitTorrent: rpc.BitTorrentInfo{Mode: ""}}), Downloading)
a.Equal(GetStatus(rpc.StatusInfo{Status: "active",
BitTorrent: rpc.BitTorrentInfo{Mode: "single"},
TotalLength: "100", CompletedLength: "50"}), Downloading)
a.Equal(GetStatus(rpc.StatusInfo{Status: "active",
BitTorrent: rpc.BitTorrentInfo{Mode: "multi"},
TotalLength: "100", CompletedLength: "100"}), Seeding)
a.Equal(GetStatus(rpc.StatusInfo{Status: "waiting"}), Ready)
a.Equal(GetStatus(rpc.StatusInfo{Status: "paused"}), Paused)
a.Equal(GetStatus(rpc.StatusInfo{Status: "error"}), Error)
a.Equal(GetStatus(rpc.StatusInfo{Status: "removed"}), Canceled)
a.Equal(GetStatus(rpc.StatusInfo{Status: "unknown"}), Unknown)
}
48 changes: 41 additions & 7 deletions pkg/aria2/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ func (monitor *Monitor) Update() bool {

util.Log().Debug("离线下载[%s]更新状态[%s]", status.Gid, status.Status)

switch status.Status {
case "complete":
switch common.GetStatus(status) {
case common.Complete, common.Seeding:
return monitor.Complete(task.TaskPoll)
case "error":
case common.Error:
return monitor.Error(status)
case "active", "waiting", "paused":
case common.Downloading, common.Ready, common.Paused:
return false
case "removed":
case common.Canceled:
monitor.Task.Status = common.Canceled
monitor.Task.Save()
monitor.RemoveTempFolder()
Expand All @@ -132,7 +132,7 @@ func (monitor *Monitor) UpdateTaskInfo(status rpc.StatusInfo) error {
originSize := monitor.Task.TotalSize

monitor.Task.GID = status.Gid
monitor.Task.Status = common.GetStatus(status.Status)
monitor.Task.Status = common.GetStatus(status)

// 文件大小、已下载大小
total, err := strconv.ParseUint(status.TotalLength, 10, 64)
Expand Down Expand Up @@ -235,6 +235,40 @@ func (monitor *Monitor) RemoveTempFolder() {

// Complete 完成下载,返回是否中断监控
func (monitor *Monitor) Complete(pool task.Pool) bool {
// 未开始转存,提交转存任务
if monitor.Task.TaskID == 0 {
return monitor.transfer(pool)
}

// 做种完成
if common.GetStatus(monitor.Task.StatusInfo) == common.Complete {
transferTask, err := model.GetTasksByID(monitor.Task.TaskID)
if err != nil {
monitor.setErrorStatus(err)
monitor.RemoveTempFolder()
return true
}

// 转存完成,回收下载目录
if transferTask.Type == task.TransferTaskType && transferTask.Status >= task.Error {
job, err := task.NewRecycleTask(monitor.Task)
if err != nil {
monitor.setErrorStatus(err)
monitor.RemoveTempFolder()
return true
}

// 提交回收任务
pool.Submit(job)

return true
}
}

return false
}

func (monitor *Monitor) transfer(pool task.Pool) bool {
// 创建中转任务
file := make([]string, 0, len(monitor.Task.StatusInfo.Files))
sizes := make(map[string]uint64, len(monitor.Task.StatusInfo.Files))
Expand Down Expand Up @@ -269,7 +303,7 @@ func (monitor *Monitor) Complete(pool task.Pool) bool {
monitor.Task.TaskID = job.Model().ID
monitor.Task.Save()

return true
return false
}

func (monitor *Monitor) setErrorStatus(err error) {
Expand Down
11 changes: 10 additions & 1 deletion pkg/aria2/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package monitor
import (
"database/sql"
"errors"
"testing"

"github.com/DATA-DOG/go-sqlmock"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
Expand All @@ -13,7 +15,6 @@ import (
"github.com/jinzhu/gorm"
"github.com/stretchr/testify/assert"
testMock "github.com/stretchr/testify/mock"
"testing"
)

var mock sqlmock.Sqlmock
Expand Down Expand Up @@ -431,6 +432,14 @@ func TestMonitor_Complete(t *testing.T) {
mock.ExpectExec("UPDATE(.+)downloads").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

mock.ExpectQuery("SELECT(.+)tasks").WillReturnRows(sqlmock.NewRows([]string{"id", "type", "status"}).AddRow(1, 2, 4))
mock.ExpectQuery("SELECT(.+)users").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(9414))
mock.ExpectBegin()
mock.ExpectExec("INSERT(.+)tasks").WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectCommit()

a.False(m.Complete(mockPool))
m.Task.StatusInfo.Status = "complete"
a.True(m.Complete(mockPool))
a.NoError(mock.ExpectationsWereMet())
mockNode.AssertExpectations(t)
Expand Down
60 changes: 31 additions & 29 deletions pkg/aria2/rpc/resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,27 @@ package rpc

// StatusInfo represents response of aria2.tellStatus
type StatusInfo struct {
Gid string `json:"gid"` // GID of the download.
Status string `json:"status"` // active for currently downloading/seeding downloads. waiting for downloads in the queue; download is not started. paused for paused downloads. error for downloads that were stopped because of error. complete for stopped and completed downloads. removed for the downloads removed by user.
TotalLength string `json:"totalLength"` // Total length of the download in bytes.
CompletedLength string `json:"completedLength"` // Completed length of the download in bytes.
UploadLength string `json:"uploadLength"` // Uploaded length of the download in bytes.
BitField string `json:"bitfield"` // Hexadecimal representation of the download progress. The highest bit corresponds to the piece at index 0. Any set bits indicate loaded pieces, while unset bits indicate not yet loaded and/or missing pieces. Any overflow bits at the end are set to zero. When the download was not started yet, this key will not be included in the response.
DownloadSpeed string `json:"downloadSpeed"` // Download speed of this download measured in bytes/sec.
UploadSpeed string `json:"uploadSpeed"` // LocalUpload speed of this download measured in bytes/sec.
InfoHash string `json:"infoHash"` // InfoHash. BitTorrent only.
NumSeeders string `json:"numSeeders"` // The number of seeders aria2 has connected to. BitTorrent only.
Seeder string `json:"seeder"` // true if the local endpoint is a seeder. Otherwise false. BitTorrent only.
PieceLength string `json:"pieceLength"` // Piece length in bytes.
NumPieces string `json:"numPieces"` // The number of pieces.
Connections string `json:"connections"` // The number of peers/servers aria2 has connected to.
ErrorCode string `json:"errorCode"` // The code of the last error for this item, if any. The value is a string. The error codes are defined in the EXIT STATUS section. This value is only available for stopped/completed downloads.
ErrorMessage string `json:"errorMessage"` // The (hopefully) human readable error message associated to errorCode.
FollowedBy []string `json:"followedBy"` // List of GIDs which are generated as the result of this download. For example, when aria2 downloads a Metalink file, it generates downloads described in the Metalink (see the --follow-metalink option). This value is useful to track auto-generated downloads. If there are no such downloads, this key will not be included in the response.
BelongsTo string `json:"belongsTo"` // GID of a parent download. Some downloads are a part of another download. For example, if a file in a Metalink has BitTorrent resources, the downloads of ".torrent" files are parts of that parent. If this download has no parent, this key will not be included in the response.
Dir string `json:"dir"` // Directory to save files.
Files []FileInfo `json:"files"` // Returns the list of files. The elements of this list are the same structs used in aria2.getFiles() method.
BitTorrent struct {
AnnounceList [][]string `json:"announceList"` // List of lists of announce URIs. If the torrent contains announce and no announce-list, announce is converted to the announce-list format.
Comment string `json:"comment"` // The comment of the torrent. comment.utf-8 is used if available.
CreationDate int64 `json:"creationDate"` // The creation time of the torrent. The value is an integer since the epoch, measured in seconds.
Mode string `json:"mode"` // File mode of the torrent. The value is either single or multi.
Info struct {
Name string `json:"name"` // name in info dictionary. name.utf-8 is used if available.
} `json:"info"` // Struct which contains data from Info dictionary. It contains following keys.
} `json:"bittorrent"` // Struct which contains information retrieved from the .torrent (file). BitTorrent only. It contains following keys.
Gid string `json:"gid"` // GID of the download.
Status string `json:"status"` // active for currently downloading/seeding downloads. waiting for downloads in the queue; download is not started. paused for paused downloads. error for downloads that were stopped because of error. complete for stopped and completed downloads. removed for the downloads removed by user.
TotalLength string `json:"totalLength"` // Total length of the download in bytes.
CompletedLength string `json:"completedLength"` // Completed length of the download in bytes.
UploadLength string `json:"uploadLength"` // Uploaded length of the download in bytes.
BitField string `json:"bitfield"` // Hexadecimal representation of the download progress. The highest bit corresponds to the piece at index 0. Any set bits indicate loaded pieces, while unset bits indicate not yet loaded and/or missing pieces. Any overflow bits at the end are set to zero. When the download was not started yet, this key will not be included in the response.
DownloadSpeed string `json:"downloadSpeed"` // Download speed of this download measured in bytes/sec.
UploadSpeed string `json:"uploadSpeed"` // LocalUpload speed of this download measured in bytes/sec.
InfoHash string `json:"infoHash"` // InfoHash. BitTorrent only.
NumSeeders string `json:"numSeeders"` // The number of seeders aria2 has connected to. BitTorrent only.
Seeder string `json:"seeder"` // true if the local endpoint is a seeder. Otherwise false. BitTorrent only.
PieceLength string `json:"pieceLength"` // Piece length in bytes.
NumPieces string `json:"numPieces"` // The number of pieces.
Connections string `json:"connections"` // The number of peers/servers aria2 has connected to.
ErrorCode string `json:"errorCode"` // The code of the last error for this item, if any. The value is a string. The error codes are defined in the EXIT STATUS section. This value is only available for stopped/completed downloads.
ErrorMessage string `json:"errorMessage"` // The (hopefully) human readable error message associated to errorCode.
FollowedBy []string `json:"followedBy"` // List of GIDs which are generated as the result of this download. For example, when aria2 downloads a Metalink file, it generates downloads described in the Metalink (see the --follow-metalink option). This value is useful to track auto-generated downloads. If there are no such downloads, this key will not be included in the response.
BelongsTo string `json:"belongsTo"` // GID of a parent download. Some downloads are a part of another download. For example, if a file in a Metalink has BitTorrent resources, the downloads of ".torrent" files are parts of that parent. If this download has no parent, this key will not be included in the response.
Dir string `json:"dir"` // Directory to save files.
Files []FileInfo `json:"files"` // Returns the list of files. The elements of this list are the same structs used in aria2.getFiles() method.
BitTorrent BitTorrentInfo `json:"bittorrent"` // Struct which contains information retrieved from the .torrent (file). BitTorrent only. It contains following keys.
}

// URIInfo represents an element of response of aria2.getUris
Expand Down Expand Up @@ -100,3 +92,13 @@ type Method struct {
Name string `json:"methodName"` // Method name to call
Params []interface{} `json:"params"` // Array containing parameters to the method call
}

type BitTorrentInfo struct {
AnnounceList [][]string `json:"announceList"` // List of lists of announce URIs. If the torrent contains announce and no announce-list, announce is converted to the announce-list format.
Comment string `json:"comment"` // The comment of the torrent. comment.utf-8 is used if available.
CreationDate int64 `json:"creationDate"` // The creation time of the torrent. The value is an integer since the epoch, measured in seconds.
Mode string `json:"mode"` // File mode of the torrent. The value is either single or multi.
Info struct {
Name string `json:"name"` // name in info dictionary. name.utf-8 is used if available.
} `json:"info"` // Struct which contains data from Info dictionary. It contains following keys.
}
7 changes: 4 additions & 3 deletions pkg/filesystem/driver/shadow/slaveinmaster/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"context"
"encoding/json"
"errors"
"net/url"
"time"

model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
Expand All @@ -13,8 +16,6 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"net/url"
"time"
)

// Driver 影子存储策略,将上传任务指派给从机节点处理,并等待从机通知上传结果
Expand Down Expand Up @@ -118,6 +119,6 @@ func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]respo
}

// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
func (d *Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}
1 change: 1 addition & 0 deletions pkg/serializer/slave.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/sha1"
"encoding/gob"
"fmt"

model "github.com/cloudreve/Cloudreve/v3/models"
)

Expand Down
14 changes: 13 additions & 1 deletion pkg/serializer/slave_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package serializer

import (
"testing"

model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/stretchr/testify/assert"
"testing"
)

func TestSlaveTransferReq_Hash(t *testing.T) {
Expand All @@ -18,3 +19,14 @@ func TestSlaveTransferReq_Hash(t *testing.T) {
}
a.NotEqual(s1.Hash("1"), s2.Hash("1"))
}

func TestSlaveRecycleReq_Hash(t *testing.T) {
a := assert.New(t)
s1 := &SlaveRecycleReq{
Path: "1",
}
s2 := &SlaveRecycleReq{
Path: "2",
}
a.NotEqual(s1.Hash("1"), s2.Hash("1"))
}
4 changes: 4 additions & 0 deletions pkg/task/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
TransferTaskType
// ImportTaskType 导入任务
ImportTaskType
// RecycleTaskType 回收任务
RecycleTaskType
)

// 任务状态
Expand Down Expand Up @@ -113,6 +115,8 @@ func GetJobFromModel(task *model.Task) (Job, error) {
return NewTransferTaskFromModel(task)
case ImportTaskType:
return NewImportTaskFromModel(task)
case RecycleTaskType:
return NewRecycleTaskFromModel(task)
default:
return nil, ErrUnknownTaskType
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/task/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package task

import (
"errors"
testMock "github.com/stretchr/testify/mock"
"testing"

"github.com/DATA-DOG/go-sqlmock"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/stretchr/testify/assert"
testMock "github.com/stretchr/testify/mock"
)

func TestRecord(t *testing.T) {
Expand Down Expand Up @@ -103,4 +103,16 @@ func TestGetJobFromModel(t *testing.T) {
asserts.Nil(job)
asserts.Error(err)
}
// RecycleTaskType
{
task := &model.Task{
Status: 0,
Type: RecycleTaskType,
}
mock.ExpectQuery("SELECT(.+)users(.+)").WillReturnError(errors.New("error"))
job, err := GetJobFromModel(task)
asserts.NoError(mock.ExpectationsWereMet())
asserts.Nil(job)
asserts.Error(err)
}
}
Loading