忍者ブログ
[1] [2] [3] [4] [5] [6] [7] [8]

DATE : 2017/06/27 (Tue)
×

[PR]上記の広告は3ヶ月以上新規記事投稿のないブログに表示されています。新しい記事を書く事で広告が消えます。


DATE : 2011/05/06 (Fri)

アプリケーションを開発していると、メモリの消費量が心配になることがあります。消費量が少ないことに超したことはありませんが、計測もせずに最適化を行うのもよくありません。計測の際に、そもそもDalvikVMのアプリケーションあたりの最大ヒープサイズがどれだけなのか気になったので、それを取得する方法をメモしておきます。

DalvikVMのアプリケーションごとの最大ヒープサイズはデバイスによって異なり、システムプロパティdalvik.vm.heapsizeに定義されています。

次を実行することで取得できます。

adb shell getprop dalvik.vm.heapsize

アプリケーションからは、android.app.ActivityManager#getMemoryClass() でも同様の内容を取得できます。アプリケーションの求める動作条件をデバイスが満たしているか否か判断する際に使えます。

PR

DATE : 2011/04/17 (Sun)

メインアカウント以外のアカウントでSkypeを立ち上げるためのAppleScriptを書いたので、以下にその方法を示します。なお、以下の方法では、サブアカウント用のSkypeを立ち上げる際に、管理者権限が必要です。動作確認は、Mac OS X 10.6.7で行いました。

  1. サブアカウント用のSkypeを立ち上げるための、Mac OS X上のアカウントを作成する。
  2. Skypeを別のアカウントで立ち上げるAppleScriptを、アプリケーションとして保存する。
  3. サブアカウント用のSkypeを起動する。

サブアカウント用のSkypeを立ち上げるための、Mac OS X上のアカウントを作成する

「アップルメニュー → システム環境設定 → アカウント」から、サブアカウントで立ち上げるSkype用のアカウントを作成します。作成するアカウントに管理者権限は必要ありません。ここで、作成の際のアカウント名を覚えておいてください。

Skypeを別のアカウントで立ち上げるAppleScriptを、アプリケーションとして保存する

以下のAppleScriptを、「アプリケーション/ユーティリティ/Apple Scriptエディタ.app」でフォーマットをアプリケーションとして保存します。ただし、「<アカウント名>」の部分は、先ほどのアカウント名に置き換えてください。

-- Please set your account that launches another Skype.
set account to "<アカウント名>"


-- Path to Skype.
set skype to POSIX path of (path to application "Skype") & "Contents/MacOS/Skype"

do shell script "sudo -u " & quoted form of account & " " & quoted form of skype & " > /dev/null 2>&1 &" with administrator privileges

サブアカウント用のSkypeを起動する

先ほど保存したアプリケーションをダブルクリックし、管理者のパスワードを入力するとサブアカウント用のSkypeが立ち上がります。


DATE : 2010/11/24 (Wed)

2010年10月23日に、六本木ヒルズにあるGoogle日本オフィスで開催された「Google日本語入力Tech Talk 2010」に行ってきました。Google Developer Day 2010の基調講演で本Tech Talkの存在を知り、興味がわいたことと、ちょうど東京出張の間に開催されることもあって応募、参加しました。以下に、その際に自分がとったメモや、本Tech Talkに参加された方のレポート、その他参考文献から文章にまとめました。

文章をまとめる際に私がとった方針は次の通りです。まず、私自身はMozcの開発に興味があるというよりも、Mozcの仕組みや開発手法に興味がありました。そのため、前述の部分を中心にまとめ、Mozcを改良する部分についての記述は薄くなっています。またMozcの仕組みをまとめるに当たって、講演では触れられていなかった部分では参考文献から推測を行いました。具体的には、サンドボックスや仮名漢字変換の際に用いられるコストの計算には、参考文献からの私の推測が含まれています。次に、できるだけ内容をまとめたかったため、質疑応答は講演と区別せずに文章に組み込みました。表記については、「Mozc」と明言されなかった、もしくは明言されたか否か記憶があやふやな部分に関しては「Google日本語入力」としています。数式については、上付き文字や下付き文字についてTeXの表記法を採用しています。

Google日本語入力ができるまで(小松弘幸氏)

Google日本語入力は、予測入力IMEであるPRIMEを開発した小松弘幸氏と形態素解析エンジンMeCabの工藤拓氏との20%プロジェクトからスタートした。Anthy, sskimeなどのIME開発経験者が社内にいたため、まずは彼らと半年ほど、週に数回集まって議論を行った。特に、開発したIMEで成功した部分、失敗した部分を出し合い、これから開発するIMEのコンセプトを固める足場とした。Google日本語入力のマルチプラットフォームを意識した設計は、この時の議論によるものである。しかし議論を重ね続けたものの、動作するコードが書かれる気配はなかった。そこで実装する時間を週に一度設けて、libanthy互換のライブラリとして試作を行った。社内での試用の結果、20%ルールのプロジェクトから本プロジェクトへと昇格した。

社内での試用では、既存のIMEとの振る舞いの違いについて特に意見が出た。既存のIMEにある機能がないことや、細かい振る舞いが既存のIMEと異なるといった意見が多かった。

そこから、既存のIMEから移行する労力を最小にすることが重要だとわかった。IMEは特に手に馴染んだツールである。ほんのわずかな振る舞いの違い、機能の足りなさがユーザの不満を募らせるのだ。

リリース直前には、ユニットテスト、ストレステストを含めとにかくテストを繰り返した。テストコードがメインのコードの2倍、3倍となるモジュールも珍しくなかった。UI部分は、できるだけテストコードに落とし込めるように設計を行った。それでもできない部分は手作業でテストした。

Google日本語入力は、次のようなプロセスを経てリリースしている。まず、毎日更新される成果物を社内で試用する。継続的インテグレーションとして自動的にユニットテストやストレステストを走らせる。もしビルドや自動テストに失敗すると、最後にコミットした人とそのコミットをレビューした人とにメールが飛ぶ。そして常に成果物ができるように素早く修正する。Web上に公開されている「開発版」は、数週間おきに更新している。アップデートテストなど最小限のテストを行い、クラッシュレポートや、入力した文字数などの利用統計情報を利用する。入力した文字列は全く送信していない。これらの結果を反映して「ベータ版」を数ヶ月おきに更新している。リリースから外される機能もある。クラッシュレポートが多い上にIssue listからなかなか消えることがないモジュールや、修正が困難であると見なされたモジュールなどである。

Mozcは、Google日本語入力をオープンソース化したものである。社内の開発者も、Google日本語入力のことをMozcと呼ぶことがある。辞書など、Google日本語入力と違いのある部分もあるが、MozcはIMEの基本機能を網羅している。研究などにもぜひ活用して欲しい。

Google日本語入力の設計概要(工藤拓氏)

Google日本語入力は、使う人にIMEの存在を意識させず作業に集中しやすい「空気のようなIME」として以下の面を重視して設計している。

  • マルチプラットフォーム
  • スピード
  • セキュリティ
  • 安定性

まず初めに、日本のIMEはとても複雑であることを理解する必要がある。20年以上の歴史があり、その間さまざまな機能が積み重ねられてきた。おそらく、すべての機能を使いこなしている人はいないだろう。

そのように複雑なIMEであるが、IMEはDLLとして実装を行うため、そのDLLはすべてのアプリケーションのプロセスに読み込まれる。従来のIMEでは、そのDLLにIMEの機能すべてを実装している。つまり、すべてのアプリケーションに読み込まれたIMEのおのおののインスタンスが、システム辞書ひとつを共有する構造となる。その結果、以下の弱点が生まれる。

クラッシュに弱い
アプリケーションがクラッシュすると、IMEも巻き込まれてクラッシュする。逆にIMEがクラッシュすると、アプリケーションも巻き込んでクラッシュする。アプリケーション上で保存していないユーザの情報が失われる可能性や、IMEのシステム辞書が壊れる可能性がある。つまりIMEは決してクラッシュしてはならない。しかしクラッシュしないプログラムを書くのは不可能である。
セキュリティが弱い
すべてのアプリケーションのプロセスに読み込まれるということは、特権レベルで動作する、例えばログイン画面などのアプリケーションにもIMEが読み込まれることを指す。すると、IMEにセキュリティホールがあると、悪意のあるユーザが特権レベルでなんでも行えてしまう。つまりIMEにはセキュリティホールがあってはならない。セキュリティホールが全くないプログラムも、書くことは困難である。仮にリリース時にはなかったとしても、それは見つからなかっただけで、後々見つかる可能性もある。

