Fiber

Fibers 提供了一种协作式并发的机制。

上下文切换

Fibers 执行用户提供的块。在执行期间,该块可以调用 Fiber.yieldFiber.transfer 来切换到另一个 fiber。Fiber#resume 用于从 Fiber.yield 被调用的点继续执行。

#!/usr/bin/env ruby

puts "1: Start program."

f = Fiber.new do
  puts "3: Entered fiber."
  Fiber.yield
  puts "5: Resumed fiber."
end

puts "2: Resume fiber first time."
f.resume

puts "4: Resume fiber second time."
f.resume

puts "6: Finished."

此程序演示了 fibers 的流程控制。

Scheduler

调度器接口用于拦截阻塞操作。典型的实现将是像 EventMachineAsync 这样的 gem 的包装器。这种设计在事件循环实现和应用程序代码之间提供了关注点分离。它还允许分层调度器执行检测。

为当前线程设置调度器

Fiber.set_scheduler(MyScheduler.new)

线程退出时,会隐式调用 set_scheduler

Fiber.set_scheduler(nil)

设计

调度器接口被设计为用户代码和阻塞操作之间一个不带偏见的轻量级层。调度器钩子应避免转换或转换参数或返回值。理想情况下,用户代码中的相同参数将直接提供给调度器钩子,不做任何更改。

接口

这是您需要实现的接口。

class Scheduler
  # Wait for the specified process ID to exit.
  # This hook is optional.
  # @parameter pid [Integer] The process ID to wait for.
  # @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
  # @returns [Process::Status] A process status instance.
  def process_wait(pid, flags)
    Thread.new do
      Process::Status.wait(pid, flags)
    end.value
  end

  # Wait for the given io readiness to match the specified events within
  # the specified timeout.
  # @parameter event [Integer] A bit mask of `IO::READABLE`,
  #   `IO::WRITABLE` and `IO::PRIORITY`.
  # @parameter timeout [Numeric] The amount of time to wait for the event in seconds.
  # @returns [Integer] The subset of events that are ready.
  def io_wait(io, events, timeout)
  end

  # Read from the given io into the specified buffer.
  # WARNING: Experimental hook! Do not use in production code!
  # @parameter io [IO] The io to read from.
  # @parameter buffer [IO::Buffer] The buffer to read into.
  # @parameter length [Integer] The minimum amount to read.
  def io_read(io, buffer, length)
  end

  # Write from the given buffer into the specified IO.
  # WARNING: Experimental hook! Do not use in production code!
  # @parameter io [IO] The io to write to.
  # @parameter buffer [IO::Buffer] The buffer to write from.
  # @parameter length [Integer] The minimum amount to write.
  def io_write(io, buffer, length)
  end

  # Sleep the current task for the specified duration, or forever if not
  # specified.
  # @parameter duration [Numeric] The amount of time to sleep in seconds.
  def kernel_sleep(duration = nil)
  end

  # Execute the given block. If the block execution exceeds the given timeout,
  # the specified exception `klass` will be raised. Typically, only non-blocking
  # methods which enter the scheduler will raise such exceptions.
  # @parameter duration [Integer] The amount of time to wait, after which an exception will be raised.
  # @parameter klass [Class] The exception class to raise.
  # @parameter *arguments [Array] The arguments to send to the constructor of the exception.
  # @yields {...} The user code to execute.
  def timeout_after(duration, klass, *arguments, &block)
  end

  # Resolve hostname to an array of IP addresses.
  # This hook is optional.
  # @parameter hostname [String] Example: "www.ruby-lang.org".
  # @returns [Array] An array of IPv4 and/or IPv6 address strings that the hostname resolves to.
  def address_resolve(hostname)
  end

  # Block the calling fiber.
  # @parameter blocker [Object] What we are waiting on, informational only.
  # @parameter timeout [Numeric | Nil] The amount of time to wait for in seconds.
  # @returns [Boolean] Whether the blocking operation was successful or not.
  def block(blocker, timeout = nil)
  end

  # Unblock the specified fiber.
  # @parameter blocker [Object] What we are waiting on, informational only.
  # @parameter fiber [Fiber] The fiber to unblock.
  # @reentrant Thread safe.
  def unblock(blocker, fiber)
  end

  # Intercept the creation of a non-blocking fiber.
  # @returns [Fiber]
  def fiber(&block)
    Fiber.new(blocking: false, &block)
  end

  # Invoked when the thread exits.
  def close
    self.run
  end

  def run
    # Implement event loop here.
  end
end

将来可能会引入额外的钩子,我们将使用特性检测来启用这些钩子。

非阻塞执行

