Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cosmos event manager like standard cosmos modules are doing #1617

Merged
merged 4 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions cosmos/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c *Client) Stream(ctx context.Context, query string) (chan hash.Hash, chan
if err != nil {
return nil, nil, err
}
eventStream, err := c.EventBus.SubscribeUnbuffered(ctx, subscriber, q)
msgStream, err := c.EventBus.SubscribeUnbuffered(ctx, subscriber, q)
if err != nil {
return nil, nil, err
}
Expand All @@ -144,19 +144,22 @@ func (c *Client) Stream(ctx context.Context, query string) (chan hash.Hash, chan
loop:
for {
select {
case event := <-eventStream.Out():
tags := event.Events()[EventHashType]
if len(tags) != 1 {
errC <- fmt.Errorf("event %s has %d tag(s), but only 1 is expected", EventHashType, len(tags))
case msg := <-msgStream.Out():
attrs := msg.Events()[EventHashType]
// The following error might be too much as MAYBE if one transaction contains many messages, the events will be merged across the whole transaction
if len(attrs) != 1 {
errC <- fmt.Errorf("event %s has %d tag(s), but only 1 is expected", EventHashType, len(attrs))
}
hash, err := hash.Decode(tags[0])
if err != nil {
errC <- err
} else {
hashC <- hash
for _, attr := range attrs {
hash, err := hash.Decode(attr)
if err != nil {
errC <- err
} else {
hashC <- hash
}
}
case <-eventStream.Cancelled():
errC <- eventStream.Err()
case <-msgStream.Cancelled():
errC <- msgStream.Err()
break loop
case <-ctx.Done():
break loop
Expand Down
24 changes: 10 additions & 14 deletions cosmos/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (m AppModule) Route() string {
// NewHandler returns the handler used to apply transactions.
func (m AppModule) NewHandler() cosmostypes.Handler {
return func(request cosmostypes.Request, msg cosmostypes.Msg) cosmostypes.Result {
request = request.WithEventManager(cosmostypes.NewEventManager())
hash, err := m.handler(request, msg)
if err != nil {
if errsdk, ok := err.(cosmostypes.Error); ok {
Expand All @@ -113,25 +114,20 @@ func (m AppModule) NewHandler() cosmostypes.Handler {
return cosmostypes.ErrInternal(err.Error()).Result()
}

events := request.EventManager().Events()
events = events.AppendEvent(
cosmostypes.NewEvent(
cosmostypes.EventTypeMessage,
cosmostypes.NewAttribute(cosmostypes.AttributeKeyModule, m.name),
),
)
request.EventManager().EmitEvent(cosmostypes.NewEvent(
cosmostypes.EventTypeMessage,
cosmostypes.NewAttribute(cosmostypes.AttributeKeyModule, m.name),
))

if hash != nil {
events = events.AppendEvent(
cosmostypes.NewEvent(
cosmostypes.EventTypeMessage,
cosmostypes.NewAttribute(AttributeKeyHash, hash.String()),
),
)
request.EventManager().EmitEvent(cosmostypes.NewEvent(
cosmostypes.EventTypeMessage,
cosmostypes.NewAttribute(AttributeKeyHash, hash.String()),
))
}
return cosmostypes.Result{
Data: hash,
Events: events,
Events: request.EventManager().Events(),
}
}
}
Expand Down