これらの弱点を克服するため、Google日本語入力では役割に応じて処理を複数のプロセスに分割した。

まずアプリケーションのプロセスに読み込まれるDLLに実装する機能は小さくし、クラッシュしにくくした。Google日本語入力では、アプリケーションのプロセスに読み込まれるDLL(Client)、変換を行うプロセス(Converter)、変換結果や変換候補などを描画するプロセス(Renderer)にプロセスを分離している。プロセスを分離することによって、ConverterやRendererがクラッシュしても、アプリケーションを巻き込んでしまうことはなくなった。つまり、クラッシュしない構造にしたと言うよりも、クラッシュしてもアプリケーションを巻き込んでしまうことのない構造としたのである。Clientは、Converterに発生したキーイベントを渡し、Converterから返ってくる、表示に必要な情報を受け取りRendererに渡すだけである。ローマ字から仮名への変換さえもConverterの役割である。Client自身は内部に状態を持たない。Statelessとすることで、クラッシュする可能性をできる限り減らした。

分離されたプロセス同士は、プロセス間通信(IPC)を用いて処理を行う。IPCとして、Windowsではnamed pipeを、MacOSではMach IPCを、LinuxではUnix Domain Socketを用いた。理由は、セキュリティのためである。例えばUnix Domain Socketを用いると、送信元のプロセスIDやユーザIDが分かり、送信元を確かめることができる。TCPソケットを用いればプラットフォームごとにわざわざIPC部分を実装し直す手間は省けるが、送信元が偽装されるセキュリティ上のリスクがある。

また分離したプロセスにサンドボックスを用いることで、仮にセキュリティホールがあったとしても、OSや他のアプリケーションに影響を与えないようにしている。サンドボックスについては講演中には触れられなかったが、Google Chromeと同じ仕組みであると仮定すると、分離したプロセスのおのおのに対してOS特有の機能を用いて最低限のアクセス権限を与えることでサンドボックスを実現しているのだろう。

CannaやWnnといった従来のIMEでも、クライアントとサーバとでプロセスが分離されているものがある。これらのIMEでは、ローマ字から仮名への変換はクライアントが担い、仮名漢字変換をサーバが担当する。そして変換結果をクライアントが描画する。しかし、クラッシュしやすいのはクライアント部分である。クライアント、サーバ方式では、これまでに述べたようなクラッシュのリスクは減らない。ちなみにGoogle日本語入力にも関わっている、Anthyを開発した田畑悠介氏は、講演後のLightning TalkでAnthyとGoogle日本語入力との設計を対比し、AnthyではできるだけOSへ依存しないように作られていることを示した。そのため、Anthyは数多くのプラットフォームへ移植できた実績がある。しかし、セキュリティ面ではGoogle日本語入力にはかなわないと語っていた。OSに依存しないように設計されているため、OSが持つセキュリティ機能を十分に利用できないためなのだろう。

Google日本語入力ではさらに、安定性とスピードを確保するため、システム辞書・言語モデル・設定ファイルはすべてバイナリ化して実行バイナリに組み込んでいる。テキストとして用意したそれらのファイルをコードジェネレータでビルド時にC++コードに変換する。こうすることで、システム辞書ファイルを外部化していた場合に発生しうる実行時のエラーを、ビルド時に発見することができる。システム辞書も壊れない。システム辞書が壊れていた場合はIMEの実行バイナリ自体が壊れるためだ。システム辞書についてのファイルI/Oもない。I/Oの部分はプラットフォームによってかなり差があり、実行バイナリに組み込むことで、プラットフォームによるI/O部分の違いを考慮しなくとも済む。同時に、I/Oがないため動作も高速化する。システム辞書のフォーマットの互換性を気にする必要もない。IMEを更新するとシステム辞書も入れ替わるためである。

Google日本語入力は、OSの再起動不要でアップデートが可能である。アプリケーションのプロセスにIMEのすべてが読み込まれる従来のIMEでアップデートを行った場合、アップデート完了後に起動したアプリケーションにのみアップデート後のIMEが読み込まれる。その時点ですでに起動済みのアプリケーションは、アップデート前のIMEを使い続ける。そして、古いIMEと新しいIMEとでシステム辞書などのリソースを共有することになる。もしも、アップデートの前後でシステム辞書などにフォーマットの違いが生じた場合は、複雑な処理が必要となる。しかし、Google日本語入力ではプロセスが分離されているため、ConverterやRendererのみのアップデートが可能である。アプリケーションに読み込まれたClientは、アップデート前に接続していたプロセスをkillしてアップデート後のプロセスに接続し直せばよい。システム辞書もバイナリに組み込まれているため、システム辞書の互換性の問題も発生しない。ユーザがキーの入力の途中でConverterがアップデートした場合でも問題ない。Clientは、セッションプレイバックという機能を備えているためである。これは、ユーザが入力したキーを蓄えておく仕組みで、Converterがクラッシュしたりアップデートによって新しいConverterに接続し直した場合に動作する。Clientと新しいConverterとがユーザの入力中に接続を確立すると、これまでにユーザの入力したキーをClientは新しいConverterに送り直す。

Google日本語入力の設計は以上である。しかし、IMEである以上、いかに設計を考慮していても仮名漢字変換がうまくできなければ意味がない。Google日本語入力は、どのように仮名漢字変換を行っているのだろうか。

Google日本語入力では、文節の区切りや単語の変換に最小コスト法を用いている。これは、文を文節で区切ったときに、単語間の連接コスト、つまり単語の繋がりの不自然さと単語ごとの単語生起コスト、つまりその単語の出現しにくくさとが全体で最小になるものを選ぶというものである。文全体でコストが最小になる文節の区切りは、ビダビアルゴリズムで探索を行って決める。

コストは、次のようにして求めている。

まず、xはユーザがキーボードから入力した読み、yはユーザが想定している仮名漢字交じりの日本語とする。すると、変換結果の日本語文y'は、「y' = argmax_y p(y|x)」と表せる。p(y|x)は、ユーザがキーボードから入力した読みがxであったときに、ユーザの想定する仮名漢字交じりの日本語がyである確率である。yについて、p(y|x)の一番高いものが、変換結果y'となる。「y' = argmax_y p(y|x)」をベイズの定理を用いて変形し、yについて最大化するという観点で順序関係が重要であるという点から分母を除くと、「y' = argmax_y p(y)p(x|y)」と書き直せる。p(y)は、仮名漢字交じりの日本語yが日本語らしい確率であり、p(x|y)は、仮名漢字交じりの日本語yをxという読みで読む確率である。

しかし、p(y)p(x|y)は文全体についての確率であり、文節を区切ったあとのコストを見る最小コスト法へ適用するには都合が悪い。そこで、p(y)p(x|y)を形態素単位で表現する。文献「確率的モデルによる仮名漢字変換」を参考に、mを形態素、hを文中の形態素の数とし、Google日本語入力で用いられている言語モデルが2-gramと仮定して「y' = argmax_y p(y)p(x|y)」を展開すると、「y' = argmax_m Π_{i=1}^{h+1} (i-1番目の品詞c_{i-1}に対して、i番目の品詞c_iが出現する確率) * (品詞c_iにおいて、形態素m_iが読みx_iで出現する確率)」となる。

ここで、前者の「(i-1番目の品詞c_{i-1}に対して、i番目の品詞c_iが出現する確率)」が最終的に連接コストとなり、後者の「(品詞c_iにおいて、形態素m_iが読みx_iで出現する確率)」が最終的に単語生起コストとなる。しかし、最小コスト法ではコストの小さい方を採用するため、確率が高いほどコストも高くなってしまうのは良くない。そこで、上記の確率の積に対して負の数を乗じておく。Google日本語入力では、この負の数は-500としている。また最小コスト法では、連接コストと単語生起コストは積ではなく和で計算する。そこで、上記の確率の積に対して対数をとる。対数をとるのは、対数内の積は対数同士の和で表現できるためである。このような処置を行うことで最小コスト法のモデルと「y' = argmax_y p(y|x)」とが一致することになる。結果、コストcは次の式で求まる。「c = Π_{i=1}^{h+1} -500 * (log(i-1番目の品詞c_{i-1}に対して、i番目の品詞c_iが出現する確率) + log(品詞c_iにおいて、形態素m_iが読みx_iで出現する確率))」。

