class Fiber
Fiber 是 Ruby 中实现轻量级协作并发的原始对象。基本上,它们是一种创建可以暂停和恢复的代码块的方式,非常类似于线程。主要区别在于它们永远不会被抢占,并且调度必须由程序员而不是虚拟机来完成。
与其他无堆栈的轻量级并发模型不同,每个 fiber 都自带一个堆栈。这使得 fiber 可以从 fiber 块内深层嵌套的函数调用中暂停。请参阅 ruby(1) 手册页以配置 fiber 堆栈的大小。
当一个 fiber 被创建时,它不会自动运行。相反,必须通过 Fiber#resume 方法显式要求它运行。运行在 fiber 内的代码可以通过调用 Fiber.yield 来放弃控制,在这种情况下,它会将控制权交还给调用者(Fiber#resume 的调用者)。
在 yield 或终止时,Fiber 会返回最后执行的表达式的值。
例如
fiber = Fiber.new do Fiber.yield 1 2 end puts fiber.resume puts fiber.resume puts fiber.resume
产生
1 2 FiberError: dead fiber called
Fiber#resume 方法接受任意数量的参数,如果这是第一次调用 resume,那么它们将被作为块参数传递。否则,它们将是调用 Fiber.yield 的返回值。
示例
fiber = Fiber.new do |first| second = Fiber.yield first + 2 end puts fiber.resume 10 puts fiber.resume 1_000_000 puts fiber.resume "The fiber will be dead before I can cause trouble"
产生
12 1000000 FiberError: dead fiber called
非阻塞 Fiber
非阻塞 fiber 的概念在 Ruby 3.0 中引入。一个非阻塞 fiber,在遇到一个通常会阻塞该 fiber 的操作(如 sleep,或等待另一个进程或 I/O)时,它会放弃控制权给其他 fiber,并允许调度程序处理阻塞和唤醒(恢复)该 fiber,当它可以继续时。
为了使 Fiber 表现为非阻塞,它需要在 Fiber.new 中创建,并设置 blocking: false(这是默认值),并且 Fiber.scheduler 应该通过 Fiber.set_scheduler 进行设置。如果在当前线程中未设置 Fiber.scheduler,则阻塞和非阻塞 fiber 的行为是相同的。
Ruby 不提供调度程序类:它期望由用户实现,并对应于 Fiber::Scheduler。
还有一个 Fiber.schedule 方法,它期望以非阻塞的方式立即执行给定的块。其具体实现取决于调度程序。
Public Class Methods
Source
static VALUE
rb_fiber_storage_aref(VALUE class, VALUE key)
{
key = rb_to_symbol(key);
VALUE storage = fiber_storage_get(fiber_current(), FALSE);
if (storage == Qnil) return Qnil;
return rb_hash_aref(storage, key);
}
Source
static VALUE
rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value)
{
key = rb_to_symbol(key);
VALUE storage = fiber_storage_get(fiber_current(), value != Qnil);
if (storage == Qnil) return Qnil;
if (value == Qnil) {
return rb_hash_delete(storage, key);
}
else {
return rb_hash_aset(storage, key, value);
}
}
Source
VALUE
rb_fiber_blocking(VALUE class)
{
VALUE fiber_value = rb_fiber_current();
rb_fiber_t *fiber = fiber_ptr(fiber_value);
// If we are already blocking, this is essentially a no-op:
if (fiber->blocking) {
return rb_yield(fiber_value);
}
else {
return rb_ensure(fiber_blocking_yield, fiber_value, fiber_blocking_ensure, fiber_value);
}
}
在块的持续时间内强制 fiber 处于阻塞状态。返回块的结果。
有关详细信息,请参阅类文档中的“非阻塞 Fiber”部分。
Source
static VALUE
rb_fiber_s_blocking_p(VALUE klass)
{
rb_thread_t *thread = GET_THREAD();
unsigned blocking = thread->blocking;
if (blocking == 0)
return Qfalse;
return INT2NUM(blocking);
}
如果当前 fiber 是非阻塞的,则返回 false。如果 fiber 是通过将 blocking: false 传递给 Fiber.new,或通过 Fiber.schedule 创建的,则 Fiber 是非阻塞的。
如果当前 Fiber 是阻塞的,则该方法返回 1。未来的发展可能会允许返回更大的整数。
请注意,即使该方法返回 false,只有在当前线程中设置了 Fiber.scheduler 时,Fiber 的行为才会不同。
有关详细信息,请参阅类文档中的“非阻塞 Fiber”部分。
Source
static VALUE
rb_fiber_s_current(VALUE klass)
{
return rb_fiber_current();
}
返回当前 fiber。如果您不在 fiber 的上下文中运行,此方法将返回根 fiber。
Source
static VALUE
rb_fiber_current_scheduler(VALUE klass)
{
return rb_fiber_scheduler_current();
}
返回通过 Fiber.set_scheduler 最后为当前线程设置的 Fiber 调度程序,当且仅当当前 fiber 是非阻塞的时。
Source
static VALUE
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
{
return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p());
}
创建新的 Fiber。最初,fiber 未运行,可以通过 resume 恢复。第一次 resume 调用时传递的参数将被传递给块。
f = Fiber.new do |initial| current = initial loop do puts "current: #{current.inspect}" current = Fiber.yield end end f.resume(100) # prints: current: 100 f.resume(1, 2, 3) # prints: current: [1, 2, 3] f.resume # prints: current: nil # ... and so on ...
如果将 blocking: false 传递给 Fiber.new,并且当前线程定义了 Fiber.scheduler,则 Fiber 将变为非阻塞(请参阅类文档中的“非阻塞 Fiber”部分)。
如果 storage 未指定,则默认会继承当前 fiber 的存储副本。这与指定 storage: true 相同。
Fiber[:x] = 1 Fiber.new do Fiber[:x] # => 1 Fiber[:x] = 2 end.resume Fiber[:x] # => 1
如果给定的 storage 是 nil,此函数将惰性初始化内部存储,该存储最初是一个空哈希。
Fiber[:x] = "Hello World" Fiber.new(storage: nil) do Fiber[:x] # nil end
否则,给定的 storage 将用作新 fiber 的存储,并且它必须是 Hash 的实例。
显式使用 storage: true 目前是实验性的,将来可能会发生变化。
Source
static VALUE
rb_fiber_s_schedule(int argc, VALUE *argv, VALUE obj)
{
return rb_fiber_s_schedule_kw(argc, argv, rb_keyword_given_p());
}
该方法期望以单独的非阻塞 fiber 立即运行提供的代码块。
puts "Go to sleep!" Fiber.set_scheduler(MyScheduler.new) Fiber.schedule do puts "Going to sleep" sleep(1) puts "I slept well" end puts "Wakey-wakey, sleepyhead"
假设 MyScheduler 已正确实现,此程序将生成
Go to sleep! Going to sleep Wakey-wakey, sleepyhead ...1 sec pause here... I slept well
...例如,在 fiber 内的第一个阻塞操作(sleep(1))处,控制权被交给外部代码(主 fiber),并在该执行结束时,调度程序负责正确地恢复所有阻塞的 fiber。
请注意,上述行为是该方法期望的行为,实际行为取决于当前调度程序对 Fiber::Scheduler#fiber 方法的实现。Ruby 不强制该方法以任何特定方式行为。
如果未设置调度程序,则该方法会引发 RuntimeError (No scheduler is available!)。
Source
static VALUE
rb_fiber_s_scheduler(VALUE klass)
{
return rb_fiber_scheduler_get();
}
返回通过 Fiber.set_scheduler 最后为当前线程设置的 Fiber 调度程序。如果未设置调度程序(这是默认值),则返回 nil,并且非阻塞 fiber 的行为与阻塞 fiber 相同。(有关调度程序概念的详细信息,请参阅类文档中的“非阻塞 Fiber”部分)。
Source
static VALUE
rb_fiber_set_scheduler(VALUE klass, VALUE scheduler)
{
return rb_fiber_scheduler_set(scheduler);
}
为当前线程设置 Fiber 调度程序。如果设置了调度程序,则非阻塞 fiber(通过 blocking: false 使用 Fiber.new 创建,或通过 Fiber.schedule 创建)将在潜在的阻塞操作上调用该调度程序的钩子方法,并且当前线程将在最终化时调用调度程序的 close 方法(允许调度程序正确管理所有未完成的 fiber)。
scheduler 可以是任何符合 Fiber::Scheduler 的类的对象。其实现由用户决定。
另请参阅类文档中的“非阻塞 Fiber”部分。
Source
static VALUE
rb_fiber_s_yield(int argc, VALUE *argv, VALUE klass)
{
return rb_fiber_yield_kw(argc, argv, rb_keyword_given_p());
}
将控制权交还给恢复 fiber 的上下文,并传递传递给它的任何参数。当下次调用 resume 时,fiber 将在此处恢复处理。传递给下一个 resume 的任何参数将是此 Fiber.yield 表达式计算的值。
Public Instance Methods
Source
VALUE
rb_fiber_alive_p(VALUE fiber_value)
{
return RBOOL(!FIBER_TERMINATED_P(fiber_ptr(fiber_value)));
}
如果 fiber 仍然可以被恢复(或转移)则返回 true。在 fiber 块执行完成后,此方法将始终返回 false。
Source
static VALUE
rb_fiber_backtrace(int argc, VALUE *argv, VALUE fiber)
{
return rb_vm_backtrace(argc, argv, &fiber_ptr(fiber)->cont.saved_ec);
}
返回 fiber 的当前执行堆栈。start、count 和 end 允许选择 backtrace 的部分。
def level3 Fiber.yield end def level2 level3 end def level1 level2 end f = Fiber.new { level1 } # It is empty before the fiber started f.backtrace #=> [] f.resume f.backtrace #=> ["test.rb:2:in `yield'", "test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"] p f.backtrace(1) # start from the item 1 #=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"] p f.backtrace(2, 2) # start from item 2, take 2 #=> ["test.rb:6:in `level2'", "test.rb:10:in `level1'"] p f.backtrace(1..3) # take items from 1 to 3 #=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'"] f.resume # It is nil after the fiber is finished f.backtrace #=> nil
Source
static VALUE
rb_fiber_backtrace_locations(int argc, VALUE *argv, VALUE fiber)
{
return rb_vm_backtrace_locations(argc, argv, &fiber_ptr(fiber)->cont.saved_ec);
}
类似于 backtrace,但将执行堆栈的每一行返回为 Thread::Backtrace::Location。接受与 backtrace 相同的参数。
f = Fiber.new { Fiber.yield } f.resume loc = f.backtrace_locations.first loc.label #=> "yield" loc.path #=> "test.rb" loc.lineno #=> 1
Source
VALUE
rb_fiber_blocking_p(VALUE fiber)
{
return RBOOL(fiber_ptr(fiber)->blocking);
}
如果 fiber 是阻塞的则返回 true,否则返回 false。如果 fiber 是通过将 blocking: false 传递给 Fiber.new,或通过 Fiber.schedule 创建的,则 Fiber 是非阻塞的。
请注意,即使该方法返回 false,只有在当前线程中设置了 Fiber.scheduler 时,该 fiber 的行为才会不同。
有关详细信息,请参阅类文档中的“非阻塞 Fiber”部分。
Source
static VALUE
rb_fiber_m_kill(VALUE self)
{
rb_fiber_t *fiber = fiber_ptr(self);
if (fiber->killed) return Qfalse;
fiber->killed = 1;
if (fiber->status == FIBER_CREATED) {
fiber->status = FIBER_TERMINATED;
}
else if (fiber->status != FIBER_TERMINATED) {
if (fiber_current() == fiber) {
fiber_check_killed(fiber);
}
else {
fiber_raise(fiber_ptr(self), Qnil);
}
}
return self;
}
通过引发一个不可捕获的异常终止 fiber。它只终止给定的 fiber 而不终止任何其他 fiber,如果另一个 fiber 调用了 resume 或 transfer,则向其返回 nil。
Fiber#kill 仅在 fiber 处于 Fiber.yield 状态时中断另一个 fiber。如果调用当前 fiber,则在该 Fiber#kill 调用点引发该异常。
如果 fiber 尚未开始,则直接过渡到终止状态。
如果 fiber 已经终止,则不执行任何操作。
如果调用的是属于另一个线程的 fiber,则引发 FiberError。
Source
static VALUE
rb_fiber_m_raise(int argc, VALUE *argv, VALUE self)
{
return rb_fiber_raise(self, argc, argv);
}
在上次调用 Fiber.yield 的点在 fiber 中引发异常。
f = Fiber.new { puts "Before the yield" Fiber.yield 1 # -- exception will be raised here puts "After the yield" } p f.resume f.raise "Gotcha"
输出
Before the first yield 1 t.rb:8:in 'Fiber.yield': Gotcha (RuntimeError) from t.rb:8:in 'block in <main>'
如果 fiber 尚未开始或已完成运行,则引发 FiberError。如果 fiber 正在 yield,则它会被恢复。如果它正在 transfer,则它会被 transfer 进入。但如果它正在 resume,则会引发 FiberError。
如果调用的是属于另一个 Thread 的 Fiber,则引发 FiberError。
有关参数的更多信息,请参阅 Kernel#raise。
Source
static VALUE
rb_fiber_m_resume(int argc, VALUE *argv, VALUE fiber)
{
return rb_fiber_resume_kw(fiber, argc, argv, rb_keyword_given_p());
}
从上次调用 Fiber.yield 的点恢复 fiber,或者如果这是第一次调用 resume,则开始运行它。传递给 resume 的参数将是 Fiber.yield 表达式的值,或者如果是第一次 resume,则将作为块参数传递给 fiber 的块。
或者,当调用 resume 时,它会计算为传递给 fiber 块中下一个 Fiber.yield 语句的参数,或者如果它在没有任何 Fiber.yield 的情况下运行完成,则为块值。
Source
static VALUE
rb_fiber_storage_get(VALUE self)
{
storage_access_must_be_from_same_fiber(self);
VALUE storage = fiber_storage_get(fiber_ptr(self), FALSE);
if (storage == Qnil) {
return Qnil;
}
else {
return rb_obj_dup(storage);
}
}
返回 fiber 的存储哈希的副本。此方法只能在 Fiber.current 上调用。
Source
static VALUE
rb_fiber_storage_set(VALUE self, VALUE value)
{
if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) {
rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL,
"Fiber#storage= is experimental and may be removed in the future!");
}
storage_access_must_be_from_same_fiber(self);
fiber_storage_validate(value);
fiber_ptr(self)->cont.saved_ec.storage = rb_obj_dup(value);
return value;
}
设置 fiber 的存储哈希。此功能是实验性的,将来可能会发生变化。此方法只能在 Fiber.current 上调用。
您应该小心使用此方法,因为您可能会无意中清除重要的 fiber 存储状态。您应该主要通过 Fiber::[]= 来为存储分配特定键。
您也可以使用 Fiber.new(storage: nil) 来创建一个带有空存储的 fiber。
示例
while request = request_queue.pop # Reset the per-request state: Fiber.current.storage = nil handle_request(request) end
Source
static VALUE
fiber_to_s(VALUE fiber_value)
{
const rb_fiber_t *fiber = fiber_ptr(fiber_value);
const rb_proc_t *proc;
char status_info[0x20];
if (fiber->resuming_fiber) {
snprintf(status_info, 0x20, " (%s by resuming)", fiber_status_name(fiber->status));
}
else {
snprintf(status_info, 0x20, " (%s)", fiber_status_name(fiber->status));
}
if (!rb_obj_is_proc(fiber->first_proc)) {
VALUE str = rb_any_to_s(fiber_value);
strlcat(status_info, ">", sizeof(status_info));
rb_str_set_len(str, RSTRING_LEN(str)-1);
rb_str_cat_cstr(str, status_info);
return str;
}
GetProcPtr(fiber->first_proc, proc);
return rb_block_to_s(fiber_value, &proc->block, status_info);
}
Source
static VALUE
rb_fiber_m_transfer(int argc, VALUE *argv, VALUE self)
{
return rb_fiber_transfer_kw(self, argc, argv, rb_keyword_given_p());
}
将控制权转移到另一个 fiber,从它上次停止的地方恢复它,或者在它之前未被恢复的情况下启动它。调用 fiber 将被挂起,就像调用 Fiber.yield 一样。
接收 transfer 调用的 fiber 将它视为 resume 调用。传递给 transfer 的参数被视为传递给 resume 的参数。
两种 fiber 之间的控制传递方式(一种是 resume 和 Fiber::yield,另一种是 fiber 之间的 transfer)不能随意混合。
-
如果 Fiber 的生命周期是以 transfer 开始的,那么它将永远无法 yield 或被恢复控制传递,只能完成或 transfer 回去。(它仍然可以恢复其他允许被恢复的 fiber。)
-
如果 Fiber 的生命周期是以 resume 开始的,它可以 yield 或 transfer 到另一个
Fiber,但只能以与其放弃方式兼容的方式接收控制:如果它 transfer 了,它只能被 transfer 回去;如果它 yield 了,它只能被 resume 回去。之后,它可以再次 transfer 或 yield。
如果违反了这些规则,将引发 FiberError。
对于单个 Fiber 设计,yield/resume 更容易使用(Fiber 只是放弃控制,它不需要考虑控制权交给谁),而 transfer 对于复杂情况更灵活,允许构建相互依赖的 Fiber 的任意图。
示例
manager = nil # For local var to be visible inside worker block # This fiber would be started with transfer # It can't yield, and can't be resumed worker = Fiber.new { |work| puts "Worker: starts" puts "Worker: Performed #{work.inspect}, transferring back" # Fiber.yield # this would raise FiberError: attempt to yield on a not resumed fiber # manager.resume # this would raise FiberError: attempt to resume a resumed fiber (double resume) manager.transfer(work.capitalize) } # This fiber would be started with resume # It can yield or transfer, and can be transferred # back or resumed manager = Fiber.new { puts "Manager: starts" puts "Manager: transferring 'something' to worker" result = worker.transfer('something') puts "Manager: worker returned #{result.inspect}" # worker.resume # this would raise FiberError: attempt to resume a transferring fiber Fiber.yield # this is OK, the fiber transferred from and to, now it can yield puts "Manager: finished" } puts "Starting the manager" manager.resume puts "Resuming the manager" # manager.transfer # this would raise FiberError: attempt to transfer to a yielding fiber manager.resume
产生
Starting the manager Manager: starts Manager: transferring 'something' to worker Worker: starts Worker: Performed "something", transferring back Manager: worker returned "Something" Resuming the manager Manager: finished