Skip to content

Commit

Permalink
feat: improve workflow and refactor rocketmq (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
sysulq authored Sep 30, 2022
1 parent 59ae1ea commit 7199981
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 149 deletions.
48 changes: 4 additions & 44 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,6 @@ jobs:
matrix:
go: ["1.16.x", "1.17.x", "1.18.x", "1.19.x"]
runs-on: ubuntu-latest
services:
mysql:
image: mysql:5.7
env:
MYSQL_ROOT_PASSWORD: 123456
ports:
- 3306:3306
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
redis:
image: redis
ports:
- 6379:6379
etcd:
image: "quay.io/coreos/etcd:v3.3"
env:
ETCD_ADVERTISE_CLIENT_URLS: "http://0.0.0.0:2379"
ETCD_LISTEN_CLIENT_URLS: "http://0.0.0.0:2379"
ETCDCTL_API: "3"
ALLOW_NONE_AUTHENTICATION: "yes"
ports:
- 2379:2379
- 2380:2380
- 4001:4001

steps:
- name: Set up Go ${{ matrix.go }}
Expand All @@ -88,6 +65,9 @@ jobs:
- name: Check out code into the Go module directory
uses: actions/[email protected]

- name: docker-compose
run: docker-compose -f test/docker-compose.yaml up -d

- uses: actions/cache@v3
with:
path: |
Expand All @@ -102,29 +82,9 @@ jobs:
go env
go get -v -t -d ./...
- uses: actions/[email protected]
- uses: actions/setup-java@v3
with:
distribution: "temurin" # See 'Supported distributions' for available options
java-version: "8"
- name: setup rocketmq
run: |
export PROJECT_PATH=`pwd`
cd $HOME
wget https://archive.apache.org/dist/rocketmq/4.6.0/rocketmq-all-4.6.0-bin-release.zip
unzip rocketmq-all-4.6.0-bin-release.zip
cd rocketmq-all-4.6.0-bin-release
perl -i -pe's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh
echo "brokerIP1=127.0.0.1" > broker.properties
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n localhost:9876 -c broker.properties &
- name: Test
run: |
go test `go list ./... | grep -v github.com/douyu/jupiter/pkg/client/rocketmq` -race
# test rocketmq
sh $HOME/rocketmq-all-4.6.0-bin-release/bin/mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t test
go test -coverprofile=coverage.txt -covermode=atomic ./...
go test -race -coverprofile=coverage.txt -covermode=atomic ./...
- name: Codecov
uses: codecov/codecov-action@v3
Expand Down
5 changes: 0 additions & 5 deletions pkg/client/rocketmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ type PushConsumer struct {

func (conf *ConsumerConfig) Build() *PushConsumer {
name := conf.Name
if _, ok := _consumers.Load(name); ok {
xlog.Jupiter().Panic("duplicated load", xlog.String("name", name))
}

xlog.Jupiter().Debug("rocketmq's config: ", xlog.String("name", name), xlog.Any("conf", conf))

Expand Down Expand Up @@ -82,7 +79,6 @@ func (conf *ConsumerConfig) Build() *PushConsumer {
}
})

_consumers.Store(name, cc)
return cc
}

Expand All @@ -91,7 +87,6 @@ func (cc *PushConsumer) Close() {
if err != nil {
xlog.Jupiter().Warn("consumer close fail", zap.Error(err))
}
_consumers.Delete(cc.name)
}

