Ractor - Ruby 的类似 Actor 的并发抽象

Ractor 设计用于在没有线程安全顾虑的情况下并行执行 Ruby 代码。

摘要

Ruby 进程中的多个 Ractor

您可以创建多个 Ractor,它们可以相互并行运行 Ruby 代码。

Ractor 之间的有限共享

Ractor 不像线程那样共享所有对象(线程可以访问任何对象,除了存储在另一个线程线程局部变量中的对象)。

使用 Ractor::Port 进行 Ractor 间的通信

Ractor 通过交换消息来进行相互通信和同步执行。 Ractor::Port 类提供了这种通信机制。

port = Ractor::Port.new

Ractor.new port do |port|
  # Other ractors can send to the port
  port << 42
end

port.receive # get a message from the port. Only the ractor that created the Port can receive from it.
#=> 42

所有 Ractor 都有一个默认端口,Ractor#sendRactor.receive(等)将使用该端口。

发送对象时的复制和移动语义

要将不可共享对象发送到另一个 Ractor,对象会被复制或移动。

线程安全

Ractor 有助于编写线程安全、并发的程序。它们仅允许通过显式消息传递来共享不可共享对象。可共享对象被保证能在跨 Ractor 正确工作,即使 Ractor 正在并行运行。然而,此保证仅适用于 Ractor 之间。如果您在一个 Ractor 中使用多个 Ruby Thread,您仍然需要在 Ractor 内部使用 Mutex 和其他线程安全工具。

创建和终止

Ractor.new

# Ractor.new with a block creates a new Ractor
r = Ractor.new do
  # This block can run in parallel with other ractors
end

# You can name a Ractor with a `name:` argument.
r = Ractor.new name: 'my-first-ractor' do
end

r.name #=> 'my-first-ractor'

块隔离

Ractor 在给定的块中执行 expr。给定的块将与其外部作用域隔离。为防止 Ractor 之间共享对象,外部变量、self 和其他信息将与块隔离。

这种隔离发生在 Ractor 创建时(调用 Ractor.new 时)。如果由于外部变量或 self 无法隔离给定的块,将引发错误。

begin
  a = true
  r = Ractor.new do
    a #=> ArgumentError because this block accesses outer variable `a`.
  end
  r.join # wait for ractor to finish
rescue ArgumentError
end
r = Ractor.new do
  p self.class #=> Ractor
  self.object_id
end
r.value == self.object_id #=> false

传递给 Ractor.new() 的参数成为给定块的块参数。但是,Ruby 不会传递对象本身,而是将它们作为消息发送(有关详细信息,请参见下文)。

r = Ractor.new 'ok' do |msg|
  msg #=> 'ok'
end
r.value #=> 'ok'
# similar to the last example
r = Ractor.new do
  msg = Ractor.receive
  msg
end
r.send 'ok'
r.value #=> 'ok'

给定块的执行结果

给定块的返回值成为传出消息(有关详细信息,请参见下文)。

r = Ractor.new do
  'ok'
end
r.value #=> `ok`

给定块中的错误将传播到传出消息的消费者。

r = Ractor.new do
  raise 'ok' # exception will be transferred to the consumer
end

begin
  r.value
rescue Ractor::RemoteError => e
  e.cause.class   #=> RuntimeError
  e.cause.message #=> 'ok'
  e.ractor        #=> r
end

Ractor 之间的通信

Ractor 之间的通信通过发送和接收消息来实现。有两种通信方式:

用户可以通过 (1) 控制程序执行时序,但应避免通过 (2) 控制(仅执行关键部分)。

对于发送和接收消息,以下是基本 API:

有 3 种方法可以将对象作为消息发送:

1) 发送引用:发送可共享对象仅发送对象的引用(速度快)。

2) 复制对象:通过深度复制来发送不可共享对象(可能很慢)。请注意,您不能通过此方式发送不支持深度复制的对象。某些 T_DATA 对象(类在 C 扩展中定义的,例如 StringIO)不受支持。

3) 移动对象:通过所有权转移将不可共享对象发送到另一个 Ractor。发送 Ractor 在移动对象后无法访问该移动的对象,否则将引发异常。实现说明:T_DATA 对象不受支持。

