1   Getting Started

1.1   What You Need To Know

Amberstone is written in Java, but no Java programming skills are needed to run it. Amberstone provides a concise set of commands and functions for performing each task, which are desctibed in this document. Proficiency in a programming language will be helpful in using Amberstone's functions and will be needed to create the load units.

Amberstone is not a SQL database, but it is important to have a basic understanding of SQL databases and SQL aggregate functions. A basic understanding of data warehousing concepts such as fact tables, star schemas and data cubes will also be very helpful in understanding Amberstone's storage and aggregation models.

Amberstone works on both Windows and Unix platforms. Unless stated otherwise all conventions in this document are for the Unix platform. Operating Amberstone will require basic system administration skills for the chosen platform including the ability to install a Java VM, set environment variables and run the Amberstone commands.

1.2   Java Virtual Machine

Amberstone is written in Java and has been tested with versions 6 and 7 of the Sun JRE. The latest version of Sun's JRE 7 will provide the best performance. The latest version of the Java VM can be downloaded here.

1.3   Downloading the Build

Download the latest version of Amberstone from www.amberstonelabs.com. There are Windows and Unix builds available.

Unpack the build in the directory of your choice. This will be the working directory for Amberstone.

1.4   Setting Environment Variables

There are three environment variables that need to be set to run Amberstone.
  1. JAVA_HOME: set this to the home directory of the Java VM.
  2. AMBERSTONE_HOME: set this to the directory that the build resides in.
  3. AMBERSTONE_NODE: a unique id for the node/server. This needs to be set even if Amberstone is run on a single node.

1.5   Setting the Path

Amberstone commands are located in the $AMBERSTONE_HOME/bin directory. Optionally, add this directory to the system PATH environment variable or you can call the commands directly.

1.6   Sample Data

A sample data partition is provided with Amberstone, along with a small sample data set and configurations. The sample partition can be found under the $AMBERSTONE_HOME/storage/sample directory.

The sample data and configurations include examples of many of Amberstones core features.

All of the examples in the User Guide refer to the sample data and configurations.

1.7   Amberstone Functions

Amberstone has a function library which provides the logic and algorithms needed to load, read and aggregate data.

Amberstone's functions take parameters and return values and follow a conventional C syntax.

Amberstone's functions can take other functions as parameters. This is known as function compounding. Function compounding can be used to build small programs without having to do anything but call functions.

This User Guide provides examples for many of Amberstone's core functions and the Functions Reference provides a description of each function.

2   Loading Data

2.1   Overview

Amberstone provides compressed storage for un-aggregated, machine-generated data. Amberstone's storage system is broken into partitions. Each partition holds one fact table and it's supporting configurations and dictionaries.

2.2   Creating a New Partition

The first step in loading data is to create a partition using the part command. The part command is issued as follows from the command prompt:

$AMBERSTONE_HOME/bin/part.sh [name] <prototype>

The part command takes two parameters. The first parameter is the name of the partition to create. The second optional parameter is the name of an already existing partition to use as a prototype. If a prototype is provided, the configuration from the prototype partition will be copied to the new partition.

Partitions are created under the $AMBERSTONE_HOME/storage directory.

A sample partition comes with Amberstone, located in the directory $AMBERSTONE_HOME/storage/sample.

2.3   Preparing Load Units

Before data can be loaded into Amberstone it must be organized into load units.

Load units are delimited text files with one record per line. The default delimiter is the vertical pipe symbol '|'. The default delimiter is set in the $AMBERSTONE_HOME/conf/system.ini file. In this file the property load-delim can be changed to any single byte character.

Load units are organized by date and must be named with the following naming convention:

YYYYMMDD.part_number.txt

The sample partition comes with sample load units located in the directory: $AMBERSTONE_HOME/storage/sample/load-queue.

One of the sample load units is named: 20120623.1.txt

The above load unit contains data from 06/23/2012. It has a part number of 1 designating that it is the first file to be loaded for this day. The part number is incremented to represent the next part from that day: 20120623.2.txt.

Records within the load units should be sorted in ascending order by timestamp.

Each partition has it's own load-queue located in $AMBERSTONE_HOME/storage/[partition name]/load-queue. Load units are to be placed in this directory to be loaded. After loading, the load units can either be removed from the server or archived using the archive command.

2.4   Configuring the Partition

The partition is created under the $AMBERSTONE_HOME/storage/[partition name] directory.

The configuration for each partition is stored in the file $AMBERSTONE_HOME/storage/[partition name]/conf/partition.xml.

The sample configurations shown in the sections below; come from the sample partition, and are located in the file $AMBERSTONE_HOME/storage/sample/conf/partition.xml.

2.4.1   Working With Dictionaries

Machine-generated data often has a significant amount of repeating text elements. In a typical data warehousing environment, repeating text elements are replaced in the fact table with a surrogate integer key and the text data is stored in supporting dimension tables. This is known as a star schema. This design keeps the fact table compact and provides fast grouping keys for aggregation.

Amberstone provides 3 different types of dictionaries for creating on-the-fly star schemas during the loading process. The dictionary types are described below:

Dynamic Dictionaries

Dynamic Dictionaries provide persistent storage for text and their integer keys. Dynamic dictionaries are built on-the-fly as data is loaded into columns in the fact table. When a new text entry is stored, the dictionary automatically creates an integer key for it by incrementing it's counter. The integer key is then returned and stored in the fact table instead of the text. The text and it's integer key are stored in the dictionary so the text can be restored when necessary.

