Skip to content

Commit

Permalink
Fixed: memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
hisune committed Jun 1, 2022
1 parent 87d5db1 commit 6410653
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/ToolsTraits.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ protected function initConfig($configPath)
if(!isset($this->config['tails']) || !is_array($this->config['tails'])) throw new Exception('not a valid tails config');
}

protected function logger(string $name, string $message, $context = [])
public function logger(string $name, string $message, $context = [])
{
$line = date('Y-m-d H:i:s') . "\t" . $message . "\t" . (!is_scalar($context) ? json_encode($context, JSON_UNESCAPED_UNICODE) : $context) . PHP_EOL;
echo $line;
Expand Down
79 changes: 42 additions & 37 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,44 +81,34 @@ protected function initClickhouse()

public function run()
{
try{
$this->handelSignal();

$this->logger('worker', sprintf('start worker name: %s, path %s, config %s', $this->name, $this->path, $this->configPath));
$this->initClickhouse();
$this->logger('worker', sprintf('receive stdin: %s', $this->name));

$this->fileObject = new SplFileObject($this->path);
$this->fileObject->seek($this->getCurrentLines());
$this->sentLastAt = time();
$this->line = '';
while($this->onProcess){
$this->batchWrite();
$line = $this->fileObject->current();
if($this->fileObject->eof()){ // 没有新行
sleep(1);
if($line){ // 处理未写入完整的行
$this->line .= $line;
}
$this->fileObject->fseek(0, SEEK_CUR);
continue;
$this->handelSignal();

$this->logger('worker', sprintf('start worker name: %s, path %s, config %s', $this->name, $this->path, $this->configPath));
$this->initClickhouse();
$this->logger('worker', sprintf('receive stdin: %s', $this->name));

$this->fileObject = new SplFileObject($this->path);
$this->fileObject->seek($this->getCurrentLines());
$this->sentLastAt = time();
$this->line = '';
while($this->onProcess){
$this->batchWrite();
$line = $this->fileObject->current();
if($this->fileObject->eof()){ // 没有新行
sleep(1);
if($line){ // 处理未写入完整的行
$this->line .= $line;
}
$this->fileObject->next(); // 指针+1
$this->line .= $line;
$this->line = rtrim($this->line, PHP_EOL);
if($this->line){
$this->progressLine();
}
$this->line = '';
$this->fileObject->fseek(0, SEEK_CUR);
continue;
}
$this->fileObject->next(); // 指针+1
$this->line .= $line;
$this->line = rtrim($this->line, PHP_EOL);
if($this->line){
$this->progressLine();
}
}catch (Throwable $e){
$this->logger('worker', sprintf('%s worker_exception: %s', $this->name, $e->getMessage()), [
'file' => $e->getFile(),
'line' => $e->getLine(),
'trace' => $e->getTraceAsString(),
]);
sleep(10);
$this->run();
$this->line = '';
}
}

Expand Down Expand Up @@ -201,4 +191,19 @@ public function stopProcess()
}

$index = isset($argv[4]) ? intval($argv[4]) : null;
(new Worker($argv[1], $argv[2], $argv[3], $index))->run();
$worker = new Worker($argv[1], $argv[2], $argv[3], $index);
while (true){
try{
$worker->run();
exit();
}catch (Throwable $e){
$worker->logger('worker', sprintf('%s worker_exception: %s', $argv[1], $e->getMessage()), [
'file' => $e->getFile(),
'line' => $e->getLine(),
'trace' => $e->getTraceAsString(),
]);
sleep(10);
$worker->logger('worker', 'memory usage: ' . memory_get_usage());
}
}

0 comments on commit 6410653

Please sign in to comment.