Skip to content
This repository has been archived by the owner on Nov 1, 2024. It is now read-only.

Regenerate worker protocol protos to add constructors and comments #78

Merged
merged 4 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 1.1.0

* Add constructors with named parameters to
the generated worker protocol messages.
* Include comments on the generated worker protocol API.

## 1.0.3

* Require `package:protobuf` >= 3.0.0.
Expand Down
7 changes: 4 additions & 3 deletions benchmark/benchmark.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ Future<void> main() async {
var path = 'blaze-bin/some/path/to/a/file/that/is/an/input/$i';
workRequest
..arguments.add('--input=$path')
..inputs.add(Input()
..path = ''
..digest.addAll(List.filled(70, 0x11)));
..inputs.add(Input(
path: '',
digest: List.filled(70, 0x11),
));
}

// Serialize it.
Expand Down
7 changes: 4 additions & 3 deletions e2e_test/lib/async_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ class ExampleAsyncWorker extends AsyncWorkerLoop {

@override
Future<WorkResponse> performRequest(WorkRequest request) async {
return WorkResponse()
..exitCode = 0
..output = request.arguments.join('\n');
return WorkResponse(
exitCode: 0,
output: request.arguments.join('\n'),
);
}
}
4 changes: 1 addition & 3 deletions e2e_test/lib/sync_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import 'package:bazel_worker/bazel_worker.dart';
class ExampleSyncWorker extends SyncWorkerLoop {
@override
WorkResponse performRequest(WorkRequest request) {
return WorkResponse()
..exitCode = 0
..output = request.arguments.join('\n');
return WorkResponse(exitCode: 0, output: request.arguments.join('\n'));
}
}
2 changes: 1 addition & 1 deletion e2e_test/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: e2e_test
publish_to: none

environment:
sdk: '>=2.19.0 <3.0.0'
sdk: '>=2.19.0 <4.0.0'