Dynamic Dictionaries are created automatically during the load, and are stored in the $AMBERSTONE_HOME/storage/[partition name]/dictionaries directory.

Resource Dictionaries

Resource Dictionaries are read-only, pre-built dictionaries. They are used for columns where the text values are known in advance.

Resources Dictionaries are loaded from name/value pair files placed in the $AMBERSTONE_HOME/resources directory. When Amberstone starts up, files with the .txt extension in this directory are loaded into on-disk indexes and are available to each partition. The format of the resource files is one name/value pair per-line, separated by a : character. The name is the text and the value is the integer key to replace it with.

name : int
		

Algorithmic Dictionaries

Algorithmic Dictionaries use an algorithm to transform text into an integer and back again. They do not require any storage.

2.4.2   Cardinality Limits of Dynamic & Resource Dictionaries

Dynamic Dictionaries and Resources Dictionaries are stored in on-disk indexes. In order to maintain high load performance, Amberstone provides caches for dictionaries and resources that keep the most recently used key-pairs in memory as the load is being performed. As the number of unique keys in the dictionary or resource rise, so will the memory requirements for the caches.

Accordingly the number of unique keys in the dictionary or resource needs to be limited to a manageable number. A good rule of thumb, is that dictionaries and resources should only be used for columns that have less than 500,000 unique values.

2.4.3   Configuring the Fact Table

Each partition contains one column oriented fact table. Fact tables consist of columns with a specific data type. Amberstone's supported data types are described below.

2.4.4   Data Types

Numeric Data Types
TypeSizesDescriptionDictionary Support
int 1 1 byte unsigned integer ranging form 0 to 255 Yes
int 2 2 byte unsigned integer ranging form 0 to 65,535 Yes
int 3 3 byte unsigned integer ranging form 0 to 16,777,215 Yes
int 4 4 byte signed integer ranging form -2,147,483,647 to 2,147,483,647 Yes
long 8 8 byte signed integer ranging form -9,223,372,036,854,775,807 to 9,223,372,036,854,775,807 No

Time Data Type
TypeSizeDescription
time 4 4 byte integer representing Unix Time in seconds.

Unix Time is automatically parsed from a text timestamp based on a template. For example the template 'MM/DD/YYYY:hh:mm:dd' would be used to parse a text time stamp formatted as '12/31/2011:04:02:05'.

The supported template formats are:

  • YYYY: four digit year.
  • MM: two digit month (1-12).
  • Mon: three digit month.(Jan, Feb, Mar, Apr, May, Jun, Jul, Aug, Sep, Oct, Nov, Dec)
  • DD: represents the two digit day (1-31).
  • hh: represents the 2 digit hour (0-23)
  • mm: represents the 2 digit minute. (0-59)
  • ss: represents the 2 digit second. (0-59)

Text Data Type
TypeSizeDescription
bytesUnlimitedAn array of bytes representing text.

2.4.5   Dictionary Support

All of the int data type support dictionaries. When a dictionary is attached to an int column, the text data loaded into that column will automatically be swapped with an integer key.

2.4.6   Handling Floating Points

Floating point data types are not included by design. This is because Amberstone specializes in aggregation and floating point errors can become very significant during aggregation.

All floating point numbers must be scaled at load time to either an int or a long using the scale function.

The scale function moves the decimal point a specified number of places to the right and drops the remaining precision. For example the float 4.567 could be scaled 3 places to make the integer 4567. Aggregations are then performed on the scaled integers to produce scaled aggregates. The scaled aggregates can then be scaled back during reporting.

2.4.7   Sample Fact Table

Below is the XML configuration for the sample partition's fact table located in the file $AMBERSTONE_HOME/storage/sample/conf/partition.xml.
<columns>
	<column name="cust_id"      type="int"   size="2"/>
	<column name="cost"         type="int"   size="4"/>
	<column name="ip"           type="int"   size="4"  dictionary="ip()"/>
	<column name="session_id"   type="long"/>
	<column name="rec_time"     type="time"  template="DD/Mon/YYYY:hh:mm:ss" />
	<column name="hour"         type="int"   size="1"/>
	<column name="url"          type="int"   size="3"  dictionary="dictionary('urls', 30)"/>
	<column name="action"       type="int"   size="1"  dictionary="resource('actions', 10)"/>
	<column name="search_terms" type="bytes" size="30"/>
 </columns>

The XML above defines a table with 9 columns. Each column has a name attribute which is used to refer to the column. Each column also has a type attribute which refers to one of the supported Amberstone data types. A brief description of each column is below:

2.4.8   Sample Loader Configuration

After the fact table has been configured, the loader needs to be configured. The loader configuration below was taken from the sample partition's partition.xml.
 <loader>
	<load name="cust_id"      value="data(0)" />
	<load name="cost"         value="scale(data(1), 2)" />
	<load name="ip"           value="data(2)" />
	<load name="session_id"   value="hash(data(3))" />
	<load name="rec_time"     value="data(4)" />
	<load name="hour"         value="slice(data(5), 12, 2)" />
	<load name="action"       value="data(5)" />
	<load name="url"          value="data(6)" />
	<load name="search_terms" value="data(7)"/>
</loader>

In the XML above, each column in the fact table has a load directive. The name attribute matches up with the column name in the table definition. The value attribute points to a function that provides the logic to move data from the load units into the column.

