Embedding TensorFlow Operations in ECL

TensorFlowTM (see https://www.tensorflow.org) is a new open-source program from Google for performing linear algebra operations on tensors (matrices) and connecting multiple such operations together. It is particularly suited for machine learning applications, and supports operations on GPUs as well as cluster-based operations across multiple machines when dealing with data that is too large for a single machine to handle. Tensorflow uses a dataflow paradigm that should feel familiar to ECL users – they even have a pretty visualization tool for examining the resulting execution graphs.

HPCC Systems has its own ML library for machine learning operations at scale, but TensorFlow’s approach is a little different and has a number of algorithms available that are not yet implemented in the ECL library, so I thought it would be interesting to see how easy it would be to use TensorFlow from within ECL. It might also help identify (and allow me to fix) any shortcomings in the Python embedded code support.

TensorFlow is most commonly accessed using a Python API – there is a C++ API but it’s not as complete, and it also requires you to build the TensorFlow libraries from source before you can use it, so I decided to focus on Python. ECL allows Python code to be embedded into an ECL program, so it should be simple enough to embed one of the TensorFlow example programs into an ECL program and see what happens.

The first thing that happened was that the “import tensorflow as tf” line failed, with a slightly obscure error message:

EXCEPTION: pyembed: 'module' object has no attribute 'argv'

I’m not a Python expert and I don’t really know why an import would want to look at the contents of argv, but it seems that it was required in this case. A quick fix in the Python embed plugin (see https://github.com/hpcc-systems/HPCC-Platform/pull/9253) to ensure that argv is set up before executing the embedded code and I was getting further. Please note that all the examples shown below will require HPCC Systems version 6.2.0 or later in order to include this fix. Fortunately 6.2.0 should be released by the time this blog is published!

The simple example created some dummy data and then used a linear regression algorithm to deduce a best fit line for it.

IMPORT python;
// A simple example of using tensorflow and numpy from within ECL

tftest := EMBED(Python)
    import tensorflow as tf
    import numpy as np

    # Create 100 phony x, y data points in NumPy, y = x * 0.1 + 0.3

    x_data = np.random.rand(100).astype(np.float32)
    y_data = x_data * 0.1 + 0.3

    # Try to find values for W and b that compute y_data = W * x_data + b
    # (We know that W should be 0.1 and b 0.3, but TensorFlow will
    # figure that out for us.)

    W = tf.Variable(tf.random_uniform([1], -1.0, 1.0))
    b = tf.Variable(tf.zeros([1]))
    y = W * x_data + b

    # Minimize the mean squared errors.

    loss = tf.reduce_mean(tf.square(y - y_data))
    optimizer = tf.train.GradientDescentOptimizer(0.5)
    train = optimizer.minimize(loss)

    # Before starting, initialize the variables.  We will 'run' this first.
    init = tf.initialize_all_variables()

    # Launch the graph.

    sess = tf.Session()
    sess.run(init)

    # Fit the line.

    for step in range(201):
        sess.run(train)
        if step % 20 == 0:
            print(step, sess.run(W).tolist()[0], sess.run(b).tolist()[0])

    # Learns best fit is W: [0.1], b: [0.3]

    print(sess.run(W).tolist()[0], sess.run(b).tolist()[0])
ENDEMBED;

// And here is the ECL code that evaluates the embed:

tftest;

Since the data was calculated using a known formula, the expected best fit line is also known and sure enough when we run this example we do get the results we expect.

(0, 0.43180590867996216, 0.16596344113349915)
(20, 0.2142491191625595, 0.2390165627002716)
(40, 0.1415812224149704, 0.2778049409389496)
(60, 0.11513355374336243, 0.29192206263542175)
(80, 0.10550791025161743, 0.2970600128173828)
(100, 0.10200461000204086, 0.29892998933792114)
(120, 0.10072959214448929, 0.29961058497428894)
(140, 0.10026553273200989, 0.2998582720756531)
(160, 0.10009665042161942, 0.29994842410087585)
(180, 0.100035160779953, 0.2999812364578247)
(200, 0.10001278668642044, 0.2999931871891022)
(0.10001278668642044, 0.2999931871891022)

So far so good, but this is not really using the HPCC Systems platform except as a means of executing embedded Python. We might just as well have run the Python code directly. I wanted something a little more ‘realistic’ in terms of how you might actually use TensorFlow within an HPCC workflow. I wanted to create an example consisting of two separate programs. One that trained a model and then saved the trained model to a file and the second which loaded the trained model to classify input data.

I found an example online at https://github.com/jonbruner/tensorflow-basics that used the MNIST handwriting sample data to classify digits and was conveniently split into the two phases I was after. Simply pasting each of these examples into an ECL EMBED function was straightforward enough (and worked fine), but that was not really a realistic example of how TensorFlow and ECL might be combined. What I wanted was to be able to take input data that was coming from ECL and pass it through the previously trained model, then use the resulting classification in subsequent ECL code. In other words, I wanted to show how a TensorFlow model could be used inside an ECL workflow.

To be efficient, the program would need to load the saved model just once, then pass the input data to it repeatedly. This brought me to my first real roadblock. How can I return a model (stored in a Python object) back to ECL code, then pass it to a subsequent Python function? There is no ECL datatype corresponding to a Python object and trying to pass it using some other datatype would not work because of the way Python objects lifetimes are managed using reference counts.

After a little head-scratching I came up with an idea. I could leave the model stored in a Python object, stored somewhere that would persist from one embed call to the next, and instead return a ’handle’ to it by which it could be retrieved by any other embed call that needed it. Global variables in Python are not persisted from one ECL embed to the next – each embed is completely independent – but the builtin scope (called __builtins__) IS persisted. I could create a dictionary in __builtins__ and store my TensorFlow model in there, looked up by a handle (an integer would do).

My first step was to see if this idea worked on the simple linear regression example above. I refactored it to split the Python code up into 4 functions.

The first two functions create the test data – I still create the data using Python (just because I was too lazy to rewrite this into ECL) but I return it to ECL before passing it from ECL back to the model training function. By separating them out, I made sure that it was possible to train the model using data that was supplied from ECL.

The third function creates the model, and the fourth evaluates the model using the test data and returns the resulting best fit parameters.

IMPORT python;
// A simple example of using tensorflow and numpy from within ECL
// We create two input arrays using numpy, and then use tensorflow to
// calculate values of W and b to fit y = W*x + b

SET OF REAL8 generateXData() := EMBED(python)
  import numpy as np
  # Create 100 random x values
  x_data = np.random.rand(100).astype(np.float32)
  return x_data.tolist()
ENDEMBED;

SET OF REAL8 generateYData(SET OF REAL8 x_data) := EMBED(python)
  import numpy as np
  # create 100 y values where y = x*0.1 + 0.3
  y_data = np.asarray(x_data) * 0.1 + 0.3
  return y_data.tolist()
ENDEMBED;

integer8 createLearningGraph(integer8 handle) := EMBED(python)
  import tensorflow as tf
  import numpy as np
  sess = tf.Session()

  # Try to find values for W and b that compute y_data = W * x_data + b
  # (We know that W should be 0.1 and b 0.3, but TensorFlow will
  # figure that out for us.)

  x_data = tf.placeholder(tf.float32, shape=[100], name="x_data")
  y_data = tf.placeholder(tf.float32, shape=[100], name="y_data")
  W = tf.Variable(tf.random_uniform([1], -1.0, 1.0),name='W')
  b = tf.Variable(tf.zeros([1]),name='b')
  y = W * x_data + b

  # Minimize the mean squared errors.

  loss = tf.reduce_mean(tf.square(y - y_data), name='loss')
  optimizer = tf.train.GradientDescentOptimizer(0.5)
  train = optimizer.minimize(loss, name='train')

  # Before starting, initialize the variables.  We will 'run' this first.

  init = tf.initialize_all_variables()
  thedict = __builtins__.get("thedict", None)
  if thedict is None:
    thedict = {}
    __builtins__["thedict"] = thedict
  thedict[handle] = sess
  return handle
ENDEMBED;

SET OF REAL8 learnW(integer8 sessHandle, SET OF REAL8 _x, SET OF REAL8 _y) :=
    EMBED(python)
  import tensorflow as tf
  import numpy as np
  sess = thedict[sessHandle]  # lookupHandle(sessHandle)
  sess.run('init')
  _x_data = np.asarray(_x)
  _y_data = np.asarray(_y)

  # Retrieve any Tensorflow session variables that I need

  W = sess.graph.get_tensor_by_name('W:0')
  b = sess.graph.get_tensor_by_name('b:0')
  x_data = sess.graph.get_tensor_by_name("x_data:0")
  y_data = sess.graph.get_tensor_by_name("y_data:0")
  train = sess.graph.get_operation_by_name('train')

  # Fit the line.

  for step in range(201):
    sess.run(train, feed_dict={x_data:_x_data, y_data:_y_data})
    if step % 20 == 0:
      print(step, sess.run(W).tolist()[0], sess.run(b).tolist()[0])

  # Learns best fit is W: [0.1], b: [0.3]

  return [sess.run(W).tolist()[0], sess.run(b).tolist()[0]]
ENDEMBED;

// Now the ECL code - generate x and y, then call tensorflow

handle := 35;  // Needs to be unique
x_data := generateXData();
y_data := generateYData(x_data);
sess := createLearningGraph(handle);
OUTPUT(learnW(sess, x_data, y_data));

This worked, but I wasn’t especially happy with the __builtins__ method for passing information from one embed to the next. One issue was the need for a unique handle – left as an exercise for the reader in the code above. If other code executing at the same time were to use the same handle, chaos would ensue and while in Thor you may be able to predict what other code is executing at the same time, it’s a lot less clear in Roxie. Another issue was ensuring that the objects were in fact cleaned up when no longer required – as coded above they would persist forever (until the platform restarted). Finally, the Python code to use them was a little ugly and potentially error prone.

I was thinking about ways to provide a builtin Python library to improve the first of the above issues, when it dawned on me that the easiest way to solve it (which could also solve the other issues) was to provide a mechanism for controlling the lifetime of Python global variables and build it into the embedded Python plugin.

I added two new options to the embedded Python plugin: GLOBALSCOPE and PERSIST. The GLOBALSCOPE option allows independent EMBED attributes to share Python globals with each other if they specify the same name for the GLOBALSCOPE parameter. The PERSIST option controls how long such a shared global scope will persist and exactly how far it will be shared. The value passed to GLOBALSCOPE can be any string you like, allowing you to share globals between related EMBED sections while keeping them distinct from unrelated ones. PERSIST can take one of the following values:

  • ‘global’ – The values persist indefinitely (until the process terminates) and are shared with any other embeds using the same GLOBALSCOPE value, even in other workunits.
  • ‘query’ – The values persist until the query is unloaded, and are shared with other instances of the query that might be running at the same time in Roxie, but not with other queries.
  • ‘workunit’ – The values persist until the end of the current workunit or the current instance of a Roxie deployed query, and are not shared with other instances.

With this new option, the example above can be simplified as follows:

IMPORT python;

// A simple example of using tensorflow and numpy from within ECL
// We create two input arrays using numpy, and then use tensorflow to
// calculate values of W and b to fit y = W*x + b

SET OF REAL8 generateXData() := EMBED(python)
  import numpy as np

  # Create 100 random x values

  x_data = np.random.rand(100).astype(np.float32)
  return x_data.tolist()
ENDEMBED;

SET OF REAL8 generateYData(SET OF REAL8 x_data) := EMBED(python)
  import numpy as np

  # create 100 y values where y = x*0.1 + 0.3

  y_data = np.asarray(x_data) * 0.1 + 0.3
  return y_data.tolist()
ENDEMBED;

createLearningGraph() := 
    EMBED(python: globalscope('blog3'),persist('workunit'))
  import tensorflow as tf
  import numpy as np
  global sess
  global x_data
  global y_data
  global W
  global b
  global train
  sess = tf.Session()

  # Try to find values for W and b that compute y_data = W * x_data + b
  # (We know that W should be 0.1 and b 0.3, but TensorFlow will
  # figure that out for us.)

  x_data = tf.placeholder(tf.float32, shape=[100], name="x_data")
  y_data = tf.placeholder(tf.float32, shape=[100], name="y_data")
  W = tf.Variable(tf.random_uniform([1], -1.0, 1.0),name='W')
  b = tf.Variable(tf.zeros([1]),name='b')
  y = W * x_data + b

  # Minimize the mean squared errors.

  loss = tf.reduce_mean(tf.square(y - y_data), name='loss')
  optimizer = tf.train.GradientDescentOptimizer(0.5)
  train = optimizer.minimize(loss, name='train')

  # Before starting, initialize the variables.  We will 'run' this first.

  init = tf.initialize_all_variables()
ENDEMBED;

SET OF REAL8 learnW(SET OF REAL8 _x, SET OF REAL8 _y) :=
    EMBED(python: globalscope('blog3'),persist('workunit'))
  import tensorflow as tf
  import numpy as np
  global sess
  global x_data
  global y_data
  global W
  global b
  global train
  sess.run('init')
  _x_data = np.asarray(_x)
  _y_data = np.asarray(_y)

  # Fit the line.

  for step in range(201):
    sess.run(train, feed_dict={x_data:_x_data, y_data:_y_data})
    if step % 20 == 0:
      print(step, sess.run(W).tolist()[0], sess.run(b).tolist()[0])

  # Learns best fit is W: [0.1], b: [0.3]

  return [sess.run(W).tolist()[0], sess.run(b).tolist()[0]]
ENDEMBED;

// Now the ECL code - generate x and y, then call TensorFlow

x_data := generateXData();
y_data := generateYData(x_data);
sequential(
  createLearningGraph(),
  learnW(x_data, y_data)
);

Now that it is easy to pass values from one EMBED to the next via globals, I have simplified the code by passing all the TensorFlow variables this way rather than just passing sess and using it to retrieve the other variables by name.

Having proved the concept, I switched back to my ‘more realistic’ example which trained a model once then used it in a separate program for classification. I reproduce the Python example code I started from here (see the link above for an annotated version of it):

save.py:

from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
import tensorflow as tf
sess = tf.InteractiveSession()
def weight_variable(shape):
  initial = tf.truncated_normal(shape, stddev=0.1)
  return tf.Variable(initial)

def bias_variable(shape):
  initial = tf.constant(0.1, shape=shape)
  return tf.Variable(initial)

def conv2d(x, W):
  return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

def max_pool_2x2(x):
  return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                        strides=[1, 2, 2, 1], padding='SAME')

x = tf.placeholder(tf.float32, shape=[None, 784], name="input")
y_ = tf.placeholder(tf.float32, shape=[None, 10])
W = tf.Variable(tf.zeros([784,10]))
b = tf.Variable(tf.zeros([10]))
W_conv1 = weight_variable([5, 5, 1, 32])
b_conv1 = bias_variable([32])
x_image = tf.reshape(x, [-1,28,28,1])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1)
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
h_pool2 = max_pool_2x2(h_conv2)
W_fc1 = weight_variable([7 * 7 * 64, 1024])
b_fc1 = bias_variable([1024])
h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
keep_prob = tf.placeholder(tf.float32, name="keep_prob")
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
W_fc2 = weight_variable([1024, 10])
b_fc2 = bias_variable([10])
y_conv = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2, name="output")

