Compiler Tutorial, Step 2: The Distributed Flag, and Execution Tests

See the Introduction for this article here.

Previous chapter: Step 1: The Parser, The Expression Tree and the Activity.

The Compiler Part

Git commit: Add DISTRIBUTED flag to DATASET(N, trans(COUNTER))
https://github.com/hpcc-systems/HPCC-Platform/pull/1338/files

Now that we have a working version of the new syntax, we need to make it worth the effort. Our original intent was to make it distributed (instead of running only on the master node), so let’s do this.

Remember, that the DISTRIBUTED flag will only work in Thor, since Roxie will only execute the ‘inlinetable’ on a single node by design.

The Parser

First, we need to add the flag to the Yacc file. We can just add it directly, but we can also prepare for the future, when you could add more flags to this syntax that are also supported by other similar constructs.

We add three new items: ‘optDatasetFlags’, which can contain a number of ‘datasetFlags’ which builds a “comma” list of all ‘datasetFlag’ items. The final ‘datasetFlag’ can be one of many options, but for now, we just have the one “DISTRIBUTED”, which will just create a new attributed bound to the atom ‘distributedAtom’. See the commit above on ‘hqlgram.y’ for more information.

Again, if you remember from previous steps, this is just a way to name a token, so later we can retrieve this information and change the back-end accordingly. On your original line, you just need to add ‘optDatasetFlags’ at the end of the list (before the final parenthesis).

| DATASET '(' thorFilenameOrList ','
beginCounterScope transform endCounterScope
optDatasetFlags ')'

and now you have an attribute that might (or might not) be on the DATASET() declaration.

The Helper

An attribute is not always treated on the same place. Some attributes are used for compilation purposes, others for execution purposes. Since we don't want to create two activities, one distributed and one not (exponential growth when you add more attributes), we need the activity to know what this means.

One way to do that would be to change the 'CThorTempTableArg' interface to pass this parameter to the activity, but here we reach a conundrum: The old temporary table helper didn't have any flag other than 'isConstant'. We could add another method 'isDistributed', but since we prepared ourselves for the future above (by allowing a list of flags to be passed), this would be at least short-sighted.

Other helpers use a much more flexible 'getFlags' functionality, where you only need to change the enum that it returns (by adding new nodes, deprecating old ones, like we did before in the AST nodes) and the underlying activity doesn't have to change.

This is important for old compiled code that would take ages to compile again on a new platform, since it'd pull out a long list of other dependencies and possibly disrupt production, etc. We tend to require a new compilation only when it's extremely necessary, or when there is a new major version[3].

Git commit: Replacing TAKtemptable for TAKinlinetable
https://github.com/hpcc-systems/HPCC-Platform/pull/1987/files

For this reason, and to make it future proof, we decided to create a new helper interface called 'IHThorTempTableExtraArg' (which was later changed to 'CThorInlineTableArg', see the commit above), that will account for 'getFlags' and allow further expansion without new changes in the interface.

So, we merge the current flag (constant) with the new one (distributed):

enum
{
TTFnoconstant = 0x0001, // default flags is zero
TTFdistributed = 0x0002,
};

This deserves some explanation. The default flag result is zero, as you can see on every other helper and the default behaviour of that specific helper was that the dataset was constant. So, to match the behaviours, we introduced a non-constant flag to tell otherwise. 'dynamic' is not the correct description.

Lots of comments were added to indicate that the old behaviour is deprecated and that on future versions of the platform (4.0 in this case), those helpers should be removed.

The Back-End

Finally, we need to generate the function in our 'BuildActivity' function. This is not too complex, though, since we just need to 'OR' all flags into an unsigned int and build an unsigned function using a helper. However, since other helpers change (with the new interface), we need to change them as well.

Old versions of those helpers (compiled before this version goes out), will contain only the 'isConstant' function, and all new ones will contain both, with the 'isConstant' calling 'getFlags' for portability.

So, for portability, we refactor the 'getFlags' into a new function, called 'doBuildTempTableFlags', which simply join the flags on 'doBuildTempTableFlags':

StringBuffer flags;
if (expr->hasProperty(distributedAtom))
flags.append("|TTFdistributed");
if (!isConstant)
flags.append("|TTFnoconstant");
if (flags.length())
doBuildUnsignedFunction(ctx, "getFlags", flags.str()+1);

Which will only override 'getFlags' if there is any flag that need setting other than the default behaviour. Again, this setting of flags is done elsewhere in the code and could be more automated, but that's for a different tutorial.

Now, we need to make 'isConstant' use 'getFlags', in 'CThorTempTableArg':

virtual bool isConstant() { return (getFlags() & TTFnoconstant) == 0; }

So, old code that relies on this information can still work.

