全探索、動的計画法、分枝限定法(枝刈り)、ダイクストラを実装して比較してみた

はじめに

アルゴリズムパズル ―プログラマのための数学パズル入門』という本を読んでいたところ、問題20の山下りの最大和が以下のようなものだった。

正の整数が三角形状に配置されている。この三角形の頂点から初めて、それぞれの回想で直前に選んだ数字と隣接する1つの数字を選びながらふもとまで降下していくときに、たどった数字を合計したあたいの最大値を求めるアルゴリズムを設計せよ(もちろん、全探索よりも効率的であること)。

f:id:Aratamotsu:20201225134315p:plain
4階層の場合。青い経路がこの場合の解

答えとしては、動的計画法を使えば全探索より効率的に解を求めることができるわけであるが、問題設定が簡単で、かつ色々なアプローチを考えることができるので、勉強していたことをいくつか試してみた。

jupyter notebookはここに置いておいた。

対応するグラフを作る

階層の数を$N$とすれば、一番上の階層のノードが1つ、1つ下の階層のノードが2つ、・・・、$N$番目の階層にノードが$N$個となって、下に降りる際に、隣接している2つのノードにエッジが出ているグラフになる。また、それぞれのノードは、得点を持っており、一番下に降りるまでに、得た得点の合計値の最大の値を求める、という問題になる。

from collections import deque
import math
import pandas as pd
from heapq import heappop, heappush

関数を3つ用意する。

# i段目の左からj番目のnodeの全体での順序を返す関数
def to_index(i, j):
  return int(i * (i + 1) / 2 + j)

# n番目のノードが上から何番目の階層にいるのかを返す関数
def to_layer(N):
  return math.floor((-1 + math.sqrt(1 + 8 * N)) / 2 ) 

def my_graph(N):
  G = []
  for i in range(N):
  for j in range(i+1):
      if i < N -1:
        # i段目の左からj番目のnodeはi+1段目のj番目とj+1番目のnodeへのedgeを持つ
        G.append([to_index(i+1, j), to_index(i+1, j+1)])
      else:
        # 最後の階層にあるnodeは、行き先となるedgesを持たない
        G.append([]) 
        
  # nodeの総数
  total_node = to_index(N-1, N-1)

  # 各nodeで得られるスコアを設定
  node_score = [random.randint(1, 10)  for i in range(total_node + 1)]

  return G, node_score

グラフを作成する場面では、リストGi番目の要素は、i番目のノードが隣接している1つ下の階層のノードのindexのリストとなっている。要はグラフの隣接リスト表現1である。なお、リストにおけるノードの順番は、上の階層から数え、同じ階層では左から順に数えていくことを想定している。

したがって、$i$個目の階層の、左から$j$番目のノードは、リストでは、$0 + 1+2+3+...+i + j = i(i+1)/2 + j$となる。また、ここから、$k$番目のノードは

$$ \frac{i(i+1)}{2} \leq k < \frac{(i+1)(i+2)}{2} $$

を満たすので、左側の不等号を解くとto_layer(k)を得ることができる。

なお、各ノードの得点は、1から10までの整数からランダムに選ばれるものとした。割と問題の対称性が高いので、外れ値的なグラフが作られることが少なくなるように思う。こうしておけば、処理時間を出す際に平均値を出すためのループ回数が少なくてすみそう。また、ここの得点の設定は後で変更した場合も考えてみる。

全探索による解法

探索経路としては、一番上の階層から右下にいくか左下にいくかのどちらかで1つ階層を降りる。また、左下か右下の2択...を繰り返すのみなので、今回の形状のグラフにおいては、bit探索が有効になる。調べるべき経路は、2択を$N-1$回行うので、$2^{N-1}$個の経路を調べることになる。

# 全探索を全探索で実装してみる。
def all_search(G, N):
  start = time.time()
  result = -1
  for i in range(2**(N-1)):
    # ビット列に従い、左右をたどっていく
    current_node = 0 # 最初のnode
    score = node_score[current_node] # 最初のnodeのscoreを足しておく
    
    for j in range(N-1):
      current_node = G[current_node][(i >> j) & 1] # 次のnodeへ移動
      score = score + node_score[current_node] # 移動したらscoreを足す
    
    result = max(result, score) # 最終的なscoreがこれまでのものより大きければ更新する

  return result, time.time() - start