Sample Record

Below is a record taken from a sample load unit in the sample partition's load-queue.

56790|.25|145.251.2.11|we23412e24422-rerfde323254-0032|23/Jun/2012:00:12:45|search|/home/search/search.html|object oriented progamming

The delimited schema for this record is:

<customer id> | <cost> | <ip> | <session id> | <timestamp> | <action> | <url> | <search terms>

The data Function

Each load directive includes a call to the data function.

The data function is used to retrieve data from a specific column in the load units. The data function takes a single parameter, an integer literal, which designates which column from the load unit to retrieve. Columns are indexed at 0. The data function can return data in three different formats depending on the context that it is called:

  1. bytes: the raw bytes found in the column.
  2. int: an int parsed from the ascii bytes in the column.
  3. long: a long parsed from the ascii bytes in the column.

The context is inferred automatically at compile time so the function syntax does not include any context information.

The load directive below calls the data function to map data into a column of type int:

<load name="cust_id" value="data(0)" />

The load directive below calls the data function to map data into a column of type bytes.

<load name="search_terms" value="data(7)"/>

Notice that the syntax is exactly the same because Amberstone automatically infers the context.

Decorating the data Function

The data function loads data directly into a column. Often it's useful to transform the data as it is loading. This is accomplished by decorating the data function with a wrapper function. A decorating function takes the result of an inner function, tranforms it, and then returns the transformed data.

Load Directives

Below is a brief description of each of the sample load directives.

<load name="cust_id" value="data(0)" />

The directive above loads the cust_id column which is an int size 4. The data(0) function returns the data from the first column of the load unit. Because the cust_id is an int, the data function parses an int and returns it.

<load name="cost" value="scale(data(1), 2)" />

The directive above loads the cost column, which is an int size three. In this example the scale function is used to decorate the data function. The data(1) call returns the data from the second column of the load unit. The decorating scale function transforms a float into a whole number by moving the decimal a specified number of places to the right and dropping any remaining precision.

<load name="ip" value="data(2)" />

The directive above loads the ip column, which is an int size 4.

The data(2) function call returns data from the third column of the load units.

The data in this column are IP addresses. During the load, the ip dictionary attached to the ip column transforms this data into a 4 byte int which is stored in the fact table.

<load name="session_id" value="hash(data(3))" />

The directive above loads the session_id column which is a long.

In this case the data(3) function call is decorated by the hash function. The hash function takes an array of bytes as input and returns a 64 bit long using the murmurhash3 algorithm.

Why hash session id's?

Often session ID's are long strings created by a GUID algorithm. Stored as a bytes data type, GUID's will take up significant space and make poor performing grouping keys. Dictionaries cannot be used to store session IDs because of their inherent high cardinality (see Cardinality Limits of Dynamic & Resource Dictionaries).

Caution!
Hashing the text session ID creates a compact long from the string of text. This provides compact storage and a faster grouping key for aggregation. But hashing does pose a collision risk that needs to be considered. The murmurhash3 algorithm, while not considered cryptographically strong, does provide strong collision protection while still delivering high performance. But there is a small chance that two different session id's will hash to the same value. Because only active sessions are kept in memory, only sessions that occur concurrently have this collision risk. Accordingly the collision risk rises with the number of concurrent sessions. This article, on the Birthday Paradox, explains the probabilities of collision given a 64 bit address space.

<load name="rec_time" value="data(4)" />

The directive above loads the rec_time column, which is a time data type. The data(4) function call returns the data from the 5th column of the load units.

Amberstone automatically uses the template provided in the column definition to parse the Unix Time integer from the data.

<load name="hour" value="slice(data(5), 12, 2)" />

The directive above loads the hour column which is an int size 1. In this case the call to the data function is wrapped by the slice function.

The slice function returns a slice from a byte array. The slice function can return an int, long or bytes depending on the context.

In this example, slice takes the bytes returned by the call to data(5) and returns a subset of the data starting from position 12 with a length of 2 bytes. You can see in the sample record that this is the hour position of the text timestamp.

Because the hour column is an int, the slice function automatically parses an int from the subarray of data and returns it.

<load name="action" value="data(5)" />

The directive above loads the action column, which is an int size 1. The call to data(5) returns the data from the sixth column of the load units. During the load, the text data from this column is automatically transformed to an integer by the dictionary assigned to the action column in the columns definition.

<load name="url" value="data(6)" />

The directive above loads the url column, which is an int size 3. The call to data(6) returns the data from the seventh column of the load units. During the load, the text data from this column is automatically transformed to an integer by the dictionary assigned to the url column in the columns definition.

<load name="search_terms" value="data(7)"/>

The directive above loads the search_term column, which is a bytes data type with a size of 30. The call to data(7) returns the data from the eighth column of the load units. If the data contains more than 30 bytes it is silently truncated during the load.

2.4.9   Creating Efficient Fact Tables

Creating efficient fact tables will help to maximize compression and aggregation performance. Below are some tips for creating efficient fact tables.

2.5   Running the load Command

After the load units are in place and the fact table table and loader have been configured, the data is ready to be loaded. To load the data from the load queue, issue the load command:

$AMBERSTONE_HOME/bin/load.sh [partition]

The load command's first parameter is the partition to load.

