Advanced Python Embedding in ECL — A definitive guide

In developing the HPCC Systems Machine Learning libraries, we’ve gotten quite a bit of experience with embedding Python for complex projects.  This guide is an attempt to document our learning and best practices so that others can benefit from that experience.

This guide assumes a fundamental understanding of ECL and the HPCC Systems distributed processing architecture, as well as some familiarity with Python.

Introduction

ECL is a powerful declarative data-processing language with native multi-processing capabilities.  It is a powerful, complete language, and an ideal solution for most data-intensive applications.  So why would we want to embed other languages, like Python within an ECL program?

There are three main reasons why we may want to do this:

  • There are Python libraries written for particular purposes that we would like to utilize.
  • Certain compute-intensive analytic processes can perform better when implemented procedurally.  This is particularly true when iterative optimizations are being done.  A data-oriented language requires repeatedly processing and refining a data set, whereas a procedural language can loop repeatedly on the same data without emitting and reprocessing the full dataset for each iteration.
  • Some complex algorithms are cleaner to express in procedural terms, and while they may be implementable in a declarative semantic, the logic may be awkward and hard to follow.

Fortunately, ECL makes it easy to transition between declarative and procedural worlds through use of embedding — placing a procedural sub-program inside an ECL program.  ECL supports embedding in a number of popular languages:

  • C++
  • Java
  • R
  • Python

Embedding Python in ECL

In this guide, we focus on embedding Python into ECL.  Python melds very naturally with ECL, and the ECL developers have done a great job making such embedding easy.  There is, however, some background knowledge necessary in order to understand how to embed Python in a way that aligns with ECL distributed processing and the needs of your application.

We will cover:

  • Basic Embedding
  • Activity Embedding
  • Python Subsystems
  • Working with Python add-on modules
  • Debugging Embedded Python

Basic Embedding

Suppose we want to use the Python math library to calculate factorials.  Doing this is a simple as:

IMPORT Python3 AS Python;  // This is required for any ECL file containing embeds.  We recommend
                           // the use of Python3, though Python2 is also available as IMPORT Python;

INTEGER doFactorial(UNSIGNED input) := EMBED(Python)
    import math
    return math.factorial(input)
ENDEMBED;

Now we can treat this embedded function just like any ECL function.  For example:

fac17 := doFactorial(17);

OUTPUT(fac17);

Notice how simple and seamless embedding can be!

But on a distributed HPCC Cluster, how did that actually execute?  Recall that in ECL, DATASETs are the main distributed component, whereas other data types are local.  So, since only one data element (UNSIGNED) is produced, this function can only execute on a single node, leaving all other nodes idle.  ECL automatically takes care of that, but if distributed performance was expected, it would not be achieved.

If the same code, was executed inside a TRANSFORM, however, the TRANSFORM is executed on every node, and unique results would be produced from the Python call on each HPCC node.  In this case, the same function would be fully parallelized.  For example:

MyRec := RECORD
    UNSIGNED num;
    UNSIGNED numFactorial := 0;
END;

// Generate a distributed dataset of MyRec, with just the num field populated with numbers 1-20:
MyDS := DATASET(20, TRANSFORM(MyRec, SELF.num := COUNTER), DISTRIBUTED);

// Now we do a PROJECT with a TRANSFORM that uses our python factorial function.
MyDSComplete := PROJECT(MyDS, TRANSFORM(MyRec, SELF.numFactorial := doFactorial(LEFT.num), SELF := LEFT));

OUTPUT(MyDSComplete);

Since the original dataset was evenly distributed among the nodes, the Python doFactorial within the TRANSFORM also occurred independently on all nodes as the dataset records resident on each node are processed.

The embedded function can take any number of any ECL data types as input arguments, and can return any ECL data types.  The argument and return signatures are exactly like any ECL function, and they map intuitively to and from basic Python data types.

ECLPython
STRINGstring
UNSIGNED, INTEGERinteger
REALfloat
SET OF <type>list
RECORD, ROWtuple
BOOLEANbool
DATASETlist
STREAMED DATASETlist or list generator.Generator on input, either for output.

Let’s look at another example to get a feel for how the mappings occur:

Suppose we have an ECL DATASET as follows:

MyRecord := RECORD
    UNSIGNED replCount;
    STRING str;
END;

myDS := DATASET([{5,  'Hello'}, {2, 'Goodbye'}], MyRecord);

Now we want to create a Python function that will concatenate each string in that dataset replCount number of times, and return the results as a SET OF STRING.  We can write:

SET OF STRING ReplicateStrings(DATASET(MyRecord) my_data) := EMBED(Python)
    outList = []
    # Datasets come in as a list of tuples.
    for inputTuple in my_data:
        # The tuple has two elements, since the record defined two fields.
        count, inString = inputTuple
        replString = inString * count
        outList.append(replString)

    # We return the SET OF STRING as a list of strings
    return outList
ENDEMBED;

Notice that DATASETs are handled as a list of tuples.  This is true on the input as well as output.  We could just as easily returned a DATASET as a list of tuples, where each tuple has the same number of items as the fields in the RECORD layout, but we wanted to also show how SET OF <type> is handled.

Complex record types such as datasets with nested child datasets can also be easily handled.  Just keep in mind that any ECL Set is represented as a Python list, and each record is represented by a tuple.  So I can represent any nested hierarchy as lists of tuples which may contain other lists of tuples (for nested DATASETS) or lists of other fundamental types for SET OF <type> fields.

If you are inputting or outputting records with just a single field, don’t forget that even that single field is inside the tuple for a record.  Recall that in Python, a tuple with a single field is represented as “(field,)”.  A comma is required after the single item to distinguish it from mathematical parentheses.

Caveats