Googleでは、コストを求めるために使用する「(i-1番目の品詞c_{i-1}に対して、i番目の品詞c_iが出現する確率)」や「(品詞c_iにおいて、形態素m_iが読みx_iで出現する確率)」をWeb上の文書から求めている。具体的には、Web上の文書からMeCabで形態素解析を行ったコーパスを作成し、上記の確率を求めている。しかし、「(品詞c_iにおいて、形態素m_iが読みx_iで出現する確率)」を求めるのは容易ではなかった。それは、熟語の読みを解析する一般的な方法がないためである。読みと文字とが1対1に対応している熟語(例えば、「熟語」は「じゅく」と「ご」と、読みと文字とが1対1に対応している)は、単漢字辞書があればそれなりに求めることができる。実際に、ipadicからEMアルゴリズムを用いて単漢字辞書を作成し、読みを推定した。しかし、読みと文字とが熟語では1対1に対応しない熟語(例えば「大人」は、どこの文字が「と」を含むのか分からない)や当て字(例えば「強敵」と書いて「とも」と読む)もある。そこで、単漢字辞書の他に、Google検索の「もしかして機能」などを総動員して読みを推定した。

このように収集した連接コストと単語生起コストから作成したシステム辞書は、Trie木で格納し、LOUDSアルゴリズムで圧縮している。Key-Value構造は使用していない。なぜなら、Key-Value構造では完全一致でしか検索を行えないためである。また次のような日本語変換特有の検索が必要となるためでもある。

  • Common Prefix Search(入力が長い、通常の変換に用いられる)
  • Predictive Search(入力が短い、予測変換に用いられる)

システム辞書は、語彙が多ければ多いほど良いと考えがちだ。しかし実際には、辞書の副作用として、語彙を多くすると一般的な変換結果よりも特殊な変換結果が優先されてしまい、一般的な文が変換できないということもある。例えば、「アイマス」と「会います」という2単語が辞書に含まれている場合、「明日彼と会います」が「明日彼とアイマス」に変換されてしまうことがある。つまり、語彙数が多ければよいと言うものではない。ユーザにとっては、辞書の語彙数よりも、一般的な文が変換が当たり前に変換できるかどうかが重要なのである。そのため一般的な文の変換が失敗すると、このIMEは使い物にならないとユーザに見なされてしまう。そこで、対立候補も考慮して単語のランク付けを行うようにしている。また、必ず変換できなければならない一般的な文の変換を自動回帰テストで行うようにしており、そのテストと機械的な評価とを用いることで変換精度を評価するようにしている。システム辞書のipadicレベルの単語に対しては、人手でチューニングすることもある。しかし単語のランキングについては人手では調整せず、すべて自動的に求めている。

なお、ユーザ辞書はヒープ上に置いている。ユーザがユーザ辞書に書き込むと、Protocol Buffersを使ってテキストファイルに書き出し、その後Converterがそのファイルを読み込むようになっている。

Mozcソースコードレビュー

Mozcのソースコードは、「mozc - Project Hosting on Google Code」で公開している。C++コンパイラがありSTLが利用可能であること、かつPOSIXをサポートする環境であれば動作するはずである。しかしビッグエンディアンは現在のところサポート外である。

ソースコードの文字エンコーディングにはUTF-8を用いている。コンパイラがUTF-8に対応していない場合もあるため、UTF-8のコードを16進エスケープシーケンスにして文字列リテラルを表現している。例えば「か」はUTF-8でE3 81 8Bなので、「\xE3\x81\x8B」と文字列リテラルに記述する。その文字列リテラルの上部に、コメントとして元の文字列を記入している。

Mozcのソースコードには、メインコードの他にテストコードも含まれている。具体的には、ユニットテストとストレステストである。例えばConverterのストレステストでは、起動したプロセスに対してキーイベントを絶え間なく送りつけるような処理を行いストレステストを行う。

ビルドシステム(向井淳氏)

ビルドツールには、GYPを使用している。GYPは、元々Google Chrome用に作られたビルドツールである。

以前はSConsを使用していた。しかし以下の問題があった。

  • SConsはPythonでビルドスクリプトを書くため、なんでもできる。しかしそれ故にビルドスクリプトを保守に手間がかかる場合がある。
  • マルチプラットフォームのサポートが弱い。アプリケーションやIMEのビルドには、そのOSやIDE付属のコマンドが必要なことがある。SConsでもできなくはないが、ビルドスクリプトが複雑になり保守が面倒になる。

GYPでは、ビルドツールでありながらビルド自体は行わない。その代わり、ビルドファイルを生成する。例えば、Windowsではvcbuildを、Linuxではmakeを、Mac OSではXcode用のビルドファイルを生成する。Xcodeではdistccを使った分散ビルドができるため、GYPは分散ビルドを行うビルドファイルを生成する。結果、GYPを使うと手でビルドファイルを作成するよりもビルドが高速化することもある。

GYPには、それ自身を起動するgypコマンドが存在する。しかし、コマンドがインストールされている場所がプラットフォームによって異なったりするため、Mozcではラッパースクリプトであるbuild_mozc.pyを用意している。

システム辞書はMozcのバイナリの中に統合されている。しかし、ソースツリー内にはテキストファイルとして存在しており、ビルド時にPythonスクリプトを使用してC++コードに変換している。

Mozcでは以下のようにビルドを2段階に分けて行う。

  1. システム辞書を生成するツールなどのMozcをビルドするのに必要なツール類のビルド。
  2. Mozc自身のビルド

すべてを一気にビルドすることもでき、2番目のみをビルドすることもできる。ビルドを2段階に分けることで、最終的な成果物に関係のない変更をシステム辞書を生成するツールなどに行った場合でも、Mozc自身のビルドのみを行うことができビルド時間を短縮できる。

Client(小松弘幸氏)

Clientは、キーイベントの受け取りや変換結果の表示を行う。つまり、OSに依存したInput Method Frameworkとやりとりする。ローマ字から仮名への変換を含めて、変換に関わる処理はすべてConverterに任せる。

Protocol Buffersを用いてシリアライズしたデータをConverterとやりとりする。Converterがアップデートすると、Clientの想定よりも新しいConverterとデータのやりとりをおこなうことになる。しかし、後方互換性はProtocol Buffersが保ってくれる。

Clientは、セッション単位でConverterやRendererとやりとりする。セッションは、入力ボックス1つにつき1つ作られる。セッションの作成方法は次の通りである。まず、ClientがCREATE_SESSIONメッセージをConverterに送る。するとConverterはSession IDを生成して返す。以降の処理では、セッションの識別に生成されたSession IDを用いる。Session IDはConverterが生成するためセッションの同一性はConverterが保証する。

Converterからの応答には、表示に必要な情報をすべて含んでいる。これは、入力中であることを示す、画面上では下線として現れる部分や、ある部分が変換済みか否かを表す情報をも含む。Clientはその情報をRendererに渡し、Rendererがそれに従って画面に表示する。

Converter(工藤拓氏)

変換結果は、key-value形式で表現している。keyが読みで、valueが表記を表す。

Converter内では、以下のデータ構造で文を表現する。

  • Segmentsオブジェクト : 文節の集合
  • Segmentオブジェクト : 文節ひとつ
  • Segment.Candiateオブジェクト : 変換候補
  • metadata : メタデータ

SegmentsインスタンスひとつはSegmentインスタンスを複数含み、SegmentインスタンスひとつはSegment.Candiateインスタンス複数とmetadataを含む。

Converterは、ConverterInterfaceを実装している各Converterオブジェクトの結果から変換結果を作る。ConverterInterfaceは、Segmentsインスタンスに対する操作を行う。ConverterInterfaceを実装しているクラスは以下のとおりである。

  • ImmutableConverter : 仮名漢字変換を行う。常に同じ変換結果を返すConverter。
  • Rewriter : 後処理を行うConverterの総称。ヒューリスティックに変換候補の書き換えや追加を行うConverter。
  • Predictor : 予測変換を行うConverter。

ImmutableConverterの結果は、品詞IDとコストとですべてが決まる。品詞は、ipadicに定義されているものを拡張して用いている。品詞から単語が生成される確率(生起確率)とひとつ前の単語の品詞と単語の所属する品詞同士が繋がる確率(連接確率)を各単語ごとに掛け合わせて、その単語の出現確率とする。コストは、-500 * log(出現確率)である。単語の生起確率は品詞ごとに求まるため、品詞間でコストを比較することはできない。つまり、名詞のある単語コスト500と助詞のある単語コスト500は同じ生起確率を表さない。また、各単語は、左品詞IDと右品詞IDを持つ。複合語の場合、左にある単語から見る場合と右にある単語から見る場合とで品詞が変わる場合があるためである。例えば、「山田太郎」という複合語は、その複合語の左にある単語から見れば「山田」の名字が見えるが、右にある単語から見れば「太郎」の名前が見える。

