この章ではQ学習とその実装を説明します。

Q学習

Q学習の方法をここでは要点だけをまとめますが、じっくり意味を知りたいという方は、
ScratchでAIを学ぼう ゲームプログラミングで強化学習を体験
を参考にしてみてください。

まず、情報のやり取りの時系列について確認です。

タイムステップ [math]t[/math] において、環境が観測[math]x(t)[/math] をエージェントに渡し、エージェントは行動 [math]a(t)[/math] を選びます。その後、環境は報酬[math]r(t)[/math]をエージェントに渡します。これで一つのサイクルが終わり、環境は次のタイムステップの観測[math]x(t+1)[/math]をエージェントに渡します。

このサイクルを何度も繰り返します。まとめると下のようになります。


[math]
x(t) \rightarrow a(t) \rightarrow r(t) \rightarrow x(t + 1) \rightarrow \cdots
[/math]

強化学習アルゴリズムの目的は、全ての [math]x[/math]に対して、未来でもらえる報酬(の和)が最大になる行動 [math]a [/math]を選べるようにすること、です。

Q学習は、各観測 [math]x[/math] に対して、各行動 [math]a[/math]を選んだ時の未来の報酬の和(累積報酬和と呼びます)を学習で近似していきます。


[math]
\begin{eqnarray}
Q(x(t), a(t)) & \approx & r(t) \\
& & + \gamma r(t+1) \\
& & + \gamma^2 r(t+2) + \cdots
\end{eqnarray}
[/math]

[math]\gamma[/math] は報酬和の発散を抑えるために導入された0から1までの値をとるパラメータで、割引率と呼びます。[math]Q(x, a)[/math]を保存する変数は、
(取りうる [math]x[/math] の値の数 [math]\times[/math] 行動の種類の数)の表のようなもになるので、Q-table と呼ぶことにします。

もう少し正確に考えると、未来で得られる報酬は確率的な要素を含むので、その平均値を近似するということになります。


[math]
\begin{eqnarray}
Q(x(t), a(t)) & \approx & E[ r(t) \\
& & + \gamma r(t+1) \\
& & + \gamma^2 r(t+2) + \cdots ]
\end{eqnarray}
[/math]

[math]E[\cdot][/math]は平均をとるという意味です。

もし全ての [math]x[/math]と [math]a[/math]において正しい[math]Q(x, a)[/math]を知っていたら、目標は達成されたも同然です。今の観測 [math]x[/math] において、最も[math]Q(x, a)[/math] の値が大きくなる[math]a[/math]を選べばよいからです。


[math]
a = \text{argmax}_{a’} Q(x, a’)
[/math]

[math]\text{argmax}_i f(i)[/math] というのは、[math]f(i)[/math]を最大にする [math]i[/math]という意味になります。ちなみに、[math]\max_i f(i)[/math] は、[math]f(i)[/math]の最大値という違った意味になります。

ただ、学習の前半において[math]Q[/math]の値は正確ではありません。ですのでQが最大となる行動をいつも選んでしまうと、間違った行動を取り続けてしまう可能性が出てきてしまいます。

そこで、基本的には上記の方法で行動を選ぶけれども、[math]\epsilon[/math]の確率でランダムに行動を選ぶという方法を取ります。[math]\epsilon[/math]は、0から1の間の値をとるパラメータであり、乱雑度と呼ぶものとします。この行動選択の方法をepsilon-greedy法と呼びます。

[math]Q[/math]は以下の学習則に従って更新していきます([math]x(t)[/math]が最終状態から得られた観測値の時のみ、最後の[math]\gamma \max_{a’}Q(x(t), a’)[/math] の項を0 にします)。


[math]
\begin{eqnarray}
Q(x, a) & \leftarrow & (1-\alpha) Q(x, a) \\
& & + \alpha \{r + \gamma \max_{a’} Q(x’, a’)\}
\end{eqnarray}
[/math]

ここで、[math]x=x(t), a=a(t), r=r(t), x’=x(t+1)[/math]としています。
この式は、[math]Q(x, a)[/math]を[math]\alpha[/math]分だけ[math]\{r + \gamma \max_{a’}Q(x’, a’)\}[/math] に近づけるという操作を表しています。この更新を毎ステップで行うことで、Q値が累積報酬和の平均値に近づいていきます。

ここで注意しなくてはいけないのは、正しい収束が保証されているのは、観測から状態の情報が完全に分かるMDPの設定のときのみということです(ただし、この場合でも学習率をある規則を満たすように徐々に小さくするという仮定が必要になります)。

