Skip to main content

DISTRIBUTE

DISTRIBUTE(recordset [, UNORDERED | ORDERED( bool ) ] [, STABLE | UNSTABLE ] [, PARALLEL [ ( numthreads ) ] ] [, ALGORITHM( name ) ] )

DISTRIBUTE(recordset, expression [, MERGE( sorts ) ] [, UNORDERED | ORDERED( bool ) ] [, STABLE | UNSTABLE ] [, PARALLEL [ ( numthreads ) ] ] [, ALGORITHM( name ) ] )

DISTRIBUTE(recordset, index [, joincondition ] [, UNORDERED | ORDERED( bool ) ] [, STABLE | UNSTABLE ] [, PARALLEL [ ( numthreads ) ] ] [, ALGORITHM( name ) ] )

DISTRIBUTE(recordset, SKEW( maxskew [, skewlimit ] ) [, UNORDERED | ORDERED( bool ) ] [, STABLE | UNSTABLE ] [, PARALLEL [ ( numthreads ) ] ] [, ALGORITHM( name ) ] )

recordsetThe set of records to distribute.
expressionAn integer expression that specifies how to distribute the recordset, usually using one the HASH functions for efficiency.
MERGESpecifies the data is redistributed maintaining the local sort order on each node.
sortsThe sort expressions by which the data has been locally sorted.
indexThe name of an INDEX attribute definition, which provides the appropriate distribution.
joinconditionOptional. A logical expression that specifies how to link the records in the recordset and the index. The keywords LEFT and RIGHT may be used as dataset qualifiers for fields in the recordset and index.
SKEWSpecifies the allowable data skew values.
maxskewA floating point number in the range of zero (0.0) to one (1.0) specifying the minimum skew to allow (0.1=10%).
skewlimitOptional. A floating point number in the range of zero (0.0) to one (1.0) specifying the maximum skew to allow (0.1=10%).
UNORDEREDOptional. Specifies the output record order is not significant.
ORDEREDSpecifies the significance of the output record order.
boolWhen False, specifies the output record order is not significant. When True, specifies the default output record order.
STABLEOptional. Specifies the input record order is significant.
UNSTABLEOptional. Specifies the input record order is not significant.
PARALLELOptional. Try to evaluate this activity in parallel.
numthreadsOptional. Try to evaluate this activity using numthreads threads.
ALGORITHMOptional. Override the algorithm used for this activity.
nameThe algorithm to use for this activity. Must be from the list of supported algorithms for the SORT function's STABLE and UNSTABLE options.
Return:DISTRIBUTE returns a set of records.

The DISTRIBUTE function re-distributes records from the recordset across all the nodes of the cluster.

"Random" DISTRIBUTE

DISTRIBUTE(recordset )

This form redistributes the recordset "randomly" so there is no data skew across nodes, but without the disadvantages the RANDOM() function could introduce. This is functionally equivalent to distributing by a hash of the entire record.

Expression DISTRIBUTE

DISTRIBUTE(recordset, expression )

This form redistributes the recordset based on the specified expression, typically one of the HASH functions. Only the bottom 32-bits of the expression value are used, so either HASH or HASH32 are the optimal choices. Records for which the expression evaluates the same will end up on the same node. DISTRIBUTE implicitly performs a modulus operation if an expression value is not in the range of the number of nodes available.

If the MERGE option is specified, the recordset must have been locally sorted by the sorts expressions. This avoids resorting.

Index-based DISTRIBUTE

DISTRIBUTE(recordset, index [, joincondition ] )

This form redistributes the recordset based on the existing distribution of the specified index, where the linkage between the two is determined by the joincondition. Records for which the joincondition is true will end up on the same node.

Skew-based DISTRIBUTE

DISTRIBUTE(recordset, SKEW( maxskew [, skewlimit ] ) )

This form redistributes the recordset, but only if necessary. The purpose of this form is to replace the use of DISTRIBUTE(recordset,RANDOM()) to simply obtain a relatively even distribution of data across the nodes. This form will always try to minimize the amount of data redistributed between the nodes.

The skew of a dataset is calculated as:

MAX(ABS(AvgPartSize-PartSize[node])/AvgPartSize)

If the recordset is skewed less than maxskew then the DISTRIBUTE is a no-op. If skewlimit is specified and the skew on any node exceeds this, the job fails with an error message (specifying the first node number exceeding the limit), otherwise the data is redistributed to ensure that the data is distributed with less skew than maxskew.

Example:

MySet1 := DISTRIBUTE(Person); //"random" distribution - no skew
MySet2 := DISTRIBUTE(Person,HASH32(Person.per_ssn));
 //all people with the same SSN end up on the same node
 //INDEX example:
mainRecord := RECORD
  INTEGER8 sequence;
  STRING20 forename; 
  STRING20 surname;
  UNSIGNED8 filepos{virtual(fileposition)};
END;
mainTable := DATASET('~keyed.d00',mainRecord,THOR);
nameKey := INDEX(mainTable, {surname,forename,filepos}, 'name.idx');
incTable := DATASET('~inc.d00',mainRecord,THOR);
x := DISTRIBUTE(incTable, nameKey,
                LEFT.surname = RIGHT.surname AND
                LEFT.forename = RIGHT.forename);
OUTPUT(x);

//SKEW example:
Jds := JOIN(somedata,otherdata,LEFT.sysid=RIGHT.sysid);
Jds_dist1 := DISTRIBUTE(Jds,SKEW(0.1));
 //ensures skew is less than 10%
Jds_dist2 := DISTRIBUTE(Jds,SKEW(0.1,0.5));
 //ensures skew is less than 10%
 //and fails if skew exceeds 50% on any node

See Also: HASH32, DISTRIBUTED, INDEX