Ruby 多线程

默认情况下,Ruby 程序一次只运行一个任务。这意味着如果代码的某个部分需要很长时间才能完成,其余部分就必须等待。

但是,如果我们想同时做多件事情,比如在显示加载动画的同时下载文件呢?

这时就需要用到**多线程**了。

什么是线程?

**线程**就像是我们 Ruby 程序内部运行的一个迷你程序。通过多线程,我们可以同时运行多个线程。

在 Ruby 中,我们可以使用 `Thread` 类来创建线程。


在 Ruby 中创建线程

我们使用 `Thread.new` 方法来创建一个新线程。它的语法是这样的:

Thread.new do
  # Code to run in a new thread
end

此语法在我们的主程序中创建一个**子线程**。

但在我们使用此语法创建实际线程之前,让我们先了解主线程和子线程的区别。

主线程与子线程

当我们运行一个 Ruby 程序时,它总是从一个**主线程**开始。我们使用 `Thread.new` 创建的任何线程都是**子线程**。例如:

# Create a new thread (child thread)
new_thread = Thread.new do
  puts "Welcome to a new thread!"
  puts "Exiting the thread..."
end

# Main thread starts here

# Wait for the child thread to finish
new_thread.join

# Print a message
puts "Welcome to the main program."

输出

Welcome to a new thread!
Exiting the thread...
Welcome to the main program.

在这里,我们创建了一个名为 `new_thread` 的子线程,它会打印一组消息。

`join` 方法告诉 Ruby 在继续执行主线程之前等待该线程完成。

**提示:** 尝试从这个程序中移除 `join` 方法。你会发现执行会立即继续到主程序。


使用 Sleep 的并发线程

我们可以使用 `sleep` 方法将我们的线程暂停指定的时间。让我们用它来使主线程和子线程并发执行:

# Child thread
thread = Thread.new do
  # Loop code 3 times
  3.times do
    puts "Child thread working..."

    # Pause the thread for 1 second
    sleep(1)
  end
end

# Main thread

# Loop code 3 times
3.times do
  puts "Main thread working..."

  # Pause the thread for 1 second
  sleep(1)
end

# Wait for the child thread to end
thread.join

示例输出

Main thread working...
Child thread working...
Main thread working...
Child thread working...
Main thread working...
Child thread working...

在这里,主线程和子线程都分别运行了三次 `times` 循环。在循环的每次迭代中,线程会暂停一秒钟。

1. 'join' 方法的作用

请注意,我们只在程序末尾使用了 `join` 方法。因此,**主线程**在等待**子线程**完成之前就开始了自己的循环。

2. 'sleep' 方法的作用

由于**主线程**循环中的 `sleep` 方法,**子线程**将在暂停之间执行。

同样,**子线程**中的 `sleep` 方法可确保**主线程**在暂停之间执行。

3. 并发线程

结果,主线程和子线程并发(交错)执行。这意味着来自主线程和子线程的输出会混合出现(不是先出现一个线程的所有输出,然后再出现另一个线程的)。

注意事项

  • Ruby 不保证精确的交错或时序。线程调度取决于 Ruby 实现和操作系统。
  • 在标准 Ruby (MRI) 中,全局解释器锁 (GIL) 会阻止 CPU 密集型任务的真正并行执行。但是,线程对于 I/O 密集型任务(即涉及输入/输出的任务)仍然有用。
  • 你可以在程序末尾删除 `thread.join`,它仍然可以工作。但是,不能保证主线程会等待子线程完全执行。因此,使用 `join` 更安全。

示例 1:基本的 Ruby 多线程

thread = Thread.new do
  puts "Thread starting..."
  sleep(2)
  puts "Thread done!"
end

puts "Waiting for thread..."

thread.join

puts "All done!"

输出

Waiting for thread...
Thread starting...
Thread done!
All done!

此程序的工作原理如下:

  1. **主线程**首先执行 `puts "Waiting for thread..."`。
  2. 然后它遇到 `thread.join`。因此,它会等待**子线程**(存储在 `thread` 变量中)完成。
  3. 在**子线程**中,首先执行 `puts "Thread starting..."`。
  4. **子线程**随后因 `sleep(2)` 而暂停两秒。
  5. 暂停后,执行 `puts "Thread done!"`。
  6. 由于**子线程**已完成执行,程序会返回到**主线程**。
  7. 最后,主线程执行 `puts "All done!"`。

