Thu Sep 20, 2018 9:59 pm
Login Register Lost Password? Contact Us


Parallel Embed - R, Python, etc..

Questions around writing code and queries

Thu Feb 08, 2018 6:35 pm Change Time Zone

Is there a way, and how would one go about coding, to have an embed structure parallelized, assuming the plugins are on each and every node? Specifically, I am interested in have python embeds to be parallelized. i.e. have my python code run separately on each node.

https://wiki.hpccsystems.com/display/hp ... ntegration
https://hpccsystems.com/training/docume ... cture.html

I know PIPE can be used to parallelize some tasks, but I was hoping to use embed. I have tried using LOCAL on the dataset that is being passed into my embed to no avail.

Thanks in advance
rken
 
Posts: 7
Joined: Thu Feb 08, 2018 6:30 pm

Thu Feb 08, 2018 7:42 pm Change Time Zone

rken,

Since every Thor node runs exactly the same .so file, and each node simply operates on whatever data is on that node (for operations that don't need to swap data between the nodes for correct execution), I would assume that your embedded Python code would do the same. IOW, parallel operation is the default mode on a multi-node Thor.

What happens when you try running a test job?

HTH,

Richard
rtaylor
Community Advisory Board Member
Community Advisory Board Member
 
Posts: 1376
Joined: Wed Oct 26, 2011 7:40 pm

Thu Mar 15, 2018 10:37 pm Change Time Zone

I ran a simple test where the python embeded code just writes to a file it's mac address, it only writes it to the file on node 1, not on each file on each node in my THOR. Perhaps this isn't the best test case?
rken
 
Posts: 7
Joined: Thu Feb 08, 2018 6:30 pm

Mon Mar 19, 2018 4:56 pm Change Time Zone

rken,

That sounds like the job is so simple that hThor is "hijacking it." You can ensure it runs in Thor by adding in some code something like this:
Code: Select all
IMPORT Std;
ds := DATASET([{'A'},{'B'},{'C'},{'D'},{'E'},
               {'F'},{'G'},{'H'},{'I'},{'J'},
               {'K'},{'L'},{'M'} ,{'N'},{'O'},
               {'P'},{'Q'},{'R'},{'S'},{'T'},
               {'U'},{'V'},{'W'},{'X'},{'Y'},{'Z'}],
              {STRING1 Letter});
dsd := DISTRIBUTE(ds);
pds := PROJECT(dsd,TRANSFORM({STRING1 Letter,UNSIGNED1 node},
                             SELF.node := Std.system.Thorlib.Node()+1, SELF := LEFT));
OUTPUT(pds,NAMED('Node_Numbered_DS'));
I put in the node numbering to absolutely demonstrate that this code is running on each separate node.

HTH,

Richard
rtaylor
Community Advisory Board Member
Community Advisory Board Member
 
Posts: 1376
Joined: Wed Oct 26, 2011 7:40 pm

Mon Mar 19, 2018 11:32 pm Change Time Zone

I tried the below code and get a somewhat confusing result. I get the dataset value along with the node.id for each row. The id goes from 1-4 for my 4 nodes, equally distributed as expected. However my python code is returning a mac address of just the first node (on each row).

Record inputed into pyembed
Code: Select all
paramOutput := RECORD
   INTEGER1 imageLabel;
   //other values...
END;


Record returned from pyembed
Code: Select all
paramOutput := RECORD
   INTEGER imageLabel;
   INTEGER mac;
END;


ECL Code:
Code: Select all
DATASET(paramOutput) pyscript(DATASET($.my_data_type) mydata) := EMBED(Python)
from time import strftime, gmtime
from uuid import getnode as get_mac

out = []
address = get_mac()

for i, image in enumerate(mydata):
   out.append((image[0], get_mac()))
return out
ENDEMBED;

subset := CHOOSEN($.my_data, 80);
ds := pyscript(subset);
dds := DISTRIBUTE(ds);
pds := PROJECT(dds,TRANSFORM({INTEGER imageLabel, INTEGER mac, UNSIGNED1 node}, SELF.node := Std.system.Thorlib.Node()+1, SELF := LEFT));
OUTPUT(pds);


Returns 80 rows, each with a different imageLabel, mac address(all the same), and node number.

I'm essentially trying to distribute the 80 records to my nodes, have the pyembed process it, and return the results. I feel I am close, but can't seem to see it...
rken
 
Posts: 7
Joined: Thu Feb 08, 2018 6:30 pm

Tue Mar 20, 2018 1:23 pm Change Time Zone

rken,

It looks to me like your Python code is still just running in node 1 and then you're distributing that to the other nodes to get the node numbers. BTW, your CHOOSEN will get you the first 80 recs from the first node that reports back (usually node 1 :) ). That may explain it.

So try it something like this instead:
Code: Select all
//create a nested child dataset with 1 rec on each node
ds1 := DATASET(1,TRANSFORM({UNSIGNED1 node, DATASET(paramOutput) macs},
                           SELF.node := Std.system.Thorlib.Node()+1, SELF := []),LOCAL);
subset := DISTRIBUTE(CHOOSEN($.my_data, 80)); //distribute the input recs
  //then do a PROJECT LOCAL to call your Python code
pds := PROJECT(ds1,TRANSFORM(RECORDOF(ds1),
                             SELF.macs := pyscript(subset), SELF := LEFT),LOCAL);
OUTPUT(pds);
I would expect this to end up with one record on each node with ~20 child recs in each. Let me know what you get. :)

