Skip to content

Commit

Permalink
Only enqueue non-terminal tasks (#164)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Sep 14, 2023
1 parent afecec7 commit fc8afaa
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 9 deletions.
23 changes: 16 additions & 7 deletions flytestdlib/cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ type metrics struct {
scope promutils.Scope
}

type Item interface{}
type Item interface {
IsTerminal() bool
}

// Items are wrapped inside an ItemWrapper to be stored in the cache.
type ItemWrapper interface {
Expand Down Expand Up @@ -164,7 +166,7 @@ func (w *autoRefresh) Start(ctx context.Context) error {
go wait.Until(func() {
err := w.enqueueBatches(enqueueCtx)
if err != nil {
logger.Errorf(enqueueCtx, "Failed to sync. Error: %v", err)
logger.Errorf(enqueueCtx, "Failed to enqueue. Error: %v", err)
}
}, w.syncPeriod, enqueueCtx.Done())

Expand Down Expand Up @@ -209,13 +211,20 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {

snapshot := make([]ItemWrapper, 0, len(keys))
for _, k := range keys {
if w.toDelete.Contains(k) {
w.lruMap.Remove(k)
w.toDelete.Remove(k)
continue
}
// If not ok, it means evicted between the item was evicted between getting the keys and this update loop
// which is fine, we can just ignore.
if value, ok := w.lruMap.Peek(k); ok && !w.toDelete.Contains(k) {
snapshot = append(snapshot, itemWrapper{
id: k.(ItemID),
item: value.(Item),
})
if value, ok := w.lruMap.Peek(k); ok {
if item, ok := value.(Item); !ok || (ok && !item.IsTerminal()) {
snapshot = append(snapshot, itemWrapper{
id: k.(ItemID),
item: value.(Item),
})
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions flytestdlib/cache/auto_refresh_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type ExampleCacheItem struct {
id string
}

func (e *ExampleCacheItem) IsTerminal() bool {
return e.status == ExampleStatusSucceeded
}

func (e *ExampleCacheItem) ID() string {
return e.id
}
Expand Down
42 changes: 40 additions & 2 deletions flytestdlib/cache/auto_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ type fakeCacheItem struct {
val int
}

func (f fakeCacheItem) IsTerminal() bool {
return false
}

type terminalCacheItem struct {
val int
}

func (t terminalCacheItem) IsTerminal() bool {
return true
}

func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) {
items := make([]ItemSyncResponse, 0, len(batch))
for _, obj := range batch {
Expand All @@ -46,7 +58,11 @@ func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) {
return items, nil
}

func TestCacheTwo(t *testing.T) {
func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) {
panic("This should never be called")
}

func TestCacheThree(t *testing.T) {
testResyncPeriod := time.Millisecond
rateLimiter := workqueue.DefaultControllerRateLimiter()

Expand Down Expand Up @@ -104,6 +120,28 @@ func TestCacheTwo(t *testing.T) {

cancel()
})

t.Run("Enqueue nothing", func(t *testing.T) {
cache, err := NewAutoRefreshCache("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope())
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
assert.NoError(t, cache.Start(ctx))

// Create ten items in the cache
for i := 1; i <= 10; i++ {
_, err := cache.GetOrCreate(fmt.Sprintf("%d", i), terminalCacheItem{
val: 0,
})
assert.NoError(t, err)
}

// Wait half a second for all resync periods to complete
// If the cache tries to enqueue the item, a panic will be thrown.
time.Sleep(500 * time.Millisecond)

cancel()
})
}

func TestQueueBuildUp(t *testing.T) {
Expand Down Expand Up @@ -134,7 +172,7 @@ func TestQueueBuildUp(t *testing.T) {
defer cancelNow()

for i := 0; i < size; i++ {
_, err := cache.GetOrCreate(strconv.Itoa(i), "test")
_, err := cache.GetOrCreate(strconv.Itoa(i), fakeCacheItem{val: 3})
assert.NoError(t, err)
}

Expand Down

0 comments on commit fc8afaa

Please sign in to comment.