Ractor - Ruby 的类似 Actor 的并发抽象
Ractor 设计用于在没有线程安全顾虑的情况下并行执行 Ruby 代码。
摘要
Ruby 进程中的多个 Ractor
您可以创建多个 Ractor,它们可以相互并行运行 Ruby 代码。
-
Ractor.new{ expr }会创建一个新的Ractor,并且expr可以在多核计算机上与其他 Ractor 并行运行。 -
Ruby 进程以一个 Ractor(称为*主 Ractor*)开始。
-
如果主 Ractor 终止,所有其他 Ractor 都会收到终止请求,这与线程的行为类似。
-
同一 Ractor 内的线程共享一个 Ractor 范围内的全局锁(MRI 术语中的 GVL),因此它们无法相互并行运行(除非在 C 扩展中显式释放 GVL)。不同 Ractor 中的线程可以并行运行。
-
创建 Ractor 的开销略高于创建线程的开销。
Ractor 之间的有限共享
Ractor 不像线程那样共享所有对象(线程可以访问任何对象,除了存储在另一个线程线程局部变量中的对象)。
-
大多数对象是*不可共享对象*。不可共享对象只能由创建它们的 Ractor 使用,因此您无需担心因跨 Ractor 并发使用对象而导致的线程安全问题。
-
有些对象是*可共享对象*。以下是不完整的列表,供您参考:
-
i = 123:所有Integer都是可共享的。 -
s = "str".freeze:如果冻结的字符串不包含指向不可共享对象的实例变量,则它是可共享的。 -
a = [1, [2], 3].freeze:a不是可共享对象,因为a指向不可共享对象[2](此Array未冻结)。 -
h = {c: Object}.freeze:h是可共享的,因为Symbol和Class是可共享的,并且Hash是冻结的。 -
类/模块对象始终是可共享的,即使它们指向不可共享对象。
-
特殊可共享对象
-
Ractor对象本身是可共享的。 -
还有更多…
-
使用 Ractor::Port 进行 Ractor 间的通信
Ractor 通过交换消息来进行相互通信和同步执行。 Ractor::Port 类提供了这种通信机制。
port = Ractor::Port.new Ractor.new port do |port| # Other ractors can send to the port port << 42 end port.receive # get a message from the port. Only the ractor that created the Port can receive from it. #=> 42
所有 Ractor 都有一个默认端口,Ractor#send、Ractor.receive(等)将使用该端口。
发送对象时的复制和移动语义
要将不可共享对象发送到另一个 Ractor,对象会被复制或移动。
-
复制:将对象深度复制到另一个 Ractor。所有不可共享对象都将被
Kernel#clone。 -
移动:将所有权转移到另一个 Ractor。
-
发送 Ractor 在移动对象后无法访问该移动的对象。
-
保证一次只有一个 Ractor 可以访问一个不可共享对象。
线程安全
Ractor 有助于编写线程安全、并发的程序。它们仅允许通过显式消息传递来共享不可共享对象。可共享对象被保证能在跨 Ractor 正确工作,即使 Ractor 正在并行运行。然而,此保证仅适用于 Ractor 之间。如果您在一个 Ractor 中使用多个 Ruby Thread,您仍然需要在 Ractor 内部使用 Mutex 和其他线程安全工具。
-
大多数对象是不可共享的。由于无法在 Ractor 之间使用这些对象,因此您无法创建跨 Ractor 的数据竞争。
-
可共享对象受到锁(或不需要)的保护,因此可以被多个 Ractor 同时使用。
创建和终止
Ractor.new
-
Ractor.new { expr }创建一个Ractor。
# Ractor.new with a block creates a new Ractor r = Ractor.new do # This block can run in parallel with other ractors end # You can name a Ractor with a `name:` argument. r = Ractor.new name: 'my-first-ractor' do end r.name #=> 'my-first-ractor'
块隔离
Ractor 在给定的块中执行 expr。给定的块将与其外部作用域隔离。为防止 Ractor 之间共享对象,外部变量、self 和其他信息将与块隔离。
这种隔离发生在 Ractor 创建时(调用 Ractor.new 时)。如果由于外部变量或 self 无法隔离给定的块,将引发错误。
begin a = true r = Ractor.new do a #=> ArgumentError because this block accesses outer variable `a`. end r.join # wait for ractor to finish rescue ArgumentError end
-
给定块的
self是Ractor对象本身。
r = Ractor.new do p self.class #=> Ractor self.object_id end r.value == self.object_id #=> false
传递给 Ractor.new() 的参数成为给定块的块参数。但是,Ruby 不会传递对象本身,而是将它们作为消息发送(有关详细信息,请参见下文)。
r = Ractor.new 'ok' do |msg| msg #=> 'ok' end r.value #=> 'ok'
# similar to the last example r = Ractor.new do msg = Ractor.receive msg end r.send 'ok' r.value #=> 'ok'
给定块的执行结果
给定块的返回值成为传出消息(有关详细信息,请参见下文)。
r = Ractor.new do 'ok' end r.value #=> `ok`
给定块中的错误将传播到传出消息的消费者。
r = Ractor.new do raise 'ok' # exception will be transferred to the consumer end begin r.value rescue Ractor::RemoteError => e e.cause.class #=> RuntimeError e.cause.message #=> 'ok' e.ractor #=> r end
Ractor 之间的通信
Ractor 之间的通信通过发送和接收消息来实现。有两种通信方式:
-
(1)通过
Ractor::Port发送和接收消息 -
(2)使用可共享容器对象。例如,Ractor::TVar gem(ko1/ractor-tvar)
用户可以通过 (1) 控制程序执行时序,但应避免通过 (2) 控制(仅执行关键部分)。
对于发送和接收消息,以下是基本 API:
-
通过
Ractor::Port进行发送/接收。-
Ractor::Port#send(obj)(Ractor::Port#<<(obj)是别名)将消息发送到端口。端口连接到一个无限大小的入队,因此发送永远不会阻塞调用者。 -
Ractor::Port#receive从其自己的入队中出队一个消息。如果入队为空,Ractor::Port#receive将阻塞当前Thread的执行,直到发送消息。 -
Ractor#send和Ractor.receive在内部使用端口(它们的默认端口),因此在概念上与上述类似。
-
-
您可以通过
Ractor::Port#close关闭Ractor::Port。端口只能由创建它的 Ractor 关闭。-
如果端口已关闭,您将无法向其发送消息。这样做会引发异常。
-
当 Ractor 终止时,该 Ractor 的端口会自动关闭。
-
-
您可以使用
Ractor#value等待 Ractor 终止并接收其返回值。这与Thread#value类似。
有 3 种方法可以将对象作为消息发送:
1) 发送引用:发送可共享对象仅发送对象的引用(速度快)。
2) 复制对象:通过深度复制来发送不可共享对象(可能很慢)。请注意,您不能通过此方式发送不支持深度复制的对象。某些 T_DATA 对象(类在 C 扩展中定义的,例如 StringIO)不受支持。
3) 移动对象:通过所有权转移将不可共享对象发送到另一个 Ractor。发送 Ractor 在移动对象后无法访问该移动的对象,否则将引发异常。实现说明:T_DATA 对象不受支持。
您可以通过 move: 关键字在“复制”和“移动”之间进行选择,Ractor#send(obj, move: true/false)。默认值为 false(“复制”)。但是,如果对象是可共享的,它将自动使用 move。
使用 Ractor.select 等待多个 Ractor
您可以一次等待多个端口上的消息。 Ractor.select() 的返回值是 [port, msg],其中 port 是就绪的端口,msg 是接收到的消息。
为了方便起见,Ractor.select 也可以接受 Ractor。在这种情况下,它会等待它们的终止。 Ractor.select() 的返回值是 [r, msg],其中 r 是已终止的 Ractor,msg 是 Ractor 块的值。
等待单个 Ractor(与 Ractor#value 相同)
r1 = Ractor.new{'r1'} r, obj = Ractor.select(r1) r == r1 and obj == 'r1' #=> true
等待两个 Ractor
r1 = Ractor.new{'r1'} r2 = Ractor.new{'r2'} rs = [r1, r2] values = [] while rs.any? r, obj = Ractor.select(*rs) rs.delete(r) values << obj end values.sort == ['r1', 'r2'] #=> true
注意:在大量 Ractor 上使用 Ractor.select() 与当前的 select(2) 有相同的问题。
关闭端口
-
Ractor::Port#close关闭端口(类似于Queue#close)。 -
当端口关闭时,
port.send(obj)会引发异常。 -
当与端口连接的队列为空且端口已关闭时,
Ractor::Port#receive会引发异常。如果队列不为空,它会出队一个对象而不引发异常。 -
当
Ractor终止时,端口会自动关闭。
示例(尝试从已关闭的 Ractor 获取结果)
r = Ractor.new do 'finish' end r.join # success (wait for the termination) r.value # success (will return 'finish') # The ractor's termination value has already been given to another ractor Ractor.new r do |r| r.value #=> Ractor::Error end.join
示例(尝试发送到已关闭的端口)
r = Ractor.new do end r.join # wait for termination, closes default port begin r.send(1) rescue Ractor::ClosedError 'ok' end
通过复制发送消息
Ractor::Port#send(obj) 会深度复制 obj(如果 obj 是不可共享对象)。
obj = 'str'.dup r = Ractor.new obj do |msg| # return received msg's object_id msg.object_id end obj.object_id == r.value #=> false
某些对象不支持复制,会引发异常。
obj = Thread.new{} begin Ractor.new obj do |msg| msg end rescue TypeError => e e.message #=> #<TypeError: allocator undefined for Thread> end
通过移动发送消息
Ractor::Port#send(obj, move: true) 将 obj 移动到目标 Ractor。如果源 Ractor 使用了移动的对象(例如,调用了 obj.foo() 这样的方法),它将引发错误。
r = Ractor.new do obj = Ractor.receive obj << ' world' end str = 'hello'.dup r.send str, move: true # str is now moved, and accessing str from this ractor is prohibited modified = r.value #=> 'hello world' begin # Error because it uses moved str. str << ' exception' # raise Ractor::MovedError rescue Ractor::MovedError modified #=> 'hello world' end
某些对象不支持移动,会引发异常。
r = Ractor.new do Ractor.receive end r.send(Thread.new{}, move: true) #=> allocator undefined for Thread (TypeError)
一旦对象被移动,源对象的类将变为 Ractor::MovedObject。
可共享对象
以下是可共享对象的非详尽列表:
为了使对象可共享,提供了 Ractor.make_shareable(obj)。它通过冻结 obj 并递归遍历其引用以全部冻结来尝试使对象可共享。此方法接受 copy: 关键字(默认值为 false)。Ractor.make_shareable(obj, copy: true) 尝试深度复制 obj 并使复制的对象可共享。Ractor.make_shareable(copy: false) 对已可共享的对象没有影响。如果对象无法变得可共享,将引发 Ractor::Error 异常。
限制 Ractor 之间共享的语言更改
为了在 Ractor 之间隔离不可共享对象,我们为多 Ractor 的 Ruby 程序引入了额外的语言语义。
请注意,在使用 Ractor 时,不需要这些额外的语义(与 Ruby 2 兼容 100%)。
全局变量
只有主 Ractor 可以访问全局变量。
$gv = 1 r = Ractor.new do $gv end begin r.join rescue Ractor::RemoteError => e e.cause.message #=> 'can not access global variables from non-main Ractors' end
请注意,一些特殊的全局变量,如 $stdin、$stdout 和 $stderr 是每个 Ractor 本地化的。有关更多详细信息,请参阅 [Bug #17268]。
可共享对象的实例变量
仅当类/模块的实例变量的值是可共享对象时,非主 Ractor 才能访问它们。
class C @iv = 1 end p Ractor.new do class C @iv end end.value #=> 1
否则,只有主 Ractor 可以访问可共享对象的实例变量。
class C @iv = [] # unshareable object end Ractor.new do class C begin p @iv rescue Ractor::IsolationError p $!.message #=> "can not get unshareable values from instance variables of classes/modules from non-main Ractors" end begin @iv = 42 rescue Ractor::IsolationError p $!.message #=> "can not set instance variables of classes/modules by non-main Ractors" end end end.join
shared = Ractor.new{} shared.instance_variable_set(:@iv, 'str') r = Ractor.new shared do |shared| p shared.instance_variable_get(:@iv) end begin r.join rescue Ractor::RemoteError => e e.cause.message #=> can not access instance variables of shareable objects from non-main Ractors (Ractor::IsolationError) end
类变量
只有主 Ractor 可以访问类变量。
class C @@cv = 'str' end r = Ractor.new do class C p @@cv end end begin r.join rescue => e e.class #=> Ractor::IsolationError end
Constants
只有主 Ractor 可以读取引用不可共享对象的常量。
class C CONST = 'str'.dup end r = Ractor.new do C::CONST end begin r.join rescue => e e.class #=> Ractor::IsolationError end
只有主 Ractor 可以定义引用不可共享对象的常量。
class C end r = Ractor.new do C::CONST = 'str'.dup end begin r.join rescue => e e.class #=> Ractor::IsolationError end
在创建/更新库以支持 Ractor 时,如果常量要被非主 Ractor 使用,它们应该仅引用可共享对象。
TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'}
在这种情况下,TABLE 指向一个不可共享的 Hash 对象。为了让其他 Ractor 使用 TABLE,我们需要使其可共享。我们可以像这样使用 Ractor.make_shareable():
TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )
为了方便起见,Ruby 3.0 引入了一个新的 shareable_constant_value 文件指令。
# shareable_constant_value: literal TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'} #=> Same as: TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )
shareable_constant_value 指令接受以下模式(描述使用示例:CONST = expr):
-
none:什么都不做。等同于:
CONST = expr -
literal
-
如果
expr由字面量组成,则替换为CONST = Ractor.make_shareable(expr)。 -
否则:替换为
CONST = expr.tap{|o| raise unless Ractor.shareable?(o)}。 -
experimental_everything:替换为
CONST = Ractor.make_shareable(expr)。 -
experimental_copy:替换为
CONST = Ractor.make_shareable(expr, copy: true)。
除了 none 模式(默认)外,这些常量保证仅引用可共享对象。
有关更多详细信息,请参阅 syntax/comments.rdoc。
可共享的 proc
Proc 和 lambda 是不可共享对象,即使它们被冻结。要创建一个不可共享的 Proc,您必须使用 Ractor.shareable_proc { expr }。与 Ractor 创建时类似,proc 的块与其外部环境隔离,因此它无法访问外部作用域中的变量。self 在 Proc 中默认为 nil,但如果想将其自定义为其他可共享对象,可以提供 self: 关键字。
p = Ractor.shareable_proc { p self } p.call #=> nil
begin a = 1 pr = Ractor.shareable_proc { p a } pr.call # never gets here rescue Ractor::IsolationError end
为了使用 Module#define_method 动态定义一个可从不同 Ractor 使用的方法,您必须使用共享的 proc 来定义它。或者,您可以使用 Module#class_eval 或 Module#module_eval 配合 String。尽管共享 proc 的 self 最初绑定到 nil,但 define_method 会在方法中将 self 绑定到正确的值。
class A define_method :testing, &Ractor.shareable_proc do p self end end Ractor.new do a = A.new a.testing #=> #<A:0x0000000101acfe10> end.join
必须进行此隔离,以防止方法访问和分配捕获的外部变量到跨 Ractor。
Ractor 本地存储
您可以将任何对象(包括不可共享对象)存储在 Ractor 本地存储中。
r = Ractor.new do values = [] Ractor[:threads] = [] 3.times do |i| Ractor[:threads] << Thread.new do values << [Ractor.receive, i+1] # Ractor.receive blocks the current thread in the current ractor until it receives a message end end Ractor[:threads].each(&:join) values end r << 1 r << 2 r << 3 r.value #=> [[1,1],[2,2],[3,3]] (the order can change with each run)
示例
Actor 模型中的传统环形示例
RN = 1_000 CR = Ractor.current r = Ractor.new do p Ractor.receive CR << :fin end RN.times{ r = Ractor.new r do |next_r| next_r << Ractor.receive end } p :setup_ok r << 1 p Ractor.receive
Fork-join
def fib n if n < 2 1 else fib(n-2) + fib(n-1) end end RN = 10 rs = (1..RN).map do |i| Ractor.new i do |i| [i, fib(i)] end end until rs.empty? r, v = Ractor.select(*rs) rs.delete r p answer: v end
Worker 池
(1)一个 Ractor 拥有一个池
require 'prime' N = 1000 RN = 10 # make RN workers workers = (1..RN).map do Ractor.new do |; result_port| loop do n, result_port = Ractor.receive result_port << [n, n.prime?, Ractor.current] end end end result_port = Ractor::Port.new results = [] (1..N).each do |i| if workers.empty? # receive a result n, result, w = result_port.receive results << [n, result] else w = workers.pop end # send a task to the idle worker ractor w << [i, result_port] end # receive a result while results.size != N n, result, _w = result_port.receive results << [n, result] end pp results.sort_by{|n, result| n}
流水线
# pipeline with send/receive r3 = Ractor.new Ractor.current do |cr| cr.send Ractor.receive + 'r3' end r2 = Ractor.new r3 do |r3| r3.send Ractor.receive + 'r2' end r1 = Ractor.new r2 do |r2| r2.send Ractor.receive + 'r1' end r1 << 'r0' p Ractor.receive #=> "r0r1r2r3"
监控
# ring example again r = Ractor.current (1..10).map{|i| r = Ractor.new r, i do |r, i| r.send Ractor.receive + "r#{i}" end } r.send "r0" p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
# ring example with an error r = Ractor.current rs = (1..10).map{|i| r = Ractor.new r, i do |r, i| loop do msg = Ractor.receive raise if /e/ =~ msg r.send msg + "r#{i}" end end } r.send "r0" p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1" r.send "r0" p Ractor.select(*rs, Ractor.current) #=> [:receive, "r0r10r9r8r7r6r5r4r3r2r1"] r.send "e0" p Ractor.select(*rs, Ractor.current) #=> # <Thread:0x000056262de28bd8 run> terminated with exception (report_on_exception is true): # Traceback (most recent call last): # 2: from /home/ko1/src/ruby/trunk/test.rb:7:in `block (2 levels) in <main>' # 1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop' # /home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception # Traceback (most recent call last): # 2: from /home/ko1/src/ruby/trunk/test.rb:7:in `block (2 levels) in <main>' # 1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop' # /home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception # 1: from /home/ko1/src/ruby/trunk/test.rb:21:in `<main>' # <internal:ractor>:69:in `select': thrown by remote Ractor. (Ractor::RemoteError)
# resend non-error message r = Ractor.current rs = (1..10).map{|i| r = Ractor.new r, i do |r, i| loop do msg = Ractor.receive raise if /e/ =~ msg r.send msg + "r#{i}" end end } r.send "r0" p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1" r.send "r0" p Ractor.select(*rs, Ractor.current) [:receive, "r0r10r9r8r7r6r5r4r3r2r1"] msg = 'e0' begin r.send msg p Ractor.select(*rs, Ractor.current) rescue Ractor::RemoteError msg = 'r0' retry end #=> <internal:ractor>:100:in `send': The incoming-port is already closed (Ractor::ClosedError) # because r == r[-1] is terminated.
# ring example with supervisor and re-start def make_ractor r, i Ractor.new r, i do |r, i| loop do msg = Ractor.receive raise if /e/ =~ msg r.send msg + "r#{i}" end end end r = Ractor.current rs = (1..10).map{|i| r = make_ractor(r, i) } msg = 'e0' # error causing message begin r.send msg p Ractor.select(*rs, Ractor.current) rescue Ractor::RemoteError r = rs[-1] = make_ractor(rs[-2], rs.size-1) msg = 'x0' retry end #=> [:receive, "x0r9r9r8r7r6r5r4r3r2r1"]