There are a few caveats to be aware of:

  1. Always use lower-case for embed arguments!   Since ECL is case-insensitive and Python is case-sensitive, arguments are always handed  to Python as lower-case.  It will not be able to find any mixed-case arguments.
  2. There is currently a minor issue with the Code Generator (Ticket# HPCC-27609) that occasionally causes confusion between Embed argument names and other attributes with the same name.  Until this is fixed in your version of the platform, it is best to use unique names for the Embed arguments.  That is, names that are not used for other attributes in the file.  I typically prefix the Embed argument names with ‘py’ to avoid this scenario.  If you see an error ‘Error: 0: pyembed name <your variable name> is not defined’, it is generally either because you used upper-case letters in your argument name (#1 above), or because there is a conflicting attribute name in the file. Of course, it could also be a typo.

Activity Embedding

Suppose I want to feed an entire dataset to a Python function, to be executed in parallel on all nodes.

I may want to feed the same data to all nodes, or I may want to distribute the data by some criteria, and feed the subset on each node to the python function.

To do this we use the “Activity” keyword in our embed statement.  This causes the Python function to behave like a distributed ECL command — the portion of the dataset on each node is passed to the Python function, and the returned dataset comprises all the records returned from the Python function on all nodes.

A special data type is used to transfer this data in and out of the Python function as a stream.  This is called STREAMED DATASET.  The beauty of the STREAMED DATASET is that the Python program can process one record at a time, and not need to load the entire dataset contents into memory.  This is useful where the embedded function processes one record at a time.  But even if the function needs the full dataset to process, we will still want to use STREAMED DATASET for the output, and at least the first input argument to the Python embed.

Suppose I had a Python function that translates English sentences into Spanish.  In order to do this quickly, I’d like to take advantage of HPCC Systems parallelization capabilities.

MyRec := RECORD
    UNSIGNED id;
    STRING Sentence;
END;

// Make a dataset of 500 English Sentences (or input from a file).
Sentences := DATASET([{1, 'Hello, my name is Roger'},
                      {2, 'I live in Colorado'}, 
                      ...
                      {500, 'This is my last sentence'}],MyRec);

// Distribute the sentences round-robin by id.
Sentences_Dist := DISTRIBUTE(Sentences, id);

// Now call the Python embed function.  Because our pyEng2Span embedded function
// is an 'activity', this one line will cause the subset of records on each node to be passed to
// the python code, and all of the results to form a single distributed dataset.
Spanish := pyEng2Span(Sentences_Dist);

// At this point, the order of the sentences is scrambled.  We'll probably want to sort it.
Spanish_Sorted := SORT(Spanish, Id);

OUTPUT(Spanish_Sorted);

// Here's what the embedded function would look like.
// Note the 'activity' keyword in the EMBED statement, and STREAMED DATASET output and first input.
EXPORT STREAMED DATASET(MyRec) pyEng2Span(STREAMED DATASET(MyRec) eng_sentences) :=       
                                                            EMBED(Python: activity)
    import eng2span # Fictional python module
    # Iterate over the input records
    for item in eng_sentences:
        id, sentence = item # Remember, each input item is a record tuple with 2 fields
        # Do the translation.
        spanish = ent2Span.Translate(sentence)
        # Since we are processing one record at a time, we use the Python generator semantic "yield"
        # (see Python documentation).  This allows us to emit records from deep inside a function.
        yield (id, spanish)  # We emit a single record as a tuple.
ENDEMBED;

That wasn’t too bad.  Even parallel processing with embedded functions is quite easy.

Note that If I called the argument to the python function “sentences” rather than “eng_sentences” it would have gotten confused with the “Sentences” attribute defined above (see Basic Embedding/Caveats #2 above), and given a confusing error.  It would have said that there was no python variable ‘sentences’.  This can be hard to diagnose if not forewarned!

In this case, we processed one record at a time, and used the generator syntax “yield” to emit the records.  If this function had needed to process all records before emitting results, we would have something like below.

EXPORT STREAMED DATASET(MyRec) pyEng2Span(STREAMED DATASET(MyRec) eng_sentences) :=   
      EMBED(Python: activity)
    import eng2span # Fictional python module
    inRecs = []  # List to accumulate input records
    # Iterate over the input
    # Note that eng_sentences comes in as a generator since it was a streamed dataset
    # generators do not have the same capabilities as lists (see comments below).
    for item in eng_sentences:
        # Add to our accumulator list
        inRecs.append(item)

    # Now we can accumulate output records.
    outRecs = []
    for i in range(len(inRecs)):
        # Now that it's a list rather than generator, we can use len() and indexing.
        rec = inRecs[i]
        # Extract the fields from the record tuple
        id, sentence = rec
        # Translate
        spanish = eng2span.Translate(sentence)
        # Accumulate output records
        outRecs.append((id, spanish))
    # Note that we return the records as a list, whereas we previously 'yield'ed one tuple at a time.
    return outRecs
ENDEMBED;

It is important to understand the nature of the STREAMED DATASET that is the input to the Python function.  It shows up as a Python generator function.  Generators only behave as a list in the sense that they can be iterated over (e.g. for item in dataset:).  They don’t have the other functions of a list such as:

  • length — e.g., len(dataset)
  • indexing — e.g., dataset[0]
  • repetitive iteration — The second iteration will return no contents as the stream is already exhausted.

To convert a stream to a list, we just iterate over it and append each item to a list.

An embedded function can take in more than one STREAMED DATASET, and may also have arguments of other types.  When working with multiple STREAMED DATASETS, it is important to make sure that both datasets are distributed so that the corresponding records are on the same node.  Keep in mind that other data types will have the same values passed to the function on all nodes.

It is not necessary for an Activity to return the same record type or count that it received.  This makes it completely flexible as to the kinds of processing that can occur.  For example, I could input a dataset and a desired result, and train a machine learning model.  I could then output the model as a single record or multiple records of a completely different format.

Python Subsystems

We use the term Python Subsystem to refer to any use of Python that requires coordination between functions.  For example, one function may create and store a Python object, and other functions may make use of that stored object.  This gets a little tricky, but if you follow the pattern below, it should not be too daunting.

Recommended Structure

We recommend that two ECL modules are used when constructing Python Subsystems.  This is not strictly required, but makes for cleaner code.

  • External Interface — A clean interface to your subsystem that is simple and easy to understand, without any embedding.
  • Internal Python Object Interface — The workhorse, containing all of the tricky embed code, typically wrapping a python object or module.

This two-layer approach provides a clean pattern, a simple user interface, and is easier to explain and understand.  We will come back to this.

Global Memory

The key to building Python Subsystems is the use of global memory.  The EMBED syntax makes this possible by maintaining the same Python global memory space across embed calls.  This is done as follows:

STREAMED DATASET(myRec) myFunc(STREAMED DATASET(myRec) input) :=
      EMBED(Python: globalscope('anyscopename'), persist('query'), activity)

The key feature here is the use of the ‘globalscope’ and ‘persist’ keywords.  The globalscope can be any string, though we recommend that you use a string unique to your module.  The persist(‘query’) says that the global memory will be preserved for the entire duration of your ECL program (i.e. query).

Initialization

The most common pattern for a Python Subsystem is to create an initialization function that creates a Python object, and attaches it to the global memory space.

Here is an example of that:

// Dummy Record for returning an object handle
dummyRec := RECORD
    UNSIGNED handle;
END;

STREAMED DATASET(dummyRec) pyInit(STREAMED DATASET(myRec) input) :=
        EMBED(Python: globalscope('anyscopename'), persist('query'), activity)
    import myObjModule # Any python module I want to use.
    global OBJECTS, OBJ_NUM

    if OBJECTS not in globals():
        # This is your one-time initializer code.  It will only be executed once on each node.
        # All global initialization goes here.
        OBJECTS = {}  # Dictionary of objects.  This allows multiple active objects.
        OBJ_NUM = 1 # Object counter
    # Now instantiate the object that we want to use repeatedly
    myObject = myObjectModule(input)

    # Add it to our OBJECTS cache
    handle = OBJ_NUM # The handle is the index into the OBJECTS dict.
    OBJECTS[handle] = myObject

    # Increment the OBJ_NUM for next time we're called.
    OBJ_NUM += 1

    # We return a single dummy record with the object handle inside.
    return[(handle,)]
ENDEMBED;

Before moving on, let’s analyze what we just did.  The first “if” checks to see if we’ve already initialized, or if this is the first time in.  If it’s the first time in, then we initialize a global dictionary of objects.  This allows for multiple active objects in the same query.  If only one is needed, then you can just create a global OBJECT variable and set your object to that.

We create our desired object, passing it any input data it needs to construct.  We return a STREAMED DATASET with a single record indicating the handle for that object.  Even if only a single object is supported, you will still return a dummyRec with an arbitrary handle.  The reason for this will become clear later.

External Interface

We’ll start with the external interface and work our way inward.  Let’s say we are building a Linear Regression machine learning subsystem.  We’ll want to construct a Linear Regression module, passing it the data it needs to learn the model, and then make queries against it to predict new results.  

IMPORT PyLR; // This imports our internal python object interface

EXPORT Regressor := MODULE(DATASET(myDsRec) independents, DATASET(myDsRec) dependents)
    # The following line contains the magic.  It causes the initialization of the module to occur.
    # Note that the module parameters should contain any data needed to perform the initialization
    LR := PyLR.Init(independents, dependents);

    # Now we have one or more functions that operate against the initialized object(s)
    EXPORT DATASET(myDsRec) Predict(DATASET(myDsRec) independents) := FUNCTION
          # Distribute the data among the nodes.
          independents_Dist := DISTRIBUTE(independents, id);
          # Invoke the python activity on all nodes, each node with the subset that is on that node.
          predictions := PyLR.Predict(independents_Dist, LR);
          # The data will come back in mixed order because it was independently processed on
          # each node. Sort it so that it correlates to the input data.
          predictions_Sorted := SORT(predictions, id);
          RETURN predictions_Sorted;
    END; // Predict Function

    # Any other functions can go here.
    ...
END; // Regressor Module

The one piece of magic in the above is the assignment of LR, which causes the internal module’s initialization to be called with the module’s input data.  Recall that ECL code is declarative, and only attributes that are used are actually evaluated.  By passing LR as

an argument to the internal predict function, we ensure that the initialization took place before predict was called.  This sequencing guarantees that the Python object is constructed (presumably fitting the model), before the prediction is done.  Now Predict can be called any number of times on the same stored model.

We could have many other functions within this module that operate on the Linear Regression model.  We only need to ensure that each function passes LR to the internal code.

Now let’s look at the internal code.

Internal Python Interface Module

The internal module is typically named after the Python module that it wraps.  In this case, we’ll be wrapping an imaginary linear regression module called PyLR.

IMPORT Python3 AS Python;

dummyRec := RECORD
    UNSIGNED handle;
END;

// Module to wrap the PyLR module (imaginary)
EXPORT PyLR := MODULE
    # This is the Init function that gets called when LR is evaluated in the external module.
    # Note that Init is implemented in ECL, but contains an interior embed function 'pyInit'
    EXPORT UNSIGNED Init(DATASET(myDsRec) independents, DATASET(myDsRec) dependents) := FUNCTION
         # The interior pyInit function
        STREAMED DATASET(dummyRec) PyInit(STREAMED DATASET(myDsRec) py_independents,
                 STREAMED DATASET(myDsRec) py_dependents) := 
                      EMBED(Python: globalscope('anyscopename'), persist('query'), activity)
            import PyLR
            global OBJECTS, OBJ_NUM
            if OBJECTS not in globals():
                # Executes first time only.  Globals are assigned here.
                OBJECTS = {}
                OBJ_NUM = 1
            # Function to unpack the inputs and get them ready to pass to the python PyLR module for fitting.
            def unpack_input(ds):
                ...
                return unpacked
            final_dependents = unpack_input(py_dependents)
            final_independents = unpack_input(py_independents)
            # Construct regressor object
            regressor = PyLR.Regressor()
            # Fit it to the data
            regressor.fit(final_dependents, final_independents)
            # Store it in globals
            handle = OBJ_NUM
            OBJECTS[handle] = regressor
            # Increment the OBJ_NUM for next time
            OBJ_NUM += 1
            # Return the handle as a single dummy record
            return [(handle,)]
        ENDEMBED; // PyInit

        // Back to the outer Init function here.
        // We distribute the datasets to all nodes so that the same model gets created
        // on each node.  Other applications may want to distribute selectively and have different models
        // on each node.
        independents_Dist := DISTRIBUTE(independents, ALL);
        dependents_Dist := DISTRIBUTE(dependents, ALL);
        // We call the internal PyInit with the distributed data
        dummy = PyInit(independents_Dist, dependents_Dist);
        // Now we have a dataset of dummy records, one from each node.  Reduce to a single handle by
        // taking the MAX.
        // Note that all nodes will return the same handle (i.e. 1 for the first time through), so we could
        // use MIN, AVE, or d[1].
        outHandle = MAX(dummy, handle);
        # Now return the UNSIGNED handle
        RETURN outHandle;
    END;  // Init

    // Now we can implement the other methods as simple (non-nested) embeds.
    EXPORT DATASET(MyDsRec) Predict(DATASET(MyDsRec) predict_independents, UNSIGNED py_handle) := 
                                           EMBED(Python: globalscope('anyscopename'), persist('query'), activity)
         # Get the trained object from globals.
         myLR = OBJECTS[py_handle]
         # Convert inputs to form needed by PyLR
         def unpack_input(inds):
             ...
             return unpacked
         final_inds = unpack_input(predict_independents)
         # Do the predicts
         results = myLR.Predict(final_inds)
         # Convert to Dataset form
         def pack_output(res):
              ...
              return packed
         final_results = pack_output(results)
         return final_results
    ENDEMBED; // Predict
END; // PyLR Module

The trickiest part here is the Init function.  This ECL function wraps a Python embed function.  It does the technical work of distributing the data as appropriate, and collecting the dummy records returned into a single “handle”.  This is necessary so that this handle can be passed back into Predict (and any other functions), guaranteeing that the initialization will be complete on all nodes before predictions are done.

The Predict function, and any other functions we wish to provide are now straight-forward.  They all must take a handle as an argument.

We skipped over some details here for simplification, but I will mention them now.  Note that the external Predict function did the distribution.  It presumably distributed the independent and dependent records so that the same record ids were on the same nodes. So, our internal Predict function will not receive records with sequential ids.  Therefore, we must be careful during the unpack to store the order of ids, so that we can pack the results with the same ids.  Then when the external predict sorts the results by id, the input and the output can be correlated.

Shared Functions and Classes

It is possible to implement shared Functions and Classes within an Embedded Python Subsystem.  Essentially, we define a class or function within the first-time clause of the PyInit function, and then assign it to a global variable.  In the following example, we re-write the Init function to provide a global unpack / pack mechanism to convert the ECL data to Python and vice versa.

STREAMED DATASET(dummyRec) PyInit(STREAMED DATASET(myDsRec) py_independents,
                 STREAMED DATASET(myDsRec) py_dependents) := EMBED(Python: globalscope('anyscopename'), persist('query'), activity)
            import PyLR
            global OBJECTS, OBJ_NUM
            if OBJECTS not in globals():
                # Executes first time only.  Globals are assigned here.
                OBJECTS = {}
                OBJ_NUM = 1
                # Here's where we create the global functions
                global UNPACK_INPUT, PACK_OUTPUT
                # Unpack the inputs and get them ready to pass to the python PyLR module for fitting.
                def _unpack_input(ds):
                     ...
                     return unpacked
                 # Assign the function to a global variable.
                 UNPACK_INPUT = _unpack_input
                 # Now do the same for pack.
                 def _pack_output(res):
                     ...
                     return packed
                 PACK_OUTPUT = _pack_output
            # Now we're past the one-time initialization, and we can use the global functions.
            # These are now also available for any other functions in the module.
            final_dependents = UNPACK_INPUT(py_dependents)
            final_independents = UNPACK_INPUT(py_independents)
            # Construct regressor object
            regressor = PyLR.Regressor()
            # Fit it to the data
            regressor.fit(final_dependents, final_independents)
            # Store it in globals
            handle = OBJ_NUM
            OBJECTS[handle] = regressor
            # Increment the OBJ_NUM
            OBJ_NUM += 1
            # Return the handle
            return [(handle,)]
        ENDEMBED; // PyInit
        ...

Now that we’ve created the global UNPACK_INPUT and PACK_OUTPUT functions, any follow-on EMBED with the same global space can use those functions.  We could have alternatively created a packer_unpacker python class and stored that as a global.

# In the one-time initialization clause...
global PACKER_UNPACKER
class _packer_unpacker:
    def unpack(self, ds):
        ...
        return unpacked

    def pack(self, res):
        ...
        return packed

# We can now instantiate the class and provide a global reference to it.
PACKER_UNPACKER = _packer_unpacker()  # This is a class instance
# Note: we could alternatively let each user create their own instance by doing
# PACKER_UNPACKER = _packer_unpacker # With no parents, this is the class itself
# Then users would do myPU = PACKER_UNPACKER();  myPU.pack(...)
# Now in any embedded function we can do
packed = PACKER_UNPACKER.pack(...)
# or
unpacked = PACKER_UNPACKER.unpack(...)
...

Processing Paradigms

There are a number of common processing paradigms for Python Subsystems.  Here are a few of the most common, but each application may have its own needs.

  • Common Data / Distributed Analysis — The main dataset(s) are replicated to all nodes, and identical python objects are created on each node independently from the data.  Then, queries can be distributed across nodes and each node will process 1 / nNodes portion of the query.  In this way, queries are fully independent and are fully parallelized.
  • Distributed Data / Averaged results — The dataset(s) are distributed among the nodes in an application meaningful way, and each node constructs an object with a piece of the data.  Then queries are handled by all nodes, and the results produced by the different nodes are aggregated (typically averaged) to produce a final result.
  • Distributed Heavy Lifting / Centralized Final Analysis — A known set of preliminary queries representing the most time consuming activities are distributed among the nodes and executed in parallel.  Then the combined results are sent to another embedded function to assemble a final result.  In this case, the final step is executed as a basic embedding rather than an activity, since it needs to utilize all the initial results.  There is no use in doing it on all nodes as they will all produce the same result, and no time will be saved.

You will need to evaluate the nature of your application and choose one of the above models, or create a new one.

Working with Python Add-on Modules

In many cases, we want to use pre-existing Python add-on modules.  These may be publicly available modules (e.g. numpy, tensorflow, scikit-learn, networkx), or internally developed modules.  If we are developing complex Python Subsystems, we recommend that these are developed as installable Python modules.  These modules can be developed and tested independently and then wrapped by EMBED code as would be done for a publicly available module.  There are two reasons for this:

  1. It is easier to debug complex code in a stand-alone environment and;
  2. Embed functions are independent, and it is awkward to share common code.  It can be done, for simple common elements (see Python Subsystems/Shared Functions and Classes for details), #1 above still suggests an add-on module for any complex modules.

Python add-on modules are typically installed using pip (pip3 for python3).  This needs to be done on each server running the HPCC-Platform.

By default, pip installs the add-on for a single user.  This doesn’t usually work well for ECL embedding because the HPCC Platform runs under its own userid ‘hpcc’, which the system user typically does not have access to.  We get around this by installing the module for all users by running pip using sudo.  For example:

sudo pip3 install tensorflow

After restarting the HPCC Platform, ECL should have access to the installed module.  You can test this by going to the “Playground” on ECLWatch and running e.g.:

IMPORT Python3 AS Python;

UNSIGNED testModule() := EMBED(Python)
    import tensorflow # or your module name
    return 1
ENDEMBED;

OUTPUT(testModule());

You may occasionally run into a problem with your first add-on module where even after running pip3 and restarting HPCC, the above test still fails.  This is usually because the PYTHONPATH environment variable for the hpcc user does not include the global install path (e.g. /usr/local/lib/python3.6/dist-packages).  To see where pip installed the package, you can run:

pip3 show <modulename>

Debugging Embedded Python

Because ECL embedded code is processed in-line from a Python point of view, the error messages produced can be hard to interpret.  For this reason, we like to take error-handling into our own hands.

We do this by using the ‘assert’ command of Python.  We use it in several ways:

  1. We check for errors in the input arguments, or for bad state using assert in a standard way.
  2. We use a form of assert to communicate detailed error information within a Python except clause.

In the first case, we might do the following:

UNSIGNED myFunc(SET OF UNSIGNED input) := EMBED(Python)
    assert len(input) > 1, 'myMod.myFunc: input must contain at least two items.  We got ' + str(input)
    ...

ENDEMBED;

If the assert clause (e.g. len(input) > 1) is false, the function will abort, and the text message will be sent to the ECL Watch console.

Best practice is to include the name of the ECL file (myMod above) and the name of the embed function (myFunc above) in the message.  That way you can tell exactly where it comes from.  Also note that we concatenate the string version of the input to the message.  The more context you provide, the easier debugging will be!

The second method we use is to put a try … except around the meat of your function, and use assert to generate a structured output message in the event of an exception.

UNSIGNED myFunc(SET OF UNSIGNED input) := EMBED(Python)
    assert len(input) > 1, 'myMod.myFunc: input must contain at least two items.  We got ' + str(input)
    try:
        # Your implementation goes here
        ...
        return result
    except:
           # We got an error.  Let's format a meaningful message
           def format_exc(func=''):
               import traceback as tb
               exc = tb.format_exc(limit=2)
               if len(exc) < 100000:
                   return func + ': ' + exc
               else:
                   return func + ': ' + exc[:200] + ' ... ' + exc[-200:]

            # Call the above function to generate a clean error message with stack trace
            # We pass in the name of our module and function to give maximal context.
            exc = format_exc('myMod.myFunc')
            # Now we use assert to deliver the message to ECL Watch.  We use a condition that
            # is always False, since we already know we had an error.
            assert False, exc
ENDEMBED;

If you have many embed functions, and you are using the Python Subsystem pattern, it is better practice to create a global function to do exception formatting to avoid messy duplicate code.  See the section Python Subsystems/Shared Functions and Classes above for details.  Your code may be something like this:

# In your one-time initialization section (per above)
            global FORMAT_EXC
            def _format_exc(func=''):
                import traceback as tb
                exc = tb.format_exc(limit=2)
                if len(exc) < 100000:
                    return func + ': ' + exc
                else:
                    return func + ': ' + exc[:200] + ' ... ' + exc[-200:]
            FORMAT_EXC = _format_exc
            ...

            # Now in any embed function, I can do:
            try:
                # My processing
                ...
                return result
            except:
                assert False, FORMAT_EXC('myMod.myFunc')

Even with the above diagnostics in place, you will sometimes get an unformatted exception.  These would be very hard to diagnose if you hadn’t put the above measures in place, but with them in place, you can be confident that the exception occurred while converting your return data back to ECL data types.  Here are some common types of mistakes that can cause this:

  • Returning a blatantly wrong data type (e.g. returning an Integer when ECL is expecting a string.
  • Improperly formatting a dataset record (e.g. the record has 6 required fields and we only returned 5).
  • When using python add-in modules, they sometimes return items that act like basic python types, but are really other custom types.  This frequently happens with numpy, for example, where it may return a numpy.float rather than a float, or an array instead of a list.  Within python you may not notice the difference, but when ECL tries to convert it, it may fail.  This is remedied by e.g., float(numpy_result) or list(numpy_array) to ensure converstion to basic python types.  In the case of an alternate string type, you can always do str(funny_string).

Other Considerations

Channels versus Slaves

The HPCC Platform has two methods of allocating multiple cluster “nodes” to servers.  These are defined in the environment.xml file.

  • slavesPerNode — Create this many node processes on each server
  • channelsPerSlave — Create this many threads (i.e. channels) within each slave.

The total number of cluster nodes on each server will therefore be slavesPerNode * channelsPerSlave.

The choice of these methods has implications on the performance and functionality of Python Subsystems.

The first implication is performance.  Python has a global interpreter lock, which means that compute-intensive threads will not achieve full parallelism, even when a sufficient number of CPUs are available.  This means that your Python Subsystems will perform better with e.g., slavesPerNode=4, channelsPerSlave=1 than with slavesPerNode=1, channelsPerSlave=4.

The second implication is memory usage.  If channels are used, there is a potential to share object memory in applications where each node creates identical objects.  Consider the  following initialization code (see Python Subsystems section):  I

STREAMED DATASET(dummyRec) PyInit(STREAMED DATASET(myDsRec) py_independents,
                 STREAMED DATASET(myDsRec) py_dependents) := 
                      EMBED(Python: globalscope('anyscopename'), persist('query'), activity)
            import PyLR
            global OBJECTS, OBJ_NUM
            if OBJECTS not in globals():
                # One-time initialization.  Globals are assigned here.
                OBJECTS = {}
                OBJ_NUM = 1
            # Function to unpack the inputs and get them ready to pass to the python PyLR module for fitting.
            def unpack_input(ds):
                ...
                return unpacked
            final_dependents = unpack_input(py_dependents)
            final_independents = unpack_input(py_independents)
            # Construct regressor object
            regressor = PyLR.Regressor()
            # Fit it to the data
            regressor.fit(final_dependents, final_independents)
            # Store it in globals
            handle = OBJ_NUM
            OBJECTS[handle] = regressor
            # Increment the OBJ_NUM for next time
            OBJ_NUM += 1
            # Return the handle as a single dummy record
            return [(handle,)]
        ENDEMBED; // PyInit

If I’m running multiple channels (threads), all of the channels will be sharing the same global memory space.  Therefore, the first thread in will do the one-time initialization, and subsequent threads will get handles 2, 3, and 4.  Since we take the MAX of the handles (in the ECL code — not shown here), the final handle will be 4.  Everything will still work, but now we have 3 copies of the object in memory that are serving no purpose.  An easy way to fix this is to sacrifice the ability to have multiple active objects within an ECL program, and just assign OBJECT in the one-time code.  Now the first thread in will assign OBJECT, and the remaining threads will bypass the one-time initialization.  It might look like this:

STREAMED DATASET(dummyRec) PyInit(STREAMED DATASET(myDsRec) py_independents,
                 STREAMED DATASET(myDsRec) py_dependents) := 
                      EMBED(Python: globalscope('anyscopename'), persist('query'), activity)
            import PyLR
            global OBJECT
            if OBJECT not in globals():
                # Executes first time only.  Globals are assigned here.
                # Construct regressor object.
                regressor = PyLR.Regressor()
                # Fit it to the data
                regressor.fit(final_dependents, final_independents)
                # Store it in global
                OBJECT = regressor 
            # Function to unpack the inputs and get them ready to pass to the python PyLR module for fitting.
            def unpack_input(ds):
                ...
                return unpacked
            final_dependents = unpack_input(py_dependents)
            final_independents = unpack_input(py_independents)
            handle = 1 # Always return the same handle.
            # Return the handle as a single dummy record
            return [(handle,)]
        ENDEMBED; // PyInit

Now we will be sharing the same copy of the object among channels.

But what if we want each node to have different data.  Now our original code would not have worked at all with channels, since we would always be looking at the last one in.  This can be handled by creating separate globalspace names for each node.  This is, in fact good practice whenever we are creating Python Subsystems with different initialization data for each node.  It will eliminate problems when the code is run in a multiple channel environment.  It would look something like this:

IMPORT Std.System.Thorlib;

node := Thorlib.node(); # The node number of the executing node.

...

// Our pyInit function definition now uses a different globalspace for each node.
STREAMED DATASET(dummyRec) PyInit(STREAMED DATASET(myDsRec) py_independents,
                STREAMED DATASET(myDsRec) py_dependents) :=
                          EMBED(Python: globalscope('anyscopename' + (STRING)node), persist('query'), activity)

      ...

This causes each thread to run in its own global space, and prevents any interactions between channels.

Automatic Workload Allocation

It is common in Python Subsystems for the internal python object to have methods that don’t require any new data, but return a large number of items. Consider an example of an object that holds a dataset, and provides a method to calculate the correlation coefficient for variable pairs.  If we want to get all such pairings in parallel, we could create an input dataset that has all of the variable pairings, distribute that, and provide it as input to the activity.  But that is unnecessary.  There is another pattern that makes this easy.  In this pattern, we pass the HPCC node number and the total number of nodes to the function, and the python code within each node decides which piece of the total activity to handle.  It might look like this:

IMPORT Std.System.Thorlib;

nNodes := Thorlib.Nodes();

node := Thorlib.Node();

...

EXPORT STREAMED DATASET(corrCoefs) getAllCorrCoefs(UNSIGNED py_handle, UNSIGNED py_node, UNSIGNED py_nnodes) :=
             EMBED(Python: globalscope('anyscopename'), persist('query'), activity)
    # Get the shared object from global storage
    myObj = OBJECTS[py_handle]
    variables = myObj.vars() # Assume there's a way of getting all the variables.
    # We sort the variable list to ensure that we are generating the same order on all nodes.
    variables.sort()
    # Now we'll iterate through all variable pairs.
    pairNum = 0
    for i in range(len(variables):
        var1 = variables[i]  # The first variable of the pair
        for j in range(i+1, len(variables)):
            # CorrCoef is symetrical. Therefore, we only need to compute in one direction
            var2 = variables[j] # The second variable of the pair
            # Now we decide if we should handle this pair, or if we should leave it to another node.
            if pairNum % py_nnodes == py_node:
                # The modulo of pairNum matches our node number, so we handle it.
                # Otherwise we leave it for another node to compute.
                coef = myObj.corrCoef(var1, var2)
                yield (var1, var2, coef) # Yield the record
             pairNum += 1
ENDEMBED;

final := getAllCorrCoefs(handle, node, nNodes);
OUTPUT(final)

The final dataset will now contain the answers for each variable pairing, which ran fully in parallel without any explicit coordination among nodes.

Executable Code Examples

Basic Embedding Example

IMPORT Python3 AS Python;

// Embed method to calculate one factorial
INTEGER doFactorial(UNSIGNED input) := EMBED(Python)
    import math
    return math.factorial(input)
ENDEMBED;

// Use above function to generate a single factorial.  Note that this
// can only run on one node, because it receives and returns a basic (local)
// data type.
fac17 := doFactorial(17);
OUTPUT(fac17, NAMED('factorial17'));

// Now we'll call it from within a TRANSFORM of a distributed dataset.
// This runs in parallel on all nodes, because each record is transformed
// with a different input value, and results in dataset records distributed
// across nodes.

MyRec := RECORD
    UNSIGNED num;
    UNSIGNED numFactorial := 0;
END;

// Geneate a distributed dataset of MyRec, with just the num field populated with numbers 1-20:
MyDS0 := DATASET(20, TRANSFORM(MyRec, SELF.num := COUNTER), DISTRIBUTED);
// Let's distribute it by num.  This will cause record 1 to be on node 1, record 2
// to be on node2, etc.  
MyDS := DISTRIBUTE(MyDS0, num);
// We'll output the original dataset for reference.
OUTPUT(MyDS, NAMED('InputDataset'));

// Now we do a PROJECT with a TRANSFORM that uses our python factorial function.
// The records on each node will be processed in parallel, resulting in the results
// being on each node.
MyDSComplete := PROJECT(MyDS, TRANSFORM(MyRec, 
                             SELF.numFactorial := doFactorial(LEFT.num),
                             SELF := LEFT));

// Output the results.  Note that, while the dataset has been completed,
// the results are not sorted (if run on a multi node thor).
OUTPUT(MyDSComplete, NAMED('Dataset_Complete'));

// You may want to sort the final results after doing distributed operations.
Complete_Sorted := SORT(MyDSComplete, num);
OUTPUT(Complete_Sorted, NAMED('Complete_Sorted'));

Activity Example

Now we implement the former example as a Python Activity.  We now process the whole dataset with a single Python invocation.

IMPORT Python3 AS Python;

MyRec := RECORD
    UNSIGNED num;
    UNSIGNED numFactorial := 0;
END;

// Geneate a distributed dataset of MyRec, with just the num field populated with numbers 1-20:
MyDS0 := DATASET(20, TRANSFORM(MyRec, SELF.num := COUNTER), DISTRIBUTED);
// Let's distribute it by num.  This will cause record 1 to be on node 1, record 2
// to be on node2, etc.  
MyDS := DISTRIBUTE(MyDS0, num);
// We'll output the original dataset for reference.
OUTPUT(MyDS, NAMED('InputDataset'));

// Python Activity that takes a partially populated dataset
// in, and returns the dataset populated with factorials.
// It runs fully in parallel.
STREAMED DATASET(MyRec) doFactorials(STREAMED DATASET(MyRec) recs) := EMBED(Python: activity)
    import math
    for recTuple in recs:
        # Extract the fields from the record. In this case
        # we only care about the first field 'num'.
        num = recTuple[0]
        # Yield a new record with the factorial included.
        yield (num, math.factorial(num))
ENDEMBED;

// Call the python activity with our partially populated
// dataset/
MyDSComplete := doFactorials(MyDS);

// Output the results.  Note that, while the dataset has been completed,
// the results are not sorted (if run on a multi node thor).
OUTPUT(MyDSComplete, NAMED('Dataset_Complete'));

// You may want to sort the final results after doing distributed operations.
Complete_Sorted := SORT(MyDSComplete, num);
OUTPUT(Complete_Sorted, NAMED('Complete_Sorted'));

Python Subsystem Example including Best Practice Diagnostic Messaging

Now we do the same task as a Python Subsystem.  This example is somewhat contrived, since the common object that we’re storing really contains no significant context, but it does illustrate the mechanics.  We don’t illustrate the two-module approach, but do show the basic mechanisms needed for that.  We also show appropriate diagnostic messaging to ease debugging.

IMPORT Python3 AS Python;

MyRec := RECORD
    UNSIGNED num;
    UNSIGNED numFactorial := 0;
END;

// Geneate a distributed dataset of MyRec, with just the num field populated with numbers 1-20:
MyDS0 := DATASET(20, TRANSFORM(MyRec, SELF.num := COUNTER), DISTRIBUTED);
// Let's distribute it by num.  This will cause record 1 to be on node 1, record 2
// to be on node2, etc.
MyDS := DISTRIBUTE(MyDS0, num);
// We'll output the original dataset for reference.
OUTPUT(MyDS, NAMED('InputDataset'));

// Python Subsystem.  We store the "factorialMgr" object in shared memory, and then access it
// with a separate method. This would only be useful if the factorialMgr had significant data
// or initialization cost which we wouldn't want to bear on each invocation.
// Note that there's no input data, but we need some kind of STREAMED DATASET for input or
// it wont run on all nodes, so we just pass a bogus handleRec dataset in.  Usually you would
// have real input data here.
handleRec := RECORD
  UNSIGNED handle;
END;
STREAMED DATASET(handleRec) fmInit(STREAMED DATASET(handleRec) recs) :=
           EMBED(Python: globalscope('facScope'), persist('query'), activity)
    import math
    global OBJECT
    # Let's create and store
    # an exception formatting function so we can use it
    # anywhere.
    global FORMAT_EXC
    def _format_exc(func=''):
       import traceback as tb
       exc = tb.format_exc(limit=2)
       if len(exc) < 100000:
           return func + ': ' + exc
       else:
           return func + ': ' + exc[:200] + ' ... ' + exc[-200:]
    FORMAT_EXC = _format_exc
    // We wrap everything we can in try ... except to make debugging easier.
    try:
        # We're only supporting one factorialMgr.  If called again will use the original.
        if 'OBJECT' not in globals():
            # Define the class. Typlically we would import another module and use a
            # class from that module.
            class factorialMgr:
                def calcFactorial(self, num):
                   return math.factorial(num)
            # Instantiate factorialMgr and store in global memory
            OBJECT = factorialMgr()
        # Now we just return an arbitrary handle record, since we're only handling one
        # instance.
        return [(1,)]
    except:
        # Use our stored exception formatting function
        exc = FORMAT_EXC('facModule.doFactorials')
        assert False, exc

ENDEMBED;

// Here's a routine that uses the shared object from Init.
// Notice that it must receive handle even though it's not used.
// Otherwise, we can't guarantee that fmInit will be called first.
STREAMED DATASET(MyRec) doFactorials(STREAMED DATASET(MyRec) recs, UNSIGNED handle) :=
           EMBED(Python: globalscope('facScope'), persist('query'), activity)
    # Check your input data and stored state and use assert to indicate errors.
    assert 'OBJECT' in globals(), 'facModule.doFactorial -- ERROR Expected OBJECT not defined.'
    try:
        for recTuple in recs:
            # Extract the fields from the record. In this case
            # we only care about the first field 'num'.
            num = recTuple[0]
            # Yield a new record with the factorial included.
            # We use the stored factorialMgr to do the work.
            yield (num, OBJECT.calcFactorial(num))
    except:
        exc = FORMAT_EXC('facModule.doFactorials')
        assert False, exc
ENDEMBED;

// Create a dummy dataset of handles, at least one record on each node.
dummy0 := DATASET([{0}], handleRec);
dummy := DISTRIBUTE(dummy0, ALL);
// Now we can call the fmInit, and get back a handle on from each node.
handles := fmInit(dummy);
// We output the handles just to show how they appear.
OUTPUT(handles, NAMED('handles'));
// Now we reduce to a single handle using MAX
handle := MAX(handles, handle);
// Output the single handle.
OUTPUT(handle, NAMED('handle'));
// And now we call the doFactorials method, using the handle.
MyDSComplete := doFactorials(MyDS, handle);

// Output the results.  Note that, while the dataset has been completed,
// the results are not sorted (if run on a multi node thor).

OUTPUT(MyDSComplete, NAMED('Dataset_Complete'));

// You may want to sort the final results after doing distributed operations.
Complete_Sorted := SORT(MyDSComplete, num);
OUTPUT(Complete_Sorted, NAMED('Complete_Sorted'));

Automatic Workload Allocation Example

IMPORT Python3 AS Python;
IMPORT Std.System.Thorlib;

node := Thorlib.node();
nNodes := Thorlib.nodes();

MyRec := RECORD
    UNSIGNED num;
    UNSIGNED numFactorial := 0;
END;

// Python Activity to produce factorials up to "upTo" argument.
// It executes in parallel and automatically allocates the workload among HPCC Nodes.
STREAMED DATASET(MyRec) doFactorials(UNSIGNED upTo, UNSIGNED pynode, UNSIGNED pynnodes) := EMBED(Python: activity)
    import math
    for i in range(upTo):
        # Only do 1/nNodes of the work.  Let other nodes do
        # the rest.
        if i % pynnodes == pynode:
            # Yield a new record with the factorial included.
            yield (i, math.factorial(i))
ENDEMBED;

// Call the python activity.  Note that we pass it the "upTo" parameter(20)
// as well as the current node number and total number of nodes in the cluster.
// This allows it to automatically balance the workload among nodes.
MyDSComplete := doFactorials(20, node, nNodes);

// Output the results.  Note that, while the dataset has been completed,
// the results are not sorted (if run on a multi node thor).
OUTPUT(MyDSComplete, NAMED('Dataset_Complete'));

// You may want to sort the final results after doing distributed operations.
Complete_Sorted := SORT(MyDSComplete, num);
OUTPUT(Complete_Sorted, NAMED('Complete_Sorted'));