class Thread
Thread 是 Ruby 实现并发编程模型的方式。
需要多线程执行的程序是 Ruby Thread 类的理想应用场景。
例如,我们可以创建一个独立于主线程执行的新线程,使用 ::new。
thr = Thread.new { puts "What's the big deal" }
然后,我们可以暂停主线程的执行,让新线程完成,使用 join。
thr.join #=> "What's the big deal"
如果在主线程终止前不调用 thr.join,那么包括 thr 在内的所有其他线程都会被终止。
或者,你可以使用数组来一次处理多个线程,如下例所示:
threads = [] threads << Thread.new { puts "What's the big deal" } threads << Thread.new { 3.times { puts "Threads are fun!" } }
创建几个线程后,我们等待它们全部依次完成。
threads.each { |thr| thr.join }
要检索线程的最后一个返回值,请使用 value。
thr = Thread.new { sleep 1; "Useful value" } thr.value #=> "Useful value"
Thread 初始化
为了创建新线程,Ruby 提供了 ::new、::start 和 ::fork。这些方法都必须提供一个块,否则会引发 ThreadError。
当子类化 Thread 类时,子类中的 initialize 方法将被 ::start 和 ::fork 忽略。否则,请确保在 initialize 方法中调用 super。
Thread 终止
对于终止线程,Ruby 提供了多种方式。
类方法 ::kill 用于退出给定的线程。
thr = Thread.new { sleep } Thread.kill(thr) # sends exit() to thr
或者,你可以使用实例方法 exit,或其任何别名 kill 或 terminate。
thr.exit
Thread 状态
Ruby 提供了一些实例方法来查询给定线程的状态。要获取当前线程状态的字符串,请使用 status。
thr = Thread.new { sleep } thr.status # => "sleep" thr.exit thr.status # => false
你还可以使用 alive? 来判断线程是否正在运行或睡眠,使用 stop? 来判断线程是否已死或已睡眠。
Thread 变量和作用域
由于线程是通过块创建的,因此变量作用域的规则与其他 Ruby 块相同。在此块内创建的任何局部变量仅对此线程可访问。
Fiber-local 与 Thread-local
每个 fiber 都有自己的 Thread#[] 存储桶。当你设置一个新的 fiber-local 时,它只在此 Fiber 内可访问。举例说明:
Thread.new { Thread.current[:foo] = "bar" Fiber.new { p Thread.current[:foo] # => nil }.resume }.join
此示例使用 [] 进行获取,使用 []= 进行设置 fiber-locals,你还可以使用 keys 列出给定线程的 fiber-locals,使用 key? 检查 fiber-local 是否存在。
至于 thread-locals,它们在整个线程的作用域内都可访问。给定以下示例:
Thread.new{ Thread.current.thread_variable_set(:foo, 1) p Thread.current.thread_variable_get(:foo) # => 1 Fiber.new{ Thread.current.thread_variable_set(:foo, 2) p Thread.current.thread_variable_get(:foo) # => 2 }.resume p Thread.current.thread_variable_get(:foo) # => 2 }.join
你可以看到,thread-local :foo 传递到了 fiber 中,并在线程结束时被更改为 2。
此示例使用 thread_variable_set 来创建新的 thread-locals,并使用 thread_variable_get 来引用它们。
还有 thread_variables 用于列出所有 thread-locals,以及 thread_variable? 用于检查给定的 thread-local 是否存在。
Exception 处理
当一个未捕获的异常在线程内部引发时,该线程将终止。默认情况下,此异常不会传播到其他线程。该异常会被存储,当另一个线程调用 value 或 join 时,异常将在该线程中重新引发。
t = Thread.new{ raise 'something went wrong' } t.value #=> RuntimeError: something went wrong
可以使用实例方法 Thread#raise 从线程外部引发异常,该方法接受与 Kernel#raise 相同的参数。
设置 Thread.abort_on_exception = true,Thread#abort_on_exception = true,或 $DEBUG = true 将导致后续在线程中引发的未捕获异常自动在主线程中重新引发。
通过添加类方法 ::handle_interrupt,你现在可以异步处理线程异常。
调度
Ruby 提供了几种方法来支持程序中的线程调度。
第一种方法是使用类方法 ::stop,将当前运行的线程置于睡眠状态并调度另一个线程执行。
一旦线程进入睡眠状态,你就可以使用实例方法 wakeup 将你的线程标记为可调度。
你还可以尝试 ::pass,它尝试将执行权传递给另一个线程,但这取决于操作系统是否会切换运行中的线程。priority 方法也是如此,它允许你向线程调度器提示你想让哪些线程在传递执行权时获得优先权。此方法也取决于操作系统,在某些平台上可能被忽略。
Public Class Methods
Source
static VALUE
rb_thread_s_abort_exc(VALUE _)
{
return RBOOL(GET_THREAD()->vm->thread_abort_on_exception);
}
返回全局“abort on exception”条件的当前状态。
默认为 false。
当设置为 true 时,如果任何线程因异常而中止,则引发的异常将在主线程中重新引发。
也可以通过全局 $DEBUG 标志或命令行选项 -d 来指定。
另请参阅 ::abort_on_exception=。
还有一个实例级别的函数可以为特定线程设置此项,请参阅 abort_on_exception。
Source
static VALUE
rb_thread_s_abort_exc_set(VALUE self, VALUE val)
{
GET_THREAD()->vm->thread_abort_on_exception = RTEST(val);
return val;
}
当设置为 true 时,如果任何线程因异常而中止,则引发的异常将在主线程中重新引发。返回新的状态。
Thread.abort_on_exception = true t1 = Thread.new do puts "In new thread" raise "Exception from thread" end sleep(1) puts "not reached"
这将产生:
In new thread prog.rb:4: Exception from thread (RuntimeError) from prog.rb:2:in `initialize' from prog.rb:2:in `new' from prog.rb:2
另请参阅 ::abort_on_exception。
还有一个实例级别的函数可以为特定线程设置此项,请参阅 abort_on_exception=。
Source
static VALUE
thread_s_current(VALUE klass)
{
return rb_thread_current();
}
返回当前正在执行的线程。
Thread.current #=> #<Thread:0x401bdf4c run>
Source
static VALUE
each_caller_location(int argc, VALUE *argv, VALUE _)
{
rb_execution_context_t *ec = GET_EC();
long n, lev = ec_backtrace_range(ec, argc, argv, 1, 1, &n);
if (lev >= 0 && n != 0) {
rb_ec_partial_backtrace_object(ec, lev, n, NULL, FALSE, TRUE);
}
return Qnil;
}
将当前执行堆栈的每一帧作为回溯位置对象进行迭代。
Source
static VALUE
rb_thread_exit(VALUE _)
{
rb_thread_t *th = GET_THREAD();
return rb_thread_kill(th->self);
}
Source
static VALUE
thread_start(VALUE klass, VALUE args)
{
struct thread_create_params params = {
.type = thread_invoke_type_proc,
.args = args,
.proc = rb_block_proc(),
};
return thread_create_core(rb_thread_alloc(klass), ¶ms);
}
Source
static VALUE
rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
{
VALUE mask = Qundef;
rb_execution_context_t * volatile ec = GET_EC();
rb_thread_t * volatile th = rb_ec_thread_ptr(ec);
volatile VALUE r = Qnil;
enum ruby_tag_type state;
if (!rb_block_given_p()) {
rb_raise(rb_eArgError, "block is needed.");
}
mask_arg = rb_to_hash_type(mask_arg);
if (OBJ_FROZEN(mask_arg) && rb_hash_compare_by_id_p(mask_arg)) {
mask = Qnil;
}
rb_hash_foreach(mask_arg, handle_interrupt_arg_check_i, (VALUE)&mask);
if (UNDEF_P(mask)) {
return rb_yield(Qnil);
}
if (!RTEST(mask)) {
mask = mask_arg;
}
else if (RB_TYPE_P(mask, T_HASH)) {
OBJ_FREEZE(mask);
}
rb_ary_push(th->pending_interrupt_mask_stack, mask);
if (!rb_threadptr_pending_interrupt_empty_p(th)) {
th->pending_interrupt_queue_checked = 0;
RUBY_VM_SET_INTERRUPT(th->ec);
}
EC_PUSH_TAG(th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
r = rb_yield(Qnil);
}
EC_POP_TAG();
rb_ary_pop(th->pending_interrupt_mask_stack);
if (!rb_threadptr_pending_interrupt_empty_p(th)) {
th->pending_interrupt_queue_checked = 0;
RUBY_VM_SET_INTERRUPT(th->ec);
}
RUBY_VM_CHECK_INTS(th->ec);
if (state) {
EC_JUMP_TAG(th->ec, state);
}
return r;
}
更改异步中断的时机。
interrupt 指的是异步事件和对应的程序,通过 Thread#raise、Thread#kill、信号陷阱(尚不支持)和主线程终止(如果主线程终止,则所有其他线程都会被杀死)。
给定的 hash 包含形如 ExceptionClass => :TimingSymbol 的键值对。其中 ExceptionClass 是由给定块处理的异常。TimingSymbol 可以是以下符号之一:
:immediate-
立即调用中断。
:on_blocking-
在阻塞操作期间调用中断。
:never-
永远不调用所有中断。
BlockingOperation 指的是该操作会阻塞调用线程,例如读写操作。在 CRuby 实现中,BlockingOperation 是任何在没有 GVL 的情况下执行的操作。
被屏蔽的异步中断会被延迟,直到它们被启用。此方法类似于 sigprocmask(3)。
注意
异步中断很难使用。
如果你需要线程间通信,请考虑使用其他方式,如 Queue。
或者,在深入理解此方法的情况下使用它们。
用法
在此示例中,我们可以防范 Thread#raise 异常。
使用 :never TimingSymbol,RuntimeError 异常将在主线程的第一个块中始终被忽略。在第二个 ::handle_interrupt 块中,我们可以有意识地处理 RuntimeError 异常。
th = Thread.new do Thread.handle_interrupt(RuntimeError => :never) { begin # You can write resource allocation code safely. Thread.handle_interrupt(RuntimeError => :immediate) { # ... } ensure # You can write resource deallocation code safely. end } end Thread.pass # ... th.raise "stop"
当我们忽略 RuntimeError 异常时,可以安全地编写资源分配代码。然后,ensure 块是我们安全地释放资源的地方。
堆栈控制设置
可以堆叠多个级别的 ::handle_interrupt 块,以便一次控制多个 ExceptionClass 和 TimingSymbol。
Thread.handle_interrupt(FooError => :never) { Thread.handle_interrupt(BarError => :never) { # FooError and BarError are prohibited. } }
与 ExceptionClass 的继承
所有继承自 ExceptionClass 参数的异常都将被考虑。
Thread.handle_interrupt(Exception => :never) { # all exceptions inherited from Exception are prohibited. }
要处理所有中断,请使用 Object 而不是 Exception 作为 ExceptionClass,因为 kill/terminate 中断不被 Exception 处理。
Source
static VALUE
rb_thread_s_ignore_deadlock(VALUE _)
{
return RBOOL(GET_THREAD()->vm->thread_ignore_deadlock);
}
返回全局“ignore deadlock”条件的当前状态。默认为 false,因此死锁条件不会被忽略。
另请参阅 ::ignore_deadlock=。
Source
static VALUE
rb_thread_s_ignore_deadlock_set(VALUE self, VALUE val)
{
GET_THREAD()->vm->thread_ignore_deadlock = RTEST(val);
return val;
}
返回新的状态。当设置为 true 时,VM 将不会检查死锁条件。只有当应用程序可以通过其他方式(如信号)打破死锁条件时,设置此项才有用。
Thread.ignore_deadlock = true queue = Thread::Queue.new trap(:SIGUSR1){queue.push "Received signal"} # raises fatal error unless ignoring deadlock puts queue.pop
另请参阅 ::ignore_deadlock。
Source
static VALUE
rb_thread_s_kill(VALUE obj, VALUE th)
{
return rb_thread_kill(th);
}
导致给定的 thread 退出,另请参阅 Thread::exit。
count = 0 a = Thread.new { loop { count += 1 } } sleep(0.1) #=> 0 Thread.kill(a) #=> #<Thread:0x401b3d30 dead> count #=> 93947 a.alive? #=> false
Source
static VALUE
thread_list(VALUE _)
{
return rb_thread_list();
}
返回所有可运行或已停止的 Thread 对象的数组。
Thread.new { sleep(200) } Thread.new { 1000000.times {|i| i*i } } Thread.new { Thread.stop } Thread.list.each {|t| p t}
这将产生:
#<Thread:0x401b3e84 sleep> #<Thread:0x401b3f38 run> #<Thread:0x401b3fb0 sleep> #<Thread:0x401bdf4c run>
Source
static VALUE
thread_s_new(int argc, VALUE *argv, VALUE klass)
{
rb_thread_t *th;
VALUE thread = rb_thread_alloc(klass);
if (GET_RACTOR()->threads.main->status == THREAD_KILLED) {
rb_raise(rb_eThreadError, "can't alloc thread");
}
rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS);
th = rb_thread_ptr(thread);
if (!threadptr_initialized(th)) {
rb_raise(rb_eThreadError, "uninitialized thread - check '%"PRIsVALUE"#initialize'",
klass);
}
return thread;
}
创建一个执行给定块的新线程。
传递给 ::new 的任何 args 都将传递给块。
arr = [] a, b, c = 1, 2, 3 Thread.new(a,b,c) { |d,e,f| arr << d << e << f }.join arr #=> [1, 2, 3]
如果 ::new 在没有块的情况下调用,则会引发 ThreadError 异常。
如果你要子类化 Thread,请确保在 initialize 方法中调用 super,否则会引发 ThreadError。
Source
static VALUE
thread_s_pass(VALUE klass)
{
rb_thread_schedule();
return Qnil;
}
给线程调度器一个提示,以将执行权传递给另一个线程。运行中的线程可能会或可能不会切换,这取决于操作系统和处理器。
Source
static VALUE
rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self)
{
return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self);
}
返回异步队列是否为空。
由于 Thread::handle_interrupt 可用于延迟异步事件,因此此方法可用于确定是否存在任何延迟事件。
如果此方法返回 true,则你可以完成 :never 块。
例如,以下方法立即处理延迟的异步事件。
def Thread.kick_interrupt_immediately Thread.handle_interrupt(Object => :immediate) { Thread.pass } end
如果提供了 error,则只检查 error 类型的延迟事件。
用法
th = Thread.new{
Thread.handle_interrupt(RuntimeError => :on_blocking){
while true
...
# reach safe point to invoke interrupt
if Thread.pending_interrupt?
Thread.handle_interrupt(Object => :immediate){}
end
...
end
}
}
...
th.raise # stop thread
此示例也可以写成以下形式,你应该使用它来避免异步中断。
flag = true
th = Thread.new{
Thread.handle_interrupt(RuntimeError => :on_blocking){
while true
...
# reach safe point to invoke interrupt
break if flag == false
...
end
}
}
...
flag = false # stop thread
Source
static VALUE
rb_thread_s_report_exc(VALUE _)
{
return RBOOL(GET_THREAD()->vm->thread_report_on_exception);
}
返回全局“report on exception”条件的当前状态。
自 Ruby 2.5 起,默认为 true。
在此标志为 true 时创建的所有线程,如果一个异常导致线程中止,都会在 $stderr 上报告一条消息。
Thread.new { 1.times { raise } }
将在 $stderr 上产生此输出:
#<Thread:...> terminated with exception (report_on_exception is true):
Traceback (most recent call last):
2: from -e:1:in `block in <main>'
1: from -e:1:in `times'
这是为了及早捕获线程中的错误。在某些情况下,你可能不希望进行此输出。有多种方法可以避免额外的输出:
-
如果异常不是预期的,最好的方法是修复异常的根本原因,使其不再发生。
-
如果异常是预期的,最好在离它引发的地方更近的地方捕获它,而不是让它杀死
Thread。 -
如果可以保证该
Thread将使用Thread#join或Thread#value进行连接,那么在启动Thread时使用Thread.current.report_on_exception = false禁用此报告是安全的。然而,这可能会更晚地处理异常,或者根本不处理,如果父线程被阻塞等原因导致该Thread永远不会被连接。
另请参阅 ::report_on_exception=。
还有一个实例级别的函数可以为特定线程设置此项,请参阅 report_on_exception=。
Source
static VALUE
rb_thread_s_report_exc_set(VALUE self, VALUE val)
{
GET_THREAD()->vm->thread_report_on_exception = RTEST(val);
return val;
}
返回新的状态。当设置为 true 时,之后创建的所有线程将继承此条件,如果一个异常导致线程中止,将在 $stderr 上报告一条消息。
Thread.report_on_exception = true t1 = Thread.new do puts "In new thread" raise "Exception from thread" end sleep(1) puts "In the main thread"
这将产生:
In new thread #<Thread:...prog.rb:2> terminated with exception (report_on_exception is true): Traceback (most recent call last): prog.rb:4:in `block in <main>': Exception from thread (RuntimeError) In the main thread
另请参阅 ::report_on_exception。
还有一个实例级别的函数可以为特定线程设置此项,请参阅 report_on_exception=。
Source
static VALUE
thread_start(VALUE klass, VALUE args)
{
struct thread_create_params params = {
.type = thread_invoke_type_proc,
.args = args,
.proc = rb_block_proc(),
};
return thread_create_core(rb_thread_alloc(klass), ¶ms);
}
Source
static VALUE
thread_stop(VALUE _)
{
return rb_thread_stop();
}
停止当前线程的执行,将其置于“睡眠”状态,并调度另一个线程执行。
a = Thread.new { print "a"; Thread.stop; print "c" } sleep 0.1 while a.status!='sleep' print "b" a.run a.join #=> "abc"
Public Instance Methods
Source
static VALUE
rb_thread_aref(VALUE thread, VALUE key)
{
ID id = rb_check_id(&key);
if (!id) return Qnil;
return rb_thread_local_aref(thread, id);
}
属性引用 — 使用符号或字符串名称返回 fiber-local 变量的值(当前线程的根 fiber,如果未明确位于 Fiber 内部)。如果指定的变量不存在,则返回 nil。
[ Thread.new { Thread.current["name"] = "A" }, Thread.new { Thread.current[:name] = "B" }, Thread.new { Thread.current["name"] = "C" } ].each do |th| th.join puts "#{th.inspect}: #{th[:name]}" end
这将产生:
#<Thread:0x00000002a54220 dead>: A #<Thread:0x00000002a541a8 dead>: B #<Thread:0x00000002a54130 dead>: C
Thread#[] 和 Thread#[]= 不是 thread-local 而是 fiber-local。在 Ruby 1.8 中不存在这种混淆,因为 fiber 仅从 Ruby 1.9 开始可用。Ruby 1.9 选择让这些方法表现为 fiber-local,以保留动态作用域的以下习惯用法。
def meth(newvalue) begin oldvalue = Thread.current[:name] Thread.current[:name] = newvalue yield ensure Thread.current[:name] = oldvalue end end
如果方法是 thread-local 且给定的块切换了 fiber,则该习惯用法可能无法作为动态作用域正常工作。
f = Fiber.new { meth(1) { Fiber.yield } } meth(2) { f.resume } f.resume p Thread.current[:name] #=> nil if fiber-local #=> 2 if thread-local (The value 2 is leaked to outside of meth method.)
有关 thread-local 变量,请参阅 thread_variable_get 和 thread_variable_set。
Source
static VALUE
rb_thread_aset(VALUE self, VALUE id, VALUE val)
{
return rb_thread_local_aset(self, rb_to_id(id), val);
}
属性赋值 — 使用符号或字符串设置或创建 fiber-local 变量的值。
另请参阅 Thread#[]。
有关 thread-local 变量,请参阅 thread_variable_set 和 thread_variable_get。
Source
static VALUE
rb_thread_abort_exc(VALUE thread)
{
return RBOOL(rb_thread_ptr(thread)->abort_on_exception);
}
返回此 thr 的 thread-local “abort on exception”条件的当前状态。
默认为 false。
另请参阅 abort_on_exception=。
还有一个类级别的函数可以为所有线程设置此项,请参阅 ::abort_on_exception。
Source
static VALUE
rb_thread_abort_exc_set(VALUE thread, VALUE val)
{
rb_thread_ptr(thread)->abort_on_exception = RTEST(val);
return val;
}
当设置为 true 时,如果此 thr 因异常而中止,则引发的异常将在主线程中重新引发。
另请参阅 abort_on_exception。
还有一个类级别的函数可以为所有线程设置此项,请参阅 ::abort_on_exception=。
Source
static VALUE
thread_add_trace_func_m(VALUE obj, VALUE trace)
{
thread_add_trace_func(GET_EC(), rb_thread_ptr(obj), trace);
return trace;
}
将 proc 添加为跟踪的处理程序。
Source
static VALUE
rb_thread_alive_p(VALUE thread)
{
return RBOOL(!thread_finished(rb_thread_ptr(thread)));
}
Source
static VALUE
rb_thread_backtrace_m(int argc, VALUE *argv, VALUE thval)
{
return rb_vm_thread_backtrace(argc, argv, thval);
}
返回目标线程的当前回溯。
Source
static VALUE
rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval)
{
return rb_vm_thread_backtrace_locations(argc, argv, thval);
}
返回目标线程的执行堆栈 — 一个包含回溯位置对象的数组。
有关更多信息,请参阅 Thread::Backtrace::Location。
此方法行为类似于 Kernel#caller_locations,只是它适用于特定线程。
终止 thr 并调度另一个线程运行,返回已终止的 Thread。如果是主线程或最后一个线程,则退出进程。请注意,如果接收方与当前运行的线程不同,则调用者不必等待线程终止。终止是异步的,线程在退出前仍可能运行少量 Ruby 代码。
Source
static VALUE
rb_thread_fetch(int argc, VALUE *argv, VALUE self)
{
VALUE key, val;
ID id;
rb_thread_t *target_th = rb_thread_ptr(self);
int block_given;
rb_check_arity(argc, 1, 2);
key = argv[0];
block_given = rb_block_given_p();
if (block_given && argc == 2) {
rb_warn("block supersedes default value argument");
}
id = rb_check_id(&key);
if (id == recursive_key) {
return target_th->ec->local_storage_recursive_hash;
}
else if (id && target_th->ec->local_storage &&
rb_id_table_lookup(target_th->ec->local_storage, id, &val)) {
return val;
}
else if (block_given) {
return rb_yield(key);
}
else if (argc == 1) {
rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key);
}
else {
return argv[1];
}
}
返回给定键的 fiber-local。如果找不到该键,有几种选项:没有其他参数时,将引发 KeyError 异常;如果给定了 default,则将返回该值;如果指定了可选代码块,则将运行该代码块并返回其结果。请参阅 Thread#[] 和 Hash#fetch。
Source
VALUE
rb_thread_group(VALUE thread)
{
return rb_thread_ptr(thread)->thgroup;
}
返回包含给定线程的 ThreadGroup。
Thread.main.group #=> #<ThreadGroup:0x4029d914>
Source
static VALUE
thread_join_m(int argc, VALUE *argv, VALUE self)
{
VALUE timeout = Qnil;
rb_hrtime_t rel = 0, *limit = 0;
if (rb_check_arity(argc, 0, 1)) {
timeout = argv[0];
}
// Convert the timeout eagerly, so it's always converted and deterministic
/*
* This supports INFINITY and negative values, so we can't use
* rb_time_interval right now...
*/
if (NIL_P(timeout)) {
/* unlimited */
}
else if (FIXNUM_P(timeout)) {
rel = rb_sec2hrtime(NUM2TIMET(timeout));
limit = &rel;
}
else {
limit = double2hrtime(&rel, rb_num2dbl(timeout));
}
return thread_join(rb_thread_ptr(self), timeout, limit);
}
调用线程将暂停执行并运行此 thr。
直到 thr 退出或给定的 limit 秒已过才会返回。
如果时间限制到期,则返回 nil,否则返回 thr。
未连接的线程将在主程序退出时被杀死。
如果 thr 之前引发了异常,并且 ::abort_on_exception 或 $DEBUG 标志未设置(因此异常尚未被处理),则将在此时处理。
a = Thread.new { print "a"; sleep(10); print "b"; print "c" } x = Thread.new { print "x"; Thread.pass; print "y"; print "z" } x.join # Let thread x finish, thread a will be killed on exit. #=> "axyz"
以下示例说明了 limit 参数。
y = Thread.new { 4.times { sleep 0.1; puts 'tick... ' }} puts "Waiting" until y.join(0.15)
这将产生:
tick... Waiting tick... Waiting tick... tick...
Source
static VALUE
rb_thread_key_p(VALUE self, VALUE key)
{
VALUE val;
ID id = rb_check_id(&key);
struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage;
if (!id || local_storage == NULL) {
return Qfalse;
}
return RBOOL(rb_id_table_lookup(local_storage, id, &val));
}
如果给定的字符串(或符号)作为 fiber-local 变量存在,则返回 true。
me = Thread.current me[:oliver] = "a" me.key?(:oliver) #=> true me.key?(:stanley) #=> false
Source
static VALUE
rb_thread_keys(VALUE self)
{
struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage;
VALUE ary = rb_ary_new();
if (local_storage) {
rb_id_table_foreach(local_storage, thread_keys_i, (void *)ary);
}
return ary;
}
返回 fiber-local 变量名称的数组(作为 Symbols)。
thr = Thread.new do Thread.current[:cat] = 'meow' Thread.current["dog"] = 'woof' end thr.join #=> #<Thread:0x401b3f10 dead> thr.keys #=> [:dog, :cat]
Source
VALUE
rb_thread_kill(VALUE thread)
{
rb_thread_t *target_th = rb_thread_ptr(thread);
if (target_th->to_kill || target_th->status == THREAD_KILLED) {
return thread;
}
if (target_th == target_th->vm->ractor.main_thread) {
rb_exit(EXIT_SUCCESS);
}
RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(target_th));
if (target_th == GET_THREAD()) {
/* kill myself immediately */
rb_threadptr_to_kill(target_th);
}
else {
threadptr_check_pending_interrupt_queue(target_th);
rb_threadptr_pending_interrupt_enque(target_th, RUBY_FATAL_THREAD_KILLED);
rb_threadptr_interrupt(target_th);
}
return thread;
}
终止 thr 并调度另一个线程运行,返回已终止的 Thread。如果是主线程或最后一个线程,则退出进程。请注意,如果接收方与当前运行的线程不同,则调用者不必等待线程终止。终止是异步的,线程在退出前仍可能运行少量 Ruby 代码。
Source
static VALUE
rb_thread_getname(VALUE thread)
{
return rb_thread_ptr(thread)->name;
}
显示线程的名称。
Source
static VALUE
rb_thread_setname(VALUE thread, VALUE name)
{
rb_thread_t *target_th = rb_thread_ptr(thread);
if (!NIL_P(name)) {
rb_encoding *enc;
StringValueCStr(name);
enc = rb_enc_get(name);
if (!rb_enc_asciicompat(enc)) {
rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
rb_enc_name(enc));
}
name = rb_str_new_frozen(name);
}
target_th->name = name;
if (threadptr_initialized(target_th) && target_th->has_dedicated_nt) {
native_set_another_thread_name(target_th->nt->thread_id, name);
}
return name;
}
将给定的名称设置给 ruby 线程。在某些平台上,它可能会将名称设置给 pthread 和/或 kernel。
Source
static VALUE
rb_thread_native_thread_id(VALUE thread)
{
rb_thread_t *target_th = rb_thread_ptr(thread);
if (rb_threadptr_dead(target_th)) return Qnil;
return native_thread_native_thread_id(target_th);
}
返回 Ruby 线程使用的本机线程 ID。
ID 取决于操作系统。(不是 pthread_self(3) 返回的 POSIX 线程 ID)
-
在 Linux 上,它是 gettid(2) 返回的 TID。
-
在 macOS 上,它是 pthread_threadid_np(3) 返回的系统级唯一整数 ID。
-
在 FreeBSD 上,它是 pthread_getthreadid_np(3) 返回的线程唯一整数 ID。
-
在 Windows 上,它是 GetThreadId() 返回的线程标识符。
-
在其他平台上,它会引发
NotImplementedError。
注意:如果线程尚未与本机线程关联或已解除关联,则返回 nil。如果 Ruby 实现使用 M:N 线程模型,ID 可能会随时间变化。
Source
static VALUE
rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread)
{
rb_thread_t *target_th = rb_thread_ptr(target_thread);
if (!target_th->pending_interrupt_queue) {
return Qfalse;
}
if (rb_threadptr_pending_interrupt_empty_p(target_th)) {
return Qfalse;
}
if (rb_check_arity(argc, 0, 1)) {
VALUE err = argv[0];
if (!rb_obj_is_kind_of(err, rb_cModule)) {
rb_raise(rb_eTypeError, "class or module required for rescue clause");
}
return RBOOL(rb_threadptr_pending_interrupt_include_p(target_th, err));
}
else {
return Qtrue;
}
}
Source
static VALUE
rb_thread_priority(VALUE thread)
{
return INT2NUM(rb_thread_ptr(thread)->priority);
}
返回 thr 的优先级。默认值是从创建新线程的当前线程继承的,或者对于初始主线程为零;高优先级线程的运行频率高于低优先级线程(但低优先级线程也可以运行)。
这只是 Ruby 线程调度器的一个提示。在某些平台上可能被忽略。
Thread.current.priority #=> 0
Source
static VALUE
rb_thread_priority_set(VALUE thread, VALUE prio)
{
rb_thread_t *target_th = rb_thread_ptr(thread);
int priority;
#if USE_NATIVE_THREAD_PRIORITY
target_th->priority = NUM2INT(prio);
native_thread_apply_priority(th);
#else
priority = NUM2INT(prio);
if (priority > RUBY_THREAD_PRIORITY_MAX) {
priority = RUBY_THREAD_PRIORITY_MAX;
}
else if (priority < RUBY_THREAD_PRIORITY_MIN) {
priority = RUBY_THREAD_PRIORITY_MIN;
}
target_th->priority = (int8_t)priority;
#endif
return INT2NUM(target_th->priority);
}
将 thr 的优先级设置为 integer。高优先级线程的运行频率高于低优先级线程(但低优先级线程也可以运行)。
这只是 Ruby 线程调度器的一个提示。在某些平台上可能被忽略。
count1 = count2 = 0 a = Thread.new do loop { count1 += 1 } end a.priority = -1 b = Thread.new do loop { count2 += 1 } end b.priority = -2 sleep 1 #=> 1 count1 #=> 622504 count2 #=> 5832
Source
static VALUE
thread_raise_m(int argc, VALUE *argv, VALUE self)
{
rb_thread_t *target_th = rb_thread_ptr(self);
const rb_thread_t *current_th = GET_THREAD();
threadptr_check_pending_interrupt_queue(target_th);
rb_threadptr_raise(target_th, argc, argv);
/* To perform Thread.current.raise as Kernel.raise */
if (current_th == target_th) {
RUBY_VM_CHECK_INTS(target_th->ec);
}
return Qnil;
}
从给定线程引发异常。调用者不必是 thr。有关参数的更多信息,请参阅 Kernel#raise。
Thread.abort_on_exception = true a = Thread.new { sleep(200) } a.raise("Gotcha")
这将产生:
prog.rb:3: Gotcha (RuntimeError) from prog.rb:2:in `initialize' from prog.rb:2:in `new' from prog.rb:2
Source
static VALUE
rb_thread_report_exc(VALUE thread)
{
return RBOOL(rb_thread_ptr(thread)->report_on_exception);
}
返回此 thr 的 thread-local “report on exception”条件的当前状态。
创建 Thread 时的默认值为全局标志 Thread.report_on_exception 的值。
另请参阅 report_on_exception=。
还有一个类级别的函数可以为所有新线程设置此项,请参阅 ::report_on_exception=。
Source
static VALUE
rb_thread_report_exc_set(VALUE thread, VALUE val)
{
rb_thread_ptr(thread)->report_on_exception = RTEST(val);
return val;
}
当设置为 true 时,如果异常导致此 thr 中止,则会在 $stderr 上打印一条消息。有关详细信息,请参阅 ::report_on_exception。
另请参阅 report_on_exception。
还有一个类级别的函数可以为所有新线程设置此项,请参阅 ::report_on_exception=。
Source
VALUE
rb_thread_run(VALUE thread)
{
rb_thread_wakeup(thread);
rb_thread_schedule();
return thread;
}
唤醒 thr,使其可调度。
a = Thread.new { puts "a"; Thread.stop; puts "c" } sleep 0.1 while a.status!='sleep' puts "Got here" a.run a.join
这将产生:
a Got here c
另请参阅实例方法 wakeup。
Source
static VALUE
thread_set_trace_func_m(VALUE target_thread, VALUE trace)
{
rb_execution_context_t *ec = GET_EC();
rb_thread_t *target_th = rb_thread_ptr(target_thread);
rb_threadptr_remove_event_hook(ec, target_th, call_trace_func, Qundef);
if (NIL_P(trace)) {
return Qnil;
}
else {
thread_add_trace_func(ec, target_th, trace);
return trace;
}
}
在 thr 上将 proc 设置为跟踪的处理程序,或者在参数为 nil 时禁用跟踪。
Source
static VALUE
rb_thread_status(VALUE thread)
{
rb_thread_t *target_th = rb_thread_ptr(thread);
if (rb_threadptr_dead(target_th)) {
if (!NIL_P(target_th->ec->errinfo) &&
!FIXNUM_P(target_th->ec->errinfo)) {
return Qnil;
}
else {
return Qfalse;
}
}
else {
return rb_str_new2(thread_status_name(target_th, FALSE));
}
}
返回 thr 的状态。
"sleep"-
如果此线程正在睡眠或等待 I/O,则返回此值。
"run"-
当此线程正在执行时。
"aborting"-
如果此线程正在中止。
false-
当此线程正常终止时。
nil-
如果通过异常终止。
a = Thread.new { raise("die now") } b = Thread.new { Thread.stop } c = Thread.new { Thread.exit } d = Thread.new { sleep } d.kill #=> #<Thread:0x401b3678 aborting> a.status #=> nil b.status #=> "sleep" c.status #=> false d.status #=> "aborting" Thread.current.status #=> "run"
Source
static VALUE
rb_thread_stop_p(VALUE thread)
{
rb_thread_t *th = rb_thread_ptr(thread);
if (rb_threadptr_dead(th)) {
return Qtrue;
}
return RBOOL(th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER);
}
终止 thr 并调度另一个线程运行,返回已终止的 Thread。如果是主线程或最后一个线程,则退出进程。请注意,如果接收方与当前运行的线程不同,则调用者不必等待线程终止。终止是异步的,线程在退出前仍可能运行少量 Ruby 代码。
Source
static VALUE
rb_thread_variable_p(VALUE thread, VALUE key)
{
VALUE locals;
VALUE symbol = rb_to_symbol(key);
if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
return Qfalse;
}
locals = rb_thread_local_storage(thread);
return RBOOL(rb_hash_lookup(locals, symbol) != Qnil);
}
如果给定的字符串(或符号)作为 thread-local 变量存在,则返回 true。
me = Thread.current me.thread_variable_set(:oliver, "a") me.thread_variable?(:oliver) #=> true me.thread_variable?(:stanley) #=> false
请注意,这些不是 fiber local 变量。有关更多详细信息,请参阅 Thread#[] 和 Thread#thread_variable_get。
Source
static VALUE
rb_thread_variable_get(VALUE thread, VALUE key)
{
VALUE locals;
VALUE symbol = rb_to_symbol(key);
if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
return Qnil;
}
locals = rb_thread_local_storage(thread);
return rb_hash_aref(locals, symbol);
}
返回已设置的 thread-local 变量的值。请注意,这些与 fiber local 值不同。有关 fiber local 值,请参阅 Thread#[] 和 Thread#[]=。
Thread local 值会随线程一起传递,并且不尊重 fiber。例如:
Thread.new { Thread.current.thread_variable_set("foo", "bar") # set a thread local Thread.current["foo"] = "bar" # set a fiber local Fiber.new { Fiber.yield [ Thread.current.thread_variable_get("foo"), # get the thread local Thread.current["foo"], # get the fiber local ] }.resume }.join.value # => ['bar', nil]
thread local 值返回“bar”,而 fiber local 值返回 nil。Fiber 在同一个线程中执行,因此 thread local 值可用。
Source
static VALUE
rb_thread_variable_set(VALUE thread, VALUE key, VALUE val)
{
VALUE locals;
if (OBJ_FROZEN(thread)) {
rb_frozen_error_raise(thread, "can't modify frozen thread locals");
}
locals = rb_thread_local_storage(thread);
return rb_hash_aset(locals, rb_to_symbol(key), val);
}
将 key 的 thread local 设置为 value。请注意,这些是线程局部的,而不是 fiber 局部的。有关更多信息,请参阅 Thread#thread_variable_get 和 Thread#[]。
Source
static VALUE
rb_thread_variables(VALUE thread)
{
VALUE locals;
VALUE ary;
ary = rb_ary_new();
if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
return ary;
}
locals = rb_thread_local_storage(thread);
rb_hash_foreach(locals, keys_i, ary);
return ary;
}
返回 thread-local 变量名称的数组(作为 Symbols)。
thr = Thread.new do Thread.current.thread_variable_set(:cat, 'meow') Thread.current.thread_variable_set("dog", 'woof') end thr.join #=> #<Thread:0x401b3f10 dead> thr.thread_variables #=> [:dog, :cat]
请注意,这些不是 fiber local 变量。有关更多详细信息,请参阅 Thread#[] 和 Thread#thread_variable_get。
Source
static VALUE
rb_thread_to_s(VALUE thread)
{
VALUE cname = rb_class_path(rb_obj_class(thread));
rb_thread_t *target_th = rb_thread_ptr(thread);
const char *status;
VALUE str, loc;
status = thread_status_name(target_th, TRUE);
str = rb_sprintf("#<%"PRIsVALUE":%p", cname, (void *)thread);
if (!NIL_P(target_th->name)) {
rb_str_catf(str, "@%"PRIsVALUE, target_th->name);
}
if ((loc = threadptr_invoke_proc_location(target_th)) != Qnil) {
rb_str_catf(str, " %"PRIsVALUE":%"PRIsVALUE,
RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
}
rb_str_catf(str, " %s>", status);
return str;
}
将 thr 的名称、ID 和状态转储到字符串。
Source
static VALUE
thread_value(VALUE self)
{
rb_thread_t *th = rb_thread_ptr(self);
thread_join(th, Qnil, 0);
if (UNDEF_P(th->value)) {
// If the thread is dead because we forked th->value is still Qundef.
return Qnil;
}
return th->value;
}
等待 thr 完成(使用 join),并返回其值或引发终止该线程的异常。
a = Thread.new { 2 + 2 } a.value #=> 4 b = Thread.new { raise 'something went wrong' } b.value #=> RuntimeError: something went wrong
Source
VALUE
rb_thread_wakeup(VALUE thread)
{
if (!RTEST(rb_thread_wakeup_alive(thread))) {
rb_raise(rb_eThreadError, "killed thread");
}
return thread;
}
将给定线程标记为可调度,但是它可能仍然阻塞在 I/O 上。
注意:这不会调用调度器,有关更多信息,请参阅 run。
c = Thread.new { Thread.stop; puts "hey!" } sleep 0.1 while c.status!='sleep' c.wakeup c.join #=> "hey!"