システム辞書への単語の追加は推奨できない。それは、コストは自動生成しており、品詞IDとコストで結果が決まるためである。どうしても追加したい場合は、その単語が含まれている十分な量の文書を集め、システム辞書内にある単語の出現頻度と追加したい単語の出現頻度との比をとって、それを基に追加したい単語のコストを求める方法を推奨する。

またシステム辞書へサードパーティ製の辞書を追加する場合は、辞書のライセンスに注意する必要がある。システム辞書はバイナリ化されて実行バイナリに組み込まれるため、例えばGPLの辞書がシステム辞書に組み込まれたMozcはGPLとして配布しなければならない。

文節区切りルールはsegmenter.defに定義してある。品詞テーブルであるid.defと合わせて、ビルド時にif-thenルールを生成している。初めはC++コードに変換していたが、CPU使用率が高い部分であったため、すべて展開してビット配列にまとめて圧縮した。結果、ルックアップだけで済むようになり、高速化を実現できた。

Rewriter(向井淳氏)

Rewriterは、ImmutableConverterの後処理として特殊な変換や履歴の学習などを行う。例えば、電卓機能やおみくじ機能はRewriterで実装されている。

顔文字変換もRewriterで実装している。この顔文字辞書は人手で作成している。工藤氏によると、経験上、1000件までは人手でも辞書を保守できるそうである。

RewriterInterfaceを実装して、rewriter.ccやビルドスクリプトにその実装を行ったクラスを追加するとRewriterを追加できる。

Rewriterは変換キーが押されるたびに呼び出されるため、処理を行うべきかどうかを素早くチェックしなければならない。

Storage(花岡俊行氏)

前述の通り、システム辞書はビルド時にバイナリの中に統合している。それに対して、ユーザ辞書はProtocol Buffersでシリアライズしたファイルである。

システム辞書に含まれている単語の中には、予測変換には出したくない候補がある。そこで、その単語をビルド前にファイルへ列挙しておき、そこに含まれているか否かを予測変換の実行時に調べている。この処理はBloom filterで実装している。Bloom filterには、本当は含まれていないにもかかわらず含まれていると判断してしまう擬陽性があるが、予測変換なので問題ないとして採用している。

LRU Storageは、変換候補のユーザの選択履歴を保持する。

参考文献

本Tech Talkに参加された方のレポート

仮名漢字変換アルゴリズム

サンドボックス


DATE : 2010/08/30 (Mon)

Google Developer Day 2010のDevQuizで出題された「PAC-MAN」の入力2までを解いた際に使用したソースコードです。言語にはPythonを使用しました。標準出力からマップデータを入れると、標準出力に経路を出力するスクリプトです。

本スクリプトでは、A*アルゴリズムを使ってパックマンの動きを探索しました。しかしただ単に探索したのでは探索範囲が爆発的に膨れあがってしまうため、敵と衝突するような動きは避けるようにしました。また、次の交差点まで移動するような経路も隣接する経路にくわえ、次の交差点まで移動する経路はスコアが半分になるようにしました。本スクリプトでは、スコアが少ないほど良いスコアであるため、次の交差点まで一気に移動する経路が優先的に選ばれます。こうすることで、探索の高速化を図りました。

スコアは、「(残りのドット数)×1000 + (消費した時間)」とし、挟み撃ちに合うなどして敵と衝突すると、「(残りのドット数)×1000」の部分を残り時間と乗算しました。

本スクリプトに至るまでには、紆余曲折がかなりありました。まずはじめは、反復深化深さ優先探索を実装してみたのですが、答えがなかなか返ってこなかったため断念しました。次にA*アルゴリズムを実装したのですが、スコアにさまざまなヒューリスティクスを加えてみたものの、入力1のみしか解けず入力2は答えが返ってこない状況となりました。加えるヒューリスティクスのネタも尽きたところで、一発奮起して遺伝的アルゴリズムを実装してみました。ところが、あともう少しでドットを食べきれるところで探索が終了してしまいました。

しかし、A*アルゴリズム版から遺伝的アルゴリズム版へ至る間に、探索の効率化やスコア計算の簡略化を行っていました。遺伝的アルゴリズムを実装する前までは敵と衝突するような経路も探索候補に入れていました。ところが遺伝的アルゴリズムでは効率的な遺伝子の設計が求められます。そこで、敵と衝突するような経路は探索範囲から外すようにしました。スコア計算についても、処理に時間のかかるヒューリスティクスはやめて、単純な計算で済むような方法としました。

このような改良を、なかばやけくそでA*アルゴリズム版にも施してみました。すると、入力2も解けてしまったのです。同じようにして入力3も解けないものかとスクリプトを実行してみたのですが、残念ながらDevQuizの締め切りまでに回答は出ませんでした。

探索に関して新しいアイディアを出すには、別の方法で実装してみるというのも十分に有効だと感じました。

#!/usr/bin/env python
# coding: UTF-8

import bisect
import math
import sys

# 左上隅の座標
START_X = 1
START_Y = 1

MAX_SCORE = sys.maxint
MIN_SCORE = -sys.maxint - 1

class Wall:
    u"""
    壁を表す。
    """
    def __str__(self):
        return u'#'

class Dot:
    u"""
    ドットを表す。
    """
    def __str__(self):
        return u'.'

# 壁のインスタンス。マップ内で同一のインスタンスを使い回す。
WALL = Wall()
# ドットのインスタンス。マップ内で同一のインスタンスを使い回す。
DOT = Dot()

# マップの評価結果
# 何も起こらなかった。
NOTHING = 0
# 敵と衝突した。
CRASH = 1
# ドットをとった。
ATE_DOT = 2
# 時間内にすべてのドットをとった。
CLEAR = 3
# 時間切れになった。
TIME_OVER = 4

class Character:
    u"""
    敵やパックマンを表すベースクラス。
    """
    def __init__(self, x, y):
        self.position = (x, y)
        self.history = []

    def set_position(self, x, y):
        u"""
        現在の位置を設定する。
        """
        self.history.append(self.position)
        self.position = (x, y)

    def undo(self):
        u"""
        1つ前の状態に戻す。
        """
        self.position = self.history.pop()

class Packman(Character):
    u"""
    パックマンを表す。
    """
    def __init__(self, x, y):
        Character.__init__(self, x, y)

    def __str__(self):
        return u'@'

class Enemy(Character):
    u"""
    敵を表すベースクラス。
    """
    def __init__(self, x, y):
        Character.__init__(self, x, y)
        self.id = u'%s,%s' % (x, y)

    def get_first_next(self, neighborhoods):
        u"""
        時刻t = 0での移動先を返す。
        """
        x, y = self.position
        if (x, y + 1) in neighborhoods:
            return (x, y + 1)
        elif (x - 1, y) in neighborhoods:
            return (x - 1, y)
        elif (x, y - 1) in neighborhoods:
            return (x, y - 1)
        elif (x + 1, y) in neighborhoods:
            return (x + 1, y)

    def get_next(self, neighborhoods):
        u"""
        次の移動先を返す。
        """
        if len(self.history) == 0:
            return self.get_first_next(neighborhoods)
        else:
            if len(neighborhoods) == 1:
                return neighborhoods[0]
            elif len(neighborhoods) == 2:
                for a_neighborhood in neighborhoods:
                    if self.history[-1] != a_neighborhood:
                        return a_neighborhood
            else:
                return self.get_next_on_crossing(neighborhoods)

    def get_differ_to_packman(self):
        u"""
        敵から見たパックマンとの距離を返す。
        """
        return (self.packman.position[0] - self.position[0],
                self.packman.position[1] - self.position[1])

    def get_state(self):
        u"""
        現在の状態を文字列として表現する。
        """
        state = u''
        if 0 < len(self.history):
            state += u'(%s, %s)->' % (self.history[-1][0], self.history[-1][0])
        state +=  u'(%s, %s) ' % (self.position[0], self.position[1])
        return state