Finally, we need to change all other helpers to export the 'getFlags' instead of 'isConstant'. In 'hqlhtcpp.cpp' you need to search for 'isConstant' and make sure they relate to TempTables. One example:

From:
if (!isConstantTransform(transform))
doBuildBoolFunction(instance->startctx, "isConstant", false);
To:
doBuildTempTableFlags(instance->startctx,
expr,
isConstantTransform(transform));

Tests, Tests, Tests

The previous test case in the compiler had a DISTRIBUTED case added, and its generated code examined and thought to be correct. Note that in this commit we did add an execution test, though it's still not executing distributed, because the engines have no knowledge of out new flag.

The Engines Part

Git commit: Implement DISTRIBUTED DATASET(COUNT) in Thor
https://github.com/hpcc-systems/HPCC-Platform/pull/1965/files

To add the new functionality on the engines, it's best if you add them all at once, which makes for a big change. If you skim through the pull request, you'll see that our three engines (Roxie, Thor and HThor) have very similar inner-workings. So, although big, this change is more mechanical than the others.

If you're still paying attention, you'll notice that we started by saying we could use an existing activity (TempTable), but ended up creating a new one (InlineTable), so the engines haven't got a clue what to do with that code. And since we want independent code form the previous form (so we can deprecate it more easily later), we need to create three different activities, one for each cluster. This shouldn't be strictly necessary, but the engines are designed radically different, which might change in the future.

And, for the engines to acknowledge the existence of such activities, like in the compiler, we need to create the nodes on the global switches:

In HThor ('ecl/eclagent/eclgraph.cpp'):

case TAKinlinetable:
return createInlineTableActivity(agent,
activityId,
subgraphId,
(IHThorInlineTableArg &)arg,
kind);

In Roxie ('roxie/ccd/ccdquery.cpp'):

case TAKinlinetable:
return createRoxieServerInlineTableActivityFactory(id,
subgraphId,
*this,
helperFactory,
kind);

In Thor ('thorlcr/slave/slave.cpp'):

case TAKinlinetable:
ret = createInlineTableSlave(this);
break;

Again, the same issue of finding the other places to put a case in the compiler apply here. See discussion in step 1 and the pull request above for some hints.

Each of this functions will simply create a new Roxie/Thor/HThor activity instance and return it. They're used to create the class structure that implement the activities in the engines.

Non-Distributed Activity

Getting the HThor's example ('ecl/hthor/hthor.cpp'), since it's the simplest of all, the two main functions you need to implement are 'ready' and 'nextInGroup'. The former will prepare all data prior to execution and the latter will give you the next item in the list, like an iterator.

As you can see in the commit, the 'start' prepares the 'numRows'. That happens because the construction of the activity can be at a time when we still don't have full information on the activity yet, so the real initialisation is done when we're actually ready to start.

Also, if the DATASET is in a child query, there can be multiple calls to 'onStart()', but in any case, there will be only one call to 'onCreate()'.

The 'nextInGroup' is an iterator that will provide the stream of input through the activities' code. If you follow the same style as other table activities, you'll see that a simple iteration will solve this problem:

while (curRow < numRows)
{
RtlDynamicRowBuilder rowBuilder(rowAllocator);
size32_t size = helper.getRow(rowBuilder, curRow++);
if (size)
{
processed++;
return rowBuilder.finalizeRowClear(size);
}
}

A 'RowBuilder' will allocate enough space for the row and if the the row has any content, it'll increment the counter and proceed.

The size had a specific purpose before this change, which was to let the helper control the counter by returning zero when there was no more rows to process. However, that would call the helper's 'getRow' more times that it needed, since the activity had no way to know it had stopped, or was just a temporary fluke.

Distributed Activity

Unlike HThor (that is just a test engine), Thor and Roxie actually execute jobs in parallel. In this particular commit, the Roxie activity is still executing in serial, but the Thor activity is parallel, so let's have a closer look at it.

In the serial algorithm, the start/ready phase sets the number of rows, but in a parallel version, we need to make sure we set these boundaries to our block of the problem. In our case, it's not too hard to do, since we always have a range of 1 to N elements in our DATASET.

So, our parallel 'start' has to account for the different blocks to process. We do that by simply taking the node ID (which logical node it's running on) and calculate the range:

startRow = (nodeid * numRows) / nodes;
maxRow = ((nodeid + 1) * numRows) / nodes;

If the DATASET is not distributed, than only the 'firstNode()' compute (maxRows set to max), while others wait (maxRows set to 0). The 'nextInGroup' implementation is very similar to the parallel version (another benefit of this approach).

References

[3] See VERSION at the root of the git tree

Follow-up

The next step on the tutorial is Step 3: The Optimisation, and More Tests.