resultという変数に、これまでたどった経路で得た得点の和の最大値を記録しておき、その値より大きい得点を得ることのできる経路が出てきた場合、その値を更新することを繰り返す。実際のところは、最終的に最大値が分かればよいので、いちいち、現状の得点の和の最大値を更新していく必要はないように思うけれど、後に枝刈りを考えるときに、この最大値が必要になるので、このように書いておいた。

グラフの形式に依存したプログラムになっており、Gが持っているエッジの情報をあまりうまく使えていない。ノードによって、エッジが3本とかになった場合、すぐ使えなくなってしまうので、あり得る経路のリストを、Gから作って、その経緯をひとつずつたどっていく、という方法の方が本来的には汎用的だと思う。(←今度書き直してみたい。)

あと、時間を計測したいので、求めた得点の最大値と処理時間を返す関数にしている。

動的計画法

全探索は何度も何度も同じエッジをたどることになり、無駄が多い。あるノードにたどり着くまでに得ることのできる得点の最大値をメモしていきながら下の階層へ降りていけば、同じエッジをたどる必要がなくなる。下の階層へ降りていく方法として、深さ優先探索幅優先探索があるので、関数の引数でどちらかを選択できるように実装した。

def DP(G, method = "DFS"):
# nodeの総数
  start = time.time()

  # 各nodeの特典結果の記録を初期化
  node_result = [-1 for i in range(len(G))]

  # stackに最初のnodeを追加
  stack = deque([0])
  node_result[0] = node_score[0]

  result = 0

  while(stack):
    # 深さ優先探索か幅優先探索かで、キューかスタックか分かれる。
    if method == "DFS":
      c_node = stack.pop()
    else:
      c_node = stack.popleft()
      
    for n_node in G[c_node]:
      node_result[n_node] = max(node_result[n_node], node_result[c_node] + node_score[n_node]) 
      stack.append(n_node)
      result = max(result, node_result[n_node])

  return result, time.time() - start

変数にstackという名前をつけてしまっているが、幅優先探索の場合は、que的な動作になる。stackに次のノードをひたすら入れていき、なくなるまでwhileでループをしている。ここも再帰関数で記述すると、もう少し見た目がよくなるのかもしれないので、別パターンも書いておきたい。

今回考えているグラフの場合、ある程度下の階層のノードにたどり着く経路はかなり多そうである。そういった場合、深さ優先探索で、早めに階層深くを探索してしまうと、上の方で他の経路からより高い得点を持つ経路によって緩和されることになり、最初に探索した計算が無駄になってしまう。実際、後に示すように幅優先探索の方が深さ優先探索より速い。

分枝限定法を使ってみる

探索している途中で、これまで得られている得点の最大値があるとする。あるノードを調べていて、そのノードから残り全ての各ノードで最高得点(ここでは10)を取ったとしても、これまで得られている得点の最大値を超えることができない場合、そのノード以降の探索をする意味はない。なので、これまで得られている得点の最大値を記録しておき、最下層までの階層の数と今現在のノードの得点とで、最大値を更新し得るか確認することができる。これにより、探索範囲を狭めることができる。

深くなればなるほど得点を得られやすくなるので、枝刈りは深さ優先探索において有効になりそうである。

実際、分枝限定法は、先ほどのコードを少し変えるだけで実装できる。

def DP_cut(G, method = "DFS"):
  start = time.time()
  
  # 各nodeの結果を記録する
  node_result = {i : -1 for i in range(len(G))}

  # stackに最初のnodeを追加
  stack = deque([0])
  node_result[0] = node_score[0]

  result = 0
  cut = 0
  result_l = []
  while(stack):
    if method == "DFS":
      c_node = stack.pop()
    else:
      c_node = stack.popleft()

    for n_node in G[c_node]:
      node_result[n_node] = max(node_result[n_node], node_result[c_node] + node_score[n_node])
      if (node_result[n_node] + (N - 1 - to_layer(n_node)) * 10) > result:
        stack.append(n_node)
        #result_l.append(node_result[n_node])
        result = max(result, node_result[n_node])
      else:
        cut = cut + 1
  #print(cut)
  return result, time.time() - start

