Skip to content

Commit

Permalink
Merge branch 'dev' into pythonicode/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
alvin-reyes authored Mar 30, 2023
2 parents 8fe97b5 + 852d7ec commit dcb5a74
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 139 deletions.
5 changes: 3 additions & 2 deletions api/api_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
echoSwagger "github.com/swaggo/echo-swagger"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

type IRegister interface {
Expand All @@ -25,9 +26,9 @@ type apiEngine struct {
cfg *config.Estuary
}

func NewEngine(cfg *config.Estuary, tcr trace.Tracer) *apiEngine {
func NewEngine(cfg *config.Estuary, tcr trace.Tracer, log *zap.SugaredLogger) *apiEngine {
e := echo.New()
e.Binder = new(util.Binder)
e.Binder = util.NewBinder(log)
e.Pre(middleware.RemoveTrailingSlash())

if cfg.Logging.ApiEndpointLogging {
Expand Down
1 change: 1 addition & 0 deletions api/v1/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ func (s *apiV1) handleAddCar(c echo.Context, u *util.User) error {
s.log.Warnf("failed to close request body: %s", err)
}
}()

header, err := s.loadCar(ctx, sbs, c.Request().Body)
if err != nil {
return err
Expand Down
16 changes: 14 additions & 2 deletions cmd/benchest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
"strings"
"time"

logging "github.com/ipfs/go-log/v2"

"github.com/application-research/estuary/util"
pgd "github.com/jinzhu/gorm/dialects/postgres"
"github.com/urfave/cli/v2"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)

var logger = logging.Logger("benchtest")

func main() {
app := getApp()

Expand Down Expand Up @@ -420,7 +424,11 @@ func benchFetch(c string) (*fetchStats, error) {

status := resp.StatusCode

defer resp.Body.Close()
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Warnf("failed to close request body: %s", err)
}
}()

br := bufio.NewReader(resp.Body)

Expand Down Expand Up @@ -473,7 +481,11 @@ func ipfsCheck(c string, maddr string) *checkResp {
}
}

defer resp.Body.Close()
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Warnf("failed to close request body: %s", err)
}
}()

var out checkResp
out.CheckTook = time.Since(start)
Expand Down
9 changes: 6 additions & 3 deletions cmd/estuary-shuttle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func main() {
MaxActivePerUser: 30,
QueueDataDir: cfg.DataDir,
}, log)
go s.PinMgr.Run(300)
go s.PinMgr.Run(5)

