2013年5月24日金曜日

ピッグデータ問題の解答 @sys1yagi編

このエントリーをブックマークに追加 このエントリーを含むはてなブックマーク
 先日CodeIQで結城 浩さんが出題していた《ピッグデータ》に負けないで! に挑戦したのでその解答をポストします。以下の内容はほぼ提出した解答そのままです。4/30の夜中に挑戦を開始して、翌日5/1の夕方辺りに解答を提出しました。
 
 結果は正解!「評価5 ベスト・ピッグデータ賞(結果が正しく技術メモも十分な解答)」だそうです。わーい。結城さんの解答も同じ考え方だったので二度わーい。

はじめに


 本書は、「ピッグデータ問題」のドキュメントである「problem.txt」、「pigdata.pdf」を読み、ピッグデータの仕様について理解している事を前提とした技術メモです。「ピッグデータ問題」の詳細については「《ピッグデータ》に負けないで! https://codeiq.jp/ace/yuki_hiroshi/q303」を参照して下さい。

利用言語、環境について


 言語はjavaを利用する事にしました。標準ライブラリにSHA-1によるハッシュ計算を行う事が出来るMessageDigestクラスが存在する点と、マルチスレッド処理が容易な点、実行速度がスクリプト言語より優位と考えられる点から選択しました。
 実行の環境は以下の通りです。

  • OS: OS X 10.8.3(Mountain Lion)
  • CPU: 1.8 GHz Intel Core i7 (4コア)
  • メモリ: 4GB
  • java: 1.8.0-ea (1.6系、1.7系でも問題なく動作は可能のはず。たまたま入っていたのでそのまま利用した)


  • getsign(107374182400, 16777216)の答え


    209679232

    処理概要


     getdataでは、SHA-1によるハッシュ計算のインプットとなるqがindexを10で除算した値である事から、qが変化するまでハッシュ値を保存する事によってハッシュ値計算の回数が減少する様にしています。getsignではExecutorServiceを利用しマルチスレッドによって同時に計算処理をする構造になっています。getsignで必要な指定count数のピッグデータの取得とソートについては、65536(16bit)個のlong配列を使って、getdataで得られた値の個数をカウントする方式としました。この処理方式に至った経緯の詳細については"処理詳細と工夫について"を参照して下さい。

    処理詳細と工夫について


    オンメモリの処理ではメモリが足りない問題


     当初getsignでは、与えられたcountの個数だけgetdataを実行し、結果をArrayListに格納していました。count個の値が出揃った後ArrayListをソートし、skipsに従ってシグネチャを計算しました。getsign(100, 10)の場合は問題無く動作しましたが、count個のデータをArrayListに格納するので、getsign(107374182400, 16777216)では当然メモリが足りなくなります。この方法ではgetsign(107374182400, 16777216)は計算出来ない事が分かりました。

    データを保存する場合2600GB必要になる


     オンメモリで大規模なgetdataの値を保持する事は困難である事から、外部記憶装 置にindexから得られるピッグデータを保存する事を考えました。後ほどソートが必要となる為、データは以下の様なリンクドリストの性質を持つ構造にしました。

    index(64bit), ピッグデータ(16bit), 前のindex(64bit), 次のindex(64bit)

     1データ辺り64+16+64+64=208bit=26byteとなり、これを107374182400個保存する場合、107374182400*24=2600GBの外部記憶装置が必要となります。2600GB=2.6TBは今ではそこまで大きなサイズとは言えませんが、2.6TBの読み書きとなると膨大な時間がかかる事が予想されます。その他、リンクドリストという構造上skipsとkに従ったシグネチャ計算を行う場合全データにシーケンシャルにアクセスする必要があったり、データサイズの大きさから実行可能な環境に大きな制約を与える事になります。この方式も見送る事にしました。

    ピッグデータが16bitである点に着目


     ピッグデータはpigdata.pdfの仕様から、符号なしの16bitの整数です。つまり0-65535の範囲の計65536種類という事になります。そこで、65536個の配列を作成し、getdataから返されたピッグデータを添字にして以下の様にgetdataから得られたピッグデータの取得回数を数え上げる事で省メモリ化出来ると考えました。

    int[] counter = new int[65536];
    int data = getdata(i);
    counter[data]++;
    

     また、この方式によりgetdataで得た一連の値のsortが不要となります。counterにアクセスする為の添字はgetdataで得られるピッグデータで、その添字でアクセスするcounterの値はgetdataで得られたピッグデータの回数を表します。添字を0から65535まで数え上げながら個数を足し合わせる事でソート済みのgetdataの配列と同等の情報が得られます。以下に例を示します。

    例:
    得られた値
      20536 6446 24555 213 9583 33333 24555
    

    counterの内部
     counter[213] = 1
     counter[6446] = 1
     counter[9583] = 1
     counter[20536] = 1
     counter[24555] = 2
     counter[33333] = 1
    

    ソート済み配列として扱う
      213 6466 9583 20536 24555 24555 33333
    

     この構造により、シグネチャを計算する為のskipsとkがアクセスする添字の位置のピッグデータを取得する事が出来ます。上記の例のデータを用いると、k=4の場合はcounter内の値を足し合わせて4を越えた時の値(>4)である24555が得られる値となります。このデータ構造の場合、各ピッグデータの元のindexデータは消失します。しかしシグネチャ計算においてはソート済みのピッグデータだけが必要となるので問題ないと判断しました。

    マルチスレッド化


     実行環境のPCはCPUのコアが4つあったので、getsignの計算をマルチスレッド化する事によって効率化を図りました。与えられた範囲のgetdataで得られた個数をカウントするCounterクラスを作成し、ExecutorServiceで任意の個数のスレッドを実行する様にしました。計算する値は1スレッド当たりcount/スレッド総数個とし、全てのスレッド計算が終わったあと、カウント用配列(65536個の配列)をマージする事でcount個のカウントデータが得られる様にしました。

    SHA-1がボトルネックとなる


     getdataのソート情報は省メモリで得られる様になりましたが、107374182400回SHA-1のハッシュ計算をするのは大きな負荷です。処理時間の殆どがSHA-1のハッシュ計算に使われてるので、この部分を最適化する事で高速化を図れると考えられます。処理概要にも記述した通り、getdataではSHA-1によるハッシュ計算のインプットとなるqがindexを10で除算した値である事から、index〜index+9の範囲でqは同じになります。そこでqが変化するまでハッシュ値を一時的に保存する事によってハッシュ値計算の回数を減らす様にしました。これによりSHA-1のハッシュ計算の回数が1/10となりました。前述した実行環境においては従来10時間の実行時間だったものが1時間まで短縮する事が出来ました。


    ソース


    package jp.dip.sys1.pigdata;
    
    import java.security.MessageDigest;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class Main {
        public final static int SIZE_OF_16 = 65536;
        public static int THREAD_COUNT = 1;
        static class Count {
            long[] mCount = new long[SIZE_OF_16];
    
            public long[] getCount() {
                return mCount;
            }
    
            public void merge(long[] src) {
                for (int i = 0; i < src.length; i++) {
                    mCount[i] += src[i];
                }
            }
        }
    
        static class Counter implements Callable<Count> {
            protected long mBegin;
            protected long mEnd;
            protected byte[] mDigest;
            protected long mPrevQ;
            protected MessageDigest mMessageDigest;
    
            public Counter(long begin, long end) {
                try {
                    mBegin = begin;
                    mEnd = end;
                    mPrevQ = -1;
                    mMessageDigest = MessageDigest.getInstance("SHA-1");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            private byte[] toSHA1(String source) {
                mMessageDigest.update(source.getBytes());
                return mMessageDigest.digest();
            }
    
            public int getdata(long index) {
                long q = index / 10;
                long r = index % 10;
                if (mPrevQ != q) {
                    mDigest = toSHA1(String.valueOf(q));
                }
                mPrevQ = q;
                int upper16 = (0xff & mDigest[(int) r * 2]) << 8;
                int lower16 = 0xff & mDigest[(int) r * 2 + 1];
                return upper16 + lower16;
            }
    
            @Override
            public Count call() throws Exception {
                Count count = new Count();
                long[] countData = count.getCount();
                for (long i = this.mBegin; i < this.mEnd; i++) {
                    int data = getdata(i);
                    countData[data]++;
                }
                return count;
            }
        }
    
        public static long getsign(final long count, final long skips) {
            if (count < 1 || skips < 1) {
                return -1;
            }
            int threadCount = THREAD_COUNT;
            ExecutorService executor = Executors.newFixedThreadPool(threadCount);
            List<Future<Count>> futures = new ArrayList<Future<Count>>();
            for (int i = 0; i < threadCount; i++) {
                long begin = (count / threadCount) * i;
                long end = (count / threadCount) * (i + 1);
                if (i + 1 >= threadCount) {
                    end += count % threadCount;
                }
                futures.add(executor.submit(new Counter(begin, end)));
            }
            Count resultCount = new Count();
            for (Future<Count> future : futures) {
                try {
                    Count c = future.get();
                    resultCount.merge(c.getCount());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            long[] countData = resultCount.getCount();
            long result = 0;
            for (long k = 0; k < count; k += skips) {
                long index = 0;
                for (int l = 0; l < SIZE_OF_16; l++) {
                    index += countData[l];
                    if (index > k) {
                        result += l;
                        break;
                    }
                }
            }
            executor.shutdownNow();
            return result;
        }
    
        public static void main(String[] args) {
            long start = System.currentTimeMillis();
            System.out.println(getsign(107374182400L, 16777216L));
            System.out.println(System.currentTimeMillis() - start + "ms");
        }
    }
    

    おわりに


     おもろかった。

    0 件のコメント:

    コメントを投稿