Ray
Ray
并行任务
要将
import ray
import time
# Start Ray.
ray.init()
@ray.remote
def f(x):
time.sleep(1)
return x
# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
result_ids.append(f.remote(i))
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_ids) # [0, 1, 2, 3]
因为对
任务依赖
一个任务还可以依赖于其他任务。在下面的代码中,
import numpy as np
@ray.remote
def create_matrix(size):
return np.random.normal(size=size)
@ray.remote
def multiply_matrices(x, y):
return np.dot(x, y)
x_id = create_matrix.remote([1000, 1000])
y_id = create_matrix.remote([1000, 1000])
z_id = multiply_matrices.remote(x_id, y_id)
# Get the results.
z = ray.get(z_id)
值聚合
就像经典的
import time
@ray.remote
def add(x, y):
time.sleep(1)
return x + y
# Aggregate the values slowly. This approach takes O(n) where n is the
# number of values being aggregated. In this case, 7 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)
# Aggregate the values in a tree-structured pattern. This approach
# takes O(log(n)). In this case, 3 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)

代码还可以进一步简化:
# Slow approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
values = [add.remote(values[0], values[1])] + values[2:]
result = ray.get(values[0])
# Fast approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
values = values[2:] + [add.remote(values[0], values[1])]
result = ray.get(values[0])
Actor
可以使用
各个
@ray.remote
class Counter(object):
def __init__(self):
self.x = 0
def inc(self):
self.x += 1
def get_value(self):
return self.x
# Create an actor process.
c = Counter.remote()
# Check the actor's counter value.
print(ray.get(c.get_value.remote()))# 0
# Increment the counter twice and check the value again.
c.inc.remote()
c.inc.remote()
print(ray.get(c.get_value.remote()))# 2
上面的例子是
句柄
import time
@ray.remote
class MessageActor(object):
def __init__(self):
self.messages = []
def add_message(self, message):
self.messages.append(message)
def get_and_clear_messages(self):
messages = self.messages
self.messages = []
return messages
# Define a remote function which loops around and pushes
# messages to the actor.
@ray.remote
def worker(message_actor, j):
for i in range(100):
time.sleep(1)
message_actor.add_message.remote(
"Message {} from actor {}.".format(i, j))
# Create a message actor.
message_actor = MessageActor.remote()
# Start 3 tasks that push messages to the actor.
[worker.remote(message_actor, j) for j in range(3)]
# Periodically get the messages and print them.
for _ in range(100):
new_messages = ray.get(message_actor.get_and_clear_messages.remote())
print("New messages:", new_messages)
time.sleep(1)
# This script prints something like the following:
# New messages: []
# New messages: ['Message 0 from actor 1.', 'Message 0 from actor 0.']
# New messages: ['Message 0 from actor 2.', 'Message 1 from actor 1.', 'Message 1 from actor 0.', 'Message 1 from actor 2.']
# New messages: ['Message 2 from actor 1.', 'Message 2 from actor 0.', 'Message 2 from actor 2.']
# New messages: ['Message 3 from actor 2.', 'Message 3 from actor 1.', 'Message 3 from actor 0.']
# New messages: ['Message 4 from actor 2.', 'Message 4 from actor 0.', 'Message 4 from actor 1.']
# New messages: ['Message 5 from actor 2.', 'Message 5 from actor 0.', 'Message 5 from actor 1.']