ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Programming Ruby (24) 파이버, 스레드, 프로세스 - 2
    Ruby 2016. 11. 8. 23:26
    [출처] Programming Ruby 
    (본 게시물은 저작권의 문제 발생시 출판사의 요청에 의해 삭제될 수 있습니다.)



    스레드 스케줄러 제어하기


    Thread 클래스는 스레드 스케줄러 제어 메서드를 제공한다. Thread.stop은 현재 스레드를 멈추고, Thread#run은 특정한 스레드가 실행되도록 한다. Thread.pass는 다른 스레드가 실행되도록 현재 스레드의 제어권을 넘겨주며, Thread#join과 Thread#value는 주어진 스레드가 끝날 때까지 호출한 스레드의 실행을 연기한다. join과 method가 일반적인 프로그램에서 사용하는 저수준의 스레드 메서드라고 할 수 있다. 나머지 수준의 스레드 제어 메서드들은 제대로 사용하기에 위험이 크다. 다행히도 루비에서는 고수준 스레드 동기 기능이 준비되어 있다.




    상호 배제


    경쟁 상태의 간단한 예를 보자 다음 예제는 다수의 스레드가 공유 변수를 갱신하려고 한다.


    sum = 0

    threads = 10.times.map do 

         Thread.new do

              100_000.times do

                   new_value = sum + 1

                   print "#{new_value} " if new_value % 250_000 == 0

                   sum = new_value

              end

         end

    end

    threads.each(&:join)

    puts "\nsum = #{sum} "


    실행결과

    250000 250000 250000 2500000 250000 250000 250000 250000

    sum = 300000


    열 개의 스레드를 생성하고 공유 변수인 sum의 값을 10만 번씩 증가시키고 있다. 그럼에도 모든 스레드가 종료된 시점에서 sum은 1,000,000 보다 작은 값을 나타낸다. 이를 통해 경쟁 상태가 발생했음을 알 수 있다. 그 이유는 새로운 값을 계산하는 코드와 이를 sum 변수에 대입하는 사이에 print가 호출되었기 때문이다.


    어떤 스레드에서 업데이트될 값을 가져온다고 가정하면, 이 값은 99,999이고 새로 계산될 값은 100,000 이다. 새로운 값을 sum에 저장하기 전에 print를 호출하고, 이는 예약되어 있는 다른 스레드를 실행한다. 따라서 두 번째 스레드에서는 99,999 값을 가져오고 이를 증가시킨다. 이는 10,000을 sum 에 저장하고 다시 반복으로 돌아가 100,001, 100,002로 계속 증가시켜 나간다. 결과적으로 자신의 메시지를 기록하는 것을 마쳤으므로 원래 스레드는 계속 실행된다. 이는 바로 100,000 이라는 값을 sum에 저장하게 되고 다른 스레드에 의해 저장된 값을 덮어쓴다. 따라서 데이터를 잃게 된다. 


    Mutex 내장 클래스를 사용해 동기화 영역을 만든다. 동기화 영역이란 어떤 시점에 하나의 스레드만이 사용할 수 있는 영역을 말한다. 아래의 예제는 뮤텍스를 사용해 동시에 두 개 이상의 스레드가 카운트를 업데이트 하지 못하도록 만든 예제이다.


    sum = 0

    mutex = Mutex.new

    threads = 10.times.map do 
         Thread.new do

              100_000.times do

                   mutex.lock                    #### 한번에 하나의 스레드만 접근 가능

                   new_value = sum + 1
                   print "#{new_value} " if new_value % 250_000 == 0

                   sum = new_value

                   mutex.unlock

              end
         end

    end

    threads.each(&:join)

    puts "\nsum = #{sum} "


    실행결과

    250000 500000 750000 100000

    sum = 100000



    위와 같은 패턴은 매우 자주 사용되므로 Mutex 클래스에는 Mutex#synchronize 메서드가 준비되어 있다. 이 메서드는 뮤텍스를 잠그고, 블록 내의 코드를 실행한 후 뮤텍스의 잠금을 해제한다. 이 메서드는 뮤텍스가 잠겨 있을 때 예외가 발생하면 뮤텍스의 잠금을 확실하게 해제해 준다.



    sum = 0
    mutex = Mutex.new
    threads = 10.times.map do 
         Thread.new do

              100_000.times do

                   mutex.synchronize do

                        new_value = sum + 1

                        print "#{new_value} " if new_value % 250_000 == 0

                        sum = new_value

                   end

              end
         end
    end
    threads.each(&:join)
    puts "\nsum = #{sum} "

    실행결과
    250000 500000 750000 100000

    sum = 100000


    뮤텍스가 잠겨 있지 않을 때는 뮤텍스의 잠금을 요청하겠지만, 뮤텍스가 잠겨있더라도 스레드 실행을 멈추고 싶지 않은 경우가 있다. Mutex#try_lock 메서드는 잠금을 할 수 있을 때는 잠금을 하고, 이미 잠겨 있을 때는 false를 반환한다. 



    아래의 코드는 가상 통화 변환 프로그램이다. 환율  객체를 보호하는 뮤텍스를 얻어올수 없다면 단순히 처리를 종료하지 않고 try_lock 메서드를 사용해서 환율이 갱신 중이라는 사실을 알리는 상태 메시지를 출력한다.


    rate_mutex = Mutex.new

    exchange_rates = ExchangeRates.new

    exchange_rates.update_from_online_feed


    Thread.new do

         loop do

              sleep 3600

              rate_mutex.synchronize do

                   exchange_rates.update_from_online_feed

              end

         end

    end


    loop do

         print "Enter currency code and amount: "

         line = gets

         if rate_mutex.try_lock

              puts(exchange_rates.convert(line)) ensure rate_mutex.unlock

         else

              puts "Sorry, rates being updated. Try agin in a minute"

         end

    end



    뮤텍스에 대해 취득한 잠금을 일시적으로 플고 다른 사람이 잠금을 가져갈 수 있게 하려면 Mutex#sleep을 사용한다. 


    rate_mutex = Mutex.new
    exchange_rates = ExchangeRates.new
    exchange_rates.update_from_online_feed

    Thread.new do

         loop do

              rate_mutex.sleep 3600

              exchange_rates.update_from_online_feed 

         end
    end

    loop do
         print "Enter currency code and amount: "
         line = gets
         if rate_mutex.try_lock
              puts(exchange_rates.convert(line)) ensure rate_mutex.unlock
         else
              puts "Sorry, rates being updated. Try agin in a minute"
         end

    end




    다중 프로세스 실행




    새로운 프로세스 생성

    독립적인 프로세스를 만드는 방법중 가장 쉬운 방법은 명령을 실행하고 끝나기를 기다리는 것이다. 루비의에서 system 메서드나 역따옴표(또는 백틱) 메서드로 몇몇 명령어를 실행하거나 호스트 시스템에서 데이터를 읽어오는 프로세스를 실행할 수 있다.


    system("tar xzf test.tgz")      # => true

    `date`               # => "Thu Nov 14 16:31~~blabla \n"


    Object#system 메서드는 주어진 명령을 하위 프로세스에서 실행한다. 명령어가 실제로 존재하고 적절하게 실행되었다면 true를 반환한다. 예외를 찾지 못한경우 예외를 발생시킨다. 명렁어가 에러를 발생시키며 종료되었다면 false를 반환한다. 


    프로세스가 실패했을 때 서브 프로세스의 종료 코드는 전역 변수 $?를 통해 확인할 수 있다. 



    하위 프로세스와 대화를 하고 데이터를 주고받는 경우 IO.popen 메서드를 사용할 수 있다. popen 메서드는 하위 프로세스로 명령을 실행하고, 하위 프로세스의 표준 입력과 출력을 루비의 IO 객체와 연결한다. IO객체에 쓰면 하위 프로세스에서는 표준 입력으로 그것을 읽을 수 있다. 


    pig는 표준 입력에서 단어를 읽어서 돼지어(igpay atnlay)로 출력한다. 


    pig = IO.popen("local/util/pig". "w+")

    pig.puts "ice cream after they go to bed"

    pig.close_write

    puts pig.gets


    실행결과

    iceway eamcray afterway eythay ogay otay edbay


    위 예제는 단순하면서 파이프를 통한 하위 프로세스 제어와 관련된 실제 세계의 복잡한 문제도 보여준다. 코드는 확실히 아주 간단한다. 파이프를 열고 문장을 쓰고 응답을 읽는다. 하지만 pig 프로그램에서는 자신이 출력한 내용을 플러시(flush)하지 않은 것으로 확인되었다. 위 예제에서 원래 의도는 pig.puts 다음 줄에 오는 pig.gets가 출력이 끝날 때까지 기다리는 것이다. pig 프로그램은 우리의 입력은 처리했지만 그 결과가 파이프에 쓰이지는 않았다. 따라서 pig.close_write를 추가해야 한다. 


    popen에 마이너스 기호(-)를 명령으로 전달하면 popen은 새로운 루비 인터프리터를 포크(fork)한다. popen에서 반환된 이후, 새로 포크된 인터프리터와 원래 인터프리터 둘 다 실행을 계속한다.


    원래 프로세스는 IO 객체를 돌려받을 것이고, 자식 프로세스는 nil을 받을 것이다. 이러한 동작은  fork(2)를 가진 운영 체제에서만 일어난다.(윈도는 해당되지 않음)


    pipe = IO.popen("-", "w+")

    if pipe

         pipe.puts "Get a job!"

         STDERR.puts "Child says '#{pipe.gets.chomp}'"

    else

         STDERR.puts "Day says '#{gets.chomp}'"

         puts "OK"

    end


    실행결과

    Dad says 'Get a job!'

    Child syas 'OK'




    독립적인 자손


    자식 프로세스에 일을 할당하고 메인 프로세스는 자신의 프로세스를 진행할 수 있다. 아래의 예제는 sort 명령을 외부에서 수행한다


    exec("sort testfile > output.txt") if fork.nil?

    # sort 는 자식 프로세스로 실행되고 있다.

    # 메인 프로그램은 수행이 계속도니다.


    # 그리고 sort 수행이 마치길 기다린다.

    Process.wait


    Object#form 은 부모에게는 프로세스 ID를 반환하고 자식에게는 nil을 반환한다. 그러므로 자식 프로세스는 Object#exec 호출을 수행하고 sort를 실행한다. 그리고 sort 명령이 끝나기를 기다리는 Process.wait 호출을 수행한다.


    자식 프로세스 종료를 통지받는 경우 Object#trap을 이용해 시그널 핸들러를 설정할 수 있다. '자식 프로세스 종료'시 나타나는 SIGCLD 에 대한 트랩을 설정한다.


    trap("CLD") do

         pid = Process.wait

         puts "Child pid #{pid}: terminated"

    end


    fork { exec("sort testfile > output.txt") }


    # 다른 작업을 수행한다.


    실행결과

    Child pid 51225: terminated




    블록과 하위 프로세스


    IO.popen은 File.open과 매우 비슷한 방식으로 블록을 활용한다. date 같은 명령과 함께 블록을 건네주면, 블록은 IO객체를 매개 변수로 해서 실행된다.


    IO.popen("date") { | f | puts "Date is #{f.gets}" }


    실행결과

    Date is Thu Nov 14 16:31:26 CST 2016


    IO 객체는 File.open 과 같이 코드 블럭이 종료될 때 자동으로 닫힌다. 

    fork와 블록을 결합하면, 블록 안의 코드는 루비 하위 프로세스에서 실행되고, 부모는 블록 이후에 수행될 것이다. 


    fork do

         puts "In child, pid = #$$"

         exit 99

    end

    pid = Process.wait

    puts "Child terminated, pid = #{pid}, status = #{$?.exitstatus}"


    실행결과

    In child, pid = 51232

    Child terminated, pid = 51232, status = 99


    $?는 하위 프로세스의 종료에 대한 정보를 가진 전역 변수다. 자세한 정보는 Process::Status 레퍼런스를 참조.


    The end.



Designed by Tistory.