When this command is issued, data will begin to load from the partition's load queue into the fact table. The fact table data is located in the directory %AMBERSTONE_HOME/storage/[partition name]/table. The table directory will contain a hierarchical directory structure, partitioning the data by year/month/day/block_number. The actual columns are written to files inside of the block number directories.

2.6   Archiving the Load Files

After the load has completed, the load units in the load queue can optionally be moved to the load archive using the archive command. The archive command is run as follows:

$AMBERSTONE_HOME/bin/archive.sh [partition]

2.7   Reloading the Data

If there is a problem with the data load, the data will need to be reloaded. If a load unit is already in the fact table, it will need to be deleted before it is reloaded. To delete data from the fact table, issue the delete command. The delete command has the following structure:

$AMBERSTONE_HOME/bin/delete.sh [partition] [startYYYYMMDD] [endYYYYMMDD]

The delete command will remove a specific range of days from the table specified by the startYYYYMMDD and endYYYYMMDD parameters. If the entire table needs to be reloaded, the trunc command can be used:

$AMBERSTONE_HOME/bin/trunc.sh [partition]

Important!

When reloading any part of a load unit, the entire load unit must be deleted and reloaded. If the load unit is broken into multiple parts, each part must be reloaded.

2.8   Reviewing the Load Logs

A log of the load is written to the $AMBERSTONE_HOME/logs/loader.log file. This log file contains useful information about the load including the files loaded, load times, and any errors that occurred. This log is over-written each run so will only contain information from previous run.

3   Reading Data

After the data is loaded, the read command can be used to read data from the partition's fact table.

3.1   Running the read Command

The structure of the read command is as follows:

read [partition] [select function] [startYYYYMMDD] [endYYYYMMDD]

To read the data in the sample partition's fact table, issue the read command as follows:

$AMBERSTONE_HOME/bin/read.sh sample 'select($ip)' 20120601 20120630

The above command tells Amberstone to read the ip column from all the records in the sample partition from 20120601 to 20120630.

Important!

The example above uses single quotes around the select statement. This is the Unix syntax. On Windows the select statement must be surrounded by double quotes.

3.2   Referencing Fact Table Columns With the $ Symbol

Within Amberstone functions, fact table columns are referenced by prefixing $ to the name of the column. For example, the cust_id column would be referenced like this: $cust_id.

3.3   The select Function

The select function is used to tell the reader what columns to read from the fact table. The select function takes one or more fact table columns as parameters.

For example:

select($cust_id, $ip)

Will select the cust_id and ip columns.

3.4   Automatic Dictionary Expansion

The reader automatically replaces surrogate integer keys, created by dictionaries, with their text values for display.

3.5   The where Function

The select function can be wrapped in a where function to filter the records that are displayed. Below is an example of this:

where(select($url), gt($cost, 50))

The above example returns only urls for records where the cost column is greater than 50. This is explained below:

The where function takes two parameters: the first parameter is a select function, the second parameter is a function that evaluates to true or false. The where function limits the result set to records where the second parameter returns true.

In the example above, the second parameter of the where function is: gt($cost, 50).

The gt function returns true if its first parameter is greater then its second parameter.

The first parameter of the gt is: $cost, which returns the data in the cost column.

Accordingly the gt function call will return true when $cost returns a value greater then 50.

3.6   Boolean Logic

The Amberstone function library provides a set of functions that perform boolean logic and can be used to filter result sets.:

lt, gt, eq, streq, contains, and, or, not

Compound boolean logic can be expressed by nesting boolean logic functions inside of the and/or functions.

3.7   Running the export Command

The export command is identical to the read command except it exports the data to delimited files rather then displaying the results. The export files are written to the $AMBERSTONE_HOME/storage/[partition name]/export directory and follow the same naming conventions as the load units.

The structure of the export command is as follows:

export [partition] [select function] [startYYYYMMDD] [endYYYYMMDD]

A sample export command looks like this:

$AMBERSTONE_HOME/bin/export.sh sample "select($cust_id, $ip)" 20120101 20121231

The example above exports the cust_id and ip columns from the sample fact table.

4   Aggregating Data

4.1   Overview

Amberstone builds multi-dimension, time series aggregations from the data stored in its fact tables. These aggregations are designed to be loaded into a database for time series reporting and pivot-table functionality.

Once the data has been loaded, it takes two steps to build aggregations:

  1. Create an aggregators config file
  2. Run the aggregate command

4.2   Configuring Aggregators

Aggregators are configured in XML files stored in the $AMBERSTONE_HOME/aggregates/conf directory and must conform to the aggregators.dtd, which is also found in this directory. Specific configurations are pointed to directly by the aggregate command so there can be many different aggregator configurations in this directory.

There are several sample aggregators configurations provided with Amberstone that work with the sample fact table. These are described below.

4.2.1   Configuring a Basic Aggregation

Below is a basic aggregators configuration, which can be found in the file $AMBERSTONE_HOME/aggregates/conf/basic.xml:

<?xml version="1.0"?>
<!DOCTYPE aggregators SYSTEM "aggregators.dtd">

<aggregators>
   <aggregator grain="YYYYMMDD()">
      <cube name="basic" groupby="$cust_id" aggregate="sum(1)"/>
   </aggregator>
</aggregators>

The XML above shows the configuration of a single aggregator with a single cube.

Aggregators and Cubes

