Skip to content

Commit

Permalink
add log
Browse files Browse the repository at this point in the history
  • Loading branch information
veezhang committed Nov 20, 2022
1 parent 6d54497 commit 6161d32
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 1 deletion.
2 changes: 2 additions & 0 deletions pkg/client/clientmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func NewNebulaClientMgr(settings *config.NebulaClientSettings, statsCh chan<- ba
}

func (m *NebulaClientMgr) Close() {
m.runnerLogger.Infof("Client manager closing")
m.pool.Close()
m.runnerLogger.Infof("Client manager closed")
}

func (m *NebulaClientMgr) GetRequestChans() []chan base.ClientRequest {
Expand Down
9 changes: 8 additions & 1 deletion pkg/cmd/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,13 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
statsMgr.StatsCh <- base.NewFileDoneStats(*file.Path)
continue
} else {
runnerLogger.Infof("Start to read %s", *file.Path)
wgReaders.Add(1)
go func(fr *reader.FileReader, filename string) {
defer wgReaders.Done()
defer func() {
runnerLogger.Infof("Finish to read %s", *file.Path)
wgReaders.Done()
}()
numReadFailed, err := fr.Read()
statsMgr.Stats.NumReadFailed += numReadFailed
if err != nil {
Expand All @@ -89,7 +93,9 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
r.Readers = freaders
r.stataMgr = statsMgr

runnerLogger.Infof("Waiting for stats manager done")
<-statsMgr.DoneCh
runnerLogger.Infof("Waiting for all readers exit")
for _, r := range freaders {
if r != nil {
r.Stop()
Expand All @@ -100,6 +106,7 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
// then <-statsMgr.DoneCh return, but not all readers have exited.
// So, it's need to wait for it exit.
wgReaders.Wait()
runnerLogger.Infof("All readers exited")

r.stataMgr.CountFileBytes(r.Readers)
r.Readers = nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/stats/statsmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ func NewStatsMgr(files []*config.File, runnerLogger *logger.RunnerLogger) *Stats
}

func (s *StatsMgr) Close() {
s.runnerLogger.Infof("Stats manager closing")
close(s.StatsCh)
close(s.DoneCh)
close(s.OutputStatsCh)
s.Done = true
s.runnerLogger.Infof("Stats manager closed")
}

func (s *StatsMgr) updateStat(stat base.Stats) {
Expand Down Expand Up @@ -129,6 +131,7 @@ func (s *StatsMgr) startWorker(numReadingFiles int) {
case base.FILEDONE:
s.print(fmt.Sprintf("Done(%s)", stat.Filename), now)
numReadingFiles--
s.runnerLogger.Infof("Remaining read files %d", numReadingFiles)
if numReadingFiles == 0 {
s.DoneCh <- true
}
Expand Down

0 comments on commit 6161d32

Please sign in to comment.