sess.run(tf.initialize_all_variables())
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y_conv), 
                           reduction_indices=[1]))
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
correct_prediction = tf.equal(tf.argmax(y_conv,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
sess.run(tf.initialize_all_variables())

# To make this run faster, we'll only run 1,000 iterations of 
# the training process.

for i in range(1000):
  batch = mnist.train.next_batch(50)
  if i%100 == 0:
    train_accuracy = accuracy.eval(feed_dict={
        x:batch[0], y_: batch[1], keep_prob: 1.0})
    print("step %d, training accuracy %g"%(i, train_accuracy))
  train_step.run(feed_dict={x: batch[0], y_: batch[1], keep_prob: 0.5})
print("test accuracy %g"%accuracy.eval(feed_dict={
    x: mnist.test.images, y_: mnist.test.labels, keep_prob: 1.0}))

image_a = mnist.validation.images[154]
image_a = image_a.reshape([1, 784])
result = sess.run(y_conv, feed_dict={x:image_a, keep_prob:1})
print(result)
print(sess.run(tf.argmax(result, 1)))
saver = tf.train.Saver()
save_path = saver.save(sess, "saved_mnist_cnn.ckpt")
print("Model saved to %s" % save_path)

load.py:

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
sess = tf.InteractiveSession()
new_saver = tf.train.import_meta_graph('saved_mnist_cnn.ckpt.meta')
new_saver.restore(sess, 'saved_mnist_cnn.ckpt')
tf.get_default_graph().as_graph_def()
x = sess.graph.get_tensor_by_name("input:0")
y_conv = sess.graph.get_tensor_by_name("output:0")
keep_prob = sess.graph.get_tensor_by_name("keep_prob:0")

for i in range(0,10000):
  image_b = mnist.validation.images[i]
  image_b = image_b.reshape([1, 784])
  result = sess.run(y_conv, feed_dict={x:image_b, keep_prob:1})
  print(result)
  print(sess.run(tf.argmax(result, 1)))

In ECL I wanted to keep the same separation of save and load in separate ECL queries. The save case could be very simple – just wrap the existing Python into a single EMBED function – but I wanted to make the example a little more realistic so I split it up so that I could pass the training data from ECL. The sample data from MNIST was not in a form easily read by ECL, so the ECL code calls back to Python to read the data.

IMPORT Python;

// We create the model into the Python global variable "sess".
// Using the 'persist' and 'globalscope' options on the EMBED statement
// allow us to share that global value with subsequent embedded 
// Python code

createModel() := EMBED(Python: globalscope('save.ecl'),persist('query'))
    import tensorflow as tf

    def weight_variable(shape):
      initial = tf.truncated_normal(shape, stddev=0.1)
      return tf.Variable(initial)

    def bias_variable(shape):
      initial = tf.constant(0.1, shape=shape)
      return tf.Variable(initial)

    def conv2d(x, W):
      return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

    def max_pool_2x2(x):
      return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                            strides=[1, 2, 2, 1], padding='SAME')

    global sess
    global numBatchesTrained

    sess = tf.Session()
    numBatchesTrained = 0;
    x = tf.placeholder(tf.float32, shape=[None, 784], name="input")
    y_ = tf.placeholder(tf.float32, shape=[None, 10], name='y_')

    W = tf.Variable(tf.zeros([784,10]))
    b = tf.Variable(tf.zeros([10]))

    W_conv1 = weight_variable([5, 5, 1, 32])
    b_conv1 = bias_variable([32])

    x_image = tf.reshape(x, [-1,28,28,1])
    h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
    h_pool1 = max_pool_2x2(h_conv1)

    W_conv2 = weight_variable([5, 5, 32, 64])
    b_conv2 = bias_variable([64])
    h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
    h_pool2 = max_pool_2x2(h_conv2)

    W_fc1 = weight_variable([7 * 7 * 64, 1024])
    b_fc1 = bias_variable([1024])
    h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
    h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)

    keep_prob = tf.placeholder(tf.float32, name="keep_prob")
    h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

    W_fc2 = weight_variable([1024, 10])
    b_fc2 = bias_variable([10])
    y_conv = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2, 
                                 name="output")
    label = tf.argmax(y_conv, 1, name='label')

    cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y_conv), 
                                 reduction_indices=[1]),name='cross_entropy')
    train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy,
                                 name='train_step')
    correct_prediction = tf.equal(tf.argmax(y_conv,1), tf.argmax(y_,1),
                                 name='correct_prediction')
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32),
                                 name='accuracy')