よって、今回考えているPOMDPの場合ではQ学習が動く保証はないのですが、簡単なPOMDPであればうまく動くようです(何が簡単か、Q学習でもできるPOMDPの種類ををうまく定義できるとありがたいですね)。

さて、実装のために上の式を少し変形して別な解釈を考えておきます。


[math]
\begin{eqnarray}
Q(x, a) & \leftarrow & Q(x, a) \\
& & – \alpha (\text{output} – \text{target})
\end{eqnarray}
[/math]

ここで、


[math]
\begin{eqnarray}
\text{output} &=& Q(x, a) \\
\text{target} &=& r + \gamma \max_{a’} Q(x’, a’)
\end{eqnarray}
[/math]

としています。observation に対するモデル(ここではQ-table)の出力がoutput、目標値はtarget、その誤差が output – target という解釈です。

output である [math]Q(x, a)[/math]が目標値であるtarget よりも多すぎたら、多すぎた分を減らしてoutput を合わせる、逆に[math]Q(x, a)[/math]がtarget よりも小さかったら、その分を増やす、という操作と考えることができます。

これは、qnet での考え方に合わせるための準備なのですが、qもこの式で実装します。

エージェントの抽象クラス coreAgt

ここからは実装について説明します。エージェントは、core.py で定義している以下の抽象クラス class coreAgt を継承しています。

class coreAgt(metaclass=ABCMeta):
    """
    Agtの抽象クラス
    """

    def build_model(self):
        """
        モデル構築(Tensorflow使用時)
        """

    @abstractmethod
    def select_action(self, observation):  # pylint:disable=no-self-use
        """
        observation に基づいてaction を出力

        Returns
        -------
        action: int
        """
        raise NotImplementedError()

    def get_Q(self, observation):  # pylint:disable=no-self-use
        """
        observationに対するQ値を出力
        """
        if observation == 0:
            Q = None
        else:
            Q = None
        return Q

    def learn(self, observation, action, reward, next_observation, done):
        """
        学習
        """

    def reset(self):
        """
        内部状態をリセット(lstmやgruで使用)
        """

    def save_state(self):
        """
        内部状態をメモリーに保存(lstmやgruで使用)
        """

    def load_state(self):
        """
        内部状態の復元(lstmやgruで使用)
        """

    def save_weights(self, filepath):
        """
        Qtableやweightパラメータの保存
        """

    def load_weights(self, filepath):
        """
        Qtableやweightパラメータの読み込み
        """

    

初めの build_model()は、設定されたパラメータ(中間ユニットの数など)を受けて、モデルを構築するためのものです。agt_q では使いません。

select_action() が、Agt で最も重要なメソッドになります。Agtの役割は行動を選択するということだからです。select_action() は、observation を受けて、行動action を出力します。

learn()は、環境からの一連の情報をもとに、select_action()の出力を最適化していく役割です。ここに、強化学習のアルゴリズムが書かれます。

get_Q()は、学習が想定通りに進んでいるかを見るためのメソッドで、必須ではありません。

残りのメソッドもコメントにある通りです。

パラメータ設定、__init__()

それでは、Q学習が実装されている agt_q.py の class Agt を見ていきます。

まず、__init__()では、Q学習で使用するパラメータをセットします。

class Agt(core.coreAgt):
    """
    Q-tableによるQ学習エージェント
    """
    def __init__(
        self,
        n_action=2,
        input_size=(7, ),
        init_val_Q=0,
        epsilon=0.1,
        alpha=0.1,
        gamma=0.9,
        max_memory=500,
        filepath=None,
    ):
        """
        Parameters
        ----------
        n_action: int
            行動の種類の数
        input_size: tuple of int 例 (7,), (5, 5)
            入力の次元
        init_val_Q: float
            Q値の初期値
        epsilon: float (0から1まで)
            Q学習のε、乱雑度
        gammma: float (0から1まで)
            Q学習のγ、割引率
        alpha: float (0から1まで)
            Q学習のα、学習率
        max_memory: int
            記憶する最大の観測数
        filepath: str
            セーブ用のファイル名
        """

        # パラメータ
        self.n_action = n_action
        self.input_size=input_size
        self.init_val_Q = init_val_Q
        self.epsilon = epsilon
        self.gamma = gamma
        self.alpha = alpha
        self.max_memory=max_memory
        self.filepath = filepath

        super().__init__()

        # 変数
        self.time = 0
        self.Q = {}
        self.len_Q = 0

このプログラムは、sim_swamptour.py を実行することで動きますが、その時のパラメータは、sim_swamptour.py の中に記載されています。

行動選択、select_action()

行動選択は、select_action()で実装されています。

行動を決めるのにQ値が必要になりますが、Q値の保持にはdict型を使っています。

