Skip to content

Commit

Permalink
Bug/fix long pushes end up in invalid state (#12)
Browse files Browse the repository at this point in the history
* Fix unit tests

* Add failing unit test for hg incoming timeout

* Introduce verify state and prevent invalid transaction state.

* fixup! Introduce verify state and prevent invalid transaction state.

* Make fake-polling use tiny chunk-size

* Rename synchronize and prevent late unexpected failures in tests

* Fix indentation and unused variable.
  • Loading branch information
myieye authored May 28, 2024
1 parent ae58e23 commit 69b57c3
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 88 deletions.
14 changes: 7 additions & 7 deletions api/src/AsyncRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,17 @@ public function cleanUp() {
}

/**
*
* @throws AsyncRunnerException
* Waits for up to 5s for the runner to complete
* @return boolean True if complete, otherwise not complete yet
*/
public function synchronize() {
for ($i = 0; $i < 200; $i++) {
public function waitForIsComplete() {
for ($i = 0; $i < 5; $i++) {
if ($this->isComplete()) {
return;
return true;
}
usleep(500000);
usleep(1000000);
}
throw new AsyncRunnerException("Error: Long running process exceeded 100 seconds while waiting to synchronize");
return false;
}
}

Expand Down
1 change: 1 addition & 0 deletions api/src/BundleHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class BundleHelper {
const State_Bundle = 'Bundle';
const State_Downloading = 'Downloading';
const State_Uploading = 'Uploading';
const State_Validating = 'Validating';
const State_Unbundle = 'Unbundle';

private $_transactionId;
Expand Down
110 changes: 58 additions & 52 deletions api/src/HgResumeApi.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,66 +91,72 @@ function pushBundleChunk($repoId, $bundleSize, $offset, $data, $transId) {
$bundle->setOffset($newSow);

// for the final chunk; assemble the bundle and apply the bundle
if ($newSow == $bundleSize) {
$bundle->setState(BundleHelper::State_Unbundle);
try { // REVIEW Would be nice if the try / catch logic was universal. ie one policy for the api function. CP 2012-06
$bundleFilePath = $bundle->getBundleFileName();
$asyncRunner = $hg->unbundle($bundleFilePath);
for ($i = 0; $i < 4; $i++) {
if ($asyncRunner->isComplete()) {
if (BundleHelper::bundleOutputHasErrors($asyncRunner->getOutput())) {
$responseValues = array('transId' => $transId);
return new HgResumeResponse(HgResumeResponse::RESET, $responseValues);
}
$bundle->cleanUp();
$asyncRunner->cleanUp();
$responseValues = array('transId' => $transId);
return new HgResumeResponse(HgResumeResponse::SUCCESS, $responseValues);
}
sleep(1);
}
$responseValues = array('transId' => $transId, 'sow' => $newSow);
// If the unbundle operation has not completed within 4 seconds (unlikely for a large repo) then we punt back to the client with a RECEIVED.
//The client can then initiate another request, and hopefully the unbundle will have finished by then
// FUTURE: we should return an INPROGRESS status and have the client display a message accordingly. - cjh 2014-07
return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues);
} catch (UnrelatedRepoException $e) {
$bundle->setOffset(0);
$responseValues = array('Error' => substr($e->getMessage(), 0, 1000));
$responseValues['transId'] = $transId;
return new HgResumeResponse(HgResumeResponse::FAIL, $responseValues);
} catch (Exception $e) {
// REVIEW The RESET response may not make sense in this context anymore. Why would we want to tell the client to resend a bundle if it failed the first time? My guess is never. cjh 2013-03
//echo $e->getMessage(); // FIXME
$bundle->setOffset(0);
$responseValues = array('Error' => substr($e->getMessage(), 0, 1000));
$responseValues['transId'] = $transId;
return new HgResumeResponse(HgResumeResponse::RESET, $responseValues);
}
} else {
if ($newSow != $bundleSize) {
// received the chunk, but it's not the last one; we expect more chunks
$responseValues = array('transId' => $transId, 'sow' => $newSow);
return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues);
}
break;
// We got everything, fall through to completePushBundle
case BundleHelper::State_Validating:
case BundleHelper::State_Unbundle:
$bundleFilePath = $bundle->getBundleFileName();
$asyncRunner = new AsyncRunner($bundleFilePath);
if ($asyncRunner->isComplete()) {
if (BundleHelper::bundleOutputHasErrors($asyncRunner->getOutput())) {
return $this->completePushBundle($bundle, $hg, $transId, $bundleSize);
}
}

/**
*
* @param BundleHelper $bundle
* @param HgRunner $hg
* @param string $transId
* @throws UnrelatedRepoException
* @throws Exception
* @return HgResumeResponse
*/
function completePushBundle($bundle, $hg, $transId, $bundleSize) {
try { // REVIEW Would be nice if the try / catch logic was universal. ie one policy for the api function. CP 2012-06
$bundleFilePath = $bundle->getBundleFileName();

switch ($bundle->getState()) {
case BundleHelper::State_Uploading:
$bundle->setState(BundleHelper::State_Validating);
$hg->startValidating($bundleFilePath);
// fall through to State_Validating
case BundleHelper::State_Validating:
if ($hg->finishValidating($bundleFilePath)) {
$bundle->setState(BundleHelper::State_Unbundle);
$hg->unbundle($bundleFilePath);
} else {
return HgResumeResponse::PendingResponse($transId, 'Verification in progress...', $bundleSize);
}
// fall through to State_Unbundle
case BundleHelper::State_Unbundle:
$asyncRunner = new AsyncRunner($bundleFilePath);
if ($asyncRunner->waitForIsComplete()) {
if (BundleHelper::bundleOutputHasErrors($asyncRunner->getOutput())) {
$responseValues = array('transId' => $transId);
// REVIEW The RESET response may not make sense in this context anymore. Why would we want to tell the client to resend a bundle if it failed the first time? My guess is never. cjh 2013-03
return new HgResumeResponse(HgResumeResponse::RESET, $responseValues);
}
$bundle->cleanUp();
$asyncRunner->cleanUp();
$responseValues = array('transId' => $transId);
// REVIEW The RESET response may not make sense in this context anymore. Why would we want to tell the client to resend a bundle if it failed the first time? My guess is never. cjh 2013-03
return new HgResumeResponse(HgResumeResponse::RESET, $responseValues);
return new HgResumeResponse(HgResumeResponse::SUCCESS, $responseValues);
} else {
return HgResumeResponse::PendingResponse($transId, 'Unpacking in progress...', $bundleSize);
}
$bundle->cleanUp();
$asyncRunner->cleanUp();
$responseValues = array('transId' => $transId);
return new HgResumeResponse(HgResumeResponse::SUCCESS, $responseValues);
} else {
$responseValues = array('transId' => $transId, 'sow' => $bundle->getOffset());
return new HgResumeResponse(HgResumeResponse::RECEIVED, $responseValues);
}
break;
} catch (UnrelatedRepoException $e) {
$bundle->setOffset(0);
$responseValues = array('Error' => substr($e->getMessage(), 0, 1000));
$responseValues['transId'] = $transId;
return new HgResumeResponse(HgResumeResponse::FAIL, $responseValues);
} catch (Exception $e) {
// REVIEW The RESET response may not make sense in this context anymore. Why would we want to tell the client to resend a bundle if it failed the first time? My guess is never. cjh 2013-03
//echo $e->getMessage(); // FIXME
$bundle->setOffset(0);
$responseValues = array('Error' => substr($e->getMessage(), 0, 1000));
$responseValues['transId'] = $transId;
return new HgResumeResponse(HgResumeResponse::RESET, $responseValues);
}
}

Expand Down
23 changes: 23 additions & 0 deletions api/src/HgResumeResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ class HgResumeResponse {
// HTTP 202 Accepted
const INPROGRESS = 9;

/* TIMEOUT
* The server is in the middle of an operation that was not completed during the request.
* The client should poll the server to ensure any additional operations are started.
* Used only by pushBundleChunk */
// HTTP 408 RequestTimeout
const TIMEOUT = 9;

public $Code;
public $Values;
public $Content;
Expand All @@ -72,6 +79,22 @@ function __construct($code, $values = array(), $content = "", $version = API_VER
$this->Content = $content;
$this->Version = $version;
}

/**
* There's no explicit polling mechanism in the v3 client.
* But we can make it retry by sending any status code it's not familiar with.
* We can also determine the chunk size the client will use, so we make it super tiny to minimize unnecessary data transfer.
* @param string $transId
* @param string $note
* @param int $bundleSize
* @return HgResumeResponse
*/
public static function PendingResponse($transId, $note, $bundleSize) {
return new HgResumeResponse(HgResumeResponse::TIMEOUT, array(
'transId' => $transId, 'Note' => $note,
// Make the client send only 10 bytes
'sow' => $bundleSize - 10));
}
}

?>
70 changes: 49 additions & 21 deletions api/src/HgRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,10 @@ private function logEvent($message) {
}

/**
*
* @param string $filepath
* @throws HgException
* @throws UnrelatedRepoException
* @throws Exception
* @return AsyncRunner
*/
function unbundle($filepath) {
function startValidating($filepath) {
if (!is_file($filepath)) {
throw new HgException("bundle file '$filepath' is not a file!");
}
Expand All @@ -58,30 +54,60 @@ function unbundle($filepath) {
// run hg incoming to make sure this bundle is related to the repo
$cmd = "hg incoming $filepath";
$this->logEvent("cmd: $cmd");
$asyncRunner = new AsyncRunner($filepath . ".incoming");
$asyncRunner = $this->getValidationRunner($filepath);
$asyncRunner->run($cmd);
$asyncRunner->synchronize();
$output = $asyncRunner->getOutput();
if (preg_match('/abort:.*unknown parent/', $output)) {
throw new UnrelatedRepoException("Project is unrelated! (unrelated bundle pushed to repo)");
}
if (preg_match('/parent:\s*-1:/', $output)) {
throw new UnrelatedRepoException("Project is unrelated! (unrelated bundle pushed to repo)");
}

/**
* @param string $filepath
* @throws UnrelatedRepoException
* @throws Exception
* @return boolean True if finished, otherwise not finished yet
*/
function finishValidating($filepath) {
$asyncRunner = $this->getValidationRunner($filepath);
if ($asyncRunner->waitForIsComplete()) {
$output = $asyncRunner->getOutput();
if (preg_match('/abort:.*unknown parent/', $output)) {
throw new UnrelatedRepoException("Project is unrelated! (unrelated bundle pushed to repo)");
}
if (preg_match('/parent:\s*-1:/', $output)) {
throw new UnrelatedRepoException("Project is unrelated! (unrelated bundle pushed to repo)");
}
if (preg_match('/abort:.*not a Mercurial bundle/', $output)) {
throw new Exception("Project cannot be updated! (corrupt bundle pushed to repo)");
}
return true;
}
if (preg_match('/abort:.*not a Mercurial bundle/', $output)) {
throw new Exception("Project cannot be updated! (corrupt bundle pushed to repo)");
return false;
}

/**
* @param string $filepath
* @return AsyncRunner
*/
function getValidationRunner($filepath) {
return new AsyncRunner($filepath . ".incoming");
}

/**
*
* @param string $filepath
* @throws HgException
* @return AsyncRunner
*/
function unbundle($filepath) {
if (!is_file($filepath)) {
throw new HgException("bundle file '$filepath' is not a file!");
}
chdir($this->repoPath); // NOTE: I tried with -R and it didn't work for me. CP 2012-06

$cmd = "hg unbundle $filepath";
$this->logEvent("cmd: $cmd");
$asyncRunner = new AsyncRunner($filepath);
$asyncRunner->run($cmd);
return $asyncRunner;
}

function assertIsRelatedRepo($bundleFilePath) {

}

/**
*
Expand Down Expand Up @@ -121,13 +147,15 @@ function makeBundle($baseHashes, $bundleFilePath) {
}

/**
* helper function, mostly for tests
* Helper function for tests
* @param string $baseHashes[]
* @param string $bundleFilePath
*/
function makeBundleAndWaitUntilFinished($baseHashes, $bundleFilePath) {
$asyncRunner = $this->makeBundle($baseHashes, $bundleFilePath);
$asyncRunner->synchronize();
if (!$asyncRunner->waitForIsComplete()) {
throw new Exception("Error: make bundle failed to complete");
}
return $asyncRunner;
}

Expand Down
6 changes: 6 additions & 0 deletions api/src/RestServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ static function mapHgResponse($hgrCode) {
$httpcode = "202 Accepted";
$codeString = 'INPROGRESS';
break;
case HgResumeResponse::TIMEOUT:
// 408 is not currently handled by the client, so it will be
// interpreted as a fail and result in a retry. That is the intentional behaviour for now (v03).
$httpcode = "408 RequestTimeout";
$codeString = 'INPROGRESS';
break;
default:
throw new Exception("Unknown response code {$response->Code}");
break;
Expand Down
5 changes: 3 additions & 2 deletions api/test/AsyncRunner_Test.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ function testRunIsComplete_FalseThenTrue() {
$runner = new AsyncRunner('/tmp/testFile');
$runner->run('echo foo');
$this->assertFalse($runner->isComplete());
$runner->synchronize();
$runner->waitForIsComplete();
$this->assertTrue($runner->isComplete());
}

Expand Down Expand Up @@ -47,7 +47,8 @@ function testGetOutput_NotComplete_Throws() {
function testGetOutput_Complete_ReturnsOutput() {
$runner = new AsyncRunner('/tmp/testFile');
$runner->run('echo abort');
$runner->synchronize();
$runner->waitForIsComplete();
$this->assertTrue($runner->isComplete());
$data = $runner->getOutput();
$this->assertPattern('/abort/', $data);
}
Expand Down
Loading

0 comments on commit 69b57c3

Please sign in to comment.