调度器钩子仅用于特殊的非阻塞执行上下文。非阻塞执行上下文会引入非确定性,因为调度器钩子的执行可能会在您的程序中引入上下文切换点。

Fibers

Fibers 可用于创建非阻塞执行上下文。

Fiber.new do
  puts Fiber.current.blocking? # false

  # May invoke `Fiber.scheduler&.io_wait`.
  io.read(...)

  # May invoke `Fiber.scheduler&.io_wait`.
  io.write(...)

  # Will invoke `Fiber.scheduler&.kernel_sleep`.
  sleep(n)
end.resume

我们还引入了一个新方法,它简化了这些非阻塞 fibers 的创建

Fiber.schedule do
  puts Fiber.current.blocking? # false
end

此方法的作用是允许调度器在内部决定何时启动 fiber,以及是使用对称 fiber 还是非对称 fiber。

您也可以创建阻塞执行上下文

Fiber.new(blocking: true) do
  # Won't use the scheduler:
  sleep(n)
end

但是,通常应避免这样做,除非您正在实现调度器。

IO

默认情况下,I/O 是非阻塞的。并非所有操作系统都支持非阻塞 I/O。Windows 是一个显著的例子,其中套接字 I/O 可以是非阻塞的,但管道 I/O 是阻塞的。前提是存在调度器且当前线程是非阻塞的,操作将调用调度器。

IO#close

关闭 IO 会中断该 IO 上的所有阻塞操作。当一个线程调用 IO#close 时,它首先尝试中断任何在某个 IO 上阻塞的线程或 fiber。关闭线程会等待,直到所有阻塞的线程和 fiber 都被正确中断并从 IO 的阻塞列表中移除。每个被中断的线程或 fiber 都会收到一个 IOError,并被干净地从阻塞操作中移除。只有在所有阻塞操作都被中断并清理后,才会关闭实际的文件描述符,以确保正确的资源清理并防止潜在的竞争条件。

对于由调度器管理的 fibers,中断过程包括调用调度器上的 rb_fiber_scheduler_fiber_interrupt。这允许调度器以适合其事件循环实现的方式处理中断。然后,调度器可以通知 fiber,该 fiber 将收到一个 IOError,并被从阻塞操作中移除。这种机制确保了基于 fiber 的并发能够与 IO 操作正确工作,即使这些操作被 IO#close 中断。

sequenceDiagram
    participant ThreadB
    participant ThreadA
    participant Scheduler
    participant IO
    participant Fiber1
    participant Fiber2

    Note over ThreadA: Thread A has a fiber scheduler
    activate Scheduler
    ThreadA->>Fiber1: Schedule Fiber 1
    activate Fiber1
    Fiber1->>IO: IO.read
    IO->>Scheduler: rb_thread_io_blocking_region
    deactivate Fiber1

    ThreadA->>Fiber2: Schedule Fiber 2
    activate Fiber2
    Fiber2->>IO: IO.read
    IO->>Scheduler: rb_thread_io_blocking_region
    deactivate Fiber2

    Note over Fiber1,Fiber2: Both fibers blocked on same IO

    Note over ThreadB: IO.close
    activate ThreadB
    ThreadB->>IO: thread_io_close_notify_all
    Note over ThreadB: rb_mutex_sleep

    IO->>Scheduler: rb_fiber_scheduler_fiber_interrupt(Fiber1)
    Scheduler->>Fiber1: fiber_interrupt with IOError
    activate Fiber1
    Note over IO: fiber_interrupt causes removal from blocking list
    Fiber1->>IO: rb_io_blocking_operation_exit()
    IO-->>ThreadB: Wakeup thread
    deactivate Fiber1

    IO->>Scheduler: rb_fiber_scheduler_fiber_interrupt(Fiber2)
    Scheduler->>Fiber2: fiber_interrupt with IOError
    activate Fiber2
    Note over IO: fiber_interrupt causes removal from blocking list
    Fiber2->>IO: rb_io_blocking_operation_exit()
    IO-->>ThreadB: Wakeup thread
    deactivate Fiber2
    deactivate Scheduler

    Note over ThreadB: Blocking operations list empty
    ThreadB->>IO: close(fd)
    deactivate ThreadB

Mutex

Mutex 类可以在非阻塞上下文中使用,并且是 fiber 特有的。

ConditionVariable

ConditionVariable 类可以在非阻塞上下文中使用,并且是 fiber 特有的。

Queue / SizedQueue

QueueSizedQueue 类可以在非阻塞上下文中使用,并且是 fiber 特有的。

Thread

Thread#join 操作可以在非阻塞上下文中使用,并且是 fiber 特有的。