class EnemyV(Enemy):
    u"""
    敵Vを表す。
    """
    def __init__(self, x, y):
        Enemy.__init__(self, x, y)
        self.packman = None

    def get_next_on_crossing(self, neighborhoods):
        u"""
        交差点での次の移動先を返す。
        """
        dx, dy = self.get_differ_to_packman()
        if dy != 0:
            candidate = (self.position[0], self.position[1] + (dy / abs(dy)))
            if candidate in neighborhoods:
                return candidate
        if dx != 0:
            candidate = (self.position[0] + (dx / abs(dx)), self.position[1])
            if candidate in neighborhoods:
                return candidate
        return self.get_first_next(neighborhoods)

    def __str__(self):
        return u'V'

class EnemyH(Enemy):
    u"""
    敵Hを表す。
    """
    def __init__(self, x, y):
        Enemy.__init__(self, x, y)
        self.packman = None

    def get_next_on_crossing(self, neighborhoods):
        u"""
        交差点での次の移動先を返す。
        """
        dx, dy = self.get_differ_to_packman()
        if dx != 0:
            candidate = (self.position[0] + (dx / abs(dx)), self.position[1])
            if candidate in neighborhoods:
                return candidate
        if dy != 0:
            candidate = (self.position[0], self.position[1] + (dy / abs(dy)))
            if candidate in neighborhoods:
                return candidate
        return self.get_first_next(neighborhoods)

    def __str__(self):
        return u'H'

class EnemyLRActionBase:
    u"""
    敵L,Rの動作を共通化したベースクラス。
    """
    def __init__(self, history, search):
        self.history = history
        self.search = search

    def get_next_on_crossing(self, position, neighborhoods):
        u"""
        交差点での次の移動先を返す。
        """
        previous_position = self.history[-1]
        previous_diff = (previous_position[0] - position[0],
                previous_position[1] - position[1])
        start = self.search.index(previous_diff) + 1
        for index in range(start, start + len(self.search)):
            next_diff = self.search[index % len(self.search)]
            candidate = (position[0] + next_diff[0], position[1] + next_diff[1])
            if candidate in neighborhoods:
                return candidate

class EnemyLAction(EnemyLRActionBase):
    u"""
    敵Lの動作を表す。
    """
    SEARCH = [(0, -1), (1, 0), (0, 1), (-1, 0)]

    def __init__(self, history):
        EnemyLRActionBase.__init__(self, history, EnemyLAction.SEARCH)

class EnemyL(Enemy):
    u"""
    敵Lを表す。
    """
    def __init__(self, x, y):
        Enemy.__init__(self, x, y)
        self.action = EnemyLAction(self.history)

    def get_next_on_crossing(self, neighborhoods):
        u"""
        交差点での次の移動先を返す。
        """
        return self.action.get_next_on_crossing(self.position, neighborhoods)

    def __str__(self):
        return u'L'

class EnemyRAction(EnemyLRActionBase):
    u"""
    敵Rの動作を表す。
    """
    SEARCH = [(0, 1), (1, 0), (0, -1), (-1, 0)]

    def __init__(self, history):
        EnemyLRActionBase.__init__(self, history, EnemyRAction.SEARCH)

class EnemyR(Enemy):
    u"""
    敵Rを表す。
    """
    def __init__(self, x, y):
        Enemy.__init__(self, x, y)
        self.action = EnemyRAction(self.history)

    def get_next_on_crossing(self, neighborhoods):
        u"""
        交差点での次の移動先を返す。
        """
        return self.action.get_next_on_crossing(self.position, neighborhoods)

    def __str__(self):
        return u'R'

class EnemyJ(Enemy):
    u"""
    敵Jを表す。
    """
    def __init__(self, x, y, field):
        Enemy.__init__(self, x, y)
        self.action_r = EnemyRAction(self.history)
        self.action_l = EnemyLAction(self.history)
        self.current_action = self.action_l
        self.action_history = []
        self.field = field

    def set_position(self, x, y):
        Enemy.set_position(self, x, y)
        self.action_history.append(self.current_action)
        previous_position = self.history[-1]
        if 3 <= self.field.get_neighborhoods_count(
                previous_position[0], previous_position[1]):
            self.change_action()

    def change_action(self):
        if self.current_action == self.action_r:
            self.current_action = self.action_l
        else:
            self.current_action = self.action_r

    def get_next(self, neighborhoods):
        self.on_crossing = False
        return Enemy.get_next(self, neighborhoods)

    def get_next_on_crossing(self, neighborhoods):
        u"""
        交差点での次の移動先を返す。
        """
        return self.current_action.get_next_on_crossing(self.position, neighborhoods)

    def undo(self):
        Enemy.undo(self)
        self.current_action = self.action_history.pop()

    def get_state(self):
        return Enemy.get_state(self) + str(self.current_action)

    def __str__(self):
        return u'J'

