Skip to content

Commit

Permalink
add feed action to controller
Browse files Browse the repository at this point in the history
The feed action first sends the same response as the list action, only
over a websocket. Then, it polls the database periodically and sends any
changes that happen. Look for some examples soon!
  • Loading branch information
preaction committed Jul 30, 2020
1 parent 679ae52 commit f8df117
Show file tree
Hide file tree
Showing 6 changed files with 342 additions and 75 deletions.
360 changes: 289 additions & 71 deletions lib/Yancy/Controller/Yancy.pm
Original file line number Diff line number Diff line change
Expand Up @@ -354,75 +354,7 @@ number and setting the C<$offset> query parameter.

sub list {
my ( $c ) = @_;
if ( $c->stash( 'collection' ) ) {
derp '"collection" stash key is now "schema" in controller configuration';
}
my $schema_name = $c->stash( 'schema' ) || $c->stash( 'collection' )
|| die "Schema name not defined in stash";
my $limit = $c->param( '$limit' ) // $c->stash->{ limit } // 10;
my $offset = $c->param( '$page' ) ? ( $c->param( '$page' ) - 1 ) * $limit
: $c->param( '$offset' ) ? $c->param( '$offset' )
: ( ( $c->stash->{page} // 1 ) - 1 ) * $limit;
$c->stash( page => int( $offset / $limit ) + 1 );
my $opt = {
limit => $limit,
offset => $offset,
};

if ( my $order_by = $c->param( '$order_by' ) ) {
$opt->{order_by} = [
map +{ "-" . ( $_->[1] ? $_->[0] : 'asc' ) => $_->[1] // $_->[0] },
map +[ split /:/ ],
split /,/, $order_by
];
}
elsif ( $order_by = $c->stash( 'order_by' ) ) {
$opt->{order_by} = $order_by;
}

my $schema = $c->yancy->schema( $schema_name ) ;
my $props = $schema->{properties};
my %param_filter = ();
for my $key ( @{ $c->req->params->names } ) {
next unless exists $props->{ $key };
my $type = $props->{$key}{type} || 'string';
my $value = $c->param( $key );
if ( is_type( $type, 'string' ) ) {
if ( ( $value =~ tr/*/%/ ) <= 0 ) {
$value = "\%$value\%";
}
$param_filter{ $key } = { -like => $value };
}
elsif ( grep is_type( $type, $_ ), qw(number integer) ) {
$param_filter{ $key } = $value ;
}
elsif ( is_type( $type, 'boolean' ) ) {
$param_filter{ ($value && $value ne 'false')? '-bool' : '-not_bool' } = $key;
}
elsif ( is_type($type, 'array') ) {
$param_filter{ $key } = { '-has' => $value };
}
else {
die "Sorry type '" .
to_json( $type ) .
"' is not handled yet, only string|number|integer|boolean|array is supported."
}
}
my $filter = {
%param_filter,
# Stash filter always overrides param filter, for security
%{ $c->_resolve_filter },
};
if ( $c->param( '$match' ) && $c->param( '$match' ) eq 'any' ) {
$filter = [
map +{ $_ => $filter->{ $_ } }, keys %$filter
];
}

#; use Data::Dumper;
#; $c->app->log->info( Dumper $filter );
#; $c->app->log->info( Dumper $opt );

my ( $schema_name, $filter, $opt ) = $c->_get_list_args;
my $result = $c->yancy->backend->list( $schema_name, $filter, $opt );
for my $helper ( @{ $c->stash( 'before_render' ) // [] } ) {
$c->$helper( $_ ) for @{ $result->{items} };
Expand All @@ -432,7 +364,7 @@ sub list {
my $format = $c->stash( 'format' );
return $c->respond_to(
json => sub {
$c->stash( json => { %$result, offset => $offset } );
$c->stash( json => { %$result, offset => $opt->{offset} } );
},
any => sub {
if ( !$c->stash( 'template' ) ) {
Expand All @@ -441,7 +373,7 @@ sub list {
$c->stash(
( format => $format )x!!$format,
%$result,
total_pages => ceil( $result->{total} / $limit ),
total_pages => ceil( $result->{total} / $opt->{limit} ),
);
},
);
Expand Down Expand Up @@ -983,6 +915,292 @@ sub delete {
);
}

=method feed
$routes->websocket( '/' )->to(
'yancy#feed',
schema => $schema_name,
);
Subscribe to a feed of changes to the given schema. This first sends a list result
(like L</list> would). Then it sends change messages. Change messages are JSON objects
with different fields based on the method of change:
# An item in the list was changed
{
method => "set",
# The position of the changed item in the list, 0-based
index => 2,
item => {
# These are the fields that changed
name => 'Lars Fillmore',
},
}
# An item was added to the list
{
method => "create",
# The position of the new item in the list, 0-based
index => 0,
item => {
# The entire, newly-created item
# ...
},
}
# An item was removed from the list. This does not necessarily mean
# the item was removed from the database.
{
method => "delete",
# The position of the item removed from the list, 0-based
index => 0,
}
B<TODO:> Allow the client to send change messages to the server.
=head4 Input Stash
This method uses the following stash values for configuration:
=over
=item schema
The schema to use. Required.
=item limit
The number of items to show on the page. Defaults to C<10>.
=item page
The page number to show. Defaults to C<1>. The page number will
be used to calculate the C<offset> parameter to L<Yancy::Backend/list>.
=item filter
A hash reference of field/value pairs to filter the contents of the list
or a subref that generates this hash reference. The subref will be passed
the current controller object (C<$c>).
This overrides any query filters and so can be used to enforce
authorization / security.
=item order_by
Set the default order for the items. Supports any L<Yancy::Backend/list>
C<order_by> structure.
=item before_render
An array reference of hooks to call once for each item in the C<items> list
before they are sent as messages. See L</ACTION HOOKS> for usage.
=back
=head4 Query Params
The following URL query parameters are allowed for this method:
=over
=item $page
Instead of using the C<page> stash value, you can use the C<$page> query
parameter to set the page.
=item $offset
Instead of using the C<page> stash value, you can use the C<$offset>
query parameter to set the page offset. This is overridden by the
C<$page> query parameter.
=item $limit
Instead of using the C<limit> stash value, you can use the C<$limit>
query parameter to allow users to specify their own page size.
=item $order_by
One or more fields to order by. Can be specified as C<< <name> >> or
C<< asc:<name> >> to sort in ascending order or C<< desc:<field> >>
to sort in descending order.
=item $match
How to match multiple field filters. Can be C<any> or C<all> (default
C<all>). C<all> means all fields must match for a row to be returned.
C<any> means at least one field must match for a row to be returned.
=item Additional Field Filters
Any named query parameter that matches a field in the schema will be
used to further filter the results. The stash C<filter> will override
this filter, so that the stash C<filter> can be used for security.
=back
=cut

sub feed {
my ( $c ) = @_;
$c->inactivity_timeout( 3600 );

# First, send the message for the initial page
my ( $schema_name, $filter, $opt ) = $c->_get_list_args;
my $result = $c->yancy->backend->list( $schema_name, $filter, $opt );
for my $helper ( @{ $c->stash( 'before_render' ) // [] } ) {
$c->$helper( $_ ) for @{ $result->{items} };
}
my $x_id_field = $c->yancy->schema( $schema_name )->{'x-id-field'} // 'id';
my @id_fields = ref $x_id_field eq 'ARRAY' ? @$x_id_field : ( $x_id_field );
#; $c->log->debug( 'Original result: ' . $c->dumper( $result ) );
$c->send({ json => { %$result, method => 'list' } });

# Now, poll the database for updates every few seconds.
# XXX: Create Yancy::Plugin::PubSub to do push messaging instead of
# ugly polling...
my $id = Mojo::IOLoop->recurring( $c->stash( 'interval' ) // 10, sub {
my $new_result = $c->yancy->backend->list( $schema_name, $filter, $opt );
#; $c->log->debug( 'New result: ' . $c->dumper( $new_result ) );
my %seen_items;
my @created_items;
NEW_ITEM: for my $new_i ( 0..$#{ $new_result->{items} } ) {
my $new_item = $new_result->{items}[$new_i];
# Loop through the old result to find the existing items by
# their ID fields
for my $old_i ( 0..$#{ $result->{items} } ) {
my $old_item = $result->{items}[$old_i];
if ( @id_fields == grep { $new_item->{ $_ } eq $old_item->{ $_ } } @id_fields ) {
# Found it!
$seen_items{ $old_i }++;
my %diff =
map { $_ => $new_item->{ $_ } }
grep {; no warnings 'uninitialized'; $new_item->{ $_ } ne $old_item->{ $_ } }
keys %$new_item, keys %$old_item
;
if ( keys %diff ) {
my $message = {
method => 'set',
index => $old_i,
item => \%diff,
};
#$c->log->debug( $c->dumper( $message ) );
$c->send({ json => $message });
}
next NEW_ITEM;
}
}
# If we can't find the new item, it must have been added.
# Queue it up to send after deletes to maintain indexes.
push @created_items, {
method => 'create',
index => $new_i,
item => $new_item,
};
}
# Any items we did not see must have been removed from the list,
# or pushed out by newly-created items. Send these in reverse to
# maintain indexes.
for my $old_i ( reverse grep { !$seen_items{ $_ } } 0..$#{ $result->{items} } ) {
my $message = {
method => 'delete',
index => $old_i,
};
#$c->log->debug( $c->dumper( $message ) );
$c->send({ json => $message });
}
# Now we can send the created items, from lowest index to
# highest index
for my $item ( @created_items ) {
#$c->log->debug( $c->dumper( $item ) );
$c->send({ json => $item });
}

$result = $new_result;
} );
$c->on( finish => sub { Mojo::IOLoop->remove( $id ) } );
# XXX: Allow client to send "list" message to change the parameters
# of the list. Respond with an entirely new result (not a diff).
# XXX: Allow client to send "create", "set", and "delete" messages
# to create, set, and delete items
}

sub _get_list_args {
my ( $c ) = @_;

if ( $c->stash( 'collection' ) ) {
derp '"collection" stash key is now "schema" in controller configuration';
}
my $schema_name = $c->stash( 'schema' ) || $c->stash( 'collection' )
|| die "Schema name not defined in stash";
my $limit = $c->param( '$limit' ) // $c->stash->{ limit } // 10;
my $offset = $c->param( '$page' ) ? ( $c->param( '$page' ) - 1 ) * $limit
: $c->param( '$offset' ) ? $c->param( '$offset' )
: ( ( $c->stash->{page} // 1 ) - 1 ) * $limit;
$c->stash( page => int( $offset / $limit ) + 1 );
my $opt = {
limit => $limit,
offset => $offset,
};

if ( my $order_by = $c->param( '$order_by' ) ) {
$opt->{order_by} = [
map +{ "-" . ( $_->[1] ? $_->[0] : 'asc' ) => $_->[1] // $_->[0] },
map +[ split /:/ ],
split /,/, $order_by
];
}
elsif ( $order_by = $c->stash( 'order_by' ) ) {
$opt->{order_by} = $order_by;
}

my $schema = $c->yancy->schema( $schema_name ) ;
my $props = $schema->{properties};
my %param_filter = ();
for my $key ( @{ $c->req->params->names } ) {
next unless exists $props->{ $key };
my $type = $props->{$key}{type} || 'string';
my $value = $c->param( $key );
if ( is_type( $type, 'string' ) ) {
if ( ( $value =~ tr/*/%/ ) <= 0 ) {
$value = "\%$value\%";
}
$param_filter{ $key } = { -like => $value };
}
elsif ( grep is_type( $type, $_ ), qw(number integer) ) {
$param_filter{ $key } = $value ;
}
elsif ( is_type( $type, 'boolean' ) ) {
$param_filter{ ($value && $value ne 'false')? '-bool' : '-not_bool' } = $key;
}
elsif ( is_type($type, 'array') ) {
$param_filter{ $key } = { '-has' => $value };
}
else {
die "Sorry type '" .
to_json( $type ) .
"' is not handled yet, only string|number|integer|boolean|array is supported."
}
}
my $filter = {
%param_filter,
# Stash filter always overrides param filter, for security
%{ $c->_resolve_filter },
};
if ( $c->param( '$match' ) && $c->param( '$match' ) eq 'any' ) {
$filter = [
map +{ $_ => $filter->{ $_ } }, keys %$filter
];
}

#; use Data::Dumper;
#; $c->app->log->info( Dumper $filter );
#; $c->app->log->info( Dumper $opt );

return ( $schema_name, $filter, $opt );
}

sub _resolve_filter {
my ( $c ) = @_;
my $filter = $c->stash( 'filter' );
Expand Down
Loading

0 comments on commit f8df117

Please sign in to comment.