An aggregators config file defines a set of aggregators and cubes. The aggregators element can contain one or more aggregator elements. Each aggregator element defines a separate aggregation engine that runs in its own thread. Each aggregator element can contain one or more cube elements. Each cube element defines a specific data cube aggregation. Data cubes are multi-dimensional aggregations that are used to support pivot-table functionality.

The Aggregator Grain

The aggregator element has a grain attribute, which defines the time grouping for all the cubes contained within.

The grain attribute points to a function that returns a time grain key. There are two functions that return day level keys:

The grain affects two important aspects of the aggregation. First, all of the cubes automatically include the time grain dimensions. This allows for time series reporting when the aggregations are loaded into a database.

Second, aggregate output files are split by time grain and written to directories with the time grain naming convention. This is explained in detail in the section on exploring the aggregator out.

Configuring a Cube

Aggregator elements contain one or more cube elements. A cube or data cube, is a multi-dimensional aggregation designed to support pivot table functionality. A basic cube has three attributes:

The SQL equivalent of the basic cube would be:

select cust_id, count(cust_id) from fact_table group by cust_id;

4.2.2   Summing a Column in the Fact Table

The sum function can also sum a column in the fact table. This is shown in the sample XML file: $AMBERSTONE_HOME/aggregates/conf/sum_column.xml.

<?xml version="1.0"?>
<!DOCTYPE aggregators SYSTEM "aggregators.dtd">

<aggregators>
   <aggregator grain="YYYYMMDD()">
      <cube name="sum_col" groupby="$cust_id" aggregate="sum($cost)"/>
   </aggregator>
</aggregators>

The function above groups on the cust_id column and sums the cost column.

The SQL equivalent of this is:

select cust_id, sum(cost) from fact_table group by cust_id;

4.2.3   Multiple Summations

The sum function can perform up to three summations at once. This is demonstrated in the sample XML file: $AMBERSTONE_HOME/aggregates/conf/sum_multi.xml.

<?xml version="1.0"?>
<!DOCTYPE aggregators SYSTEM "aggregators.dtd">

<aggregators>
   <aggregator grain="YYYYMMDD()">
      <cube name="sum_multi" groupby="$cust_id" aggregate="sum($cost, 1)"/>
   </aggregator>
</aggregators>

The example above, groups by cust_id and keeps summations of the cost column and the number literal 1.

The SQL equivelent of this is:

select cust_id, sum(cost), count(cust_id) from fact_table group by cust_id;

4.2.4   Multiple Dimensions

The duo, trio and quad functions can be used to create multi-dimensional grouping keys. This is demonstrated in the sample XML file: $AMBERSTONE_HOME/aggregates/conf/multi_dims.xml.

<?xml version="1.0"?>
<!DOCTYPE aggregators SYSTEM "aggregators.dtd">

<aggregators>
   <aggregator grain="YYYYMMDD()">
      <cube name="multi_dims" groupby="duo($cust_id, $action)" aggregate="sum($cost)"/>
   </aggregator>
</aggregators>

The example above uses the duo function to create a two dimensional grouping key from the cust_id and action columns.

The SQL equivalent of this is:

select cust_id, action, sum(cost) from fact_table group by cust_id, action;

The functions trio and quad can be used to create three and four dimensional grouping keys. For example

<cube name="multi_dims" groupby="trio($cust_id, $action, $ip)" aggregate="sum($cost)"/>

The SQL equivalent of this is: select cust_id, action, ip, sum(cost) from fact_table group by cust_id, action, ip;

4.2.5   Multiple Threads

Multiple aggregators can be configured in the same config file. Each aggregator runs in its own thread. This allows multiple aggregations to be run in parallel on the same server.

Data is read from the fact table by a separate background thread and passed in large blocks to each aggregator thread for processing. With this design there is no disk contention and all of the aggregations are performed with a single pass over the data.

Each cube holds it's aggregates in a separate memory space so there is no memory contention.

Due to this design, Amberstone can run parallel aggregations at nearly the same speed as a single aggregation. On a 6 or 8 core server the limiting factor for Amberstone's parallel performance will be memory bandwidth. How many threads a server can run without degrading performance will depend on the server's CPU and memory architecture.

Parallel aggregator threads are configured in the sample XML file: $AMBERSTONE_HOME/aggregates/conf/thread_multi.xml.

<?xml version="1.0"?>
<!DOCTYPE aggregators SYSTEM "aggregators.dtd">

<aggregators>

   <aggregator grain="YYYYMMDD()">
      <cube name="thread1" groupby="$cust_id" aggregate="sum(1)"/>
   </aggregator>

   <aggregator grain="YYYYMMDD()">
       <aggregate name="thread2" groupby="$cust_id" aggregate="sum($cost)"/>
   </aggregator>

</aggregators>

4.2.6   Adding a where Clause

The cube has a where attribute that filters the records that are aggregated.

A sample where clause is configured in the sample XML file: $AMBERSTONE_HOME/aggregates/conf/where.xml.

<?xml version="1.0"?>
<!DOCTYPE aggregators SYSTEM "aggregators.dtd">

<aggregators>

	<aggregator grain="YYYYMMDD()">
	    <cube name="cust_agg" groupby="$cust_id" aggregate="sum(1)" where="gt($cost, 0)"/>
	</aggregator>

</aggregators>

The where attribute points to a function that evaluates to true or false. Only records where the where function returns true will be added to the aggregate. In the example above, the function gt($cost, 0) returns true when the cost column is greater than 0.

4.2.7   Hourly Time Grains

