Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added R wrappers for released dataset ops #40

Merged
merged 13 commits into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ __pycache__
*.pbxproj
*.xcworkspacedata
.ipynb_checkpoints

# Auto-generated files by `R CMD check`
tfio.Rcheck/
tfio_*.tar.gz
.Rproj.user

1 change: 1 addition & 0 deletions R-package/.Rbuildignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
^.*\.Rproj$
^\.Rproj\.user$
^man-roxygen/
scripts
5 changes: 2 additions & 3 deletions R-package/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Imports:
reticulate (>= 1.10),
tensorflow (>= 1.9),
tfdatasets (>= 1.9),
forge,
magrittr,
rlang,
tidyselect,
Expand All @@ -32,7 +33,5 @@ Roxygen: list(markdown = TRUE)
RoxygenNote: 6.1.0
Suggests:
testthat,
knitr,
tfestimators,
keras
knitr
VignetteBuilder: knitr
49 changes: 49 additions & 0 deletions R-package/NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,9 +1,36 @@
# Generated by roxygen2: do not edit by hand

export("%>%")
export(dataset_batch)
export(dataset_cache)
export(dataset_concatenate)
export(dataset_filter)
export(dataset_flat_map)
export(dataset_interleave)
export(dataset_map)
export(dataset_map_and_batch)
export(dataset_padded_batch)
export(dataset_prefetch)
export(dataset_prefetch_to_device)
export(dataset_prepare)
export(dataset_repeat)
export(dataset_shard)
export(dataset_shuffle)
export(dataset_shuffle_and_repeat)
export(dataset_skip)
export(dataset_take)
export(ignite_dataset)
export(install_tensorflow)
export(kafka_dataset)
export(kinesis_dataset)
export(next_batch)
export(sequence_file_dataset)
export(tf)
export(tf_config)
export(tf_version)
export(until_out_of_range)
export(with_dataset)
import(forge)
import(rlang)
import(tfdatasets)
import(tidyselect)
Expand All @@ -16,3 +43,25 @@ importFrom(reticulate,tuple)
importFrom(tensorflow,install_tensorflow)
importFrom(tensorflow,tf)
importFrom(tensorflow,tf_config)
importFrom(tensorflow,tf_version)
importFrom(tfdatasets,dataset_batch)
importFrom(tfdatasets,dataset_cache)
importFrom(tfdatasets,dataset_concatenate)
importFrom(tfdatasets,dataset_filter)
importFrom(tfdatasets,dataset_flat_map)
importFrom(tfdatasets,dataset_interleave)
importFrom(tfdatasets,dataset_map)
importFrom(tfdatasets,dataset_map_and_batch)
importFrom(tfdatasets,dataset_padded_batch)
importFrom(tfdatasets,dataset_prefetch)
importFrom(tfdatasets,dataset_prefetch_to_device)
importFrom(tfdatasets,dataset_prepare)
importFrom(tfdatasets,dataset_repeat)
importFrom(tfdatasets,dataset_shard)
importFrom(tfdatasets,dataset_shuffle)
importFrom(tfdatasets,dataset_shuffle_and_repeat)
importFrom(tfdatasets,dataset_skip)
importFrom(tfdatasets,dataset_take)
importFrom(tfdatasets,next_batch)
importFrom(tfdatasets,until_out_of_range)
importFrom(tfdatasets,with_dataset)
8 changes: 4 additions & 4 deletions R-package/R/dataset_utils.R
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
as_tf_dataset <- function (dataset) {
if (!is_dataset(dataset))
if (!is_dataset(dataset))
stop("Provided dataset is not a TensorFlow Dataset")
if (!inherits(dataset, "tf_dataset"))
if (!inherits(dataset, "tf_dataset"))
class(dataset) <- c("tf_dataset", class(dataset))
dataset
}

is_dataset <- function (x) {
inherits(x, "tensorflow.python.data.ops.dataset_ops.Dataset") || is_tfio_dataset(X)
inherits(x, "tensorflow.python.data.ops.dataset_ops.Dataset") || is_tfio_dataset(x)
}

