Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters: fix TestPendingLogsSubscription #23619

Merged
merged 1 commit into from
Oct 4, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 74 additions & 26 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,58 +506,80 @@ func TestPendingLogsSubscription(t *testing.T) {
},
}

pendingBlockNumber = big.NewInt(rpc.PendingBlockNumber.Int64())

testCases = []struct {
crit ethereum.FilterQuery
expected []*types.Log
c chan []*types.Log
sub *Subscription
err chan error
}{
// match all
{
ethereum.FilterQuery{}, flattenLogs(allLogs),
nil, nil,
ethereum.FilterQuery{FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
flattenLogs(allLogs),
nil, nil, nil,
},
// match none due to no matching addresses
{
ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}},
ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
nil,
nil, nil,
nil, nil, nil,
},
// match logs based on addresses, ignore topics
{
ethereum.FilterQuery{Addresses: []common.Address{firstAddr}},
ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
append(flattenLogs(allLogs[:2]), allLogs[5][3]),
nil, nil,
nil, nil, nil,
},
// match none due to no matching topics (match with address)
{
ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}},
ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
nil,
nil, nil, nil,
},
// match logs based on addresses and topics
{
ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}},
ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
append(flattenLogs(allLogs[3:5]), allLogs[5][0]),
nil, nil,
nil, nil, nil,
},
// match logs based on multiple addresses and "or" topics
{
ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}},
ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
append(flattenLogs(allLogs[2:5]), allLogs[5][0]),
nil, nil, nil,
},
// multiple pending logs, should match only 2 topics from the logs in block 5
{
ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
[]*types.Log{allLogs[5][0], allLogs[5][2]},
nil, nil, nil,
},
// match none due to only matching new mined logs
{
ethereum.FilterQuery{},
nil,
nil, nil, nil,
},
// match none due to only matching mined logs within a specific block range
{
ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)},
nil,
nil, nil, nil,
},
// block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes
// match all due to matching mined and pending logs
{
ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)},
append(flattenLogs(allLogs[:2]), allLogs[5][3]),
nil, nil,
ethereum.FilterQuery{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())},
flattenLogs(allLogs),
nil, nil, nil,
},
// multiple pending logs, should match only 2 topics from the logs in block 5
// match none due to matching logs from a specific block number to new mined blocks
{
ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}},
[]*types.Log{allLogs[5][0], allLogs[5][2]},
nil, nil,
ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())},
nil,
nil, nil, nil,
},
}
)
Expand All @@ -567,43 +589,69 @@ func TestPendingLogsSubscription(t *testing.T) {
// (some) events are posted.
for i := range testCases {
testCases[i].c = make(chan []*types.Log)
testCases[i].sub, _ = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c)
testCases[i].err = make(chan error)

var err error
testCases[i].sub, err = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c)
if err != nil {
t.Fatalf("SubscribeLogs %d failed: %v\n", i, err)
}
}

for n, test := range testCases {
i := n
tt := test
go func() {
defer tt.sub.Unsubscribe()

var fetched []*types.Log

timeout := time.After(1 * time.Second)
fetchLoop:
for {
logs := <-tt.c
fetched = append(fetched, logs...)
if len(fetched) >= len(tt.expected) {
select {
case logs := <-tt.c:
// Do not break early if we've fetched greater, or equal,
// to the number of logs expected. This ensures we do not
// deadlock the filter system because it will do a blocking
// send on this channel if another log arrives.
fetched = append(fetched, logs...)
case <-timeout:
break fetchLoop
}
}

if len(fetched) != len(tt.expected) {
panic(fmt.Sprintf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)))
tt.err <- fmt.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
return
}

for l := range fetched {
if fetched[l].Removed {
panic(fmt.Sprintf("expected log not to be removed for log %d in case %d", l, i))
tt.err <- fmt.Errorf("expected log not to be removed for log %d in case %d", l, i)
return
}
if !reflect.DeepEqual(fetched[l], tt.expected[l]) {
panic(fmt.Sprintf("invalid log on index %d for case %d", l, i))
tt.err <- fmt.Errorf("invalid log on index %d for case %d\n", l, i)
return
}
}
tt.err <- nil
}()
}

// raise events
time.Sleep(1 * time.Second)
for _, ev := range allLogs {
backend.pendingLogsFeed.Send(ev)
}

for i := range testCases {
err := <-testCases[i].err
if err != nil {
t.Fatalf("test %d failed: %v", i, err)
}
<-testCases[i].sub.Err()
}
}

// TestPendingTxFilterDeadlock tests if the event loop hangs when pending
Expand Down