Thus far, all the sample cubes were calculated within daily time grains. There are times when it's useful to create cubes within an hourly time grain. This can be done using the hourly function.

A sample hourly time grain is configured in the sample XML file: $AMBERSTONE_HOME/aggregates/conf/hourly.xml.

<?xml version="1.0"?>
<!DOCTYPE aggregators SYSTEM "aggregators.dtd">

<aggregators>

	<aggregator grain="hourly(YYYYMMDD(), $hour)">
	    <cube name="cust_agg" groupby="$cust_id" aggregate="sum(1)"/>
	</aggregator>

</aggregators>

In the example above, the hourly function is called to return an hourly time grain grouping key. The first parameter is the daily time grain function and the second parameter returns the hour. In the example above the $hour column in the fact table is used to return the hour for each record.

4.2.8   Sessions & Transactions

Amberstone can tie together two or more records into a logical transaction for aggregation. A common use case for this is session tracking.

In order for Amberstone to perform transaction/session tracking two elements must be present in the fact table:

  1. A transaction or session id must be present for each record in the fact table.
  2. A column of the data type time, that contains the time of the record, must be present for each record in the fact table.

Configuring a Transaction

The sample transaction configuration shown below can be found in the sample XML file: $AMBERSTONE_HOME/aggregates/conf/transaction.xml.

<aggregators>

      <aggregator grain="YYYYMMDD()">

         <transaction  key="$session_id"  time="$rec_time" timeout="1200">

             <properties>
                <property name="cust_id" value="var($cust_id, false)"/>
                <property name="path"    value="list($url, 4, true, true)"/>
                <property name="actions" value="set($action, 4)"/>
                <property name="cost"    value="counter($cost, true)"/>
             </properties>

             <cube name="session_agg" groupby="@path" aggregate="sum(1)" where="gt(@cost, 50)"/>

         </transaction>

      </aggregator>

</aggregators>

The transaction Element

The transaction element, which defines a transaction, is placed inside an aggregator element. The transaction element has three attributes:

  1. key: evaluates to a key that is used to tie together the records in the transaction
  2. time: evaluates to a time data type, representing the time of the record
  3. timeout: an integer literal describing the time in seconds that the transaction can be idle before timing out

Transaction Properties

A transaction can be configured to hold properties which are used to maintain state for each individual transaction. These properties are defined within the properties element inside of the transaction.

Defining a Property

Inside of the properties element there are one or more property elements. The property elements have a name attribute which will be used to access the property during aggregation. They also have a value attribute which defines how the property maintains it's state.

There are four different ways that a property can maintain state:

  1. Variable tracking: saves one specific variable value. The var function is used for this type of tracking. Below is an example of this

    <property name="cust_id" value="var($cust_id, false)"/>

    The val function takes two parameters:

    • The first parameter evaluates to a key that will be stored in the property.
    • The second parameter is a boolean literal which states whether or not the value overwrites when the transaction is updated with a new record.
  2. Counter Tracking: keeps a counter which can be incremented conditionally. The counter function is used for this type of tracking. Below is an example of this:

    <property name="cost" value="counter($cost, true)"/>

    The counter function takes two parameters:

    • The first parameter evaluates to a long, that is added to the count when the transaction is updated.
    • The second parameter evaluates to a boolean that determines if the counter should be updated.
  3. List Tracking: builds up a list of values that can be tracked as a single key for aggregation purposes. This can be used for tracking paths through a website. The list function is used for this type of tracking. Below is an example of this:

    <property name="path" value="list($url, 4, true, true)"/>

    The list function takes four parameters:

    • The first parameter evaluates to a key that is added to the list conditionally when the transaction is updated.
    • The second parameter is an integer literal that caps the size of the list.
    • The third parameter evaluates to true or false and determines if the key is added to the list.
    • The fourth parameter is a boolean literal that determines if the list is to be kept in insertion order. If this parameter is true, the list will be kept in insertion order. If this parameter is false then the list items will be sorted by their hashCodes. Ordering the list by hashCode works for situations where a consistently ordered list is needed.
  4. Set Tracking: has identical parameters and behaviors as list tracking except it maintains a unique set of values. The set function is used for this type of tracking.

    <property name="set" value="set($url, 4, true, true)"/>

Updating Transactions

As records flow through the aggregator they are grouped into transactions based on the key attribute of the transaction element. Each time a transaction is updated with a new record, the transaction properties are updated with the new records data.

Completing Transactions

When a transaction has completed, the cubes within the transaction are updated with the completed transaction's data. A transaction is considered complete when it has been idle for the timeout period specified by the timeout attribute of the transaction element

Transaction Cubes

Cube elements are placed inside of transactions to aggregate transactions as they are completed. Cubes within a transaction don't have direct access to columns in the fact table. Instead, grouping keys and summing values are read from the transaction properties using special variables prefixed with @ symbol.

Referencing Transaction Properties With the @ Symbol

The @ is used to access the properties of completed transactions. The @ is followed by a string literal that refers to the name of a transaction property. Transaction properties, referenced in this manner, can be used as grouping keys, longs or booleans depending on the context.

Below the sample cube is explained:

<cube name="session_agg" groupby="@path"  aggregate="sum(1)" where="gt(@cost, 50)"/>

In the example above, the cube's groupby key is the path transaction property. This is an example of how a transaction property can be used to group by.

A where clause is applied that returns true if the cost transaction property is greater than 50. In this context, the transaction property returns a long.