您可以通过 move: 关键字在“复制”和“移动”之间进行选择,Ractor#send(obj, move: true/false)。默认值为 false(“复制”)。但是,如果对象是可共享的,它将自动使用 move

使用 Ractor.select 等待多个 Ractor

您可以一次等待多个端口上的消息。 Ractor.select() 的返回值是 [port, msg],其中 port 是就绪的端口,msg 是接收到的消息。

为了方便起见,Ractor.select 也可以接受 Ractor。在这种情况下,它会等待它们的终止。 Ractor.select() 的返回值是 [r, msg],其中 r 是已终止的 Ractormsg 是 Ractor 块的值。

等待单个 Ractor(与 Ractor#value 相同)

r1 = Ractor.new{'r1'}

r, obj = Ractor.select(r1)
r == r1 and obj == 'r1' #=> true

等待两个 Ractor

r1 = Ractor.new{'r1'}
r2 = Ractor.new{'r2'}
rs = [r1, r2]
values = []

while rs.any?
  r, obj = Ractor.select(*rs)
  rs.delete(r)
  values << obj
end

values.sort == ['r1', 'r2'] #=> true

注意:在大量 Ractor 上使用 Ractor.select() 与当前的 select(2) 有相同的问题。

关闭端口

示例(尝试从已关闭的 Ractor 获取结果)

r = Ractor.new do
  'finish'
end
r.join # success (wait for the termination)
r.value # success (will return 'finish')

# The ractor's termination value has already been given to another ractor
Ractor.new r do |r|
  r.value #=> Ractor::Error
end.join

示例(尝试发送到已关闭的端口)

r = Ractor.new do
end

r.join # wait for termination, closes default port

begin
  r.send(1)
rescue Ractor::ClosedError
  'ok'
end

通过复制发送消息

Ractor::Port#send(obj) 会深度复制 obj(如果 obj 是不可共享对象)。

obj = 'str'.dup
r = Ractor.new obj do |msg|
  # return received msg's object_id
  msg.object_id
end

obj.object_id == r.value #=> false

某些对象不支持复制,会引发异常。

obj = Thread.new{}
begin
  Ractor.new obj do |msg|
    msg
  end
rescue TypeError => e
  e.message #=> #<TypeError: allocator undefined for Thread>
end

通过移动发送消息

Ractor::Port#send(obj, move: true)obj 移动到目标 Ractor。如果源 Ractor 使用了移动的对象(例如,调用了 obj.foo() 这样的方法),它将引发错误。

r = Ractor.new do
  obj = Ractor.receive
  obj << ' world'
end

str = 'hello'.dup
r.send str, move: true
# str is now moved, and accessing str from this ractor is prohibited
modified = r.value #=> 'hello world'


begin
  # Error because it uses moved str.
  str << ' exception' # raise Ractor::MovedError
rescue Ractor::MovedError
  modified #=> 'hello world'
end

某些对象不支持移动,会引发异常。

r = Ractor.new do
  Ractor.receive
end

r.send(Thread.new{}, move: true) #=> allocator undefined for Thread (TypeError)

一旦对象被移动,源对象的类将变为 Ractor::MovedObject

可共享对象

以下是可共享对象的非详尽列表:

为了使对象可共享,提供了 Ractor.make_shareable(obj)。它通过冻结 obj 并递归遍历其引用以全部冻结来尝试使对象可共享。此方法接受 copy: 关键字(默认值为 false)。Ractor.make_shareable(obj, copy: true) 尝试深度复制 obj 并使复制的对象可共享。Ractor.make_shareable(copy: false) 对已可共享的对象没有影响。如果对象无法变得可共享,将引发 Ractor::Error 异常。

限制 Ractor 之间共享的语言更改

为了在 Ractor 之间隔离不可共享对象,我们为多 Ractor 的 Ruby 程序引入了额外的语言语义。

请注意,在使用 Ractor 时,不需要这些额外的语义(与 Ruby 2 兼容 100%)。

全局变量

只有主 Ractor 可以访问全局变量。

$gv = 1
r = Ractor.new do
  $gv
end

begin
  r.join
rescue Ractor::RemoteError => e
  e.cause.message #=> 'can not access global variables from non-main Ractors'
end

请注意,一些特殊的全局变量,如 $stdin$stdout$stderr 是每个 Ractor 本地化的。有关更多详细信息,请参阅 [Bug #17268]

可共享对象的实例变量

仅当类/模块的实例变量的值是可共享对象时,非主 Ractor 才能访问它们。

class C
  @iv = 1
end

p Ractor.new do
  class C
     @iv
  end
end.value #=> 1

否则,只有主 Ractor 可以访问可共享对象的实例变量。

class C
  @iv = [] # unshareable object
end

Ractor.new do
  class C
    begin
      p @iv
    rescue Ractor::IsolationError
      p $!.message
      #=> "can not get unshareable values from instance variables of classes/modules from non-main Ractors"
    end

    begin
      @iv = 42
    rescue Ractor::IsolationError
      p $!.message
      #=> "can not set instance variables of classes/modules by non-main Ractors"
    end
  end
end.join
shared = Ractor.new{}
shared.instance_variable_set(:@iv, 'str')

r = Ractor.new shared do |shared|
  p shared.instance_variable_get(:@iv)
end

begin
  r.join
rescue Ractor::RemoteError => e
  e.cause.message #=> can not access instance variables of shareable objects from non-main Ractors (Ractor::IsolationError)
end

类变量

只有主 Ractor 可以访问类变量。

class C
  @@cv = 'str'
end

r = Ractor.new do
  class C
    p @@cv
  end
end


begin
  r.join
rescue => e
  e.class #=> Ractor::IsolationError
end

Constants

只有主 Ractor 可以读取引用不可共享对象的常量。

class C
  CONST = 'str'.dup
end
r = Ractor.new do
  C::CONST
end
begin
  r.join
rescue => e
  e.class #=> Ractor::IsolationError
end

只有主 Ractor 可以定义引用不可共享对象的常量。

class C
end
r = Ractor.new do
  C::CONST = 'str'.dup
end
begin
  r.join
rescue => e
  e.class #=> Ractor::IsolationError
end

在创建/更新库以支持 Ractor 时,如果常量要被非主 Ractor 使用,它们应该仅引用可共享对象。

TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'}

在这种情况下,TABLE 指向一个不可共享的 Hash 对象。为了让其他 Ractor 使用 TABLE,我们需要使其可共享。我们可以像这样使用 Ractor.make_shareable()

TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )

为了方便起见,Ruby 3.0 引入了一个新的 shareable_constant_value 文件指令。

# shareable_constant_value: literal

TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'}
#=> Same as: TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )

shareable_constant_value 指令接受以下模式(描述使用示例:CONST = expr):

除了 none 模式(默认)外,这些常量保证仅引用可共享对象。

有关更多详细信息,请参阅 syntax/comments.rdoc

可共享的 proc

Proc 和 lambda 是不可共享对象,即使它们被冻结。要创建一个不可共享的 Proc,您必须使用 Ractor.shareable_proc { expr }。与 Ractor 创建时类似,proc 的块与其外部环境隔离,因此它无法访问外部作用域中的变量。selfProc 中默认为 nil,但如果想将其自定义为其他可共享对象,可以提供 self: 关键字。

p = Ractor.shareable_proc { p self }
p.call #=> nil
begin
  a = 1
  pr = Ractor.shareable_proc { p a }
  pr.call # never gets here
rescue Ractor::IsolationError
end

为了使用 Module#define_method 动态定义一个可从不同 Ractor 使用的方法,您必须使用共享的 proc 来定义它。或者,您可以使用 Module#class_evalModule#module_eval 配合 String。尽管共享 proc 的 self 最初绑定到 nil,但 define_method 会在方法中将 self 绑定到正确的值。

class A
  define_method :testing, &Ractor.shareable_proc do
    p self
  end
end
Ractor.new do
  a = A.new
  a.testing #=> #<A:0x0000000101acfe10>
end.join

必须进行此隔离,以防止方法访问和分配捕获的外部变量到跨 Ractor。

Ractor 本地存储

您可以将任何对象(包括不可共享对象)存储在 Ractor 本地存储中。

r = Ractor.new do
  values = []
  Ractor[:threads] = []
  3.times do |i|
    Ractor[:threads] << Thread.new do
      values << [Ractor.receive, i+1] # Ractor.receive blocks the current thread in the current ractor until it receives a message
    end
  end
  Ractor[:threads].each(&:join)
  values
end

r << 1
r << 2
r << 3
r.value #=> [[1,1],[2,2],[3,3]] (the order can change with each run)

示例

Actor 模型中的传统环形示例

RN = 1_000
CR = Ractor.current

r = Ractor.new do
  p Ractor.receive
  CR << :fin
end

RN.times{
  r = Ractor.new r do |next_r|
    next_r << Ractor.receive
  end
}

p :setup_ok
r << 1
p Ractor.receive

Fork-join

def fib n
  if n < 2
    1
  else
    fib(n-2) + fib(n-1)
  end
end

RN = 10
rs = (1..RN).map do |i|
  Ractor.new i do |i|
    [i, fib(i)]
  end
end

until rs.empty?
  r, v = Ractor.select(*rs)
  rs.delete r
  p answer: v
end

Worker 池

(1)一个 Ractor 拥有一个池

require 'prime'

N = 1000
RN = 10

# make RN workers
workers = (1..RN).map do
  Ractor.new do |; result_port|
    loop do
      n, result_port = Ractor.receive
      result_port << [n, n.prime?, Ractor.current]
    end
  end
end

result_port = Ractor::Port.new
results = []

(1..N).each do |i|
  if workers.empty?
    # receive a result
    n, result, w = result_port.receive
    results << [n, result]
  else
    w = workers.pop
  end

  # send a task to the idle worker ractor
  w << [i, result_port]
end

# receive a result
while results.size != N
  n, result, _w = result_port.receive
  results << [n, result]
end

pp results.sort_by{|n, result| n}

流水线

# pipeline with send/receive

r3 = Ractor.new Ractor.current do |cr|
  cr.send Ractor.receive + 'r3'
end

r2 = Ractor.new r3 do |r3|
  r3.send Ractor.receive + 'r2'
end

r1 = Ractor.new r2 do |r2|
  r2.send Ractor.receive + 'r1'
end

r1 << 'r0'
p Ractor.receive #=> "r0r1r2r3"

监控

# ring example again

r = Ractor.current
(1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    r.send Ractor.receive + "r#{i}"
  end
}

r.send "r0"
p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
# ring example with an error

r = Ractor.current
rs = (1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
}

r.send "r0"
p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
r.send "r0"
p Ractor.select(*rs, Ractor.current) #=> [:receive, "r0r10r9r8r7r6r5r4r3r2r1"]
r.send "e0"
p Ractor.select(*rs, Ractor.current)
#=>
# <Thread:0x000056262de28bd8 run> terminated with exception (report_on_exception is true):
# Traceback (most recent call last):
#         2: from /home/ko1/src/ruby/trunk/test.rb:7:in `block (2 levels) in <main>'
#         1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop'
# /home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception
# Traceback (most recent call last):
#         2: from /home/ko1/src/ruby/trunk/test.rb:7:in `block (2 levels) in <main>'
#         1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop'
# /home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception
#         1: from /home/ko1/src/ruby/trunk/test.rb:21:in `<main>'
# <internal:ractor>:69:in `select': thrown by remote Ractor. (Ractor::RemoteError)
# resend non-error message

r = Ractor.current
rs = (1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
}

r.send "r0"
p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
r.send "r0"
p Ractor.select(*rs, Ractor.current)
[:receive, "r0r10r9r8r7r6r5r4r3r2r1"]
msg = 'e0'
begin
  r.send msg
  p Ractor.select(*rs, Ractor.current)
rescue Ractor::RemoteError
  msg = 'r0'
  retry
end

#=> <internal:ractor>:100:in `send': The incoming-port is already closed (Ractor::ClosedError)
# because r == r[-1] is terminated.
# ring example with supervisor and re-start

def make_ractor r, i
  Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
end

r = Ractor.current
rs = (1..10).map{|i|
  r = make_ractor(r, i)
}

msg = 'e0' # error causing message
begin
  r.send msg
  p Ractor.select(*rs, Ractor.current)
rescue Ractor::RemoteError
  r = rs[-1] = make_ractor(rs[-2], rs.size-1)
  msg = 'x0'
  retry
end

#=> [:receive, "x0r9r9r8r7r6r5r4r3r2r1"]