ENDEMBED;

// Code to read the MNIST image files - use the TensorFlow helpers
// to stream them using a Python generator
// The example code that comes with TensorFlow takes care of
// downloading them if not found, unzipping them on the fly,
// and converting them into a Numpy array

mnist_image := RECORD
  SET OF real8 pixels;
END;

mnist_onehot := RECORD
  SET OF real8 scores;
END;

mnist_training_batch := RECORD
  DATASET(mnist_image) images;
  DATASET(mnist_onehot) scores;
END;

// Every row returned from this will be a batch of batchsize images
// from the training data, along with the corresponding classification
// in "one-hot' form

DATASET(mnist_training_batch) readBatch(const varstring name, 
       unsigned batchsize) := EMBED(Python)
  from tensorflow.examples.tutorials.mnist import input_data
  mnist = input_data.read_data_sets(name, one_hot=True)
  while 1:
    batch = mnist.train.next_batch(batchsize)
    if batch is None:
      break
    yield (batch[0].tolist(), batch[1].tolist())
ENDEMBED;

// We pass the training data in in batches - each record in the incoming dataset
// represents one batch

trainModel(DATASET(mnist_training_batch) training_data, UNSIGNED batchsize) :=
      EMBED(Python: globalscope('save.ecl'),persist('query'))
  import tensorflow as tf
  import numpy

  global sess
  global numBatchesTrained;
  with sess.as_default():
    keep_prob = sess.graph.get_tensor_by_name("keep_prob:0")
    x = sess.graph.get_tensor_by_name("input:0")
    y_ = sess.graph.get_tensor_by_name("y_:0")
    train_step = sess.graph.get_operation_by_name("train_step")
    accuracy = sess.graph.get_tensor_by_name("accuracy:0")

    sess.run(tf.initialize_all_variables())
    for batch in training_data:
      x_in = numpy.asarray(batch.images).reshape([batchsize, 784])
      y_in = numpy.asarray(batch.scores).reshape([batchsize, 10])
      train_step.run(feed_dict={x: x_in, y_: y_in, keep_prob: 0.5})
      numBatchesTrained += 1
      if numBatchesTrained%100 == 0:
        train_accuracy = accuracy.eval(feed_dict={
             x:x_in,
             y_: y_in,
             keep_prob: 1.0})
        print("step %d, training accuracy %g" % 
                (numBatchesTrained, train_accuracy))
