(본 게시물은 저작권의 문제 발생시 출판사의 요청에 의해 삭제될 수 있습니다.)
루비는 프로그램의 다른 부분을 '동시에' 실행하는 기본적인 두 가지 방법을 제공한다. 파이버를 사용하면 프로그램의 일부분을 정지하고 다른 부분을 실행할 수 있다. 파이버보다 작업간의 결합도가 적은 방법으로는 다수의 스레드를 사용하는 방법과 다수의 프로세스를 분리하는 방법이다.
파이버
루비 1.9에서 파이버가 도입되었다. 파이버는 단순한 코루틴 메커니즘이다. 이를 사용하면 스레드 고유의 복잡한 처리 없이도 스레드 사용하는 것처럼 프로그램을 작성할 수 있다. 다음 예제는 택스트 파일을 분석해 출현횟수를 계산한다. 먼저 파이버를 사용하지 않고 반복문으로 구현한다.
counts = Hash.new(0)
File.foreach("testfile") do | line |
line.scan(/\w+/) do | word |
word = word.downcase
counts[word] += 1
end
end
counts.keys.sort.each { | k | print "#{k}:#{counts[k]} "}
실행결과
and:1 is:3 line:3 on:1 one:1 so:1 this:3 three:1 two:1
위 예제는 단어를 찾는 코드와 빈도를 계산하는 코드가 얽혀 있다. 이 코드를 개선한 간단한 해결책은 아래와 같다.
words = Fiber.new do
File.foreach("testfile") do | line |
line.scan(/\w+/) do | word |
Fiber.yield word.downcase
end
end
nil
end
counts = Hash.new(0)
while word = words.resume
counts[word] += 1
end
counts.keys.sort.each { | k | print " #{k}:#{counts[k]} "}
실행결과
and:1 is:3 line:3 on:1 one:1 so:1 this:2 ....
Fiber 클래스의 생성자는 블록이 주어지면 파이버 객체를 반환한다. 생성되는 시점에서 블록을 실행되지 않는다.
이후에 파이버 객체에 대해 resume 메서드를 호출할 수 있다. 이를 통해 블록을 실행할 수 있다. 파일을 열고 scan 메서드가 각 단어를 추출한다. 하지만 이 지점에서 Fiber.yield가 호출된다. 이는 블록의 실행을 중지시킨다. 그리고 앞서 블록을 실행하도록 해 준 resume 메서드에 Fiber.yield 의 값을 반환한다.
메인 프로그램은 반복문으로 파이버에서 반환된 첫 단어의 출현 빈도를 계산한다. 그리고 while 반복문으로 돌아가 조건식을 평가할 때 words.resume 메서드를 호출한다. resume 메서드를 호출하면 다시 블록을 실행하고 이전에 정지한 지점의 바로 다음 부분부터 (Fiber.yield다음 줄) 이어서 실행한다.
파이버 블록에서 파일을 모두 읽어 들이면, foreach 블록을 빠져나가고 파이버는 종료된다. 파이버의 반환값은 마지막으로 평가된 값이 된다. 이후에 resume이 호출되면 nil을 반환하고 이후 다시 resume 메서드를 호출하면 FiberError 예외를 발생시킨다.
파이버는 무한한 수열의 값을 생성하는데도 사용된다. 아래의 예제는 2로 나눌 수 있지만 3으로 나눌 수 없는 수열을 반복한다.
twos = Fiber.new do
num = 2
loop do
fiber.yield(num) unless num % 3 == 0
num += 2
end
end
10.times { print twos.resume, " " }
실행 결과
2 4 8 10 14 16 20 22 26 28
파이버는 그저 객체이므로 다른 곳으로 념겨지거나 변수에 저장되거나 할 수 있다. 파이버는 생성된 스레드 에서만 resume 가능하다. 루비 2.0에서는 게으른 열거자(lazy enumerator)를 사용해서 무한한 목록을 구현할 수도 있다.
파이버, 코루틴, 계속
루비의 파이버는 기본적으로 제한적이다. 파이버는 단지 yield를 통해 제어권이 넘겨진 부분에 대해 resume 메서드로 다시 제어권을 넘겨받을수 있을 뿐이다. 루비에는 이러한 파이버의 기능을 확장하는 두 개의 표준 라이브러리가 포함되어 있다.
fiber 라이브러리는 완전한 코루틴을 지원한다. 이 라이브러리를 로드하면 파이버에는 transfer 메서드가 추가되며 다른 임의의 파이버로 제어권을 넘길 수 있게 된다.
이와 연관된 좀 더 일반적인 매커니즘은 계속(continuation)이다. 계속은 실행 중인 프로그램의 상태(실행 중인 위치, 현재의 바인딩)를 기억해 놓고, 이후에 이 상태를 그대로 재개할 수 있도록 해준다. 계속을 사용해서 코루틴은 물론 다른 제어 구조를 구현할 수도 있다. 실행 중인 웹 어플리케이션의 상태를 특정 요청으로부터 다른 요청의 사이에 보존할 때도 계속을 사용할 수 있다. 즉 애플리케이션은 브라우저에 응답을 반환할 때 계속 객체를 생성하고 브라우저에서 다음 요청을 받으면 계속을 호출해 바로 이전ㅇ네 멈춰둔 시점부터 처리를 재개할 수 있다. 루비에서 이 기능을 사용하려면 continuation 라이브러리를 require해야 한다.
멀티스레딩
루비 1.9에서는 운영 체제를 통해 스레드의 제어권을 변경할 수 있게 되었다. 운영 체제 네이티브 스레드를 사용해서 멀티프로세서에 의해 높은 처리 능력을 활용할 수 있게 되었지만 대부분의 루비 확장 라이브러리는 오래된 스레드 모델에 맞춰 개발되었기 때문에 스레드 안전이라고 할 수 없다. 루비는 동시에 다수의 스레드를 실행하지 않도록 되어 있다. 즉 같은 애플리케이션에서는 결코 두 개의 스레드가 병렬로 실행되는 일은 없다.
루비 스레드 만들기
루비로 스레드 생성후 병렬로 웹 페이지를 다운로드 한다. 아래 코드는 다운로드 하고자 하는 URL 마다 새로운 스레드를 생성해서 HTTP 트랜잭션을 처리한다.
require 'net/http'
threads = pages.map do | page_to_fetch |
Thread.new(page_to_getch) do | url |
http = Net::HTTP.net(url, 80)
print "fetching: #{url}\n"
resp = http.get(' / ')
print "Got #{url}: #{resp.message}\n"
end
end
threads.each { | thr | thr.join }
실행결과
fetchinf: ww.rubycentral.org
fetching: slashdot.org
Got slashdot.org: OK
위의 코드에 대해 살펴보자.
1) Threads.new 를 호출해 새로운 스레드를 생성한다. 이 생성자에는 새로운 스레드에서 실행할 코드를 담은 블록을 넘겨준다.
2) 첫 번째 스레드가 시작되면 page_to_fetch는 www.rubycentral.org로 설정된다. 한편 반복문에서는 다른 스레드를 만드는 작업이 이루어지고 있다. 두번째 스래드에서는 page_to_fetch 변수에 slashdot.org가 설정된다. 스레드의 블록 내에서 작성된 지역변수를 사용하면 이 변수는 해당 스레드 안에서만 사용되기 때문에 첫 번째 스레드 처리중에 page_to_fetch 값이 두번째 스레드가 생성되었다고 해서 변경되지 않는다.
3) 반복문 안에서 각 스레드는 puts 가 아닌 print를 사용해서 출력을 수행한다. 그 이유에 대해선 puts의 동작 방식을 이해해야 한다. puts의 동작은 두 단계로 나뉜다. 먼저 넘겨받은 인자를 출력하고 줄바꿈을 출력한다. 이 두 단계의 처리 사이에서 스레드의 제어권이 다른 스레드로 넘어갈 수 있기 때문에 print 문을 통해 줄 바꿈을 포함하는 문자열을 출력하도록 한다.
마지막 줄에는 이해하기 어려운 부분이 생성된 스레드 각각에 대해 join을 호출하는 이유가 뭘까?
루비 프로그램이 종료되면 모든 스레드는 자신의 상태와 무관하게 강제로 종료되어 버린다. 스레드의 Thread#join을 호출하면 특정 스레드가 일을 끝마칠 때까지 멈춰(block) 있을 것이다. 따라서 요청을 처리하는 스레드에 join을 호출함으로써 메인 프로그램이 종료되기 전에 세 개의 요청이 모두 완료됨을 확신할 수 있다.
join이 종료되지 않는 상황이 우려된다면, join에 제한 시간을 매개 변수로 넘겨줄 수 있다. 스레드 종료 전에 제한 시간이 모두 지나면 join 호출은 nil을 반환한다. 그 외에는 Thread#value, 즉 스레드에서 마지막으로 호출한 문장의 값을 반환한다.
join이외에 스레드는 언제나 Thread.current로 접근할 수 있고 Thread.list 로 스레드 상태 목록을 가져올 수 있다. 특정 스레드의 상태를 확인하기 위해서는 Thread#status 와 Thread#alive? 메서드를 사용할 수 있다. Thread#priority= 를 이용하면 특정 스레드의 우선순위를 조정할 수도 있다. 우선순위가 높은 스레드는 우선순위가 낮은 스레드보다 우선적으로 실행된다.
스레드 변수
스레드는 생성되는 시점에 유효범위 안에 모든 변수에 정상적으로 접근할 수 있다. 스레드 코드를 담고 있는 블록 안의 지역 변수는 해당 스레드에만 지역적이고 스레드간에 서로 공유되지 않는다. 스레드마다 존재하는 변수를 메인 스레드를 포함해 다른 스레드에서도 접근하기 위해 Thread 클래스는 스레드 지역 변수를 만들고 이를 이름으로 접근하는 특별한 기능을 지원한다.
마치 스레드 객체가 일종의 해시인 것처럼 [ ] = 를 이용하여 요소를 쓰고, [ ] 를 이용하여 요소를 읽을 수 있다. 다음 예제는 각 스레드에 mycount라는 키를 가진 스레드 지역 변수에 count 변수의 현재 값을 기록한다. 이를 위해 코드에서 스래드 객체를 색인할 때 :mycount 라는 심벌을 사용한다.
count = 0
threads = 10.times.map do | i |
Thread.new do
sleep(rand(0.1))
Thread.current[:mycount] = count
count += 1
end
end
threads.each { | t | t.join; print t[:mycount], ", " }
puts "count = #{count}"
실행결과
6, 0, 2, 7, 5, 4, 1, 9, 3, 8, count = 10
메인 스레드는 하위 스레드들이 끝나기를 기다린다. 그리고 각 스레드가 가진 count 값을 출력한다. 각 스레드가 값을 기록하기 전의 임의의 시간을 기다리도록 했다.
스레드와 예외
스레드가 처리하지 않은 예외를 일으키는 경우 abort_on_exception 이 false이고, $DEBUG 플래그도 설정되어 있지 않으면 (기본 상태), 처리되지 않은 예외는 현재 스레드를 종료시킬 것이다. 하지만 나머지 스레드들은 계속 실행될 것이다.
사실 예외를 발생시킨 스레드에 join하기 전까지는 예외가 발생했다는 사실조차 알 수 없다. 다음 예제에는 1번 스레드는 아무 내용도 출력하지 못하고 날아가 버릴 것이다. 하지만 다른 스레드들의 추적 내용은 여전히 볼 수 있다.
threads = 4.times.map do | number |
Thread.new(number) do | i |
raise "Boom!" if i == 1
print "#{i}\n"
end
end
puts "Waiting"
sleep 0.1
puts "Done"
실행결과
0
Waiting
2
3
Done
스레드 종료를 기다리기 위해서는 일반적으로 sleep이 아니라 join 메서드를 사용한다. 어떤 스레드에 join을 했고 이 스레드에서 예외가 발생했다면, 이 예외는 join을 실행한 원래의 스레드에서도 발생할 것이다.
threads = 4.times.map do | number |
Thread.new(number) do | i |
raise "Boom!" if i == 1
print "#{i}\n"
end
end
puts "Waiting"
threads.each do | t |
begin
t.join
rescue RuntimeError => e
puts "Failed: #{e.message}"
end
end
puts "Done"
Waiting
Failed: Boom!
Done
aort_on_exception 을 true로 설정했거나 디버그 플래그를 활성화하기 위해 -d 옵션을 사용해 루비 인터프리터를 실행했을 때 처리하지 않은 예외가 발생한다면 메인 스레드를 종료한다. 따라서 어떠한 메시지도 볼 수 없다.
Thread.abort_on_exception = true
threads = 4.times.map do | number |
Thread.new(number) do | i |
raise "Boom!" if i == 1
print "#{i}\n"
end
end
puts "Waiting"
thread.each { | t | t.join }
puts "Done"
실행결과
0
prog.rb:4 in 'block (2 levels) in <main>': Boom! (RuntimeError)
The End.