class Field:
    u"""
    現在のマップの状態を表す。
    """
    def __init__(self, field_file = None):
        if field_file:
            self.make_field(field_file)
        else:
            pass

    def get_neighborhoods_count(self, x, y):
        u"""
        進入可能な通路の数を返す。
        """
        neighborhood_count = 0
        if WALL not in self.field[y - 1][x]:
            neighborhood_count += 1
        if WALL not in self.field[y + 1][x]:
            neighborhood_count += 1
        if WALL not in self.field[y][x + 1]:
            neighborhood_count += 1
        if WALL not in self.field[y][x - 1]:
            neighborhood_count += 1
        return neighborhood_count

    def get_fast_way_one(self, candidate, neighborhood):
        u"""
        隣接する交差点までの1経路を返す。
        """
        forward_x = neighborhood[0] - candidate[0]
        forward_y = neighborhood[1] - candidate[1]
        previous_x = candidate[0]
        previous_y = candidate[1]
        current_x = candidate[0] + forward_x
        current_y = candidate[1] + forward_y
        fast_way = [(previous_x, previous_y), (current_x, current_y)]
        while True:
            neighborhoods = self.get_neighborhoods_on_position(current_x, current_y)
            if 3 <= len(neighborhoods):
                return fast_way
            elif (current_x + forward_x, current_y + forward_y) not in neighborhoods:
                list = [a_neighborhood for a_neighborhood in neighborhoods
                        if a_neighborhood != (previous_x, previous_y)]
                if len(list) == 0:
                    return None
                next = list[0]
                if len(next) == 0:
                    return fast_way
                forward_x = next[0] - current_x
                forward_y = next[1] - current_y
            previous_x = current_x
            previous_y = current_y
            current_x = current_x + forward_x
            current_y = current_y + forward_y
            fast_way.append((current_x, current_y))

    def get_fast_ways(self, candidate):
        u"""
        隣接する交差点までの全経路を返す。
        """
        neighborhoods = [a_neighborhood for a_neighborhood
                in self.get_neighborhoods_on_position(candidate[0], candidate[1])
                if a_neighborhood != self.packman.position]
        fast_ways = []
        for a_neighborhood in neighborhoods:
            fast_way = (self.get_fast_way_one(candidate, a_neighborhood))
            if fast_way:
                fast_ways.append(fast_way)
        return fast_ways

    def get_neighborhoods_on_position(self, x, y):
        u"""
        隣接する通路の座標を返す。
        """
        neighborhoods = []
        if WALL not in self.field[y - 1][x]:
            neighborhoods.append((x, y - 1))
        if WALL not in self.field[y + 1][x]:
            neighborhoods.append((x, y + 1))
        if WALL not in self.field[y][x + 1]:
            neighborhoods.append((x + 1, y))
        if WALL not in self.field[y][x - 1]:
            neighborhoods.append((x - 1, y))
        return neighborhoods

    def get_neighborhoods(self, character):
        u"""
        指定のキャラクターから見た、隣接する通路の座標を返す。
        """
        return self.get_neighborhoods_on_position(
            character.position[0], character.position[1])

    def get_next_candidates(self):
        u"""
        次に移動可能な座標を返す。
        """
        candidates = self.get_neighborhoods(self.packman)
        candidates.append(self.packman.position)
        for an_enemy in self.enemies:
            enemy_next = an_enemy.get_next(self.get_neighborhoods(an_enemy))
            for a_candidate in candidates[:]:
                if an_enemy.position == a_candidate or enemy_next == a_candidate:
                    candidates.remove(a_candidate)
        return candidates

    def move_character(self, character, x, y):
        u"""
        キャラクターを指定の座標に動かす。
        """
        self.field[character.position[1]][character.position[0]].remove(character)
        self.field[y][x].add(character)
        character.set_position(x, y)

    def undo_character(self, character):
        u"""
        指定のキャラクターを1つ前の時刻の状態に戻す。
        """
        self.field[character.position[1]][character.position[0]].remove(character)
        character.undo()
        self.field[character.position[1]][character.position[0]].add(character)

    def is_crashed(self):
        u"""
        パックマンが敵と衝突したか否かを返す。
        """
        if 0 < len(self.field[self.packman.position[1]][self.packman.position[0]] &
                self.enemies_on_field):
            return True
        if 2 <= len(self.packman.history):
            packman_previous_position = self.packman.history[-1]
            candidate_enemies = \
                    self.field[packman_previous_position[1]][packman_previous_position[0]] & \
                        self.enemies_on_field
            for a_candidate_enemy in candidate_enemies:
                if a_candidate_enemy.history[-1] == self.packman.position:
                    return True
        return False

    def evalute(self):
        u"""
        現在のフィールドの状態について評価を行う。
        """
        if self.time < 0:
            self.dot_ate_history.append(None)
            return TIME_OVER
        if self.is_crashed():
            self.dot_ate_history.append(None)
            return CRASH
        if DOT in self.field[self.packman.position[1]][self.packman.position[0]]:
            self.field[self.packman.position[1]][self.packman.position[0]].remove(DOT)
            self.dot_count -= 1
            self.dot_ate_history.append(self.packman.position)
            if self.dot_count == 0:
                return CLEAR
            else:
                return ATE_DOT
        else:
            self.dot_ate_history.append(None)
            return NOTHING

    def next(self, x, y):
        u"""
        指定の座標にパックマンを移動させる。
        """
        for an_enemy in self.enemies:
            next_x, next_y = an_enemy.get_next(self.get_neighborhoods(an_enemy))
            self.move_character(an_enemy, next_x, next_y)
        self.move_character(self.packman, x, y)
        self.time -= 1
        return self.evalute()

    def undo(self):
        u"""
        1つ前の時刻の状態にフィールドを戻す。
        """
        for an_enemy in self.enemies:
            self.undo_character(an_enemy)
        self.undo_character(self.packman)
        previous_dot = self.dot_ate_history.pop()
        if previous_dot:
            self.field[previous_dot[1]][previous_dot[0]].add(DOT)
            self.dot_count += 1
        self.time += 1

    def undo_all(self):
        u"""
        初期状態に戻す。
        """
        for i in range(0, len(self.packman.history)):
            self.undo()

    def set_packman_to_enemies(self):
        u"""
        各EnemyオブジェクトにPackmanオブジェクトを設定する。
        """
        for an_enemy in self.enemies:
            an_enemy.packman = self.packman

    def make_field(self, file):
        u"""
        フィールドを作成する。
        """
        lines = file.readlines()
        # 1行目: 時間
        self.time = int(lines[0])
        # 2行目: <幅> <高さ>
        self.width, self.height = [int(a_number) + 1 for a_number in lines[1].split()]
        # マスク処理での境界値処理を省略するために、一回り大きなフィールドを作成する。
        self.field = []
        for row_index in range(0, self.height + 1):
            current_row = []
            self.field.append(current_row)
            for column_index in range(0, self.width + 1):
                current_row.append(set())
        # フィールドの作成
        self.dot_count = 0
        self.enemies = []
        field_source = lines[2:]
        y = START_Y
        for a_row in field_source:
            x = START_X
            for a_column in a_row:
                self.field[y][x].clear()
                element = self.get_element(a_column, x, y)
                if element:
                    self.field[y][x].add(element)
                    if isinstance(element, Dot):
                        self.dot_count += 1
                    elif isinstance(element, Packman):
                        self.packman = element
                    elif isinstance(element, Enemy):
                        self.enemies.append(element)
                x += 1
            y += 1
        self.set_packman_to_enemies()
        self.packman_on_field = frozenset([self.packman])
        self.enemies_on_field = frozenset(self.enemies)
        self.dot_on_field = frozenset([DOT])
        self.dot_ate_history = []
        self.initial_time = self.time
        self.initial_dot_count = self.dot_count

    def get_element(self, element_character, x, y):
        u"""
        テキスト表現のマップの指定の要素からオブジェクトを生成する。
        """
        if element_character == u'#':
            return WALL
        elif element_character == u' ':
            return None
        elif element_character == u'.':
            return DOT
        elif element_character == u'@':
            return Packman(x, y)
        elif element_character == u'V':
            return EnemyV(x, y)
        elif element_character == u'H':
            return EnemyH(x, y)
        elif element_character == u'L':
            return EnemyL(x, y)
        elif element_character == u'R':
            return EnemyR(x, y)
        elif element_character == u'J':
            return EnemyJ(x, y, self)

    def __str__(self):
        map = u''
        for y in range(START_Y, self.height):
            for x in range(START_X, self.width):
                elements = self.field[y][x]
                if WALL in elements:
                    map += str(WALL)
                elif self.packman in elements:
                    if 0 < len(elements - self.packman_on_field):
                        map += u'x'
                    else:
                        map += str(self.packman)
                elif 0 < len(elements & self.enemies_on_field):
                    enemies = list(elements & self.enemies_on_field)
                    map += str(enemies[0])
                elif DOT in elements:
                    map += str(DOT)
                else:
                    map += u' '
            map += u'\n'
        return map

class Node:
    u"""
    フィールド上のある場面を表す。
    """
    def __init__(self, current_field, result):
        if 2 <= len(current_field.packman.history):
            self.route = current_field.packman.history[1:]
            self.route.append(current_field.packman.position)
        elif 1 == len(current_field.packman.history):
            self.route = [current_field.packman.position]
        else:
            self.route = []
        self.is_fast = False
        self.position = current_field.packman.position
        self.score = self.calculate_score(current_field, result)
        self.original_score = self.score
        self.enemy_state = u''
        for an_enemy in current_field.enemies:
            self.enemy_state += an_enemy.get_state()
        self.string = u'%s,%s,%s' % (
                str(len(self.route)), str(self.position), self.enemy_state)

    def __str__(self):
        return self.string

    def calculate_score(self, current_field, result):
        u"""
        スコアを計算する。
        """
        if result == TIME_OVER:
            return MAX_SCORE
        if current_field.time <= current_field.dot_count:
            return MAX_SCORE
        dot_score = (current_field.dot_count * 1000)
        if result == CRASH:
            dot_score *= current_field.time
        score = len(self.route) + dot_score
        return score 

    def set_fast(self):
        u"""
        高速移動(次の交差点まで移動)するノードか否かを設定する。
        """
        self.is_fast = True
        self.score /= 2

    def setup_field(self, field):
        u"""
        Fieldオブジェクトを、本ノードが表す状態にする。
        """
        if self.route == field.packman.history[1:]:
            return
        field.undo_all()
        for a_next in self.route:
            field.next(a_next[0], a_next[1])

    def __cmp__(x, y):
        return x.score - y.score

class ClosedNode():
    u"""
    調査が終わったノードを表す。
    bisectモジュールは、__cmp__関数を用いるため、
    クローズリストでbisectモジュールを使用するには本クラスが必要。
    """
    def __init__(self, node):
        self.node = node
        self.score = node.score
        self.original_score = node.original_score

    def __cmp__(x, y):
        return cmp(str(x.node), str(y.node))

def find_node_index(node, list):
    u"""
    指定のリストからノードを探し、そのインデックスを返す。
    """
    i = bisect.bisect_left(list, node)
    if i < len(list) and list[i] == node:
        return i
    else:
        return -1

def insert_node(node, list):
    u"""
    指定のリストにノードを挿入する。
    """
    bisect.insort(list, node)

def pop_node(node, list):
    u"""
    指定のリストから指定のノードを取り出す。
    """
    index = find_node_index(node, list)
    if 0 <= index:
        list.pop(index)
    else:
        return None

def find_index_from_close_list(node, close_list):
    u"""
    クローズリストからノードを探し、そのインデックスを返す。
    """
    return find_node_index(ClosedNode(node), close_list)

def find_index_from_open_list(node, open_list):
    u"""
    オープンリストからノードを探し、そのインデックスを返す。
    """
    for index in range(0,len(open_list)):
        if str(node) == str(open_list[index]):
            return index
    else:
        return -1

def decode_start_to_next(start, next):
    u"""
    1回分の移動を表現する文字を返す。
    """
    diff_x = next[0] - start[0]
    diff_y = next[1] - start[1]
    if diff_x == -1:
        return u'h'
    elif diff_y == 1:
        return u'j'
    elif diff_y == -1:
        return u'k'
    elif diff_x == 1:
        return u'l'
    elif diff_x == 0 and diff_y == 0:
        return u'.'
    else:
        return u'?'