ENDEMBED;

// Evaluate the quality of the model using the mnist test data

evaluateModel(STRING name) := 
        EMBED(Python: globalscope('save.ecl'),persist('query'))
  import tensorflow as tf
  from tensorflow.examples.tutorials.mnist import input_data
  mnist = input_data.read_data_sets(name, one_hot=True)

  global sess

  with sess.as_default():
    x = sess.graph.get_tensor_by_name("input:0")
    y_ = sess.graph.get_tensor_by_name("y_:0")
    accuracy = sess.graph.get_tensor_by_name("accuracy:0")
    keep_prob = sess.graph.get_tensor_by_name("keep_prob:0")

    print("test accuracy %g"%accuracy.eval(feed_dict={
                x: mnist.test.images,
                y_: mnist.test.labels, 
                keep_prob: 1.0}))
 ENDEMBED;

// Save the trained model to a disk file - it can be loaded
// and reused (see load.ecl)

string saveModel(string save_path) :=
      EMBED(Python: globalscope('save.ecl'),persist('query'))
  import tensorflow as tf
  global sess
  saver = tf.train.Saver()
  return saver.save(sess, save_path);
ENDEMBED;

// Train using 1000 batches of 50 rows from the training data

batchsize := 50;
mydata := choosen(readBatch('MNIST_data', batchsize), 1000);

