Skip to content

Commit

Permalink
Remove deprecated arguments and deprecate redirect_output and redirec…
Browse files Browse the repository at this point in the history
…t_worker_output.
  • Loading branch information
robertnishihara committed Feb 14, 2019
1 parent 13e14c6 commit 98b1633
Show file tree
Hide file tree
Showing 19 changed files with 131 additions and 169 deletions.
4 changes: 2 additions & 2 deletions doc/source/tempfile.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ A typical layout of temporary files could look like this:
│   ├── monitor.out
│   ├── plasma_store_0.err # array of plasma stores' outputs
│   ├── plasma_store_0.out
│   ├── raylet_0.err # array of raylets' outputs. Control it with `--no-redirect-worker-output` (in Ray's command line) or `redirect_worker_output` (in ray.init())
│   ├── raylet_0.err
│   ├── raylet_0.out
│   ├── redis-shard_0.err # array of redis shards' outputs
│   ├── redis-shard_0.out
Expand Down Expand Up @@ -80,4 +80,4 @@ The path you specified will be given as it is without being affected any other p
Notes
-----

Temporary file policies are defined in ``python/ray/node.py``.
Temporary file policies are defined in ``python/ray/node.py``.
30 changes: 19 additions & 11 deletions examples/lbfgs/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class LinearModel(object):
variables (TensorFlowVariables): Extracted variables and methods to
manipulate them.
"""

def __init__(self, shape):
"""Creates a LinearModel object."""
x = tf.placeholder(tf.float32, [None, shape[0]])
Expand All @@ -46,26 +47,33 @@ def __init__(self, shape):
y = tf.nn.softmax(tf.matmul(x, w) + b)
y_ = tf.placeholder(tf.float32, [None, shape[1]])
self.y_ = y_
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y),
reduction_indices=[1]))
cross_entropy = tf.reduce_mean(
-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
self.cross_entropy = cross_entropy
self.cross_entropy_grads = tf.gradients(cross_entropy, [w, b])
self.sess = tf.Session()
# In order to get and set the weights, we pass in the loss function to
# Ray's TensorFlowVariables to automatically create methods to modify
# the weights.
self.variables = ray.experimental.TensorFlowVariables(cross_entropy,
self.sess)
self.variables = ray.experimental.TensorFlowVariables(
cross_entropy, self.sess)

def loss(self, xs, ys):
"""Computes the loss of the network."""
return float(self.sess.run(self.cross_entropy,
feed_dict={self.x: xs, self.y_: ys}))
return float(
self.sess.run(
self.cross_entropy, feed_dict={
self.x: xs,
self.y_: ys
}))

def grad(self, xs, ys):
"""Computes the gradients of the network."""
return self.sess.run(self.cross_entropy_grads,
feed_dict={self.x: xs, self.y_: ys})
return self.sess.run(
self.cross_entropy_grads, feed_dict={
self.x: xs,
self.y_: ys
})


@ray.remote
Expand Down Expand Up @@ -110,7 +118,7 @@ def full_grad(theta):


if __name__ == "__main__":
ray.init(redirect_output=True)
ray.init()

# From the perspective of scipy.optimize.fmin_l_bfgs_b, full_loss is simply
# a function which takes some parameters theta, and computes a loss.
Expand All @@ -136,5 +144,5 @@ def full_grad(theta):

# Use L-BFGS to minimize the loss function.
print("Running L-BFGS.")
result = scipy.optimize.fmin_l_bfgs_b(full_loss, theta_init, maxiter=10,
fprime=full_grad, disp=True)
result = scipy.optimize.fmin_l_bfgs_b(
full_loss, theta_init, maxiter=10, fprime=full_grad, disp=True)
92 changes: 55 additions & 37 deletions examples/resnet/resnet_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,41 @@
"update Tensorflow to the latest version.")

parser = argparse.ArgumentParser(description="Run the ResNet example.")
parser.add_argument("--dataset", default="cifar10", type=str,
help="Dataset to use: cifar10 or cifar100.")
parser.add_argument("--train_data_path",
default="cifar-10-batches-bin/data_batch*", type=str,
help="Data path for the training data.")
parser.add_argument("--eval_data_path",
default="cifar-10-batches-bin/test_batch.bin", type=str,
help="Data path for the testing data.")
parser.add_argument("--eval_dir", default="/tmp/resnet-model/eval", type=str,
help="Data path for the tensorboard logs.")
parser.add_argument("--eval_batch_count", default=50, type=int,
help="Number of batches to evaluate over.")
parser.add_argument("--num_gpus", default=0, type=int,
help="Number of GPUs to use for training.")
parser.add_argument("--redis-address", default=None, type=str,
help="The Redis address of the cluster.")
parser.add_argument(
"--dataset",
default="cifar10",
type=str,
help="Dataset to use: cifar10 or cifar100.")
parser.add_argument(
"--train_data_path",
default="cifar-10-batches-bin/data_batch*",
type=str,
help="Data path for the training data.")
parser.add_argument(
"--eval_data_path",
default="cifar-10-batches-bin/test_batch.bin",
type=str,
help="Data path for the testing data.")
parser.add_argument(
"--eval_dir",
default="/tmp/resnet-model/eval",
type=str,
help="Data path for the tensorboard logs.")
parser.add_argument(
"--eval_batch_count",
default=50,
type=int,
help="Number of batches to evaluate over.")
parser.add_argument(
"--num_gpus",
default=0,
type=int,
help="Number of GPUs to use for training.")
parser.add_argument(
"--redis-address",
default=None,
type=str,
help="The Redis address of the cluster.")

FLAGS = parser.parse_args()

Expand Down Expand Up @@ -87,9 +106,8 @@ def __init__(self, data, dataset, num_gpus):

with tf.device("/gpu:0" if num_gpus > 0 else "/cpu:0"):
# Build the model.
images, labels = cifar_input.build_input(data,
hps.batch_size, dataset,
False)
images, labels = cifar_input.build_input(data, hps.batch_size,
dataset, False)
self.model = resnet_model.ResNet(hps, images, labels, "train")
self.model.build_graph()
config = tf.ConfigProto(allow_soft_placement=True)
Expand Down Expand Up @@ -131,9 +149,8 @@ def __init__(self, data, dataset, eval_batch_count, eval_dir):
num_gpus=0)
with tf.device("/cpu:0"):
# Builds the testing network.
images, labels = cifar_input.build_input(data,
hps.batch_size, dataset,
False)
images, labels = cifar_input.build_input(data, hps.batch_size,
dataset, False)
self.model = resnet_model.ResNet(hps, images, labels, "eval")
self.model.build_graph()
config = tf.ConfigProto(allow_soft_placement=True)
Expand All @@ -159,8 +176,7 @@ def accuracy(self, weights, train_step):
sess = self.model.variables.sess
for _ in range(self.eval_batch_count):
summaries, loss, predictions, truth = sess.run(
[model.summaries, model.cost, model.predictions,
model.labels])
[model.summaries, model.cost, model.predictions, model.labels])

truth = np.argmax(truth, axis=1)
predictions = np.argmax(predictions, axis=1)
Expand All @@ -170,8 +186,7 @@ def accuracy(self, weights, train_step):
precision = 1.0 * correct_prediction / total_prediction
self.best_precision = max(precision, self.best_precision)
precision_summ = tf.Summary()
precision_summ.value.add(
tag="Precision", simple_value=precision)
precision_summ.value.add(tag="Precision", simple_value=precision)
self.summary_writer.add_summary(precision_summ, train_step)
best_precision_summ = tf.Summary()
best_precision_summ.value.add(
Expand All @@ -192,23 +207,24 @@ def get_ip_addr(self):
def train():
num_gpus = FLAGS.num_gpus
if FLAGS.redis_address is None:
ray.init(num_gpus=num_gpus, redirect_output=True)
ray.init(num_gpus=num_gpus)
else:
ray.init(redis_address=FLAGS.redis_address)
train_data = get_data.remote(FLAGS.train_data_path, 50000, FLAGS.dataset)
test_data = get_data.remote(FLAGS.eval_data_path, 10000, FLAGS.dataset)
# Creates an actor for each gpu, or one if only using the cpu. Each actor
# has access to the dataset.
if FLAGS.num_gpus > 0:
train_actors = [ResNetTrainActor.remote(train_data, FLAGS.dataset,
num_gpus)
for _ in range(num_gpus)]
train_actors = [
ResNetTrainActor.remote(train_data, FLAGS.dataset, num_gpus)
for _ in range(num_gpus)
]
else:
train_actors = [ResNetTrainActor.remote(train_data, FLAGS.dataset, 0)]
test_actor = ResNetTestActor.remote(test_data, FLAGS.dataset,
FLAGS.eval_batch_count, FLAGS.eval_dir)
print("The log files for tensorboard are stored at ip {}."
.format(ray.get(test_actor.get_ip_addr.remote())))
print("The log files for tensorboard are stored at ip {}.".format(
ray.get(test_actor.get_ip_addr.remote())))
step = 0
weight_id = train_actors[0].get_weights.remote()
acc_id = test_actor.accuracy.remote(weight_id, step)
Expand All @@ -218,11 +234,13 @@ def train():
print("Starting training loop. Use Ctrl-C to exit.")
try:
while True:
all_weights = ray.get([actor.compute_steps.remote(weight_id)
for actor in train_actors])
mean_weights = {k: (sum(weights[k] for weights in all_weights) /
num_gpus)
for k in all_weights[0]}
all_weights = ray.get([
actor.compute_steps.remote(weight_id) for actor in train_actors
])
mean_weights = {
k: (sum(weights[k] for weights in all_weights) / num_gpus)
for k in all_weights[0]
}
weight_id = ray.put(mean_weights)
step += 10
if step % 200 == 0:
Expand Down
33 changes: 21 additions & 12 deletions examples/rl_pong/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,26 @@ def compute_gradient(self, model):

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Train an RL agent on Pong.")
parser.add_argument("--batch-size", default=10, type=int,
help="The number of rollouts to do per batch.")
parser.add_argument("--redis-address", default=None, type=str,
help="The Redis address of the cluster.")
parser.add_argument("--iterations", default=-1, type=int,
help="The number of model updates to perform. By "
"default, training will not terminate.")
parser.add_argument(
"--batch-size",
default=10,
type=int,
help="The number of rollouts to do per batch.")
parser.add_argument(
"--redis-address",
default=None,
type=str,
help="The Redis address of the cluster.")
parser.add_argument(
"--iterations",
default=-1,
type=int,
help="The number of model updates to perform. By "
"default, training will not terminate.")
args = parser.parse_args()
batch_size = args.batch_size

ray.init(redis_address=args.redis_address, redirect_output=True)
ray.init(redis_address=args.redis_address)

# Run the reinforcement learning.

Expand Down Expand Up @@ -187,17 +196,17 @@ def compute_gradient(self, model):
# Accumulate the gradient over batch.
for k in model:
grad_buffer[k] += grad[k]
running_reward = (reward_sum if running_reward is None
else running_reward * 0.99 + reward_sum * 0.01)
running_reward = (reward_sum if running_reward is None else
running_reward * 0.99 + reward_sum * 0.01)
end_time = time.time()
print("Batch {} computed {} rollouts in {} seconds, "
"running mean is {}".format(batch_num, batch_size,
end_time - start_time,
running_reward))
for k, v in model.items():
g = grad_buffer[k]
rmsprop_cache[k] = (decay_rate * rmsprop_cache[k] +
(1 - decay_rate) * g ** 2)
rmsprop_cache[k] = (
decay_rate * rmsprop_cache[k] + (1 - decay_rate) * g**2)
model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)
# Reset the batch gradient buffer.
grad_buffer[k] = np.zeros_like(v)
Expand Down
21 changes: 0 additions & 21 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ def __call__(self, *args, **kwargs):
def remote(self, *args, **kwargs):
return self._remote(args, kwargs)

def _submit(self, args, kwargs, num_return_vals=None):
logger.warning(
"WARNING: _submit() is being deprecated. Please use _remote().")
return self._remote(
args=args, kwargs=kwargs, num_return_vals=num_return_vals)

def _remote(self, args, kwargs, num_return_vals=None):
if num_return_vals is None:
num_return_vals = self._num_return_vals
Expand Down Expand Up @@ -238,21 +232,6 @@ def remote(self, *args, **kwargs):
"""
return self._remote(args=args, kwargs=kwargs)

