マルチスレッドと排他制御

threaded-map という関数を用いて、高階なマルチスレッド・プログラミングをしています。

(threaded-map f l)

f は1引数関数で、リスト l の各要素に適用されるんですが、要素ごとに独立したスレッド下で処理される点が普通の map と異なります。

しかも、各スレッドの処理内容を一つのリストに集めて返してくれる優れものです。

そのことを実現するために、channel というオブジェクトが使われています。

MzScheme の channel オブジェクトは、基本的にはスレッド間のコミュニケーションのためのものだと思うんですが、ここでは スレッドと一対一対応で担当の channel を作り、f の結果を受け取る、という使い方がされています。

最終的に channel が得たデータを集める時にカレント・スレッドがブロックされるので、シーケンシャルな map と同様の結果が得られるわけです (順序も保たれます)。


さて、そんな threaded-map とは直接は関係無いんですが、マルチスレッド処理一般の問題として、共有リソースへのアクセスの問題というのがありますよね。

具体的には、データベースに関して、読み込みがまだ終わっていないのに、threaded-map で一斉にアクセスを掛けられてしまうという状況を経験しました。

その際、実際にはデータが有るにも関わらず無いと判断され、新規データを一斉に取得しに行かれてしまったんです。

そこで、読み込みが終わるまで各スレッドに待機してもらう方法が無いか考えてみた結果、次のようなごく単純な方法にたどり着きました:

;; Wrong
(define-values (load-db save-db init-db)
  (let ((db #f))
    (define (load-db)
      (cond ((not db)
             (set! db 'loading)
             (set! db (do-load-db))
             db)
            ((eq? db 'loading)
             (sleep 3)
             (load-db))
            (else db)))
    ...
    (values load-db save-db init-db)))

この例では利用者に完全な db オブジェクト (ハッシュ表) を提供することが目的なんですが、db オブジェクトをアトミックに変数に割り当てないと、途中の状態の db にアクセスされてしまうことになります。

set! 自体はアトミックであることが保証されているので、割り当てを2段階で行っています。最初に読み込み中のフラグをセットした後、別に括り出した関数で読み込み処理を行います。これにより、途中の状態は見えなくなるわけです。

利用例です:

(define (iter-db-if p? f)
  (hash-for-each (load-db)
                 (lambda (k v)
                   (if (p? v) (f k v)))))

(define (find-db p?)
  (let/ec return
    (iter-db-if p? return)
    (values #f #f)))

ご覧のように、間接的な利用者はもちろん、直接の利用者も「待ち」の状態を意識する必要が全くありません。


追記:

失礼しました。並列処理の根本的な問題を理解していなかったようです。

引用させていただきます:
shiro 『dbが#fの状態で、
スレッドAが(eq? db 'loading)をチェック→スレッドBが(eq? db 'loading)をチェック→スレッドAがelse節実行開始(ここでdbが'loadingになる)→スレッドBもelse節を開始
という競合状態が起きそうに見えますが、何か別の制約があるのでしょうか。
一般には、どこかで「ある状態の検査と変更」をアトミックに行わないとどうやっても競合は避けられないのでは。』


set! がアトミックであれば大丈夫、と安易に考えてしまったんですが、確かにおっしゃる通りだと思います。

個人的に殆ど未知な分野ですので、じゃぁどうするか、という事はこれからじっくり考えたいと思います。


追記2:

問題の解決ではないんですが、手元で上記コードが一見問題なく動作していた理由が分かりました。

threaded-map による同時アクセスに対処する、ということが今回の具体的な目的だったんですが、threaded-map ではスレッドが map 関数によってシーケンシャルに作成されるという事情により、たまたま同時アクセスの問題が生じなかった、ということじゃないかと思います。

それを、マルチスレッドの処理一般が解決できたかのように勘違いしたという、残念なミスでした。


追記3:

上の load-db の定義部分を以下のように変更しました。これで良くなったんじゃないでしょうか。

(define load-db
  (let ((sema (make-semaphore 1)))
    (lambda ()
      (call-with-semaphore
       sema
       (lambda ()
         (or db
             (begin
               (set! db (do-load-db))
               db)))))))

実は最初に semaphore について調べたんですが使い方が良く分からず、さらに、一旦ロードされたら後は自由にアクセスしてもらって構わないという特殊な事情から、当初のような形で妥協したという経緯があったのでした。

でもこれで一つ勉強できました。改めてコメント感謝いたします。