示例 2:运行多个线程

thread1 = Thread.new do
  puts "Thread 1 starting"
  sleep(1)
  puts "Thread 1 finished"
end

thread2 = Thread.new do
  puts "Thread 2 starting"
  sleep(1)
  puts "Thread 2 finished"
end

thread3 = Thread.new do
  puts "Thread 3 starting"
  sleep(1)
  puts "Thread 3 finished"
end

# Wait for all threads to finish
thread1.join
thread2.join
thread3.join

puts "\nAll threads finished!"

示例输出

Thread 1 starting
Thread 2 starting
Thread 3 starting
Thread 1 finished
Thread 2 finished
Thread 3 finished

All threads finished!

在这里,我们创建了三个线程(`thread1`、`thread2` 和 `thread3`)并等待它们全部完成。

请注意,我们是单独创建线程的,这导致了长串的代码。

thread1 = Thread.new do
  # Code for thread1
end

thread2 = Thread.new do
  # Code for thread2
end

thread3 = Thread.new do
  # Code for thread3
end

我们可以在下一个示例中看到,通过在循环中创建线程来解决这个问题。


示例 3:使用循环和数组创建多个线程

让我们通过在循环中创建线程并将它们添加到数组中来写一个更简洁的版本。

# Create an array to store multiple threads
threads = []

# Run a loop 3 times to create 3 separate threads
3.times do |i|
    
  # Create a new thread and
  # add it to the threads array
  threads << Thread.new do
    puts "Thread #{i + 1} starting"
    sleep(1)
    puts "Thread #{i + 1} finished"
  end
end

# Iterate through the array
# Wait for each thread to execute
threads.each(&:join)

puts "\nAll threads finished!"

示例输出

Thread 1 starting
Thread 2 starting
Thread 3 starting
Thread 2 finished
Thread 1 finished
Thread 3 finished

All threads finished!

1. 使用循环创建线程

首先,我们运行了一个 `times` 循环三次来创建三个线程。在循环的每次迭代中,都会创建一个新线程并将其添加到 `threads` 数组中。

3.times do |i|
  threads << Thread.new do
    # Thread code
  end
end

2. 主线程

在主线程中,我们使用 `each` 循环遍历 `threads` 数组。

在循环的每次迭代中,我们使用 `join` 和相应的线程来确保线程被完全执行。

threads.each(&:join)

上面的代码是以下代码的简写:

threads.each { |thread| thread.join }

你可以用上面传统的 `each` 循环代码替换示例中的 `each` 循环简写。

**注意:** 此程序的输出每次执行可能会有所不同。


共享数据和竞态条件

所有线程共享相同的内存。因此,如果多个线程同时尝试修改同一个变量,可能会发生意外情况。例如:

# Create a counter variable
counter = 0

# Loop 3 times to create 3 threads
# And append them to the threads array
threads = 3.times.map do
  Thread.new do
    10.times do
      # Increment counter
      counter += 1
    end
  end
end

# Wait for the threads to execute
threads.each(&:join)

# Print the value of counter
puts "Counter: #{counter}"

预期输出

Counter: 30

可能的输出 1

Counter: 26

可能的输出 2

Counter: 29

在这里,我们在**主线程**中创建了 `counter` 变量,并将其初始化为 **0**。因此,此变量将被所有**子线程**共享。

1. 创建线程

然后,我们使用以下代码创建了三个**子线程**:

threads = 3.times.map do
  Thread.new do
    # Code
  end
end

我们使用 `map` 通过收集每个 `Thread.new` 调用的结果来创建一个线程数组。

每个线程都运行自己的 `times` 循环,包含**10**次迭代。在循环的每次迭代中,`counter` 变量的值增加 **1**。

Thread.new do
  10.times do
    # Increment counter
    counter += 1
  end
end

2. 理想情况

理想情况下,每个线程都在其他线程的干扰下执行,并以如下方式更新 `counter` 变量:

线程 循环操作 counter (最终值)
1 将 `counter` 的值递增**10**次。 10
2 将 `counter` 的值递增**10**次。 20
3 将 `counter` 的值递增**10**次。 30

3. 实际情况

实际上,**多个线程可能同时尝试更新** `counter` 变量。

例如,如果第一个线程将 `counter` 的值更新为 **6**,则另一个线程也可能同时将其值更新为 **6**。

因此,第一个线程的更新会被覆盖,而不是第一个线程将 `counter` 更新为 **6**,然后另一个线程将其更新为 **7**。

在这种情况下,`counter` 将更新为 **6**,最终输出将小于 **30**。

**注意:** 上述情况称为**竞态条件**,即多个线程在没有协调的情况下尝试**读取、修改和写入**共享资源。

在 Ruby 中,我们可以使用 `Mutex` 来防止竞态条件。


使用 Mutex 防止竞态条件

为防止竞态条件,我们使用 `Mutex`(互斥),它会锁定一段代码,一次只有一个线程可以执行它。

你可以将 `Mutex` 想象成卫生间门上的锁。一次只有一个人(线程)可以进去。

通常,当多个线程并发运行时,我们使用 `Mutex` 来**控制对共享资源的访问**。

语法

mutex_object = Mutex.new

# Other code

mutex_object.synchronize do
  # Code that accesses shared resources
end

示例 4:Ruby Mutex

现在,让我们使用 `Mutex` 来防止我们在之前的程序中出现的竞态条件:

counter = 0

# Create a Mutex object
mutex = Mutex.new

threads = 3.times.map do
  Thread.new do
    10.times do

      # Update counter inside the Mutex block
      mutex.synchronize do
        counter += 1
      end

    end
  end
end

threads.each(&:join)
puts "Counter: #{counter}"

输出

Counter: 30

在这里,我们创建了一个名为 `mutex` 的 `Mutex` 对象。然后,我们使用此对象在线程访问 `counter` 变量(因为它是共享资源)时锁定该线程。

mutex.synchronize do
  counter += 1
end

因此,当一个线程到达代码的这一部分时,它会尝试锁定 `mutex.synchronize do` 中的代码块。

  • 如果 `mutex` 已经被另一个线程锁定,则当前线程将等待直到它被解锁。
  • 一旦获取(锁定)`mutex`,线程将执行该块并将 `counter` 增加 **1**。
  • 块完成后,`mutex` 会自动解锁,允许其他等待的线程获取它。

由于没有线程干扰另一个线程的执行,我们**始终**会得到以下输出:

Counter: 30

处理线程中的错误

如果在线程中发生错误,它不会导致主程序崩溃,但**线程会停止**。例如:

# A thread that raises an error
Thread.new do
  raise "Something went wrong"
end

# Pause main thread for 1 second
# So that the child thread can execute
sleep(1)

puts "Main program continues"

输出

#<Thread:0x00007c501ef63bd8 /tmp/ZXZLrHrmDX/main.rb:1 run> terminated with exception (report_on_exception is true):
/tmp/ZXZLrHrmDX/main.rb:2:in 'block in <main>': Something went wrong (RuntimeError)
Main program continues

在这里,我们创建了一个使用 `raise` 关键字手动引发错误的线程。这会停止线程的执行。

你可以使用 `begin...rescue` 来处理此类异常

Thread.new do

  # Put code that may cause error inside 'begin'
  begin
    raise "Error in thread"

  # Rescue the error using object 'e'
  rescue => e
    puts "Caught error: #{e.message}"

  end
end

sleep(1)

puts "Main program continues"

输出

ERROR!
Caught error: Error in thread
Main program continues

在这里,我们将可能导致错误的代码放在 `begin` 语句中。如果发生错误,`rescue` 块会通过打印错误消息来处理异常。

这样,即使线程导致了错误,它的执行也不会突然停止。

如需了解更多信息,请访问 Ruby 异常处理


更多关于 Ruby 多线程

为什么要使用线程,何时避免它们?

1. 何时使用线程

当你想要执行以下操作时,请使用线程:

  • 同时执行多个任务。
  • 避免阻塞主程序(例如,在长时间运行的任务中)。
  • 提高 I/O 密集型程序的性能。

