Ruby - 多线程

Ruby - 多线程 首页 / Ruby入门教程 / Ruby - 多线程

传统程序只有一个执行线程,构成该程序的语句或指令将顺序执行,直到程序终止。

多线程程序具有多个执行线程。在每个线程中,语句是按顺序执行的,但是线程本身可以在如多核CPU上并行执行。通常在单个CPU计算机上,实际上不是并行执行多个线程,而是通过交错执行线程来模拟并行性。

Ruby使使用 Thread 类编写多线程程序变得容易。 Ruby线程是在代码中实现并发的轻量级高效方法。

创建线程

要启动新线程,只需将一个块与对 Thread.new的调用相关联。将创建一个新线程来执行块中的代码,原始线程将立即从 Thread.new 返回并使用下一条语句恢复执行-

# Thread #1 is running here
Thread.new {
   # Thread #2 runs this code
}
# Thread #1 runs this code

这是一个示例,显示了如何使用多线程Ruby程序。

#!/usr/bin/ruby

def func1
   i=0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i=i+1
   end
end

def func2
   j=0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j=j+1
   end
end

puts "Started At #{Time.now}"
t1=Thread.new{func1()}
t2=Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"

这将产生以下输出-

Started At 2021-03-09 00:15:08 +0800
func1 at: 2021-03-09 00:15:08 +0800
func2 at: 2021-03-09 00:15:08 +0800
func2 at: 2021-03-09 00:15:09 +0800
func1 at: 2021-03-09 00:15:10 +0800
func2 at: 2021-03-09 00:15:10 +0800
func1 at: 2021-03-09 00:15:12 +0800
End at 2021-03-09 00:15:14 +0800

线程生命周期

使用 Thread.new 创建一个新线程。您还可以使用同义词 Thread.start 和 Thread.fork 。

创建线程后无需启动线程,它在CPU资源可用时自动开始运行。

Thread类定义了许多在线程运行时查询和操作线程的方法。线程在与对 Thread.new的调用相关联的块中运行代码,然后停止运行。

您可以通过调用特定线程的 Thread.join 方法来等待该线程完成。调用线程将阻塞,直到给定线程完成。

线程和异常

如果在主线程中引发了异常,并且没有在任何地方处理异常,则Ruby解释器将显示一条消息并退出。在除主线程之外的其他线程中,未处理的异常会导致线程停止运行。

如果线程 t 由于未处理的异常而退出,而另一个线程 s 调用 t.join或t.value ,则发生的异常在 t 中在线程 s 中引发。

如果 Thread.abort_on_exception是 false (默认条件),则未处理的异常只会杀死当前线程,其余所有线程继续运行。

如果希望任何线程中任何未处理的异常导致解释器退出,请将类方法 Thread.abort_on_exception 设置为 true 。

t=Thread.new { ... }
t.abort_on_exception=true

线程变量

创建线程时,线程通常可以访问范围内的任何变量。线程块本地的变量是线程本地的,并且不共享。

线程类具有特殊的函数,该函数允许按名称创建和访问线程局部变量。您只需将线程对象视为哈希对象即可,使用[] =写入元素,然后使用[]读回它们。

在此示例中,每个线程都使用键 mycount 将变量计数的当前值记录在一个线程局部变量中。

#!/usr/bin/ruby

count=0
arr=[]

10.times do |i|
   arr[i]=Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"]=count
      count += 1
   }
end

arr.each {|t| t.join; print t["mycount"], ", " }
puts "count=#{count}"

这产生以下输出-

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count=10

主线程等待子线程完成,然后打印出每个子线程捕获的 count 值。

线程优先级

影响线程调度的第一个因素是线程优先级:高优先级线程先于低优先级线程进行调度。更准确地说,只有在没有更高优先级的线程等待运行时,线程才会获得CPU时间。

您可以使用 priority=priority 设置和查询Ruby Thread对象的优先级。新创建的线程与创建它的线程具有相同的优先级。主线程从优先级0开始。

线程锁

如果两个线程共享对同一数据的访问,并且至少有一个线程修改了该数据,则必须格外小心,以确保没有一个线程能够看到处于不一致状态的数据。这称为线程排除。

Mutex 是一个实现简单信号灯锁的类,用于互斥访问某些共享资源。也就是说,在给定的时间只有一个线程可以持有该锁。其他线程可能选择排队等待该锁可用,或者可能只是选择立即获得一个指示该锁不可用的错误。

通过将对共享数据的所有访问置于 mutex 的控制之下,无涯教程确保了一致性和原子操作。让无涯教程尝试示例,第一个不带mutax,第二个带mutax-

无Mutax锁

#!/usr/bin/ruby
require 'thread'

count1=count2=0
difference=0
counter=Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy=Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

这将产生以下输出-

count1 :  1583766
count2 :  1583766
difference : 0

有Mutax锁

#!/usr/bin/ruby
require 'thread'
mutex=Mutex.new

count1=count2=0
difference=0
counter=Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
   end
end
spy=Thread.new do
   loop do
      mutex.synchronize do
         difference += (count1 - count2).abs
      end
   end
end
sleep 1
mutex.lock
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

这将产生以下输出-

count1 :  696591
count2 :  696591
difference : 0

处理死锁