def _submit(self,
args,
kwargs,
num_cpus=None,
num_gpus=None,
resources=None):
logger.warning(
"WARNING: _submit() is being deprecated. Please use _remote().")
return self._remote(
args=args,
kwargs=kwargs,
num_cpus=num_cpus,
num_gpus=num_gpus,
resources=resources)

def _remote(self,
args,
kwargs,
Expand Down
8 changes: 4 additions & 4 deletions python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def _make_inc_temp(self, suffix="", prefix="", directory_name="/tmp/ray"):
raise FileExistsError(errno.EEXIST,
"No usable temporary filename found")

def new_log_files(self, name, redirect_output=None):
def new_log_files(self, name, redirect_output=True):
"""Generate partially randomized filenames for log files.
Args:
Expand Down Expand Up @@ -262,7 +262,7 @@ def start_redis(self):
redis_shard_ports=self._ray_params.redis_shard_ports,
num_redis_shards=self._ray_params.num_redis_shards,
redis_max_clients=self._ray_params.redis_max_clients,
redirect_worker_output=self._ray_params.redirect_worker_output,
redirect_worker_output=True,
password=self._ray_params.redis_password,
redis_max_memory=self._ray_params.redis_max_memory)
assert (
Expand All @@ -272,7 +272,7 @@ def start_redis(self):

def start_log_monitor(self):
"""Start the log monitor."""
stdout_file, stderr_file = self.new_log_files("log_monitor", True)
stdout_file, stderr_file = self.new_log_files("log_monitor")
process_info = ray.services.start_log_monitor(
self.redis_address,
self._logs_dir,
Expand All @@ -286,7 +286,7 @@ def start_log_monitor(self):

def start_ui(self):
"""Start the web UI."""
stdout_file, stderr_file = self.new_log_files("webui", True)
stdout_file, stderr_file = self.new_log_files("webui")
notebook_name = self._make_inc_temp(
suffix=".ipynb", prefix="ray_ui", directory_name=self._temp_dir)
self._webui_url, process_info = ray.services.start_ui(
Expand Down
Loading

0 comments on commit 98b1633

Please sign in to comment.