部分継続とストリームで作るジェネレータ関数

最近、とある目的で Jpeg 画像を読むプログラムを書きました。

scheme-jpeg

(はてなのロゴを Jpeg で保存し、青い部分をキャラクタで表示したところです)

難しい部分は Common Lisp のコードなどを参考にしたんですが、いくつか Scheme ならではと思えるテクニックの発見もありました。

今回は、ビットストリームやピクセル毎の色データを生成するのに利用したテクニックをご紹介します。


Jpeg のデコードで難しい工程の一つに、ハフマン符号の読み取り、というのがあります。

Jpeg ファイルの頭の方には、量子化テーブルとかハフマン・テーブルとか、色んなややこしい情報が詰まったヘッダーがあるんですが、それが終わると 0 と 1 の並び (ビットストリーム) が始まります。

実際のデータを、出現頻度の高いものは短く、めったに現れないものは長い符号に対応させる、という方式で圧縮したものです。

このビットの並びを、

(bit)

のように評価するだけで一つずつ得られるようにしたいと考えました。

一般に、IO ポートでのデータの読み書きはバイト単位で行われます:

(read-byte input-port) ; => 8-bit integer

これを、1 ビットずつに分割して出力するようにしたいわけです。直接ビット演算をしても良いんですが、SRFI 60 (Integers as Bits) というライブラリの integer->list 関数を使うのが便利そうです。

(define int 255)
(integer->list int)   ; => (#t #t #t #t #t #t #t #t)

(set! int 1)
(integer->list int)   ; => (#t)
(integer->list int 8) ; => (#f #f #f #f #f #f #f #t)

例のように、8 ビット未満の数値でも 8 つのビットを得るためには 第 2 引数に 8 を渡してやる必要があります。

さて、ここから 1 ビットずつ得たいわけですが、いきなりジェネレータを書く前に、とりあえずストリーム (遅延リスト) を作ってみました:

(define (port->bitstream in)
  (let/ec return
    (define int (read-byte in))
    (cond ((eof-object? int) (return stream-null))
          ...)
    (reset (for-each (lambda (bit)
                       (shift k
                              (stream-cons (if bit 1 0)
                                           (k #f))))
                     (integer->list int 8))
           (port->bitstream in))))

(途中の本質的でない部分は略しています)

比較のため、部分継続 (reset/shift) を使わないバージョンも示します:

(define (port->bitstream in)
  (let/ec return
    (define int (read-byte in))
    (cond ((eof-object? int) (return stream-null))
          ...)
    (let loop ((lst (integer->list int 8)))
      (if (null? lst)
          (port->bitstream in)
          (stream-cons (if (car lst) 1 0)
                       (loop (cdr lst)))))))

こちらはリスト構造を直接 car や cdr で触ったりしていますが、上の方では for-each という抽象化された構文がそのまま利用できるというメリットがあります。これは、for-each の部分を差し替えれば色んなデータ構造にそのまま対応できることを意味します。

reset と shift というのが部分継続を作るフォームです。PLT Scheme では "control.ss" というライブラリに含まれています。

reset で部分継続の開始位置を設定し、shift で reset との間の部分を切り取ります。shift の後の変数にはその切り取った部分が関数としてバインドされます。

プログラムの部分、という抽象的なはずのものを自由に切り取って実体化することができるわけです。

なお、let/ec は脱出のための継続 (escape continuation) を生成するフォームです。reset 以降に突入する前に脱出したい場合があるので使っています。


ここで、reset/shift の動作を普通のリストで確認してみましょう。

(reset
  (for-each (lambda (x)
              (shift k x))
            '(1 2 3)))
; => 1

k を使わずに捨てるとどうなるか、という例です。for-each そのものは副作用のための構文なので値を返さないんですが、1 が返ってますね。for-each を飛び越えて reset の位置に x が返されている、ということになります。

似た例ですが

(reset
  (for-each (lambda (x)
              (shift k x))
            '(1 2 3))
  (display "You won't see this."))
; => 1

メッセージは表示されません。for-each ループが終わらないうちに制御が reset の部分に移るため、以降の式は評価されないんです。

では k を使いましょう。

(reset
  (for-each (lambda (x)
              (shift k
                     (cons x (k #f))))
            '(1 2 3))
  '())
; => (1 2 3)

cons の第 2 引数が部分継続の呼び出しになっています。consing の流れを書き出してみると:

; within for-each
(cons 1 (k #f))
(cons 1 (cons 2 (k #f)))
(cons 1 (cons 2 (cons 3 (k #f))))
; for-each terminates
(cons 1 (cons 2 (cons 3 '())))

という感じです。#f が気になるかもしれませんが、PLT Scheme の部分継続は一引数関数なので、適当な引数を渡しているだけです。

reset/shift とは別に、prompt/control というペアもあります:

(prompt
  (for-each (lambda (x)
              (control k
                       (cons x (k #f))))
            '(1 2 3))
  '())
; => (3 2 1)

要素順が逆になっちゃいました。理由はまだ勉強中です。すいません。

さらに脱線しますが Python 等のジェネレータ風なこともできます:

(define (generate iterate collection)
  (shift yield
         (iterate yield collection)))

(reset
  (display (generate for-each '(1 2 3)))
  (newline))
; =>
1
2
3

shift の位置が for-each (変数 iterate) よりも外側にあることに注意してください。一度保存した部分継続 (yield) は何度でも呼べる、ということを利用した例です。


話をビットストリームに戻します。いちおう再掲:

(define (port->bitstream in)
  ...
    (reset (for-each (lambda (bit)
                       (shift k
                              (stream-cons (if bit 1 0)
                                           (k #f))))
                     (integer->list int 8))
           (port->bitstream in))))

これでビット (0 or 1) の遅延リストが作られるわけですが、そのままではちょっと使いにくいので、ジェネレータ関数によって遅延リストの操作を隠蔽することにします。

(define (stream->generator s)
  (lambda ()
    (if (stream-null? s)
        #f
        (begin0 (stream-car s)
          (set! s (stream-cdr s))))))

あるいは、ここでも部分継続を使って

(define (stream->generator s)
  (define g #f)
  (lambda ()
    (if g
        (g #f)
        (reset
          (stream-for-each (lambda (x)
                             (shift k
                                    (set! g k)
                                    x))
                           s)))))

のようにしても良いわけです。

以上の結果、

(define bit (stream->generator (port->bitstream in)))
(list (bit) (bit) (bit) (bit))
; => (1 0 0 1)

みたいな感じで、簡単な関数呼び出しでビットを得られるようになりました。


さて、ここまで考えてきたところで、何でわざわざ一度遅延リストにするんだ、という疑問も湧いてきますよね。ということで、直接ジェネレータを作るバージョンも考えてみました:

(define (port->bitgen in)
  (define g #f)
  (define (loop)
    (let/ec return
      (define int (read-byte in))
      (cond ((eof-object? int) (return #f))
            ...)
      (reset (for-each (lambda (bit)
                         (shift k
                                (set! g k)
                                (if bit 1 0)))
                       (integer->list int 8))
             (loop))))
  (lambda ()
    (if g (g #f) (loop))))

いかがでしょうか。割りとスマートに書けている感じもしますが、個人的には遅延リスト化するやり方の方が好みです。


参考文献:
composable-continuations-tutorial
SHIFT/RESET 'generator' example