class Fiber::Scheduler
这不是一个现有的类,而是 对象应该遵循的接口的文档,以便作为参数传递给 Scheduler 并处理非阻塞的 fiber。有关某些概念的解释,请参阅 Fiber.scheduler 类文档中的“非阻塞 fiber”部分。Fiber
Scheduler 的行为和用法应如下所示:
-
当非阻塞
的执行达到某个阻塞操作(如 sleep、等待进程或非就绪 I/O)时,它会调用下面列出的调度器的某些钩子方法。Fiber -
以某种方式注册当前 fiber 正在等待的内容,并通过Scheduler将控制权交给其他 fiber(因此 fiber 在等待其等待结束时将被挂起,而同一线程中的其他 fiber 可以执行)。Fiber.yield -
在当前线程执行结束时,将调用调度器的
scheduler_close方法。 -
调度器会进入一个等待循环,检查所有阻塞的 fiber(这些 fiber 已在钩子调用中注册),并在等待的资源就绪时(例如 I/O 就绪或 sleep 时间已过)恢复它们。
这样,并发执行将对每个独立的 Fiber 的代码实现透明。
实现由 gem 提供,例如 Async。Scheduler
钩子方法是:
-
io_wait,io_read,io_write,io_pread,io_pwriteio_select, andio_close -
(随着 Ruby 开发者制造更多具有非阻塞调用的方法,此列表将继续扩展)。
除非另有说明,否则钩子实现是强制性的:如果它们未实现,尝试调用钩子的方法将失败。为向后兼容起见,将来钩子将是可选的(如果它们未实现,由于调度器是为较旧的 Ruby 版本创建的,需要此钩子的代码不会失败,只会表现为阻塞方式)。
还强烈建议调度器实现 方法,该方法由 fiber 委托。Fiber.schedule
示例性的玩具调度器实现可以在 Ruby 代码的 test/fiber/scheduler.rb 中找到。
Public Instance Methods
Source
VALUE
rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
{
VALUE arguments[] = {
hostname
};
return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
}
由执行非反向 DNS 查找的任何方法调用。最值得注意的方法是 ,但还有许多其他方法。Addrinfo.getaddrinfo
该方法应返回一个字符串数组,对应于 hostname 解析到的 IP 地址,如果无法解析,则返回 nil。
所有可能调用点的详尽列表:
-
Addrinfo.marshal_load
Source
VALUE
rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
{
return rb_funcall(scheduler, id_block, 2, blocker, timeout);
}
由 等方法以及 Thread.join 调用,表示当前 Thread::Mutex 将被阻塞直到进一步通知(例如 Fiber)或直到 unblocktimeout 过去。
blocker 是我们正在等待的对象,仅用于信息目的(用于调试和日志记录)。对其值没有保证。
预计返回一个布尔值,指定阻塞操作是否成功。
Source
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
{
// Check if scheduler supports blocking_operation_wait before creating the object
if (!rb_respond_to(scheduler, id_blocking_operation_wait)) {
return Qundef;
}
// Create a new BlockingOperation with the blocking operation
VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state);
VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation);
// Get the operation data to check if it was executed
rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation);
rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status);
// Invalidate the operation now that we're done with it
operation->function = NULL;
operation->state = NULL;
operation->data = NULL;
operation->data2 = NULL;
operation->unblock_function = NULL;
// If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead
if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
return Qundef;
}
return result;
}
由 Ruby 的核心方法调用,以非阻塞方式运行阻塞操作。blocking_operation 是一个封装阻塞操作的不透明对象,它响应不带任何参数的 call 方法。
如果调度器未实现此方法,或者调度器未执行阻塞操作,Ruby 将回退到非调度器实现。
建议的最小实现为:
def blocking_operation_wait(blocking_operation) Thread.new { blocking_operation.call }.join end
Source
VALUE
rb_fiber_scheduler_close(VALUE scheduler)
{
RUBY_ASSERT(ruby_thread_has_gvl_p());
VALUE result;
// The reason for calling `scheduler_close` before calling `close` is for
// legacy schedulers which implement `close` and expect the user to call
// it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
// which should call `scheduler_close`. If it were to call `close`, it
// would create an infinite loop.
result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
if (!UNDEF_P(result)) return result;
result = rb_check_funcall(scheduler, id_close, 0, NULL);
if (!UNDEF_P(result)) return result;
return Qnil;
}
当当前线程退出时调用。调度器应实现此方法,以便所有等待的 fiber 可以完成其执行。
建议的模式是在 方法中实现主事件循环。close
Source
VALUE
rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
{
return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
}
的实现。该方法预计会立即在一个独立的非阻塞 fiber 中运行给定的代码块,并返回该 Fiber.schedule。Fiber
建议的最小实现为:
def fiber(&block) fiber = Fiber.new(blocking: false, &block) fiber.resume fiber end
Source
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
{
VALUE arguments[] = {
fiber, exception
};
VALUE result;
enum ruby_tag_type state;
// We must prevent interrupts while invoking the fiber_interrupt method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_unblock`.
rb_execution_context_t *ec = GET_EC();
int saved_interrupt_mask = ec->interrupt_mask;
ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
result = rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
}
EC_POP_TAG();
ec->interrupt_mask = saved_interrupt_mask;
if (state) {
EC_JUMP_TAG(ec, state);
}
RUBY_VM_CHECK_INTS(ec);
return result;
}
Source
VALUE
rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
{
VALUE arguments[] = {io};
return rb_check_funcall(scheduler, id_io_close, 1, arguments);
}
由 Ruby 的核心方法调用,通知调度器 对象已关闭。请注意,该方法将接收关闭对象的整数文件描述符,而不是对象本身。IO
Source
VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_pread)) {
return RUBY_Qundef;
}
VALUE arguments[] = {
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
} else {
return fiber_scheduler_io_pread((VALUE)&arguments);
}
}
由 或 IO#pread 调用,从 IO::Buffer#preadio 中偏移量 from 处读取 length 字节到指定的 buffer(请参阅 )中的 IO::Bufferoffset。
此方法在语义上与 相同,但它允许指定读取的偏移量,并且通常更适合对同一文件的异步 io_read 操作。IO
此方法应被视为实验性的。
Source
VALUE
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_pwrite)) {
return RUBY_Qundef;
}
VALUE arguments[] = {
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
} else {
return fiber_scheduler_io_pwrite((VALUE)&arguments);
}
}
由 或 IO#pwrite 调用,从指定的 IO::Buffer#pwritebuffer(请参阅 )中的 IO::Bufferoffset 处向 io 写入 length 字节。
此方法在语义上与 相同,但它允许指定写入的偏移量,并且通常更适合对同一文件的异步 io_write 操作。IO
此方法应被视为实验性的。
Source
VALUE
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_read)) {
return RUBY_Qundef;
}
VALUE arguments[] = {
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
} else {
return fiber_scheduler_io_read((VALUE)&arguments);
}
}
由 或 IO#readIO#Buffer.read 调用,从 io 中读取 length 字节到指定的 buffer(请参阅 )中的 IO::Bufferoffset。
length 参数是“要读取的最小长度”。如果 缓冲区大小为 8KiB,但 IOlength 为 1024(1KiB),则最多可以读取 8KiB,但至少会读取 1KiB。通常,读取数据少于 length 的唯一情况是读取数据时发生错误。
指定 length 为 0 是有效的,表示尝试至少读取一次并返回任何可用数据。
建议的实现应尝试以非阻塞方式从 io 读取,如果 io 未就绪,则调用 (这将把控制权交给其他 fiber)。io_wait
有关可用于返回数据的接口,请参阅 。IO::Buffer
预计返回读取的字节数,或在发生错误时返回 -errno(系统错误代码的负数)。
此方法应被视为实验性的。
Source
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
{
VALUE arguments[] = {
readables, writables, exceptables, timeout
};
return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
}
Source
VALUE
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
{
VALUE arguments[] = {
scheduler, io, events, timeout
};
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
} else {
return fiber_scheduler_io_wait((VALUE)&arguments);
}
}
由 、IO#wait、IO#wait_readable 调用,以查询指定的描述符是否在指定的 IO#wait_writabletimeout 内已准备好进行指定事件。
events 是 IO::READABLE、IO::WRITABLE 和 IO::PRIORITY 的位掩码。
建议的实现应注册哪个 正在等待哪些资源,并立即调用 Fiber 将控制权传递给其他 fiber。然后,在 Fiber.yield 方法中,调度器可能会将所有 I/O 资源分派给等待它的 fiber。close
预计返回立即准备好的事件子集。
Source
VALUE
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_write)) {
return RUBY_Qundef;
}
VALUE arguments[] = {
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
} else {
return fiber_scheduler_io_write((VALUE)&arguments);
}
}
由 或 IO#write 调用,将 IO::Buffer#writelength 字节从指定的 buffer(请参阅 )中的 IO::Bufferoffset 处写入 io。
length 参数是“要写入的最小长度”。如果 缓冲区大小为 8KiB,但指定的 IOlength 为 1024(1KiB),则最多会写入 8KiB,但至少会写入 1KiB。通常,写入数据少于 length 的唯一情况是写入数据时发生错误。
指定 length 为 0 是有效的,表示尝试至少写入一次,尽可能多地写入数据。
建议的实现应尝试以非阻塞方式向 io 写入,如果 io 未就绪,则调用 (这将把控制权交给其他 fiber)。io_wait
有关可用于从缓冲区高效获取数据的接口,请参阅 。IO::Buffer
预计返回写入的字节数,或在发生错误时返回 -errno(系统错误代码的负数)。
此方法应被视为实验性的。
Source
VALUE
rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout)
{
return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
}
由 和 Kernel#sleep 调用,并期望提供一种非阻塞的 sleep 实现。实现可能会将当前 fiber 注册到一个“哪个 fiber 等待到哪个时刻”的列表中,调用 Thread::Mutex#sleep 将控制权传递出去,然后在 Fiber.yield 中恢复等待期已过的 fiber。close
Source
VALUE
rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
{
VALUE arguments[] = {
PIDT2NUM(pid), RB_INT2NUM(flags)
};
return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
}
由 调用,以等待指定的进程。有关参数说明,请参阅该方法说明。Process::Status.wait
建议的最小实现:
Thread.new do Process::Status.wait(pid, flags) end.value
此钩子是可选的:如果当前调度器中不存在, 将表现为阻塞方法。Process::Status.wait
预计返回一个 实例。Process::Status
Source
VALUE
rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
{
VALUE arguments[] = {
timeout, exception, message
};
return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
}
由 调用,在给定的 Timeout.timeoutduration 内执行给定的 block。它也可以由调度器或用户代码直接调用。
尝试将给定 block 的执行时间限制在给定的 duration 内(如果可能)。当非阻塞操作导致 block 的执行时间超过指定的 duration 时,应通过引发指定的 exception_class(使用给定的 exception_arguments 构建)来中断该非阻塞操作。
一般执行超时被认为是危险的。此实现只会中断非阻塞操作。这是设计使然,因为预计非阻塞操作可能因各种不可预测的原因而失败,因此应用程序应已能够稳健地处理这些情况,从而隐含地处理超时。
但是,由于此设计,如果 block 未调用任何非阻塞操作,则无法中断它。如果您希望提供可预测的超时点,可以考虑添加 sleep(0)。
如果 block 执行成功,将返回其结果。
异常通常会使用 引发。Fiber#raise
Source
VALUE
rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
{
RUBY_ASSERT(rb_obj_is_fiber(fiber));
VALUE result;
enum ruby_tag_type state;
// `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`.
//
// If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it.
int saved_errno = errno;
// We must prevent interrupts while invoking the unblock method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_fiber_interrupt`.
rb_execution_context_t *ec = GET_EC();
int saved_interrupt_mask = ec->interrupt_mask;
ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
}
EC_POP_TAG();
ec->interrupt_mask = saved_interrupt_mask;
if (state) {
EC_JUMP_TAG(ec, state);
}
RUBY_VM_CHECK_INTS(ec);
errno = saved_errno;
return result;
}
调用以唤醒之前使用 阻塞的 block(例如,Fiber 调用 Thread::Mutex#lock,而 block 调用 Thread::Mutex#unlock)。调度器应使用 unblockfiber 参数来了解哪个 fiber 被解除了阻塞。
blocker 是我们曾等待的对象,但仅用于信息目的(用于调试和日志记录),并且不能保证它与 的 blockblocker 值相同。
Source
VALUE
rb_fiber_scheduler_yield(VALUE scheduler)
{
// First try to call the scheduler's yield method, if it exists:
VALUE result = rb_check_funcall(scheduler, id_yield, 0, NULL);
if (!UNDEF_P(result)) return result;
// Otherwise, we can emulate yield by sleeping for 0 seconds:
return rb_fiber_scheduler_kernel_sleep(scheduler, RB_INT2NUM(0));
}
将控制权交给调度器,将在下一个调度周期恢复。