SEQUENTIAL(
  createModel(),
  trainModel(mydata, batchsize),
  evaluateModel('MNIST_data'),
  OUTPUT('Model saved to ' + saveModel('saved_mnist_cnn.ckpt'))
);

I saved the above code in the file save.ecl, compiled it using eclcc save.ecl -o save, then ran it as ./save, giving me the following output:

Successfully downloaded train-images-idx3-ubyte.gz 9912422 bytes.
Extracting MNIST_data/train-images-idx3-ubyte.gz
Successfully downloaded train-labels-idx1-ubyte.gz 28881 bytes.
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Successfully downloaded t10k-images-idx3-ubyte.gz 1648877 bytes.
Extracting MNIST_data/t10k-images-idx3-ubyte.gz

Successfully downloaded t10k-labels-idx1-ubyte.gz 4542 bytes.
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
step 100, training accuracy 0.88
step 200, training accuracy 0.94
step 300, training accuracy 0.94
step 400, training accuracy 0.96
step 500, training accuracy 0.9
step 600, training accuracy 0.98
step 700, training accuracy 0.98
step 800, training accuracy 0.92
step 900, training accuracy 0.96
step 1000, training accuracy 0.94
Extracting MNIST_data/train-images-idx3-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
test accuracy 0.9698
Model saved to saved_mnist_cnn.ckpt