Dijkstraダイクストラ)に変更する

ざっくり言うと、stackだったりqueだったりの探索に優先順序をつけるとダイクストラになる。ヒープを使って優先度付きキューを利用してダイクストラを実装してみる。

def Dijkstra(G):
    start = time.time()

    # 各nodeの結果を記録する
    node_result = {i : -1 for i in range(len(G))}

    # heapに最初のnodeを追加
    hp = []
    node_result[0] = node_score[0]
    heappush(hp, (-node_result[0], 0))
    
    result = 0
    while(hp):
        # ヒープからノードを取り出し、
        c_node = heappop(hp)[1]
        
        for n_node in G[c_node]:
            # 取り出したノードが向かう先のノードについて、緩和を行う
            if node_result[c_node] + node_score[n_node] > node_result[n_node]:
                node_result[n_node] = node_result[c_node] + node_score[n_node]
                result = max(result, node_result[n_node])
                heappush(hp, (-node_result[n_node], n_node))

    return result, time.time() - start

実際、コードはほとんど変わらない。ヒープを使う際は、タプルで入れている。また、最小値から取り出すしようなので、経路の得点にマイナスを付けてプッシュする。ヒープの使い方は、こんな感じ。

from heapq import heappop, heappush
# ヒープに格納
hp = []
heappush(hp, (1, 5))
heappush(hp, (2, 4))
heappush(hp, (3, 3))
heappush(hp, (4, 2))
heappush(hp, (5, 1))
# 取り出してみる
if hp:
    y = heappop(hp)
    print(y)
else:
    print("hp is empty")

ダイクストラ+分枝限定法

当然枝刈りを入れることもできる。

def Dijkstra_cut(G):
    start = time.time()

    # 各nodeの結果を記録する
    node_result = {i : -1 for i in range(len(G))}

    # heapに最初のnodeを追加
    hp = []
    node_result[0] = node_score[0]
    heappush(hp, (-node_result[0], 0))
    
    result = 0
    cut = 0 # 枝狩りの回数カウント用
    while(hp):
        # ヒープからノードを取り出し、
        c_node = heappop(hp)[1]
        
        for n_node in G[c_node]:
            if node_result[c_node] + node_score[n_node] > node_result[n_node]:
                # 取り出したノードが向かう先のノードについて、緩和を行う
                node_result[n_node] = node_result[c_node] + node_score[n_node]
                
                # 枝狩りできるかどうか確認して、枝狩できなければ、ヒープに入れる
                if (node_result[n_node] + (N - 1 - to_layer(n_node)) * 10) > result:
                    result = max(result, node_result[n_node])
                    heappush(hp, (-node_result[n_node], n_node))
                else:
                    cut = cut + 1
    #print(cut)
    return result, time.time() - start

処理時間を測ってみる

とうとう、ここまでこれた。これまで実装したもので処理時間がどのような挙動を示しているのかを確かめる。

# 階層数
stairs = 50

# それぞれの階層数で、何個のグラフを試してみるか、その個数
roup = 1000

elapsed_time = {}

#algorithms = ["all_search", "DP_DFS", "DP_BFS", "DP_DFS_cut", "DP_BFS_cut", "Dijkstra", "Dijkstra_cut"]
algorithms = ["DP_DFS", "DP_BFS", "DP_DFS_cut", "DP_BFS_cut", "Dijkstra", "Dijkstra_cut"]

for algorithm in algorithms:
    elapsed_time[algorithm] = []

