Skip to content

Commit

Permalink
refactor: port to Gio._promisify()
Browse files Browse the repository at this point in the history
This tech-preview isn't going anywhere, and in fact will become a part
of GJS soon. Port to the more modern async-await patterns throughout
the service.
  • Loading branch information
andyholmes committed Jul 14, 2023
1 parent afa70d6 commit 91046cc
Show file tree
Hide file tree
Showing 17 changed files with 423 additions and 1,107 deletions.
169 changes: 43 additions & 126 deletions installed-tests/fixtures/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ var ChannelService = GObject.registerClass({
_initUdpListener() {
// Default broadcast address
this._udp_address = Gio.InetSocketAddress.new_from_string(
'255.255.255.255',
this.port
);
'255.255.255.255', this.port);

try {
this._udp6 = Gio.Socket.new(
Expand Down Expand Up @@ -246,21 +244,14 @@ var ChannelService = GObject.registerClass({
this._channels.set(channel.address, channel);

// Open a TCP connection
const connection = await new Promise((resolve, reject) => {
const address = Gio.InetSocketAddress.new_from_string(
packet.body.tcpHost,
packet.body.tcpPort
);
const client = new Gio.SocketClient({enable_proxy: false});

client.connect_async(address, null, (client, res) => {
try {
resolve(client.connect_finish(res));
} catch (e) {
reject(e);
}
});
});
const address = Gio.InetSocketAddress.new_from_string(
packet.body.tcpHost,
packet.body.tcpPort
);

const client = new Gio.SocketClient({enable_proxy: false});
const connection = await client.connect_async(address,
this.cancellable);

// Connect the channel and attach it to the device on success
await channel.open(connection);
Expand Down Expand Up @@ -406,45 +397,6 @@ var Channel = GObject.registerClass({
this._port = port;
}

_receiveIdent(connection) {
return new Promise((resolve, reject) => {
this.input_stream.read_line_async(
GLib.PRIORITY_DEFAULT,
this.cancellable,
(stream, res) => {
try {
const data = stream.read_line_finish_utf8(res)[0];
this.identity = new Core.Packet(data);

if (!this.identity.body.deviceId)
throw new Error('missing deviceId');

resolve();
} catch (e) {
reject(e);
}
}
);
});
}

_sendIdent(connection) {
return new Promise((resolve, reject) => {
connection.get_output_stream().write_all_async(
this.backend.identity.serialize(),
GLib.PRIORITY_DEFAULT,
this.cancellable,
(stream, res) => {
try {
resolve(stream.write_all_finish(res));
} catch (e) {
reject(e);
}
}
);
});
}

async accept(connection) {
try {
this._connection = connection;
Expand All @@ -454,7 +406,13 @@ var Channel = GObject.registerClass({
base_stream: this._connection.get_input_stream(),
});

await this._receiveIdent(this._connection);
const data = await this.input_stream.read_line_async(
GLib.PRIORITY_DEFAULT, this.cancellable);

this.identity = new Core.Packet(data);

if (!this.identity.body.deviceId)
throw new Error('missing deviceId');
} catch (e) {
this.close();
return e;
Expand All @@ -470,7 +428,10 @@ var Channel = GObject.registerClass({
base_stream: this._connection.get_input_stream(),
});

await this._sendIdent(this._connection);
await connection.get_output_stream().write_all_async(
this.backend.identity.serialize(),
GLib.PRIORITY_DEFAULT,
this.cancellable);
} catch (e) {
this.close();
return e;
Expand All @@ -487,6 +448,7 @@ var Channel = GObject.registerClass({
this.backend.channels.delete(this.address);
this.cancellable.cancel();

// These calls are not Promisified, so they can finish themselves
if (this._connection)
this._connection.close_async(GLib.PRIORITY_DEFAULT, null, null);

Expand All @@ -498,27 +460,18 @@ var Channel = GObject.registerClass({
}

async download(packet, target, cancellable = null) {
const connection = await new Promise((resolve, reject) => {
const client = new Gio.SocketClient({enable_proxy: false});
const address = Gio.InetSocketAddress.new_from_string(this.host,
packet.payloadTransferInfo.port);

const address = Gio.InetSocketAddress.new_from_string(
this.host,
packet.payloadTransferInfo.port
);

client.connect_async(address, cancellable, (client, res) => {
try {
resolve(client.connect_finish(res));
} catch (e) {
reject(e);
}
});
});

const source = connection.get_input_stream();
const client = new Gio.SocketClient({enable_proxy: false});
const connection = await client.connect_async(address, cancellable);

// Start the transfer
const transferredSize = await this._transfer(source, target, cancellable);
const transferredSize = await connection.output_stream.splice_async(
target, connection.input_stream,
(Gio.OutputStreamSpliceFlags.CLOSE_SOURCE |
Gio.OutputStreamSpliceFlags.CLOSE_TARGET),
GLib.PRIORITY_DEFAULT, cancellable);

if (transferredSize !== packet.payloadSize) {
throw new Gio.IOErrorEnum({
Expand Down Expand Up @@ -548,31 +501,24 @@ var Channel = GObject.registerClass({
}

// Listen for the incoming connection
const acceptConnection = new Promise((resolve, reject) => {
listener.accept_async(
cancellable,
(listener, res, source_object) => {
try {
resolve(listener.accept_finish(res)[0]);
} catch (e) {
reject(e);
}
}
);
});
const acceptConnection = listener.accept_async(cancellable);

// Notify the device we're ready
// FIXME: this may fail, leaving a dangling stream until timeout
packet.body.payloadHash = this.checksum;
packet.payloadSize = size;
packet.payloadTransferInfo = {port: port};
this.sendPacket(new Core.Packet(packet));

// Accept the connection and configure the channel
const connection = await acceptConnection;
const target = connection.get_output_stream();

// Start the transfer
const transferredSize = await this._transfer(source, target, cancellable);
const transferredSize = await connection.output_stream.splice_async(
source,
(Gio.OutputStreamSpliceFlags.CLOSE_SOURCE |
Gio.OutputStreamSpliceFlags.CLOSE_TARGET),
GLib.PRIORITY_DEFAULT, cancellable);

if (transferredSize !== size) {
throw new Gio.IOErrorEnum({
Expand All @@ -582,25 +528,6 @@ var Channel = GObject.registerClass({
}
}

_transfer(source, target, cancellable) {
return new Promise((resolve, reject) => {
target.splice_async(
source,
(Gio.OutputStreamSpliceFlags.CLOSE_SOURCE |
Gio.OutputStreamSpliceFlags.CLOSE_TARGET),
GLib.PRIORITY_DEFAULT,
cancellable,
(target, res) => {
try {
resolve(target.splice_finish(res));
} catch (e) {
reject(e);
}
}
);
});
}

async rejectTransfer(packet) {
try {
if (!packet || !packet.hasPayload())
Expand All @@ -609,22 +536,12 @@ var Channel = GObject.registerClass({
if (packet.payloadTransferInfo.port === undefined)
return;

const connection = await new Promise((resolve, reject) => {
const client = new Gio.SocketClient({enable_proxy: false});

const address = Gio.InetSocketAddress.new_from_string(
this.host,
packet.payloadTransferInfo.port
);

client.connect_async(address, null, (client, res) => {
try {
resolve(client.connect_finish(res));
} catch (e) {
resolve();
}
});
});
const address = Gio.InetSocketAddress.new_from_string(this.host,
packet.payloadTransferInfo.port);

const client = new Gio.SocketClient({enable_proxy: false});
const connection = await client.connect_async(address,
this.cancellable);

connection.close_async(GLib.PRIORITY_DEFAULT, null, null);
} catch (e) {
Expand Down
40 changes: 40 additions & 0 deletions src/service/__init__.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,46 @@ const GLib = imports.gi.GLib;
const Config = imports.config;


// Promise Wrappers
try {
const {EBook, EDataServer} = imports.gi;

Gio._promisify(EBook.BookClient, 'connect', 'connect_finish');
Gio._promisify(EBook.BookClient.prototype, 'get_view');
Gio._promisify(EBook.BookClient.prototype, 'get_contacts');
Gio._promisify(EDataServer.SourceRegistry, 'new', 'new_finish');
} catch {
// Silence import errors
}

Gio._promisify(Gio.AsyncInitable.prototype, 'init_async');
Gio._promisify(Gio.DBusConnection.prototype, 'call');
Gio._promisify(Gio.DBusProxy.prototype, 'call');
Gio._promisify(Gio.DataInputStream.prototype, 'read_line_async',
'read_line_finish_utf8');
Gio._promisify(Gio.File.prototype, 'delete_async');
Gio._promisify(Gio.File.prototype, 'enumerate_children_async');
Gio._promisify(Gio.File.prototype, 'load_contents_async');
Gio._promisify(Gio.File.prototype, 'mount_enclosing_volume');
Gio._promisify(Gio.File.prototype, 'query_info_async');
Gio._promisify(Gio.File.prototype, 'read_async');
Gio._promisify(Gio.File.prototype, 'replace_async');
Gio._promisify(Gio.File.prototype, 'replace_contents_bytes_async',
'replace_contents_finish');
Gio._promisify(Gio.FileEnumerator.prototype, 'next_files_async');
Gio._promisify(Gio.Mount.prototype, 'unmount_with_operation');
Gio._promisify(Gio.InputStream.prototype, 'close_async');
Gio._promisify(Gio.OutputStream.prototype, 'close_async');
Gio._promisify(Gio.OutputStream.prototype, 'splice_async');
Gio._promisify(Gio.OutputStream.prototype, 'write_all_async');
Gio._promisify(Gio.SocketClient.prototype, 'connect_async');
Gio._promisify(Gio.SocketListener.prototype, 'accept_async');
Gio._promisify(Gio.Subprocess.prototype, 'communicate_utf8_async');
Gio._promisify(Gio.Subprocess.prototype, 'wait_check_async');
Gio._promisify(Gio.TlsConnection.prototype, 'handshake_async');
Gio._promisify(Gio.DtlsConnection.prototype, 'handshake_async');


// User Directories
Config.CACHEDIR = GLib.build_filenamev([GLib.get_user_cache_dir(), 'gsconnect']);
Config.CONFIGDIR = GLib.build_filenamev([GLib.get_user_config_dir(), 'gsconnect']);
Expand Down
Loading

0 comments on commit 91046cc

Please sign in to comment.