With the advancement of machine learning algorithms and data explosion, parallelism and distributed solution become an important topic. In this post, we will introduce Ray, a Python library for building distributed applications in the machine learning system.
Ray provides simple primitives for building distributed applications. Users can parallelize a single machine code with Ray. A large ecosystem of tools is built on top of the core Ray for machine learning.
pip install -U ray
Windows machines need to install dependencies.
There're two ways to initialize Ray.
Via Python script
You can start Ray on a single machine with Python scripte.
import ray ray.init()
Without specific config, ray use all cores of the single machine.
Via the command line interface
$ ray start --head --port=6379
In this way, it start 1 node ray runtime. This machine becomes the head node.
After you start a head node for the cluster, you can create worker node on other machines and call
ray.init(address='auto) in python script to connect this Ray runtime.
Single machine configuration is an exception: on a single machine,
ray start is enough to both start ray cluster service and connect to it.
To launch a Ray cluster on the cloud, use
ray up command.
$ ray up some_cluster_configuration_file
To check the status of Ray:
To stop the Ray cluster:
@ray.remote(num_cpus=0.5) def my_function(): return 1 ob = my_function.remote()
You can invoke remote function by
.remote() method. it returns an object ref. An object ref is a unique ID that can refer to a remote object.
Another way to generate object ref is
y = 1 ob_y = ray.put(y)
Remote objects are immutable.
To retrieve (fetch) result of a remote object from an object ref, you can use
for _ in range(3): ray.get(ob) print("wut")
Executed object ref can be passed to the remote function.
@ray.remote def second_function(a): return a+1 ob2 = second_function.remote(ob) print(ray.get(ob2))
You can have multiple returns in a remote function.
@ray.remote(num_returns=2) def return_multiple(): return 1,2 a,b = return_multiple.remote()
To cancel a task.
@ray.remote def long(): time.sleep(10e4) ob_long = long.remote() ray.cancel(ob_long)
To check the finished tasks, you can use
ready_refs, remaining_refs = ray.wait(object_refs=[ob,ob_long],num_returns=1,timeout=None) print(remaining_refs)
remote classes (actors)
You can use
@ray.remote to indicate that the instance of the class is an actor.
To create a new worker, we need to instantiate a new actor.
@ray.remote class Counter(object): def __init__(self): self.value = 0 def increment(self): self.value += 1 return self.value counter = Counter.remote() ob_c = counter.increment.remote() print(ray.get(ob_c) == 1)
Another equivalent way is
class Counter(): ... Counter = ray.remote(Counter) counter = Counter.remote()
counter is an instantiated actor. When an actor is instantiated, a node(computer) in the cluster is chosen to create a worker process on that node for the actor. On that worker process, a
counter object is created.
Methods called on different actors execute in parallel. Methods called on the same actor execute in sequence.
You can also call the methods of actor remotely.
@ray.remote class Foo(object): #specify the numbers of returned object refs @ray.method(num_returns=2) def bar(self): return 1,2 f = Foo.remote() ob_1, ob_2 = f.bar.remote()
You can assign resources for the worker in
@ray.remote(num_cpus=). If you assigned like that, when an instance is created, it will be placed on a node that has the resources you asked for and the resource will be reserved for the actor during its lifetime.
If you make no assignment for resources, the actor will acquire no resources for its lifetime but will acquire 1 CPU each time it executes its method.
Another way to acquire resources is
@ray.remote() class Counter(num_cpus = 2): ... a1 = Counter.options(num_cpus = 1).remote()
Deal with Pipeline
ray.get() is powerful however with some weakness: if we use
ray.get() on the results of multiple tasks, we will have to wait until all the tasks finish. This issue can harm the performance if the tasks have a widely different time schedule.
ray.wait() helps solve this by returning the results as soon as any object in the list is ready. It gives two returns:
- the ID of ready(finished) objects
- the ID of unfinished objects
The last finished object will be the first item in the first return list.
You can use the Ray dashboard to understand its performance.
when you run
ray.init(), the information of dashboard will be printed like
View the Ray dashboard at localhost:8265
Run Ray on a cluster of machines!
A Ray cluster consists of a head node (must have) and a set of worker nodes (optional). The head node must be started first.