Note: The downloads will only happen the first time the example is run, and the exact accuracy reported may vary from run to run due to some randomness in the training algorithm.

For the loading side of the problem, I took a similar approach, separating out the load functionality (done once) from the classify (done repeatedly). I also wrote some code to make sure that the MNIST data was only unpacked once even though I was reading it for both the images and the labels.

IMPORT Python;

TFModel(CONST VARSTRING modelname) := MODULE

  // Load a previously-trained TensorFlow model
  // We pass values between this function and others in this module
  // using Python global variables
  // 
  // We return a ‘handle’ to simplify the process of ensuring that the
  // model is loaded if score is called.

  INTEGER4 loadModel(CONST VARSTRING model) := 
      EMBED(Python: globalscope('model.'+modelname),persist('global'))
    import tensorflow as tf

    global sess
    global input
    global keep_prob
    global label

    sess = tf.Session()
    new_saver = tf.train.import_meta_graph(model + '.meta')
    new_saver.restore(sess, model)
    tf.get_default_graph().as_graph_def()

    input = sess.graph.get_tensor_by_name("input:0")
    keep_prob = sess.graph.get_tensor_by_name("keep_prob:0")
    label = sess.graph.get_tensor_by_name("label:0")
    print ('Model %s loaded' % model)
    return 0
  ENDEMBED;

  // Using ONCE ensures that the data is loaded only once even if
  // multiple functions (or even multiple queries) need it

  mymodel := loadModel(modelname) : ONCE;

  // Use the previously-loaded model to categorize an image

  INTEGER8 _score(SET OF REAL8 image, INTEGER4 theModel) := 
      EMBED(Python: globalscope('model.'+modelname),persist('global'))
    import tensorflow as tf
    import numpy

    global sess
    global input
    global keep_prob
    global label

    image_b = numpy.asarray(image)
    image_b = image_b.reshape([1, 784])
    result = sess.run(label, feed_dict={input:image_b, keep_prob:1})
    return result[0]
  ENDEMBED;

  EXPORT INTEGER8 score(SET OF REAL8 image) := _score(image, mymodel);
