Quick Tutorial on Ray: Parallel and Distributed Python

2020.10.09 16 Views 16 Python

With the advancement of machine learning algorithms and data explosion, parallelism and distributed solution become an important topic. In this post, we will introduce Ray, a Python library for building distributed applications in the machine learning system.

Ray provides simple primitives for building distributed applications. Users can parallelize a single machine code with Ray. A large ecosystem of tools is built on top of the core Ray for machine learning.


Installation

pip install -U ray

Windows machines need to install dependencies.


Initialization

There're two ways to initialize Ray.

Via Python script

You can start Ray on a single machine with Python scripte.

import ray
ray.init()

Without specific config, ray use all cores of the single machine.

Via the command line interface

$ ray start --head --port=6379

In this way, it start 1 node ray runtime. This machine becomes the head node.

After you start a head node for the cluster, you can create worker node on other machines and call ray.init(address='auto) in python script to connect this Ray runtime.

Single machine configuration is an exception: on a single machine, ray.init() without ray start is enough to both start ray cluster service and connect to it.

To launch a Ray cluster on the cloud, use ray up command.

$ ray up some_cluster_configuration_file

To check the status of Ray:

print(ray.is_initialized())

To stop the Ray cluster:

ray.shutdown()

Remote Function

@ray.remote(num_cpus=0.5)
def my_function():
    return 1

ob = my_function.remote()

You can invoke remote function by .remote() method. it returns an object ref. An object ref is a unique ID that can refer to a remote object.

Another way to generate object ref is ray.put.

y = 1
ob_y = ray.put(y)

Remote objects are immutable.

To retrieve (fetch) result of a remote object from an object ref, you can use ray.get.

for _ in range(3):
    ray.get(ob)
    print("wut")

Executed object ref can be passed to the remote function.

@ray.remote
def second_function(a):
    return a+1

ob2 = second_function.remote(ob) 
print(ray.get(ob2))

You can have multiple returns in a remote function.

@ray.remote(num_returns=2)
def return_multiple():
    return 1,2

a,b = return_multiple.remote()

To cancel a task.

@ray.remote
def long():
    time.sleep(10e4)

ob_long = long.remote()
ray.cancel(ob_long)

To check the finished tasks, you can use ray.wait.

ready_refs, remaining_refs = ray.wait(object_refs=[ob,ob_long],num_returns=1,timeout=None)
print(remaining_refs)

remote classes (actors)

Create

You can use @ray.remote to indicate that the instance of the class is an actor.

To create a new worker, we need to instantiate a new actor.

@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0
    
    def increment(self):
        self.value += 1
        return self.value

counter = Counter.remote()
ob_c = counter.increment.remote()
print(ray.get(ob_c) == 1)

Another equivalent way is

class Counter():
  ...

Counter = ray.remote(Counter)
counter = Counter.remote()

Here counter is an instantiated actor. When an actor is instantiated, a node(computer) in the cluster is chosen to create a worker process on that node for the actor. On that worker process, a counter object is created.

Methods called on different actors execute in parallel. Methods called on the same actor execute in sequence.

Methods

You can also call the methods of actor remotely.

@ray.remote
class Foo(object):

    #specify the numbers of returned object refs
    @ray.method(num_returns=2)
    def bar(self):
        return 1,2

f = Foo.remote()
ob_1, ob_2 = f.bar.remote()

Assign Resources

You can assign resources for the worker in @ray.remote(num_cpus=). If you assigned like that, when an instance is created, it will be placed on a node that has the resources you asked for and the resource will be reserved for the actor during its lifetime.

If you make no assignment for resources, the actor will acquire no resources for its lifetime but will acquire 1 CPU each time it executes its method.

Another way to acquire resources is

@ray.remote()
class Counter(num_cpus = 2):
    ...

a1 = Counter.options(num_cpus = 1).remote()

Deal with Pipeline

ray.get() is powerful however with some weakness: if we use ray.get() on the results of multiple tasks, we will have to wait until all the tasks finish. This issue can harm the performance if the tasks have a widely different time schedule.

ray.wait() helps solve this by returning the results as soon as any object in the list is ready. It gives two returns:

  1. the ID of ready(finished) objects
  2. the ID of unfinished objects

The last finished object will be the first item in the first return list.


Ray Dashboard

You can use the Ray dashboard to understand its performance.

when you run ray.init(), the information of dashboard will be printed like View the Ray dashboard at localhost:8265


Distributed Ray

Run Ray on a cluster of machines!

A Ray cluster consists of a head node (must have) and a set of worker nodes (optional). The head node must be started first.


Further Reading

We only introduce the basis of Ray. Here is the Ray paper. Ray can be used on the cloud clusters. Several libraries on the top of Ray are built for solving problems in Machine Learning.

  1. Tune: scalable hyperparameter tuning
  2. RLlib: Scalable Reinforcement Learning
  3. RaySGD: Distributed Training Wrappers
  4. Ray Serve: Scalable model-serving library
Comments
Write a Comment