**注意:** Ruby 使用**全局解释器锁 (GIL)**,这限制了 CPU 密集型任务的真正并行性。但是,线程仍然有助于处理等待输入/输出的任务(如文件操作、网络请求或数据库访问)。

2. 何时不要使用线程

  • 对于 Ruby 中的 CPU 密集型任务(GIL 限制了性能)。
  • 如果你需要真正的并行性。在这种情况下,可以考虑使用**进程**或外部库,如 `parallel` 或 `concurrent-ruby`。
线程的最佳实践。
  • 始终使用 `join` 等待线程完成。
  • 除非使用线程安全的`方法,否则避免共享变量。
  • 使用 `Mutex` 来安全地访问共享数据。
  • 处理线程中的异常。
  • 保持线程逻辑简单且专注。
线程和进程有什么区别?

**进程**有自己的内存,而**线程**在同一进程内共享内存。

虽然线程更轻量且创建速度更快,但它们不像进程那样提供**真正的并行性**。

在线程内声明的变量仅限于该线程。

虽然线程共享在主线程中声明的变量和资源,但它们不共享在线程内部声明的变量。

换句话说,在线程内部声明的变量是**仅限于该线程的**。例如:

thread1 = Thread.new do
  # Create a local variable 'num'
  num = 5
  puts num
end

# Invalid: Attempt to access 'num' in another thread
thread2 = Thread.new do
  puts num
end

thread1.join
thread2.join

# Invalid: Attempt to access 'num' in main thread
puts num

输出

5
#<Thread:0x00007ddbefa53978 /tmp/XjZ6AVlftj/main.rb:8 run> terminated with exception (report_on_exception is true):
/tmp/XjZ6AVlftj/main.rb:9:in 'block in <main>': undefined local variable or method 'num' for main (NameError)

在这里,`num` 是 `thread1` 作用域内的局部变量。因此,我们无法从该线程外部访问它。

在线程末尾使用 join 方法

你可以将 `join` 方法附加到线程的 `end` 语句。例如:

thread1 = Thread.new do
  puts "First thread"
end.join

thread2 = Thread.new do
  puts "Second thread"
end.join

puts "Main thread"

输出

First thread
Second thread
Main thread

如你所见,我们使用了 `end.join` 来让程序等待线程执行。这种语法在短的线程块中很常见,有助于减少混乱。

上述程序等同于

thread1 = Thread.new do
  puts "First thread"
end

thread2 = Thread.new do
  puts "Second thread"
end

thread1.join
thread2.join

puts "Main thread"
使用 alive? 方法检查线程状态

我们可以使用 `alive?` 方法检查线程的状态。例如:

thread = Thread.new do
  sleep(2)
end

puts "Thread alive? #{thread.alive?}"  # Output: true
sleep(3)
puts "Thread alive? #{thread.alive?}"  # Output: false

在这里,`thread` 在结束前会暂停两秒。因此,它将在两秒内“存活”(运行或睡眠)。

当子线程处于睡眠状态时,我们在主线程中检查它是否存活。检查方式如下:

1. 第一次检查

当我们第一次检查 `thread` 时,它仍在运行,因为两秒尚未过去。因此,`thread.alive?` 返回 `true`。

2. 第二次检查

第一次检查后,主线程睡眠三秒。在此期间,子线程已完成执行。

因此,第二次 `thread.alive?` 返回 `false`。

使用 value 方法获取线程的返回值

我们可以使用 `value` 方法来获取线程的最后求值表达式。例如:

thread = Thread.new do
  5 * 2
end

puts thread.value

# Output: 10

在这里,`thread.value` 给了我们线程操作 `5 * 2` 的结果,即 **10**。

**注意:** 使用 `thread.value` 时,线程会阻塞直到结果可用。这类似于调用 `join` 然后检索结果。

你觉得这篇文章有帮助吗?

我们的高级学习平台,凭借十多年的经验和数千条反馈创建。

以前所未有的方式学习和提高您的编程技能。

试用 Programiz PRO
  • 交互式课程
  • 证书
  • AI 帮助
  • 2000+ 挑战