END;

MNIST(CONST VARSTRING save_location) := MODULE

  // Code to read the MNIST image files - use the TensorFlow helpers to stream
  // them using Python generators
  // The example code that comes with TensorFlow takes care of downloading
  // them if not found, unzipping them on the fly,  and converting them into
  // a Numpy array.
  // We return a ‘handle’ to simplify the process of ensuring that the
  // data is loaded if any of the functions requiring it are called.

  UNSIGNED4 loadMNist(const varstring name) := 
      EMBED(Python: GLOBALSCOPE('mnistData.'+save_location),PERSIST('global'))
    from tensorflow.examples.tutorials.mnist import input_data
    global mnist
    mnist = input_data.read_data_sets(name, one_hot=True)
    return 0
  ENDEMBED;

  // Using ONCE ensures that the data is loaded only once even if
  // multiple functions (or even multiple queries) need it

  SHARED loadHandle := loadMNist(save_location) : ONCE;

  EXPORT mnist_image := RECORD
    SET OF real8 pixels;
  END;

  EXPORT mnist_label := RECORD
    INTEGER8 label;
  END;

  SHARED DATASET(mnist_image) _readImages(UNSIGNED4 handle) := 
      EMBED(Python: GLOBALSCOPE('mnistData.'+save_location),PERSIST('global'))
    from tensorflow.examples.tutorials.mnist import input_data
    global mnist
    for f in mnist.validation.images:
      yield ( f.tolist() )
  ENDEMBED;

  SHARED DATASET(mnist_label) _readLabels(UNSIGNED4 handle) := 
      EMBED(Python: GLOBALSCOPE('mnistData.'+save_location),PERSIST('global'))
    import numpy
    from tensorflow.examples.tutorials.mnist import input_data
    global mnist
    for f in mnist.validation.labels:
      yield ( numpy.argmax(f, 0) )
  ENDEMBED;

  // We pass the ‘handle’ from the loadMNist function to ensure that
  // there is a dependency on that attribute, and therefore that the
  // loadMNist code gets included.

  EXPORT DATASET(mnist_image) readImages := _readImages(loadHandle);
  EXPORT DATASET(mnist_label) readLabels := _readLabels(loadHandle);