The sample cube builds an aggregation that shows the count for each path where the total cost of the transaction is greater than 50.

Time Grain Boundaries

Transactions may span time grain boundaries. In these cases the transaction is counted only in the time period that it completes in.

Incremental Transaction Tracking

Because transactions span time grain boundaries, care must be taken to ensure that transactions are not lost at the boundary when doing incremental aggregation. For example, if daily incremental aggregations are being performed, there may be transactions from the prior day that span into the current day. In order to capture those transactions, part of the prior day must be aggregated with the current day. Amberstone supports a block level aggregation parameter which starts the aggregation at the specific block of a day. For example if session aggregation for 11/11/2012 is being performed, then Amberstone can begin the aggregation with the last few blocks of 11/10/2012. The mechanics of this are described in the next section.

4.3   Running the aggregate Command

After an aggregators configuration is prepared, it can be used as a parameter for the aggregate command, which builds the aggregates.

The structure of the aggregate command is as follows:

aggregate [partition] [aggregators config file] [startYYYYMMDD] [endYYYYMMDD] <block>

A sample aggregate command looks like this:

$AMBERSTONE_HOME/bin/aggregate.sh sample multi_dims.xml 20120101 20121231

The above command tells Amberstone to read all the records in the sample partition from 20120101 to 20121231, and create the aggregates that are configured in the multi_dims.xml file.

Optional block Parameter

Optionally a block parameter can be added to the aggregate command which tells Amberstone which block of the start day to begin aggregating from. This parameter must be an interger and can be negative. If it's negative it counts back from the last block of the start day. For example, if there are ten blocks in the start day and the block paramters is -2, aggregation will begin from the 9th block.

This feature is used to ensure that transactions that cross time grain boundaries can be properly counted. The following example demonstrates the optional block parameter:

$AMBERSTONE_HOME/bin/aggregate.sh sample multi_dims.xml 20121230 20121231 -2

The example above will begin aggregating on the last two blocks of 12/30/2012 and aggregate all the blocks in 12/31/2012. This ensures that any transactions that begin on 12/30/2012 and complete in 12/31/2012 will be fully accounted for.

4.4   Understanding Aggregator Output

Cubes are output to directories under the $AMBERSTONE_HOME/aggregates/cubes directory. The files are written to directories according to the aggregator time grain.

Daily grains, such as: YYYYMMDD(), create output directories in then format YYYYMMDD.

Hourly grains created with the hourly function, such as hourly(YYYYMMDD(), $hour), have a sub-directory for the hour under the daily directory.

The cube output files are named with the following convention: [CUBE_NAME].[PARTITION_NAME].[NODE_NAME].tsv.

The cube output files contain the aggregated data in a format designed for merging.

Each line in the file contains a unique dimensions key and the summations for those dimensions.

The dimensions appear at the beginning of each record and are separated by "::".

The first set of dimensions is the time dimensions, which have been separated into individual parts. For the YYYYMMDD() time grain, the quarter has been automatically added.

The format of the YYYYMMDD() time dimensions are:

year::quarter::month::day_of_month.

If the hour has been fused onto the time grain then it will appear as:

year::quarter::month::day::hour

The format of the YYYYWWD() time grain is:

year::week::day_of_week.

Following the time dimensions, are the cube dimensions, which are defined by the cube's groupby attribute. These are joined together with the time dimensions and separated by a "::". The cube dimensions appear in the order that they are listed in the groupby attribute of the cube. Dimensions that were surrogate integer keys, created by dictionaries, are automatically expanded to their text form.

Following the dimensions, are the summations from the sum function separated by a tab. There will be a column for each summation.

For example:

<?xml version="1.0"?>
<!DOCTYPE aggregators SYSTEM "aggregators.dtd">

<aggregators>
   <aggregator grain="YYYYMMDD()">
      <cube name="duo" groupby="duo($cust_id, $action)" aggregate="sum($cost)"/>
   </aggregator>
</aggregators>

The configuration defined above would produce cube output data in the following format:

year::quarter::month::day::cust_id::action cost

The first dimensions, year::quarter::month::day, come from the time grain. The cust_id and the action dimensions come from the cubes two dimensional grouping key. The cost is the summation of the cost fact table column.

The format of these files is specifically designed for merging and cannot be loaded directly into a database. The merge command must be run first to prepare them. Merging is described in the next section.

6   Rolling Up Hourly Aggregates

Hourly aggregates can be used as a strategy for handling high cardinality data cubes. If the reporting system that the aggregates are being loaded into only needs daily aggregates, then the hourly aggregates will need to be rolled up to daily aggregates. Amberstone's rollup command does this task. This can also be done inside of the reporting system using SQL, but in cases where the hourly aggregates have millions of records, it's more efficient to do this before loading the data into database.

Important!

The rollup command must be run before the merge is run.

Running the rollup Command

The rollup command is run by issuing the rollup command as follows:

$AMBERSTONE_HOME/bin/rollup.sh

The rollup command walks the cubes output directory. When it encounters an hourly aggregate, it moves a copy of the file to the daily aggregates directory above it. As it writes this file, it removes the hour dimension from each record. The file names that are written to the daily aggregate directory will contain the hour that the records belong to.

When the merge command is run the houly aggregates will be merged to create a daily aggregate.

6   Merging Aggregates