def decode_route(start, route):
    u"""
    指定の経路を表現する文字列を返す。
    """
    result = u''
    for next in route:
        result += decode_start_to_next(start, next)
        start = next
    return result

def a_search(field):
    u"""
    経路を探索する。
    """
    first_node = Node(field, NOTHING)
    start_position = field.packman.position
    open_list = [first_node]
    open_list_strs = [str(first_node)]
    close_list = []
    min_dot_count = field.initial_dot_count
    while 0 < len(open_list):
        current_node = open_list.pop(0)
        insert_node(ClosedNode(current_node), close_list)
        current_node.setup_field(field)
        if field.dot_count < min_dot_count:
            min_dot_count = field.dot_count
            print field
            print u'%s/%s' % ((field.initial_dot_count - field.dot_count),
                    field.initial_dot_count)
            print decode_route(
                    start_position, field.packman.history[1:] + [field.packman.position])
        if field.dot_count == 0:
            return current_node.route
        candidates_1x = field.get_next_candidates()
        candidates_nx = []
        for a_candidate_1x in candidates_1x:
            candidates_nx.extend(field.get_fast_ways(a_candidate_1x))
        for a_candidate_nx in candidates_nx:
            next_count = 0
            result = NOTHING
            created_nodes = []
            for a_next in a_candidate_nx:
                result = field.next(a_next[0], a_next[1])
                next_count += 1
                if result == CRASH or result == TIME_OVER:
                    break
                else:
                    node = Node(field, result)
                    if node.score != MAX_SCORE:
                        created_nodes.append(node)
                    if result == CLEAR:
                        break
            if 0 < len(created_nodes):
                created_nodes[-1].set_fast()
                for a_node in created_nodes:
                    if pop_node(str(a_node), open_list_strs):
                        open_list_index = find_index_from_open_list(a_node, open_list)
                        if 0 <= open_list_index:
                            if a_node.original_score < \
                                    open_list[open_list_index].original_score:
                                insert_node(ClosedNode(open_list[open_list_index]),
                                        close_list)
                                del open_list[open_list_index]
                                insert_node(a_node, open_list)
                                insert_node(str(a_node), open_list_strs)
                            continue
                    close_list_index = find_index_from_close_list(a_node, close_list)
                    if 0 <= close_list_index:
                        if a_node.original_score < \
                                close_list[close_list_index].original_score:
                            del close_list[close_list_index]
                            insert_node(a_node, open_list)
                            insert_node(str(a_node), open_list_strs)
                        continue
                    insert_node(a_node, open_list)
                    insert_node(str(a_node), open_list_strs)
            for i in range(0, next_count):
                field.undo()
    else:
        print u'失敗'

def print_result(route, field):
    u"""
    結果を標準出力に出力する。
    """
    decoded_route = decode_route(field.packman.position, route)
    print decoded_route
    print u'-- initial --'
    print str(field)
    index = 0
    for a_next in route:
        field.next(a_next[0], a_next[1])
        print u'-- %s --' % (decoded_route[index])
        print str(field)
        index += 1
    print u'%s steps.' % (len(decoded_route))
    print decoded_route

def main():
    initial_field = Field(sys.stdin)
    route = a_search(initial_field)
    initial_field.undo_all()
    print_result(route, initial_field)

if __name__ == '__main__':
    main()

DATE : 2010/08/28 (Sat)

Google Developer Day 2010のDevQuizで出題された「Shiritori」レベル3を解いた際に使用したソースコードです。言語にはPythonを使用しました。サーバの回答を入力すると、自分が答えるべき単語を出力するというスクリプトです。出力までには1~3分ほど時間がかかります。辞書は、スクリプトと同じディレクトリに以下の形式でdata.txtを置くことで設定します。

    * (単語1)
    * (単語2)
    * (単語3)
    * (以下同様)

本ソースコードでは、アルファベータ法を実装してみました。しかし、初めに作成したコードでは、レベル1やレベル2は突破できたものの、レベル3は単語の候補が多くなりすぎて、プログラムから全く回答が返ってこないという状態になってしまいました。

そこで先読みの深さを制限するほかに、枝刈りを行いました。例えば、axxb, ayyb, bxxa, byya

という辞書があったとし、サーバがaxxbを回答したとします。プレイヤーの候補はbxxaもしくはbyyaとなります。ところがそのどちらを選んでも、辞書の中にayybという、プレイヤーに再びbのつく単語を選ばせることのできる単語が存在するため、サーバはその単語を回答してくると予測できます。すると、プレイヤーの候補としてbxxa, byyaのそれぞれを候補としてその先の手を読むのではなく、どちらか一方、例えばbxxaだけの先の手を読むだけで十分です。このように、プレイヤーが前回と同じ文字を先頭に持つ単語を回答しなければならない状況になりうる単語が複数ある場合は、そのうちの1つのみを先読みの候補とし、それ以外は無視することで枝刈りを行いました。特にレベル3はこのような状況になる単語が大量にあるため、この枝刈りを行うことで、答えが全く返ってこなくなる状況から、1~3分待てば回答が返ってくるほどまでに進歩しました。

#!/usr/bin/env python
# coding: UTF-8

import re
import sys
import bisect

# 辞書のパス
DICTIONARY_FILE = u'data.txt'

# 辞書から単語を取り出すための正規表現。
WORD_PATTERN = re.compile(ur'\s*\*\s*(\w+)\s*')

# 探索を行う深さ
FIRST_DEPTH = 10

MAX_SCORE = sys.maxint
MIN_SCORE = -sys.maxint - 1

# 自分が勝ったことを表す
MY_WIN = MAX_SCORE - 1
# サーバが勝ったことを表す
SERVER_WIN = MIN_SCORE + 1

# ターンを表す定数
MY_TURN = 0
SERVER_TURN = 1

def compress_word(word):
    u"""
    単語を2文字に圧縮する。
    """
    if 2 < len(word):
        return word[0] + word[len(word) - 1]
    else:
        return word

class CompressedDictionary(list):
    u"""
    圧縮された単語からなる辞書。
    """
    def __init__(self, source_dictionary):
        self.extend([compress_word(a_word) for a_word in source_dictionary])
        self.sort()

    def __contains__(self, item):
        index = bisect.bisect_left(self, item)
        return index < len(self) and self[index] == item

    def clone(self):
        u"""
        本辞書を複製する。
        """
        return CompressedDictionary(self)

    def get_word(self, compressed_word, source_dictionary):
        u"""
        圧縮された辞書から元の単語を取り出す。
        """
        if compressed_word in self:
            for a_word in source_dictionary:
                if a_word[0] == compressed_word[0] and \
                        a_word[len(a_word) - 1] == compressed_word[1]:
                    return a_word
        else:
            raise ValueError()

    def remove(self, x):
        u"""
        辞書から単語を削除する。
        """
        index = bisect.bisect_left(self, x)
        if index < len(self) and self[index] == x:
            del self[index]
        else:
            raise ValueError()

    def remove_word(self, word):
        u"""
        辞書から単語を、未圧縮の単語を指定して削除する。
        """
        first_character = word[0]
        last_character = word[len(word) - 1]
        index = bisect.bisect_left(self, word)
        if index < len(self):
            candidate_word = self[index]
            if candidate_word[0] == first_character and \
                    candidate_word[1] == last_character:
                self.remove(candidate_word)
            else:
                raise ValueError()

def load_dictionary(dictionary_file):
    u"""
    辞書をファイルから読み込む。
    """
    dictionary = []
    with open(dictionary_file, 'r') as f:
        for a_line in f:
            match = WORD_PATTERN.match(a_line)
            if match:
                dictionary.append(match.group(1))
    return dictionary

def input_server_word(previous_word, dictionary):
    u"""
    サーバからの回答を入力する。
    """
    while True:
        print "Please input server's word."
        source_word = raw_input('---> ')
        word = compress_word(source_word)
        if word in dictionary:
            if previous_word:
                if previous_word[1] == word[0]:
                    return word
                else:
                    print '%s is invalid word. Please retry.' % (source_word)
            else:
                return word
        else:
            print '%s is not in dictioary. Please retry.' % (source_word)

def get_candidate_words(previous_word, current_dictionary):
    u"""
    回答の候補となる単語を返す。
    """
    previous_last_character = previous_word[1]
    return [a_word for a_word in current_dictionary
            if a_word[0] == previous_last_character]