is_tfio_dataset <- function(x) {
"tensorflow_io" %in% class(x)
grepl("tensorflow_io", class(x))
}
14 changes: 14 additions & 0 deletions R-package/R/hadoop_dataset.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#' Create a `SequenceFileDataset`.
#'
#' This function allows a user to read data from a hadoop sequence
#' file. A sequence file consists of (key value) pairs sequentially. At
#' the moment, `org.apache.hadoop.io.Text` is the only serialization type
#' being supported, and there is no compression support.
#'
#' @param filenames A `tf.string` tensor containing one or more filenames.
#'
#' @export
sequence_file_dataset <- function(filenames) {
dataset <- tfio_lib$hadoop$SequenceFileDataset(filenames = filenames)
as_tf_dataset(dataset)
}
56 changes: 56 additions & 0 deletions R-package/R/ignite_dataset.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#' Create a `IgniteDataset`.
#'
#' Apache Ignite is a memory-centric distributed database, caching, and
#' processing platform for transactional, analytical, and streaming workloads,
#' delivering in-memory speeds at petabyte scale. This contrib package
#' contains an integration between Apache Ignite and TensorFlow. The
#' integration is based on tf.data from TensorFlow side and Binary Client
#' Protocol from Apache Ignite side. It allows to use Apache Ignite as a
#' datasource for neural network training, inference and all other
#' computations supported by TensorFlow. Ignite Dataset is based on Apache
#' Ignite Binary Client Protocol.
#'
#' @param cache_name Cache name to be used as datasource.
#' @param host Apache Ignite Thin Client host to be connected.
#' @param port Apache Ignite Thin Client port to be connected.
#' @param local Local flag that defines to query only local data.
#' @param part Number of partitions to be queried.
#' @param page_size Apache Ignite Thin Client page size.
#' @param username Apache Ignite Thin Client authentication username.
#' @param password Apache Ignite Thin Client authentication password.
#' @param certfile File in PEM format containing the certificate as well as any
#' number of CA certificates needed to establish the certificate's
#' authenticity.
#' @param keyfile File containing the private key (otherwise the private key
#' will be taken from certfile as well).
#' @param cert_password Password to be used if the private key is encrypted and
#' a password is necessary.
#'
#' @export
ignite_dataset <- function(
cache_name,
host = "localhost",
port = 10800,
local = FALSE,
part = -1,
page_size = 100,
username = NULL,
password = NULL,
certfile = NULL,
keyfile = NULL,
cert_password = NULL) {
dataset <- tfio_lib$ignite$IgniteDataset(
cache_name = cache_name,
host = host,
port = cast_scalar_integer(port),
local = cast_logical(local),
part = cast_scalar_integer(part),
page_size = cast_scalar_integer(page_size),
username = cast_nullable_string(username),
password = cast_nullable_string(password),
certfile = cast_nullable_string(certfile),
keyfile = cast_nullable_string(keyfile),
cert_password = cast_nullable_string(cert_password)
)
as_tf_dataset(dataset)
}
27 changes: 27 additions & 0 deletions R-package/R/kafka_dataset.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#' Creates a `KafkaDataset`.
#'
#' @param topics A `tf.string` tensor containing one or more subscriptions, in
#' the format of `[topic:partition:offset:length]`, by default length is -1
#' for unlimited.
#' @param servers A list of bootstrap servers.
#' @param group The consumer group id.
#' @param eof If True, the kafka reader will stop on EOF.
#' @param timeout The timeout value for the Kafka Consumer to wait (in
#' millisecond).
#'
#' @export
kafka_dataset <- function(
topics,
servers = "localhost",
group = "",
eof = FALSE,
timeout = 1000) {
dataset <- tfio_lib$kafka$KafkaDataset(
topics = topics,
servers = servers,
group = group,
eof = cast_logical(eof),
timeout = cast_scalar_integer(timeout)
)
as_tf_dataset(dataset)
}
28 changes: 28 additions & 0 deletions R-package/R/kinesis_dataset.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#' Creates a `KinesisDataset`.
#'
#' Kinesis is a managed service provided by AWS for data streaming.
#' This dataset reads messages from Kinesis with each message presented
#' as a `tf.string`.
#'
#' @param stream A `tf.string` tensor containing the name of the stream.
#' @param shard A `tf.string` tensor containing the id of the shard.
#' @param read_indefinitely If `True`, the Kinesis dataset will keep retry again
#' on `EOF` after the `interval` period. If `False`, then the dataset will
#' stop on `EOF`. The default value is `True`.
#' @param interval The interval for the Kinesis Client to wait before it tries
#' to get records again (in millisecond).
#'
#' @export
kinesis_dataset <- function(
stream,
shard = "",
read_indefinitely = TRUE,
interval = 100000) {
dataset <- tfio_lib$kinesis$KinesisDataset(
stream = stream,
shard = shard,
read_indefinitely = cast_logical(read_indefinitely),
interval = cast_scalar_integer(interval)
)
as_tf_dataset(dataset)
}
26 changes: 2 additions & 24 deletions R-package/R/package.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ NULL
#' @import tidyselect
#' @import rlang
#' @import tfdatasets
#' @import forge
NULL