当开始使用 Mutex 对象进行线程排除时,必须小心避免 deadlock 。死锁是所有线程都在等待获取另一个线程拥有的资源时发生的情况。由于所有线程均被阻止,因此它们无法释放所持有的锁。并且由于它们无法释放锁,因此其他线程也无法获取这些锁。

这是条件变量出现的地方。 条件变量只是与资源相关联的信号量,并用于保护特定 mutex 。当您需要不可用的资源时,请等待条件变量。该操作将释放对相应 mutex 的锁定。当其他一些线程发出信号指示资源可用时,原始线程将退出等待状态,并同时重新获得对关键区域的锁定。

#!/usr/bin/ruby
require 'thread'
mutex=Mutex.new

cv=ConditionVariable.new
a=Thread.new {
   mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
   }
}

puts "(Later, back at the ranch...)"

b=Thread.new {
   mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
   }
}
a.join
b.join

这将产生以下输出-

A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!

线程状态

下表显示了与五个可能状态相对应的五个可能返回值。 status 方法返回线程的状态。

线程状态返回值
Runnable运行中
Sleeping休眠中
Aborting已停止
Terminated normally false
Terminated with exceptino nil

线程类方法

Thread 类提供了以下方法,它们适用于程序中所有可用的线程。这些方法将被称为使用 Thread 类名,如下所示-

Thread.abort_on_exception=true

线程方法

这些方法适用于线程的。这些方法将被称为使用 Thread 的,如下所示-

#!/usr/bin/ruby

thr=Thread.new do   # Calling a class method new
   puts "In second thread"
   raise "Raise exception"
end
thr.join   # Calling an instance method join

公开方法

MethodDescription
abort_on_exception它返回全局“abort on exception”条件的状态。默认值为true。设置为true时,如果任何线程中引发异常,则所有线程将中止。
abort_on_exception=设置为true时,如果引发异常,则所有线程将中止。它返回新状态。
current它返回当前正在执行的线程。
exclusive{block}它将块包装为单个,返回块的值。
exit它终止当前正在运行的线程并安排另一个线程运行。
kill(thread)它导致给定线程退出。
fork([args]*){|args| block}它基本上与::new方法相同。
handle_interrupt(hash){...}更改异步中断时序。
list返回可运行或已停止的所有线程的线程对象数组。
main返回主线程。
new{...}/ new(*args, &proc)/ new(*args){|args|...}它创建一个执行给定块的新线程。
pass它为线程调度程序提供了将执行传递给另一个线程的提示。正在运行的线程可能会切换,也可能不会切换,这取决于操作系统。
pending_interrupt?(error = nil)它返回异步队列是否为空。
start([args]*){|args|block}它基本上与::new方法相同。
stop它停止当前线程的执行,使其进入“sleep”睡眠状态并计划另一个线程的执行。

公共实例方法

MethodDescription
thr[sym]它使用字符串或符号名称返回线程局部变量的值。
thr[sym]=它使用字符串或符号名称创建线程局部变量的值。
abort_on_exception它返回线程的“异常终止”状态。
abort_on_exception=设置为true时,如果此线程中引发异常,则所有线程将中止。
add_trace_func(proc)将proc添加为跟踪处理程序。
alive?如果线程正在运行或正在睡眠,则返回true。
backtrace它返回当前目标的回溯。
backtrace_locations(*args)它返回前面目标的执行堆栈。
exit/kill/terminate它终止线程并执行另一个线程来运行。
group它返回包含给定线程的ThreadGroup或返回nil。
inspect它将thr的名称,id和状态转储到字符串中。
join调用线程将暂停执行并运行此程序。
key?(sym)如果给定的字符串作为线程局部变量存在,则返回true。
keys它返回一个线程局部变量名称的数组。
pending_interrupt?(error=nil)返回异步队列对于目标线程是否为空。
priority它返回线程的优先级。
priority=它将线程的优先级设置为整数。
kill它的作用与退出相同。
raise它从给定线程中引发异常。
run它唤醒线程,使它有资格进行调度。
safe_level它返回线程有效的安全级别。
set_trace_func(proc)它在线程上建立proc作为处理程序。
status它返回线程的状态。
stop?如果线程正在休眠或已死,则返回true。
terminate它终止线程并安排另一个线程运行。
thread_variable?(key)如果给定的字符串作为线程局部变量存在,则返回true。
thread_variable_get(key)t返回已设置的线程局部变量的值。
thread_variable_set(key, value)将本地线程的键设置为value。
thread_variable它返回线程局部变量的数组。
value它使用连接等待线程完成,并返回其值。
wakeup使给定线程有资格进行调度,尽管该线程可能仍在I/O上处于阻塞状态。

祝学习愉快!(内容编辑有误?请选中要编辑内容 -> 右键 -> 修改 -> 提交!)

技术教程推荐

从0开始学架构 -〔李运华〕

深入浅出云计算 -〔何恺铎〕

Spring编程常见错误50例 -〔傅健〕

大数据经典论文解读 -〔徐文浩〕

编程高手必学的内存知识 -〔海纳〕

攻克视频技术 -〔李江〕

搞定音频技术 -〔冯建元 〕

徐昊 · TDD项目实战70讲 -〔徐昊〕

结构思考力 · 透过结构看问题解决 -〔李忠秋〕

好记忆不如烂笔头。留下您的足迹吧 :)