def has_win_word(candidate_words, current_dictionary):
    u"""
    勝利できる単語が候補中にあるか否かを返す。
    勝利できる単語とは、末尾の文字を先頭に持つ単語が
    辞書の中にないような単語のことである。
    """
    for a_candidate in candidate_words:
        last_candidate_character = a_candidate[1]
        for next_candidate in current_dictionary:
            if next_candidate[0] == last_candidate_character:
                break
        else:
            return True
    else:
        return False

def get_dead_rate_in_bad_loop(candidate_words, current_dictionary):
    u"""
    ループになる可能性を返す。
    """
    dead_count = 0

    for my_candidate_word in candidate_words:
        alive = True
        my_candidate_first_character = my_candidate_word[0]
        for opponent_word in get_candidate_words(
                my_candidate_word, current_dictionary):
            if opponent_word[1] == my_candidate_first_character:
                dead_count += 1
    return float(dead_count) / len(candidate_words)

def get_frame_rate(previous_word, candidate_words):
    u"""
    ループにはめられている可能性を返す。
    """
    frame_count = 0

    target_character = previous_word[0]
    for my_candidate_word in candidate_words:
        if my_candidate_word[1] == target_character:
            frame_count += 1

    return float(frame_count) / len(candidate_words)

def calculate_score(turn, previous_word, current_dictionary):
    u"""
    スコアを計算する。
    """
    candidate_words = get_candidate_words(previous_word, current_dictionary)
    if 0 < len(candidate_words):
        if has_win_word(candidate_words, current_dictionary):
            if turn == MY_TURN:
                return MY_WIN
            elif turn == SERVER_TURN:
                return SERVER_WIN
            else:
                print u'失敗'
        else:
            score = 0
            dead_score = 50
            dead_rate = get_dead_rate_in_bad_loop(
                    candidate_words, current_dictionary)
            if dead_rate == 1:
                if turn == MY_TURN:
                    return SERVER_WIN
                elif turn == SERVER_TURN:
                    return MY_WIN
                else:
                    print u'失敗'
            score -= int(dead_score * dead_rate)
            frame_score = 100
            frame_rate = get_frame_rate(previous_word, candidate_words)
            score += int(frame_score * frame_rate)
            score *= len(candidate_words)
            if turn == MY_TURN:
                return score
            elif turn == SERVER_TURN:
                return -score
            else:
                print u'失敗'
    else:
        if turn == MY_TURN:
            return SERVER_WIN
        elif turn == SERVER_TURN:
            return MY_WIN
        else:
            print u'失敗'

def get_server_candidate_list(my_word, current_dictionary):
    u"""
    サーバが回答する可能性のある単語のリストを返す。
    """
    candidate_words = list(set(get_candidate_words(my_word, current_dictionary)))
    candidate_words.sort(lambda x, y: cmp(
            calculate_score(MY_TURN, x, current_dictionary),
            calculate_score(MY_TURN, y, current_dictionary)))
    return candidate_words

def is_loop(opponent_word, candidate_word, current_dictionary):
    u"""
    指定の単語でループに入るかどうかを返す。
    """
    last_loop_character = opponent_word[1]
    opponent_candidates = get_candidate_words(candidate_word, current_dictionary)
    for a_opponent_candidate in opponent_candidates:
        if a_opponent_candidate[1] == last_loop_character:
            return True
    else:
        return False

def select_server_word(history, dictionary, limit, depth):
    u"""
    サーバ側の回答を予測する。
    """
    my_word = history[len(history) - 1]
    current_dictionary = dictionary.clone()
    current_dictionary.remove(my_word)
    if depth == 0:
        return calculate_score(SERVER_TURN, my_word, current_dictionary), None
    score = MAX_SCORE
    word = None
    candidate_words = get_server_candidate_list(my_word, current_dictionary)
    if 0 < len(candidate_words):
        loop_candidates = [a_candidate for a_candidate in candidate_words
                if is_loop(my_word, a_candidate, current_dictionary)]
        for a_loop_candidate in loop_candidates:
            candidate_words.remove(a_loop_candidate)
        if 0 < len(loop_candidates):
            a_loop_candidate = loop_candidates[0]
            current_history = history[:] + [a_loop_candidate]
            candidate_score, _ = select_my_word(
                    current_history, current_dictionary, score, depth - 1)
            score = candidate_score
            word = a_loop_candidate
        if score != SERVER_WIN and limit <= score:
            for a_candidate_word in candidate_words:
                current_history = history[:] + [a_candidate_word]
                candidate_score, _ = select_my_word(
                        current_history, current_dictionary, score, depth - 1)
                if candidate_score < score:
                    score = candidate_score
                    word = a_candidate_word
                if score == SERVER_WIN or score <= limit:
                    break
        return score, word
    else:
        return MY_WIN, None

def get_my_candidate_list(server_word, current_dictionary):
    u"""
    自分が回答できる単語のリストを返す。
    """
    candidate_words = list(set(get_candidate_words(server_word, current_dictionary)))
    candidate_words.sort(lambda x, y: cmp(
            calculate_score(SERVER_TURN, x, current_dictionary),
            calculate_score(SERVER_TURN, y, current_dictionary)))
    candidate_words.reverse()
    return candidate_words

def select_my_word(history, dictionary, limit, depth):
    u"""
    自分の回答を予測する。
    """
    server_word = history[len(history) - 1]
    current_dictionary = dictionary.clone()
    current_dictionary.remove(server_word)
    if depth == 0:
        return calculate_score(MY_TURN, server_word, current_dictionary), None
    score = MIN_SCORE
    word = None
    candidate_words = get_my_candidate_list(server_word, current_dictionary)
    if 0 < len(candidate_words):
        loop_candidates = [a_candidate for a_candidate in candidate_words
                if is_loop(server_word, a_candidate, current_dictionary)]
        for a_loop_candidate in loop_candidates:
            candidate_words.remove(a_loop_candidate)
        if 0 < len(loop_candidates):
            a_loop_candidate = loop_candidates[0]
            current_history = history[:] + [a_loop_candidate]
            candidate_score, _ = select_server_word(
                    current_history, current_dictionary, score, depth - 1)
            score = candidate_score
            word = a_loop_candidate
        if score != MY_WIN and score <= limit:
            for a_candidate_word in candidate_words:
                current_history = history[:] + [a_candidate_word]
                candidate_score, _ = select_server_word(
                        current_history, current_dictionary, score, depth - 1)
                if score < candidate_score:
                    score = candidate_score
                    word = a_candidate_word
                if candidate_score == MY_WIN or limit <= score:
                    break
        return score, word
    else:
        return SERVER_WIN, None

def calculate_my_word(server_word, dictionary, max_depth):
    u"""
    次に出すべき単語を求める。
    """
    score, my_word = select_my_word([server_word], dictionary, MAX_SCORE, max_depth)
    return my_word

def main():
    source_dictionary = load_dictionary(DICTIONARY_FILE)
    dictionary = CompressedDictionary(source_dictionary)

    server_word = input_server_word(None,dictionary)
    current_dictionary = dictionary.clone()
    depth = FIRST_DEPTH
    turn = 1

    while True:
        my_word = calculate_my_word(server_word, current_dictionary, depth)
        current_dictionary.remove_word(server_word)
        select_word = current_dictionary.get_word(my_word, source_dictionary)
        print 'Select %s' % (select_word)
        print '\n'
        current_dictionary.remove(my_word)
        server_word = input_server_word(my_word, current_dictionary)
        turn += 1

if __name__ == '__main__':
    main()
忍者ブログ [PR]
ブログ内検索
最近の状況
リンク
カレンダー
05 2017/06 07
S M T W T F S
1 2 3
4 5 6 7 8 9 10
11 12 13 14 15 16 17
18 19 20 21 22 23 24
25 26 27 28 29 30
使用許諾
最新コメント
(08/15)
(05/04)
(03/06)
(03/04)
(09/25)
最新トラックバック
T/O
(11/05)
ブログ内検索
最近の状況
リンク
カレンダー
05 2017/06 07
S M T W T F S
1 2 3
4 5 6 7 8 9 10
11 12 13 14 15 16 17
18 19 20 21 22 23 24
25 26 27 28 29 30
使用許諾
最新コメント
(08/15)
(05/04)
(03/06)
(03/04)
(09/25)
最新トラックバック
T/O
(11/05)