The merging of cubes is central to Amberstone's scalable design. Amberstone can merge cubes created from a partitioned fact table after the aggregation is completed. The reason this is possible, is that aggregates created using the sum function are commutative. This means that, given the same set of records, a merged cube built from a partitioned table will be identical to a cube that was created from a single table.

Important!

The merge command must be run, even if there is no file merging to be done, to prepare the data to be loaded into a database.

Running the merge Command

The merge command is run by issuing the merge command as follows:

$AMBERSTONE_HOME/bin/merge.sh

The merge command traverses the directories under the $AMBERSTONE_HOME/aggregates/cubes directory and merges files with the same CUBE_NAME. As it merges files, if it finds matching dimension keys it merges the sums to produce a merged record. For example: The record:

2012::2::06::23::45678::download 2700

Could be merged to with the following record in a different cube file:

2012::2::06::23::45678::download 2000

The merge would create a merged record of:

2012::2::06::23::45678::download 4700

Merged files are written to directories in the $AMBERSTONE_HOME/aggregates/merged directory. The directory structure will be the same as the cubes directory but the file names will be shortened to be only the CUBE_NAME.tsv.

The records in the merged file will be formatted for loading into a database.

7   Loading Merged Cubes Into A Database

The merged cubes are in a format designed to be loaded into a relational database or column store.

Below is a sample record from a merged aggregation and a corresponding sample table schema:

2012 2 06 23 45678 download 2700

Table schema:

	create table CUSTOMER_ACTION
	(
		year int,
		quarter int,
		month int,
		day int,
		cust_id int,
		action varchar(10),
		cost bigint
	);

The primary key of this table would be the concatenation of the columns: year, quarter, month, day, cust_id, action.

Individual indexes should be created on each of these columns as well to facilitate fast querying.

Once the aggregated data is loaded, and the indexes are created, the table can be queried.

Below is a sample query:

select year, quarter, cust_id, sum(cost) as cost from CUSTOMER_ACTION group by year, quarter, cust_id where year = 2012 order by quarter, cost;

This query will result in the quarterly cost for each cust_id in 1998 ordered by quarter and hits. Reporting applications can then display this in a table for viewing.

8   Partitioning Data

Up until this point, the documentation has focused on working with a single partition. When working with large data sets, it's often nessesary to spread data across multiple partitions. Amberstone supports two approaches to this: Horizontal Partitioning and Sharding.

8.1   Horizontal Partitioning

Horizontal partitioning occurs when a table is split into separate partitions based on the data in the table. Below are two examples of horizontal partitioning.

Partitioning to Support Partial Day Aggregation

Amberstone fact tables are automatically partitioned by day, and the aggregate command takes parameters to specify the range of days to perform the aggregation on. Sometimes it's benificial to perform aggregations on only part of a day's data. Horizontal partitioning can be used to split the table to support partial day aggregation.

For example, two partitions called AM and PM could be created with the same fact table definition. Data that is loaded in the morning would go into the AM partition and data that is loaded in the afternoon would be loaded into the PM partition.

To aggregate the morning data, the aggregate command would be issued against the AM partition. If a cube named hits is created from the AM partition, it would produce a file named:

hits.AM.node1.tsv

After running the merge, the formatted AM aggregates can be loaded into the database.

In the afternoon the data is loaded into the PM partition. The same hits cube is built, this time on the PM partition, creating an output file named:

hits.PM.node1.tsv

This time when the merge is run, the file hits.AM.noded1.tsv will be merged with the file hits.PM.node1.tsv to create a full day aggregate.

The full day aggregate can then be loaded into the database by updating the records that were loaded in the morning.

This approach allows the PM data to be aggregated without having to re-aggregate the AM data. In situations where billions of records are flowing into Amberstone a day, this technique can save valuable time.

Partitioning to Deal with High Cardinality

Consider this use case:

A large social video website has over 50,000,000 videos online and needs to provide aggregated monthly reporting numbers for each video. They are receiving 5 billion page views a day, accessing roughly 10 million different videos.

Amberstone can easily handle the 5 billion page views per day. But the high cardinality, stemming from having to produce aggregates on 10,000,000 videos a day, cannot be handled in a single partition.

To handle this situtation, 50 partitions are created in Amberstone. The load units are split, by video identifier, into 50 separate files. The split files are then loaded into the separate partitions and each partition is individually aggregated.

Each aggregate is named differently so when then merge is run, the files are not merged together. This results in 50 separate load files per day that can be loaded into the database.

Splitting the data in this manner results in managable cardinality.

8.2   Sharding

There are situations when a single Amberstone node will not have enough throughput to meet load or aggregation demands. In these situations, sharding can be used to distribute work across a cluster of servers.

Sharding occurs when a fact table is split across multpile servers.

To implement sharding, create the same partion on two or more servers. Then split the load units equally across the shards and load in parallel. Load throughput will increase linearly with the number of shards. After the data is loaded, perform the identical aggregations on each shard in parallel. Aggregation throughput will also increase linearly with the number of shards. After aggregation completes there will be aggregate output files with the same cube name and partition name but with a different node name on each shard.

To illustrate this, consider again the cube named hits.

Shard 1 would have the following output file:

hits.part1.node1.tsv

Shard 2 would have the output file:

hits.part1.node2.tsv

Each file contains a part of the aggregation.

After the aggregates have been created, rsync can be used to collect all of the aggregate files onto one of the nodes. Then the merge can be run on the files to produce the combined aggregates.