まず、下のコードの(A) で numpy の array 型であるobservationをstr型に変換し、それを obs とします。

(B)では、そのobs が登録済みかどうかを確認し、登録していなかったら新規で登録します。この時、登録数が self.max_memoryを超えたら、警告を出して終了します。

(C)と(D)で epsilon-greedy方が実装されています。(C)は、Qが最大となる行動を出力とするコードであり、(D)は、ランダムで行動を決めているコードです。

class Agt(core.coreAgt):
    --- 省略
    def select_action(self, observation):
        obs = self._trans_code(observation) # (A)

        self._check_and_add_observation(obs) # (B)

        if self.epsilon < np.random.rand():
            action = np.argmax(self.Q[obs]) # (C)
        else:
            action = np.random.randint(0, self.n_action) # (D)
        return action

    def _trans_code(self, observation):
        obs = str(observation)
        return obs

    def _check_and_add_observation(self, observation):
        if observation not in self.Q:
            self.Q[observation] = [self.init_val_Q] * self.n_action
            self.len_Q += 1
            if self.len_Q > self.max_memory:
                print('Qの大きさが上限 %d に達しました。' % self.max_memory)
                sys.exit()
            if (self.len_Q < 100 and self.len_Q % 10 == 0) or (self.len_Q % 100 == 0):
                print('used memory for Q-table --- %d' % self.len_Q)

Q学習、learn()

学習の部分の実装です。

前に記述した学習則は以下の式でした。


[math]
\begin{eqnarray}
Q(x, a) & \leftarrow & Q(x, a) \\
& & – \alpha (\text{output} – \text{target})
\end{eqnarray}
[/math]


[math]
\begin{eqnarray}
\text{output} &=& Q(x, a) \\
\text{target} &=& r + \gamma \max_{a’} Q(x’, a’)
\end{eqnarray}
[/math]

これが learn()で実装されています。[math]x[/math], [math]a[/math], [math]r[/math], [math]x'[/math] が、それぞれ、observation, action, reward, next_observation としています。doneは、next_observation が最終状態からの観測であるときにTrue となる変数です。Trueの時には、target は reward のみとなります。

class Agt(core.coreAgt):
    --- 省略
    def learn(self, observation, action, reward, next_observation, done):
        """
        Q(obs, act)
            <- (1-alpha) Q(obs, act)
                + alpha ( rwd + gammma * max_a Q(next_obs))

        input : (obs, act)
        output: Q(obs, act)
        target: rwd * gamma * max_a Q(next_obs, a)
        """
        obs = self._trans_code(observation)
        next_obs = self._trans_code(next_observation)

        self._check_and_add_observation(next_obs)

        output = self.Q[obs][action]
        if done is False:
            target = reward + self.gamma * max(self.Q[next_obs])
        else:
            target = reward

        self.Q[obs][action] -= self.alpha * (output - target)

保存と読み出し, save_weights(), load_weights()

最後のコードです。Q値のの保存と読み出しの実装です。

class Agt(core.coreAgt):
    --- 省略
    def save_weights(self, filepath=None):
        if filepath is None:
            filepath = self.filepath
        with open(filepath, mode='wb') as f:
            pickle.dump(self.Q, f)

    def load_weights(self, filepath=None):
        if filepath is None:
            filepath = self.filepath
        with open(filepath, mode='rb') as f:
            self.Q = pickle.load(f)

agt_q.py の説明は以上です。

agt_q で出来るタスクとできないタスク

agt_q は新しい観察パターンが来ると、その観察パターンでのQ値をQ-table に足していきます。そのため、観察パターンが増えるとメモリーが圧迫されます。また、メモリーが足りたとしても、Q-table が巨大になることでQ値の収束が非常に遅くなります。

そのような状態を回避するために、プログラムでは、観察パターンが500個よりも多くなるとエラーを出して止まるようにしています。

silent_ruin は、地形が固定されているために観測パターンが限られるために、agt_qで解くことが可能です。

また、open_field は、ゴールの位置はランダムに変わりますが、壁がないために観測パターンがすくなくて済み、解くことが可能です。

many_swamp, ruin_1swamp, ruin_2swamp は、ゴールも壁もランダムに変わるので、観測パターンがすぐに上限に達してしまい、解くことはできません。

一方、Tmaze_both, Tmaze_either はマップが固定ですので、観測パターンの問題は起きません。しかし、短期記憶を必要とするタスクですので、もともとその機能のないagt_qは、正しい解に収束することはできません。

それでは、次章で観測パターンの問題をニューラルネットを使って解決していきましょう。

[top] [01] [02] [03] [04] [05] [06] 次は[07] [08] [09] [10]