dependencies:
bazel_worker:
Expand Down
5 changes: 3 additions & 2 deletions e2e_test/test/e2e_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ void runE2eTestForWorker(String groupName, SpawnWorker spawnWorker) {
Future _doRequests(BazelWorkerDriver driver, {int? count}) async {
count ??= 100;
var requests = List.generate(count, (requestNum) {
var request = WorkRequest();
request.arguments.addAll(List.generate(requestNum, (argNum) => '$argNum'));
var request = WorkRequest(
arguments: List.generate(requestNum, (argNum) => '$argNum'),
);
return request;
});
var responses = await Future.wait(requests.map(driver.doWork));
Expand Down
2 changes: 1 addition & 1 deletion example/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ void main() async {
[Platform.script.resolve('worker.dart').toFilePath()],
workingDirectory: scratchSpace.path),
maxWorkers: 4);
var response = await driver.doWork(WorkRequest()..arguments.add('foo'));
var response = await driver.doWork(WorkRequest(arguments: ['foo']));
if (response.exitCode != EXIT_CODE_OK) {
print('Worker request failed');
} else {
Expand Down
2 changes: 1 addition & 1 deletion example/worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ class SyncSimpleWorker extends SyncWorkerLoop {
@override
WorkResponse performRequest(WorkRequest request) {
File('hello.txt').writeAsStringSync(request.arguments.first);
return WorkResponse()..exitCode = EXIT_CODE_OK;
return WorkResponse(exitCode: EXIT_CODE_OK);
}
}
16 changes: 9 additions & 7 deletions lib/src/driver/driver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,12 @@ class BazelWorkerDriver {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
stderr.writeln('Failed to run request ${attempt.request}');
response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output =
response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output:
'Invalid response from worker, this probably means it wrote '
'invalid output or died.';
'invalid output or died.',
);
}
attempt.responseCompleter.complete(response);
_cleanUp(worker);
Expand All @@ -166,9 +167,10 @@ class BazelWorkerDriver {
if (!attempt.responseCompleter.isCompleted) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
var response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = 'Error running worker:\n$e\n$s';
var response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output: 'Error running worker:\n$e\n$s',
);
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
Expand Down
21 changes: 12 additions & 9 deletions lib/src/driver/driver_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ class StdDriverConnection implements DriverConnection {
Future<WorkResponse> readResponse() async {
var buffer = await _messageGrouper.next;
if (buffer == null) {
return WorkResponse()
..exitCode = EXIT_CODE_BROKEN_PIPE
..output = 'Connection to worker closed';
return WorkResponse(
exitCode: EXIT_CODE_BROKEN_PIPE,
output: 'Connection to worker closed',
);
}

WorkResponse response;
Expand All @@ -63,9 +64,10 @@ class StdDriverConnection implements DriverConnection {
try {
// Try parsing the message as a string and set that as the output.
var output = utf8.decode(buffer);
var response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = 'Worker sent an invalid response:\n$output';
var response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output: 'Worker sent an invalid response:\n$output',
);
return response;
} catch (_) {
// Fall back to original exception and rethrow if we fail to parse as
Expand Down Expand Up @@ -108,9 +110,10 @@ class IsolateDriverConnection implements DriverConnection {
@override
Future<WorkResponse> readResponse() async {
if (!await _receivePortIterator.moveNext()) {
return WorkResponse()
..exitCode = EXIT_CODE_BROKEN_PIPE
..output = 'Connection to worker closed.';
return WorkResponse(
exitCode: EXIT_CODE_BROKEN_PIPE,
output: 'Connection to worker closed.',
);
}
return WorkResponse.fromBuffer(_receivePortIterator.current as List<int>);
}
Expand Down
7 changes: 4 additions & 3 deletions lib/src/worker/async_worker_loop.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ abstract class AsyncWorkerLoop implements WorkerLoop {
response.output = '${response.output}$printMessages';
}
} catch (e, s) {
response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = '$e\n$s';
response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output: '$e\n$s',
);
}

connection.writeResponse(response);
Expand Down
7 changes: 4 additions & 3 deletions lib/src/worker/sync_worker_loop.dart
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ abstract class SyncWorkerLoop implements WorkerLoop {
response.output = '${response.output}$printMessages';
}
} catch (e, s) {
response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = '$e\n$s';
response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output: '$e\n$s',
);
}

connection.writeResponse(response);
Expand Down
117 changes: 113 additions & 4 deletions lib/src/worker_protocol.pb.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//
// @dart = 2.12

// ignore_for_file: annotate_overrides, camel_case_types
// ignore_for_file: annotate_overrides, camel_case_types, comment_references
// ignore_for_file: constant_identifier_names, library_prefixes
// ignore_for_file: non_constant_identifier_names, prefer_final_fields
// ignore_for_file: unnecessary_import, unnecessary_this, unused_import
Expand All @@ -13,8 +13,21 @@ import 'dart:core' as $core;

import 'package:protobuf/protobuf.dart' as $pb;

/// An input file.
class Input extends $pb.GeneratedMessage {
factory Input() => create();
factory Input({
$core.String? path,
$core.List<$core.int>? digest,
}) {
final $result = create();
if (path != null) {
$result.path = path;
}
if (digest != null) {
$result.digest = digest;
}
return $result;
}
Input._() : super();
factory Input.fromBuffer($core.List<$core.int> i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) =>
Expand Down Expand Up @@ -53,6 +66,10 @@ class Input extends $pb.GeneratedMessage {
_defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<Input>(create);
static Input? _defaultInstance;

/// The path in the file system where to read this input artifact from. This is
/// either a path relative to the execution root (the worker process is
/// launched with the working directory set to the execution root), or an
/// absolute path.
@$pb.TagNumber(1)
$core.String get path => $_getSZ(0);
@$pb.TagNumber(1)
Expand All @@ -65,6 +82,9 @@ class Input extends $pb.GeneratedMessage {
@$pb.TagNumber(1)
void clearPath() => clearField(1);

/// A hash-value of the contents. The format of the contents is unspecified and
/// the digest should be treated as an opaque token. This can be empty in some
/// cases.
@$pb.TagNumber(2)
$core.List<$core.int> get digest => $_getN(1);
@$pb.TagNumber(2)
Expand All @@ -78,8 +98,37 @@ class Input extends $pb.GeneratedMessage {
void clearDigest() => clearField(2);
}

/// This represents a single work unit that Blaze sends to the worker.
class WorkRequest extends $pb.GeneratedMessage {
factory WorkRequest() => create();
factory WorkRequest({
$core.Iterable<$core.String>? arguments,
$core.Iterable<Input>? inputs,
$core.int? requestId,
$core.bool? cancel,
$core.int? verbosity,
$core.String? sandboxDir,
}) {
final $result = create();
if (arguments != null) {
$result.arguments.addAll(arguments);
}
if (inputs != null) {
$result.inputs.addAll(inputs);
}
if (requestId != null) {
$result.requestId = requestId;
}
if (cancel != null) {
$result.cancel = cancel;
}
if (verbosity != null) {
$result.verbosity = verbosity;
}
if (sandboxDir != null) {
$result.sandboxDir = sandboxDir;
}
return $result;
}
WorkRequest._() : super();
factory WorkRequest.fromBuffer($core.List<$core.int> i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) =>
Expand Down Expand Up @@ -126,9 +175,19 @@ class WorkRequest extends $pb.GeneratedMessage {
@$pb.TagNumber(1)
$core.List<$core.String> get arguments => $_getList(0);

/// The inputs that the worker is allowed to read during execution of this
/// request.
@$pb.TagNumber(2)
$core.List<Input> get inputs => $_getList(1);

/// Each WorkRequest must have either a unique
/// request_id or request_id = 0. If request_id is 0, this WorkRequest must be
/// processed alone (singleplex), otherwise the worker may process multiple
/// WorkRequests in parallel (multiplexing). As an exception to the above, if
/// the cancel field is true, the request_id must be the same as a previously
/// sent WorkRequest. The request_id must be attached unchanged to the
/// corresponding WorkResponse. Only one singleplex request may be sent to a
/// worker at a time.
@$pb.TagNumber(3)
$core.int get requestId => $_getIZ(2);
@$pb.TagNumber(3)
Expand All @@ -141,6 +200,9 @@ class WorkRequest extends $pb.GeneratedMessage {
@$pb.TagNumber(3)
void clearRequestId() => clearField(3);

/// EXPERIMENTAL: When true, this is a cancel request, indicating that a
/// previously sent WorkRequest with the same request_id should be cancelled.
/// The arguments and inputs fields must be empty and should be ignored.
@$pb.TagNumber(4)
$core.bool get cancel => $_getBF(3);
@$pb.TagNumber(4)
Expand All @@ -153,6 +215,9 @@ class WorkRequest extends $pb.GeneratedMessage {
@$pb.TagNumber(4)
void clearCancel() => clearField(4);

/// Values greater than 0 indicate that the worker may output extra debug
/// information to stderr (which will go into the worker log). Setting the
/// --worker_verbose flag for Bazel makes this flag default to 10.
@$pb.TagNumber(5)
$core.int get verbosity => $_getIZ(4);
@$pb.TagNumber(5)
Expand All @@ -165,6 +230,15 @@ class WorkRequest extends $pb.GeneratedMessage {
@$pb.TagNumber(5)
void clearVerbosity() => clearField(5);

/// The relative directory inside the workers working directory where the
/// inputs and outputs are placed, for sandboxing purposes. For singleplex
/// workers, this is unset, as they can use their working directory as sandbox.
/// For multiplex workers, this will be set when the
/// --experimental_worker_multiplex_sandbox flag is set _and_ the execution
/// requirements for the worker includes 'supports-multiplex-sandbox'.
/// The paths in `inputs` will not contain this prefix, but the actual files
/// will be placed/must be written relative to this directory. The worker
/// implementation is responsible for resolving the file paths.
@$pb.TagNumber(6)
$core.String get sandboxDir => $_getSZ(5);
@$pb.TagNumber(6)
Expand All @@ -178,8 +252,30 @@ class WorkRequest extends $pb.GeneratedMessage {
void clearSandboxDir() => clearField(6);
}

/// The worker sends this message to Blaze when it finished its work on the
/// WorkRequest message.
class WorkResponse extends $pb.GeneratedMessage {
factory WorkResponse() => create();
factory WorkResponse({
$core.int? exitCode,
$core.String? output,
$core.int? requestId,
$core.bool? wasCancelled,
}) {
final $result = create();
if (exitCode != null) {
$result.exitCode = exitCode;
}
if (output != null) {
$result.output = output;
}
if (requestId != null) {
$result.requestId = requestId;
}
if (wasCancelled != null) {
$result.wasCancelled = wasCancelled;
}
return $result;
}
WorkResponse._() : super();
factory WorkResponse.fromBuffer($core.List<$core.int> i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) =>
Expand Down Expand Up @@ -233,6 +329,9 @@ class WorkResponse extends $pb.GeneratedMessage {
@$pb.TagNumber(1)
void clearExitCode() => clearField(1);

/// This is printed to the user after the WorkResponse has been received and is
/// supposed to contain compiler warnings / errors etc. - thus we'll use a
/// string type here, which gives us UTF-8 encoding.
@$pb.TagNumber(2)
$core.String get output => $_getSZ(1);
@$pb.TagNumber(2)
Expand All @@ -245,6 +344,10 @@ class WorkResponse extends $pb.GeneratedMessage {
@$pb.TagNumber(2)
void clearOutput() => clearField(2);

/// This field must be set to the same request_id as the WorkRequest it is a
/// response to. Since worker processes which support multiplex worker will
/// handle multiple WorkRequests in parallel, this ID will be used to
/// determined which WorkerProxy does this WorkResponse belong to.
@$pb.TagNumber(3)
$core.int get requestId => $_getIZ(2);
@$pb.TagNumber(3)
Expand All @@ -257,6 +360,12 @@ class WorkResponse extends $pb.GeneratedMessage {
@$pb.TagNumber(3)
void clearRequestId() => clearField(3);

/// EXPERIMENTAL When true, indicates that this response was sent due to
/// receiving a cancel request. The exit_code and output fields should be empty
/// and will be ignored. Exactly one WorkResponse must be sent for each
/// non-cancelling WorkRequest received by the worker, but if the worker
/// received a cancel request, it doesn't matter if it replies with a regular
/// WorkResponse or with one where was_cancelled = true.
@$pb.TagNumber(4)
$core.bool get wasCancelled => $_getBF(3);
@$pb.TagNumber(4)
Expand Down
Loading