HTH,

Richard
rtaylor
Community Advisory Board Member
Community Advisory Board Member
 
Posts: 1376
Joined: Wed Oct 26, 2011 7:40 pm

Tue Apr 03, 2018 10:07 pm Change Time Zone

I removed the python bit to try and simplify, but I am still struggling to get the expected output... I tried with and without "LOCAL" on the PROJECT() without any difference, which seems like LOCAL is not doing anything and/or the dsData is not being distributed correctly.

This is what I think the code is doing:
-Create a dataset on each node that has node id and an arbitrary integer
-Distribute the alphabet to all the nodes resulting in 26/4 letters per node
-Locally transform the two, now distributed, datasets to have a node id, integer, and child records of 26/4 letters.

The result is 4 rows with node id, integer, and 26 letters.

Code: Select all
dsModel := DATASET(1,TRANSFORM({UNSIGNED1 node, INTEGER i},
                           SELF.node := Std.system.Thorlib.Node()+1, SELF.i := 9, SELF := []),LOCAL);
dsData := DATASET([{'A'},{'B'},{'C'},{'D'},{'E'},
               {'F'},{'G'},{'H'},{'I'},{'J'},
               {'K'},{'L'},{'M'} ,{'N'},{'O'},
               {'P'},{'Q'},{'R'},{'S'},{'T'},
               {'U'},{'V'},{'W'},{'X'},{'Y'},{'Z'}],
              {STRING1 Letter});
ddsData := DISTRIBUTE(dsData); //distributes the letters to the nodes
pds := PROJECT(dsModel,TRANSFORM({UNSIGNED1 node, TYPEOF(dsModel) model, TYPEOF(dsData) letters},
                             SELF.node := Std.system.Thorlib.Node()+1, SELF.model := dsModel, SELF.letters := ddsData, SELF := LEFT), LOCAL);
OUTPUT(pds,NAMED('Node_Numbered_DS'));
rken
 
Posts: 7
Joined: Thu Feb 08, 2018 6:30 pm

Wed Apr 04, 2018 9:34 am Change Time Zone

rken,

OK, your problem is limiting the set of records you're putting into the letters field in your PROJECT TRANSFORM function. The LOCAL option doesn't change the fact that you're asking for all the records in the recordset. So to do what you want you need to add a filter to limit to just the recs on the same node. That means adding a node number to the letters, also. Here's how I do it:
Code: Select all
IMPORT Std;

dsData := DATASET([{'A'},{'B'},{'C'},{'D'},{'E'},
               {'F'},{'G'},{'H'},{'I'},{'J'},
               {'K'},{'L'},{'M'} ,{'N'},{'O'},
               {'P'},{'Q'},{'R'},{'S'},{'T'},
               {'U'},{'V'},{'W'},{'X'},{'Y'},{'Z'}],
              {STRING1 Letter});
ddsData := DISTRIBUTE(dsData); //distributes the letters to the nodes

NodeLtrRec := {STRING1 Letter,UNSIGNED1 node};
ndsData := PROJECT(ddsData,TRANSFORM(NodeLtrRec,
                                     SELF.node := Std.system.Thorlib.Node()+1,
                                     SELF := LEFT));
ndsData; //node-numbered distributed letters

ModelRec := {UNSIGNED1 node, INTEGER i};
dsModel  := DATASET(1,TRANSFORM(ModelRec,
                                SELF.node := Std.system.Thorlib.Node()+1,
                                SELF.i := 9),LOCAL);
dsModel;  //one rec per node                                    

ResultRec := {ModelRec, DATASET(ModelRec) model, DATASET(NodeLtrRec) letters};

ResultRec XF(dsmodel L) := TRANSFORM
  SELF.model := L;
  SELF.letters := ndsData(node=L.node); //note the filter here   
  SELF := L;
END;
pds := PROJECT(dsmodel,XF(LEFT),LOCAL);

OUTPUT(pds,NAMED('Node_Numbered_DS'));

HTH,

Richard
rtaylor
Community Advisory Board Member
Community Advisory Board Member
 
Posts: 1376
Joined: Wed Oct 26, 2011 7:40 pm


Return to Programming

Who is online

Users browsing this forum: Bing [Bot] and 1 guest