func (cc *PushConsumer) WithInterceptor(fs ...primitive.Interceptor) *PushConsumer {
Expand Down
71 changes: 0 additions & 71 deletions pkg/client/rocketmq/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,15 @@ package rocketmq

import (
"fmt"
"net/http"
"os"
"runtime"
"sync"

"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
"github.com/douyu/jupiter/pkg/application"
"github.com/douyu/jupiter/pkg/governor"
"github.com/douyu/jupiter/pkg/xlog"
jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"
)

var _producers = &sync.Map{}
var _consumers = &sync.Map{}

func init() {
rlog.SetLogLevel("debug")
rlog.SetLogger(&mqLogger{xlog.Jupiter()})
Expand All @@ -43,67 +35,4 @@ func init() {
fmt.Fprint(os.Stderr, "[rocketmq panic recovery]\n", string(stack[:length]))
xlog.Jupiter().Error("rocketmq panic recovery", zap.Any("error", i))
}

governor.HandleFunc("/debug/rocketmq/stats", func(w http.ResponseWriter, r *http.Request) {
type rocketmqStatus struct {
application.RuntimeStats
RocketMQs map[string]interface{} `json:"rocketmqs"`
FlowInfo map[string]FlowInfo `json:"flowInfo"`
}

var rets = rocketmqStatus{
RuntimeStats: application.NewRuntimeStats(),
RocketMQs: make(map[string]interface{}),
FlowInfo: make(map[string]FlowInfo),
}

_producers.Range(func(key interface{}, val interface{}) bool {
name := key.(string)
cc := val.(*Producer)
rets.RocketMQs[name] = map[string]interface{}{
"role": "producer",
"config": cc.ProducerConfig,
}
rets.FlowInfo[fmt.Sprintf("%s_%s", name, cc.fInfo.GroupType)] = cc.fInfo
return true
})

_consumers.Range(func(key interface{}, val interface{}) bool {
name := key.(string)
cc := val.(*PushConsumer)
rets.RocketMQs[name] = map[string]interface{}{
"config": cc.ConsumerConfig,
"role": "consumer",
}
rets.FlowInfo[name+"_"+cc.fInfo.GroupType] = cc.fInfo
return true
})

_ = jsoniter.NewEncoder(w).Encode(rets)
})
}

func GetProducer(name string) *Producer {
if ins, ok := _producers.Load(name); ok {
return ins.(*Producer)
}
return nil
}

// Get ...
func GetConsumer(name string) *PushConsumer {
if ins, ok := _consumers.Load(name); ok {
return ins.(*PushConsumer)
}

return nil
}

// Invoker ...
func InvokerProducer(name string) *Producer {
if client := GetProducer(name); client != nil {
return client
}

return StdNewProducer(name)
}
6 changes: 0 additions & 6 deletions pkg/client/rocketmq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ func StdNewProducer(name string) *Producer {
func (conf *ProducerConfig) Build() *Producer {
name := conf.Name

if _, ok := _producers.Load(name); ok {
xlog.Jupiter().Panic("duplicated load", xlog.String("name", name))
}

if xdebug.IsDevelopmentMode() {
xdebug.PrettyJsonPrint("rocketmq's config: "+name, conf)
}
Expand All @@ -72,7 +68,6 @@ func (conf *ProducerConfig) Build() *Producer {
_ = cc.Start()
})

_producers.Store(name, cc)
return cc
}

Expand Down Expand Up @@ -126,7 +121,6 @@ func (pc *Producer) Close() error {
xlog.Jupiter().Warn("consumer close fail", xlog.Any("error", err.Error()))
return err
}
_producers.Delete(pc.name)
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions test/data/broker/conf/broker.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
autoCreateTopicEnable = true
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#set `brokerIP1` if you want to set physical IP as broker IP.
brokerIP1=127.0.0.1
79 changes: 79 additions & 0 deletions test/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
version: '3.3'
services:
cache:
image: redis:6.2-alpine
restart: always
ports:
- 6379:6379
command: redis-server
# volumes:
# - ./data/cache:/data
db:
image: mysql:5.7
# volumes:
# - ./data/db:/var/run/mysqld/
command: --default-authentication-plugin=mysql_native_password
environment:
MYSQL_ROOT_PASSWORD: 123456
MYSQL_DATABASE: jupiter
ports:
- 3306:3306
adminer:
image: adminer
restart: always
ports:
- 8080:8080
etcd:
image: "quay.io/coreos/etcd:v3.3"
environment:
ETCD_ADVERTISE_CLIENT_URLS: "http://0.0.0.0:2379"
ETCD_LISTEN_CLIENT_URLS: "http://0.0.0.0:2379"
ETCDCTL_API: "3"
ALLOW_NONE_AUTHENTICATION: "yes"
ports:
- 2379:2379
- 2380:2380
- 4001:4001

namesrv:
image: apacherocketmq/rocketmq:4.5.0
ports:
- 9876:9876
# volumes:
# - ./data/namesrv/logs:/home/rocketmq/logs
command: sh mqnamesrv
broker:
image: apacherocketmq/rocketmq:4.5.0
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
# - ./data/broker/logs:/home/rocketmq/logs
# - ./data/broker/store:/home/rocketmq/store
- ./data/broker/conf/broker.conf:/home/rocketmq/rocketmq-4.5.0/conf/broker.conf
command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
depends_on:
- namesrv
mqconsole:
image: styletang/rocketmq-console-ng
ports:
- 19876:8080
environment:
JAVA_OPTS: -Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=falses
depends_on:
- namesrv
jaeger:
image: jaegertracing/all-in-one:1.32
ports:
- 16686:16686
- 14268:14268
- 14269:14269
- 14250:14250
- 9411:9411
- 5778:5778
- 5775:5775/udp
- 6831:6831/udp
- 6832:6832/udp
environment:
COLLECTOR_ZIPKIN_HOST_PORT: :9411
46 changes: 23 additions & 23 deletions test/testdata/rocketmq/conf/rocketmq.toml
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
[jupiter.rocketmq.example]
[jupiter.rocketmq.example.consumer]
enable = true
addr = ["127.0.0.1:9876"]
group = "testGroup"
dialTimeout = "3s"
rwTimeout = "3s"
topic = "test"
subExpression = "TagA || TagB || TagC "
rate = 100
enableTrace = true
[jupiter.rocketmq.example.producer]
group = "testGroup"
addr = ["127.0.0.1:9876"]
dialTimeout = "3s"
rwTimeout = "3s"
topic = "test"
enableTrace = true
[jupiter.rocketmq.example.consumer]
enable = true
addr = ["127.0.0.1:9876"]
group = "testGroup"
dialTimeout = "3s"
rwTimeout = "3s"
topic = "DefaultCluster"
subExpression = "TagA || TagB || TagC "
rate = 100
enableTrace = true
[jupiter.rocketmq.example.producer]
group = "testGroup"
addr = ["127.0.0.1:9876"]
dialTimeout = "3s"
rwTimeout = "3s"
topic = "DefaultCluster"
enableTrace = true

# - NAME_SERVER_ADDRESS=127.0.0.1:9876
# - BROKER_ADDRESS=127.0.0.1:10911
# - NAME_SERVER_ADDRESS=127.0.0.1:9876
# - BROKER_ADDRESS=127.0.0.1:10911
[jupiter.trace.jaeger]
EnableRPCMetrics= true
[jupiter.trace.jaeger.Reporter]
LocalAgentHostPort = "127.0.0.1:6831"
LogSpans = true
EnableRPCMetrics = true
[jupiter.trace.jaeger.Reporter]
LocalAgentHostPort = "127.0.0.1:6831"
LogSpans = true

0 comments on commit 7199981

Please sign in to comment.