END;

// And now the ‘real’ ECL code - read each image as a set, pass it to
// the classifier, and return the classification
// First the code to read the MNIST images and the verification labels

imagedata := MNIST('MNIST_DATA');
images := imagedata.readImages;
labels := imagedata.readLabels;

// Then the code to load the model

mymodel := TFModel('saved_mnist_cnn.ckpt');

// We call mymodel.score for every image in the images dataset. Note the
// use of PARALLEL(8) on the PROJECT statement to execute on 8 threads
// at once. You can experiment with different values - the optimal value
// will depend upon the hardware as well as on the complexity of the
// model and what else is being run at the same time.

outrec := RECORD
  integer categorized;
  integer label;
END;

outrec doscore(imagedata.mnist_image L) := TRANSFORM
  self.categorized := mymodel.score(L.pixels);
  self.label := 0;  // filled in later
END;

scored := PROJECT(images, doscore(left),PARALLEL(8));

// Now we compare our classifier against the labels from MNIST

outrec dolabel(outrec L, imagedata.mnist_label R) := TRANSFORM
  self.categorized := L.categorized;
  self.label := R.label;
END;

p := COMBINE(scored, labels, dolabel(LEFT, RIGHT));

// Accuracy should approximately match that reported by save.ecl

output('Accuracy '+(1-count(p(categorized != label))/count(p)));

I saved the above code in the file load.ecl, compiled it using eclcc load.ecl -o load, then ran it as ./load, giving me the following output:

Extracting MNIST_DATA/train-images-idx3-ubyte.gz
Extracting MNIST_DATA/train-labels-idx1-ubyte.gz
Extracting MNIST_DATA/t10k-images-idx3-ubyte.gz
Extracting MNIST_DATA/t10k-labels-idx1-ubyte.gz
Model saved_mnist_cnn.ckpt loaded
Accuracy 0.9694

The reported accuracy is pretty close to that reported when the model was trained.

There are a few problems still to be solved if this is to be used in a real Thor or Roxie scenario rather than the standalone test case I used here. For example, the model file will need to be distributed from the location where it was created onto every node in a Roxie or Thor cluster that is to use it. Larger models that require more than the resources of a single node will present a more complex problem – TensorFlow can in theory handle such cases but the complexity of passing in the data needs some thought. These problems are left as an exercise for the reader (or perhaps a future blog…)