// only refresh pin queue if pin queue refresh and local adding are enabled
if !cfg.NoReloadPinQueue && !cfg.Content.DisableLocalAdding {
Expand Down Expand Up @@ -841,6 +841,8 @@ type Shuttle struct {
tcLk sync.Mutex
trackingChannels map[string]*util.ChanTrack

commpLk sync.Mutex

splitLk sync.Mutex
splitsInProgress map[uint64]bool

Expand Down Expand Up @@ -1108,7 +1110,7 @@ func withUser(f func(echo.Context, *User) error) func(echo.Context) error {

func (s *Shuttle) ServeAPI() error {
e := echo.New()
e.Binder = new(util.Binder)
e.Binder = util.NewBinder(log)
e.Pre(middleware.RemoveTrailingSlash())

if s.shuttleConfig.Logging.ApiEndpointLogging {
Expand Down Expand Up @@ -1434,7 +1436,7 @@ func (s *Shuttle) handleAddCarToShuttle(c echo.Context, u *User) error {
defer func() {
go func() {
if err := s.StagingMgr.CleanUp(bsid); err != nil {
log.Errorf("failed to clean up staging blockstore: %s", err)
log.Warnf("failed to clean up staging blockstore: %s", err)
}
}()
}()
Expand All @@ -1444,6 +1446,7 @@ func (s *Shuttle) handleAddCarToShuttle(c echo.Context, u *User) error {
log.Warnf("failed to close request body: %s", err)
}
}()

header, err := s.loadCar(ctx, bs, c.Request().Body)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions cmd/estuary-shuttle/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ type commpResult struct {
}

func (d *Shuttle) handleRpcComputeCommP(ctx context.Context, cmd *rpcevent.ComputeCommP) error {
d.commpLk.Lock()
defer d.commpLk.Unlock()

ctx, span := d.Tracer.Start(ctx, "handleComputeCommP", trace.WithAttributes(
attribute.String("data", cmd.Data.String()),
))
Expand Down
7 changes: 4 additions & 3 deletions deal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ func (m *manager) DealComplete(contID uint64, tx *gorm.DB) {
m.log.Debugf("deal-making complete for content: %d", contID)

if err := tx.Model(model.DealQueue{}).Where("cont_id = ?", contID).UpdateColumns(map[string]interface{}{
"can_deal": false,
"deal_count": 0,
"can_deal": false,
"deal_count": 0,
"deal_check_next_attempt_at": time.Now().Add(10 * time.Hour).UTC(),
}).Error; err != nil {
m.log.Errorf("failed to update deal queue (DealComplete) for cont %d - %s", contID, err)
return
Expand All @@ -105,7 +106,7 @@ func (m *manager) DealCheckComplete(contID uint64, dealsToBeMade int, tx *gorm.D
if err := tx.Model(model.DealQueue{}).Where("cont_id = ?", contID).UpdateColumns(map[string]interface{}{
"can_deal": canDeal,
"deal_count": dealsToBeMade,
"deal_check_next_attempt_at": time.Now().Add(48 * time.Hour).UTC(),
"deal_check_next_attempt_at": time.Now().Add(10 * time.Hour).UTC(),
}).Error; err != nil {
m.log.Errorf("failed to update deal queue (DealCheckComplete) for cont %d - %s", contID, err)
}
Expand Down
2 changes: 1 addition & 1 deletion deal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (m *manager) runDealBackFillWorker(ctx context.Context) {
m.log.Debugf("trying to start deal queue backfill, starting from content: %d", tracker.LastContID)

var contents []*util.Content
if err := m.db.Where("size >= ? and size <= ? and active", m.cfg.Content.MinSize, m.cfg.Content.MaxSize).Order("id asc").Limit(2000).Find(&contents).Error; err != nil {
if err := m.db.Where("size >= ? and size <= ? and active and id > ?", m.cfg.Content.MinSize, m.cfg.Content.MaxSize, tracker.LastContID).Order("id asc").Limit(2000).Find(&contents).Error; err != nil {
m.log.Warnf("failed to get contents for deal queue backfill - %s", err)
continue
}
Expand Down
46 changes: 23 additions & 23 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/application-research/filclient v0.5.0-rc1
github.com/application-research/goque v1.0.3-0.20221024210042-22e2e2c1a730
github.com/cenkalti/backoff/v4 v4.2.0
github.com/cheggaaa/pb/v3 v3.1.0
github.com/cheggaaa/pb/v3 v3.1.2
github.com/docker/go-units v0.5.0
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5
Expand All @@ -18,7 +18,7 @@ require (
github.com/filecoin-project/go-data-transfer v1.15.3
github.com/filecoin-project/go-fil-commcid v0.1.0 // indirect
github.com/filecoin-project/go-fil-markets v1.26.0
github.com/filecoin-project/go-jsonrpc v0.2.1
github.com/filecoin-project/go-jsonrpc v0.2.3
github.com/filecoin-project/go-legs v0.4.9 // indirect
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-state-types v0.10.0-alpha-2
Expand All @@ -40,55 +40,55 @@ require (
github.com/ipfs/go-fetcher v1.6.1
github.com/ipfs/go-graphsync v0.13.2
github.com/ipfs/go-ipfs v0.13.1
github.com/ipfs/go-ipfs-blockstore v1.2.0
github.com/ipfs/go-ipfs-blockstore v1.3.0
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-exchange-interface v0.2.0
github.com/ipfs/go-ipfs-exchange-offline v0.3.0
github.com/ipfs/go-ipld-cbor v0.0.6
github.com/ipfs/go-ipld-format v0.4.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-merkledag v0.9.0
github.com/ipfs/go-merkledag v0.10.0
github.com/ipfs/go-metrics-interface v0.0.1
github.com/ipfs/go-path v0.3.0
github.com/ipfs/go-unixfs v0.4.2
github.com/ipfs/go-unixfsnode v1.5.1
github.com/ipld/go-car v0.6.0
github.com/ipld/go-codec-dagpb v1.5.0
github.com/ipld/go-ipld-prime v0.19.0
github.com/ipld/go-codec-dagpb v1.6.0
github.com/ipld/go-ipld-prime v0.20.0
github.com/jinzhu/gorm v1.9.16
github.com/labstack/echo/v4 v4.10.0
github.com/labstack/gommon v0.4.0
github.com/libp2p/go-libp2p v0.23.4
github.com/libp2p/go-libp2p-kad-dht v0.20.0
github.com/libp2p/go-libp2p-pubsub v0.8.3 // indirect
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.6.0
github.com/libp2p/go-libp2p-routing-helpers v0.6.1
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.8.0
github.com/multiformats/go-multihash v0.2.1
github.com/paskal/golang-lru v0.6.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.14.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.2
github.com/swaggo/echo-swagger v1.3.5
github.com/swaggo/swag v1.8.10
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/urfave/cli/v2 v2.24.3
github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa
github.com/whyrusleeping/memo v0.0.0-20211124220851-3b94446416a3
go.opencensus.io v0.24.0
go.opentelemetry.io/otel v1.12.0
go.opentelemetry.io/otel/exporters/jaeger v1.12.0
go.opentelemetry.io/otel/sdk v1.12.0
go.opentelemetry.io/otel/trace v1.12.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/exporters/jaeger v1.14.0
go.opentelemetry.io/otel/sdk v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.5.0
golang.org/x/net v0.5.0
golang.org/x/sys v0.4.0
golang.org/x/crypto v0.6.0
golang.org/x/net v0.7.0
golang.org/x/sys v0.6.0
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
gorm.io/driver/postgres v1.4.6
gorm.io/driver/postgres v1.5.0
gorm.io/driver/sqlite v1.4.4
gorm.io/gorm v1.24.5
gorm.io/gorm v1.24.7-0.20230306060331-85eaf9eeda11
)

require (
Expand All @@ -113,7 +113,7 @@ require (
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/VividCortex/ewma v1.1.1 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/akavel/rsrc v0.8.0 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
Expand Down Expand Up @@ -142,7 +142,7 @@ require (
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f // indirect
github.com/filecoin-project/go-amt-ipld/v2 v2.1.0 // indirect
github.com/filecoin-project/go-amt-ipld/v3 v3.1.0 // indirect
Expand Down Expand Up @@ -229,7 +229,7 @@ require (
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.2.0 // indirect
github.com/jackc/pgx/v5 v5.3.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
Expand Down Expand Up @@ -285,7 +285,7 @@ require (
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.1.1 // indirect
github.com/multiformats/go-multicodec v0.7.0 // indirect
github.com/multiformats/go-multicodec v0.8.0 // indirect
github.com/multiformats/go-multistream v0.3.3 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/nkovacs/streamquote v1.0.0 // indirect
Expand Down Expand Up @@ -331,8 +331,8 @@ require (
golang.org/x/exp v0.0.0-20230124142953-7f5a42a36c7e // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/term v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.5.0 // indirect
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect
google.golang.org/grpc v1.47.0 // indirect
Expand Down
Loading

0 comments on commit dcb5a74

Please sign in to comment.