post Image
TensorFlow の複数プロセスで queue を共有


はじめに

タイトル通りの内容です。

TensorFlowで分散処理をする場合queueだとかVariableだとか状態を持つような特殊なoperationを複数のプロセス間で共有する必要があります。

そのあたりの情報が少ないのでメモを残しておきます。

まずはqueueの場合です。


多分最小に近いサンプルコード

2つプロセスを立ち上げるので、2つコードを用意します。

enqueue.pyはqueueにひたすらデータを詰めていくコード、dequeue.pyはqueueからひたすらデータを取り出していくコードです。

重要なのはRandomShuffleQueueを作る時に両方のコードで同じshared_nameを指定しているところです。

enqueue.py

import tensorflow as tf

server = tf.train.Server(
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}),
job_name="local",
task_index=0
)

q = tf.RandomShuffleQueue(
capacity=10,
dtypes=[tf.float32, tf.string],
name="q",
shared_name="shared_queue",
min_after_dequeue=0
)
enqueue_op = q.enqueue_many(vals=[[0, 1, 2], ["a", "b", "c"]])

with tf.Session(server.target) as sess:
for _ in range(10):
sess.run(enqueue_op)

dequeue.py

import tensorflow as tf

server = tf.train.Server(
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}),
job_name="local",
task_index=1
)

q = tf.RandomShuffleQueue(
capacity=10,
dtypes=[tf.float32, tf.string],
name="q",
shared_name="shared_queue",
min_after_dequeue=0
)
dequeue_op = q.dequeue()

with tf.Session(server.target) as sess:
for _ in range(10):
print(sess.run([dequeue_op]))

それぞれ以下のように動かしてみましょう。

python enqueue.py

python dequeue.py

dequeue.pyを実行すると以下のようにenqueue.pyでqueueに詰めたデータを取り出すことができました。

[[2.0, 'c']]

[[0.0, 'a']]
[[2.0, 'c']]
[[2.0, 'c']]
[[0.0, 'a']]
[[1.0, 'b']]
[[2.0, 'c']]
[[1.0, 'b']]
[[1.0, 'b']]
[[0.0, 'a']]

やったね。


参考


『 Python 』Article List