for N in range(1, stairs):
    #s_all_search = 0
    s_DP_DFS = 0
    s_DP_BFS = 0
    s_DP_DFS_cut = 0
    s_DP_BFS_cut = 0
    s_Dijkstra = 0
    s_Dijkstra_cut = 0
    
    start = time.time()
    for _ in range(roup):
        G, node_score = my_graph(N)
        
        # 所要時間を足していく
        #s_all_search = s_all_search + all_search(G, N)[1]
        s_DP_DFS = s_DP_DFS + DP(G)[1]
        s_DP_BFS = s_DP_BFS + DP(G, method="BFS")[1]
        s_DP_DFS_cut = s_DP_DFS_cut + DP_cut(G)[1]
        s_DP_BFS_cut = s_DP_BFS_cut + DP_cut(G, method="BFS")[1]
        s_Dijkstra = s_Dijkstra + Dijkstra(G)[1]
        s_Dijkstra_cut = s_Dijkstra_cut + Dijkstra_cut(G)[1]

    
    # 平均処理時間を算出し、log10を取る
    #elapsed_time["all_search"].append(s_all_search / roup)
    elapsed_time["DP_DFS"].append(s_DP_DFS / roup)
    elapsed_time["DP_BFS"].append(s_DP_BFS / roup)
    elapsed_time["DP_DFS_cut"].append(s_DP_DFS_cut / roup)
    elapsed_time["DP_BFS_cut"].append(s_DP_BFS_cut / roup)
    elapsed_time["Dijkstra"].append(s_Dijkstra / roup)
    elapsed_time["Dijkstra_cut"].append(s_Dijkstra_cut / roup)

    # print(N, time.time() - start)

ループ1000回はちょっと回しすぎかもしれないが、ここが少ないと結構ジグザグになってしまう。計測結果を図示してみよう。

# 計算時間の表示
df = pd.DataFrame(elapsed_time)
df.applymap(math.sqrt).plot()

なお、図示する際に計測時間のルートをとっている。これは、動的計画法幅優先探索で実装すると、処理回数がグラフのエッジの数に比例し、エッジの数はノードの数の2倍、ノードの数は階層$N$に対し、$N(N+1)/2$となるので、$O(N^{2})$で実装できているからである。実際、直線になっている(ように見える。)

実際のところ図示してみる。

# 計算時間の表示
import matplotlib.pyplot as plt
%matplotlib inline

df = pd.DataFrame(elapsed_time)

plt.figure()
plt.style.use(['default']) 

df.applymap(math.sqrt).plot()
plt.savefig('./plot.png', dpi=300)
plt.close('all')

f:id:Aratamotsu:20201225174359p:plain
処理時間の比較

予想通り - 幅優先探索が一番速い - 枝刈りは深さ優先探索ダイクストラでは有効 - 幅優先探索では、枝刈りは意味がない - 幅優先探索は、$O(N^{2})$

ということになった。ただ、問題設定次第では、実効的に枝刈りが効果を発揮することもあると考えられる。

今、ノードの得点は1から10までの整数をランダムに選択しているが、この分散をもう少し大きくしてやれば、経路ごとに得点の差が出てきて枝刈りが有効になりそうである。そこで、my_graph()

node_score = [random.randint(1, 10)  for i in range(total_node + 1)]

node_score = [random.choice([0, 10]) for i in range(total_node + 1)]

に変更し、得点が1又は10しか取り得ないようにしてみる。その結果がこちら。

f:id:Aratamotsu:20201225174402p:plain
得点の分散を大きくした場合の処理時間の比較

途中までは、ダイクストラ+枝刈りがリードしているものの、計算量的に改善されているわけではないこともあって、$N=40$ほどで追いつかれてしまっている。深さ優先探索でも、枝刈りで大きく改善されていることがわかる。

おまけ:貪欲法

貪欲法では当然、最適解は見つからないけれど、$O(N)$で実装できる。

def Greeding(G):
# nodeの総数
    start = time.time()

    # stackに最初のnodeを追加
    stack = deque([0])
    score = 0

    while(stack):
        c_node = stack.pop()
        score = score + node_score[c_node]
        
        connected_node = G[c_node]
        if connected_node:
            # まどろっこしいが、一般の有向グラフに適用できるように書いておく
            next_scores = [node_score[x] for x in connected_node]
            max_score = max(next_scores)
            n_node = G[c_node][next_scores.index(max_score)]
            stack.append(n_node)
        
    result = score
    return result, time.time() - start

まとめ

ひとつの問題を色々なアルゴリズムで実装して比較してみた。問題の性質を色々と考えて、自分で実装し分析してみると勉強になる。ソートとかでも同じことをやってみたい。

読んでいる本

アルゴリズムパズル ―プログラマのための数学パズル入門

アルゴリズムパズル ―プログラマのための数学パズル入門


  1. 『問題解決力を鍛える!アルゴリズムとデータ構造』p.164でこのように表現されていた。