tfio_lib <- NULL
Expand All @@ -35,8 +36,7 @@ tfio_lib <- NULL
}
)

# TODO: This is commented out for now until we add the wrappers.
# tfio_lib <<- import("tensorflow_io", delay_load = delay_load)
tfio_lib <<- import("tensorflow_io", delay_load = delay_load)

}

Expand All @@ -63,25 +63,3 @@ check_tensorflow_version <- function(displayed_warning) {
.onDetach <- function(libpath) {

}

# Reusable function for registering a set of methods with S3 manually. The
# methods argument is a list of character vectors, each of which has the form
# c(package, genname, class).
registerMethods <- function(methods) {
lapply(methods, function(method) {
pkg <- method[[1]]
generic <- method[[2]]
class <- method[[3]]
func <- get(paste(generic, class, sep = "."))
if (pkg %in% loadedNamespaces()) {
registerS3method(generic, class, func, envir = asNamespace(pkg))
}
setHook(
packageEvent(pkg, "onLoad"),
function(...) {
registerS3method(generic, class, func, envir = asNamespace(pkg))
}
)
})
}

98 changes: 98 additions & 0 deletions R-package/R/reexports.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,101 @@ tensorflow::install_tensorflow
#' @importFrom tensorflow tf_config
#' @export
tensorflow::tf_config

#' @importFrom tensorflow tf_version
#' @export
tensorflow::tf_version


# Re-exports from tfdatasets dataset_iterators

#' @importFrom tfdatasets next_batch
#' @export
tfdatasets::next_batch

#' @importFrom tfdatasets with_dataset
#' @export
tfdatasets::with_dataset

#' @importFrom tfdatasets until_out_of_range
#' @export
tfdatasets::until_out_of_range


# Re-exports from tfdatasets dataset_methods

#' @importFrom tfdatasets dataset_repeat
#' @export
tfdatasets::dataset_repeat

#' @importFrom tfdatasets dataset_shuffle
#' @export
tfdatasets::dataset_shuffle

#' @importFrom tfdatasets dataset_shuffle_and_repeat
#' @export
tfdatasets::dataset_shuffle_and_repeat

#' @importFrom tfdatasets dataset_batch
#' @export
tfdatasets::dataset_batch

#' @importFrom tfdatasets dataset_cache
#' @export
tfdatasets::dataset_cache

#' @importFrom tfdatasets dataset_concatenate
#' @export
tfdatasets::dataset_concatenate

#' @importFrom tfdatasets dataset_take
#' @export
tfdatasets::dataset_take

#' @importFrom tfdatasets dataset_map
#' @export
tfdatasets::dataset_map

#' @importFrom tfdatasets dataset_map_and_batch
#' @export
tfdatasets::dataset_map_and_batch

#' @importFrom tfdatasets dataset_flat_map
#' @export
tfdatasets::dataset_flat_map

#' @importFrom tfdatasets dataset_prefetch
#' @export
tfdatasets::dataset_prefetch

#' @importFrom tfdatasets dataset_prefetch_to_device
#' @export
tfdatasets::dataset_prefetch_to_device

#' @importFrom tfdatasets dataset_filter
#' @export
tfdatasets::dataset_filter

#' @importFrom tfdatasets dataset_skip
#' @export
tfdatasets::dataset_skip

#' @importFrom tfdatasets dataset_interleave
#' @export
tfdatasets::dataset_interleave

#' @importFrom tfdatasets dataset_prefetch
#' @export
tfdatasets::dataset_prefetch

#' @importFrom tfdatasets dataset_shard
#' @export
tfdatasets::dataset_shard

#' @importFrom tfdatasets dataset_padded_batch
#' @export
tfdatasets::dataset_padded_batch

#' @importFrom tfdatasets dataset_prepare
#' @export
tfdatasets::dataset_prepare
Loading