JBoss Community Archive (Read Only)

RHQ

Aggregation Schema Changes

Overview

This document explains schema changes that aim to reduce I/O overhead during aggregation and to handle past data, that is data collected from some earlier time. The data could be two hours or two days old. There are a couple scenarios in which we have to handle past data. The first involves the agent reporting late data. This could be a result of the agent falling behind with collecting and sending measurement reports, or it could be due to the agent having spooled data for time period of time while it is disconnected from the server. The second scenario in which we have to handle past data is with failed aggregations. Suppose when aggregation runs for a particular time slice, it fails for some number of measurement schedules. We want to retry the failed aggregation of that data at some point in the future.

The latest RHQ release as of this writing is 4.10.0. The latest JON release is 3.2.0.

Terminology

This section establishes some terminology to help make some of the concepts discussed more understandable and to avoid potential confusion.

Row

Unless otherwise stated a row will refer to a CQL row. Remember though that CQL transposes rows and columns, making them look more similar to relational rows and columns.

Partition

A partition refers to one physical row in a table or column family. A partition may be comprised of multiple CQL rows.

Primary Key

Uniquely identifies a row. It can consist of one or more columns.

Partition Key

The first column of the primary key. All rows having the same partition key are stored in the same partition and on the same node.

Clustering Columns

Any additional columns in the primary key after the partition key. Clustering columns define grouping and sorting within a partition.

Bucket

One of the metric data tables - raw_metrics, one_hour_metrics, six_hour_metrics, twenty_four_hour_metrics

I/O Overhead

We currently have the metrics_index table with the following schema,

CREATE TABLE metrics_index (
  bucket text,
  time timestamp,
  schedule_id int,
  PRIMARY KEY ((bucket, time), schedule_id)
)

We perform two writes when inserting raw data - one to the raw_metrics table and one to metrics_index. Suppose during the 10:00 hour we insert raw data for schedule id 100 having a timestamp of 10:02:00. The insert into metrics_index will look like,

INSERT INTO metrics_index (bucket, time, schedule_id)
VALUES (‘one_hour_metrics’, 10:00, 100)

The timestamp is rounded down to the start of the current time slice, 10:00. During aggregation, a single query is executed against metrics_index to obtain all of the schedule ids with raw data to be aggregated. Then for each schedule id, a query is executed against raw_metrics to fetch the data to be aggregated into 1 hour metrics.

If the query against metrics_index returns N measurement schedule ids, then N + 1 queries are executed to aggregate raw data. In the worst case scenario where we also have to aggregate 1 hour and 6 hour data, 3N + 3 queries are executed.

This does not scale well as the number of schedules increases as it leads to a lot of I/O overhead on the storage node(s).

Missed Aggregations

There are a few scenarios in which we could fail to aggregate metric data.

Server Outage

Suppose the server goes down at 08:46 and does not come back up until 09:45. We miss aggregating data for the 08:00 to 09:00 time slice. We already check at server startup if there has been a missed aggregation. If we detect that one has been missed, aggregation for the missed time slice will run during the next scheduled aggregation.

Failed Aggregation

Suppose that the Storage Cluster goes down during aggregation when only 10% of the schedules been processed. Aggregation should terminate immediately. The remaining 90% of the schedules will not have their data aggregated.

Late Measurement Reports

Suppose an agent loses it connection to the server at 09:30. The agent will spool measurement reports to disk. The agent reconnects at 10:15 after aggregation has finished. The agent sends a measurement report with data from the 09:00 hour. That data does not get aggregated.

Schema Changes

Two tables are being added, metrics_cache and metrics_cache_index. The metrics_index table will be dropped. Here is the schema.

CREATE TABLE metrics_cache (
  bucket text,
  time_slice timestamp,
  start_schedule_id int,
  schedule_id int,
  time timestamp,
  value map<int, double>,
  PRIMARY KEY ((bucket, time_slice, start_schedule_id), schedule_id, time)
);

CREATE TABLE metrics_cache_index (
  bucket text,
  day timestamp,
  partition int,
  collection_time_slice timestamp,
  start_schedule_id int,
  insert_time_slice timestamp,
  schedule_ids set<int>,
  PRIMARY KEY ((bucket, day, partition), collection_time_slice, start_schedule_id, insert_time_slice)
);

When we insert metric data, we write to raw_metrics, metrics_cache, and metrics_cache_index. metrics_cache and metrics_cache_index are only queried during aggregation and during server startup.

We store data for multiple schedules per partition in metrics_cache. We will refer to the number of schedules per partition as the cache block size (CBS).

Changing the cache block size is discussed in a later section.

It is worth noting that with the metrics_index table, there can only ever be zero or one row for a given schedule id and time slice. With metrics_cache_index, a row is stored for a cache block which may contain multiple schedules. There may also exist more than one row in metrics_cache_index for a given cache block and time slice. This is discussed in later sections.

We will look at an example of storing raw data to explain the new tables. Suppose we have the following raw data and a cache block size of 10.

{scheduleId: 100, timestamp: 10:18:00, value: 3.14},
{scheduleId: 221, timestamp: 10:16:00, value: 84},
{scheduleId: 366, timestamp: 10:09:00, value: 2.17}

Let’s first look at the INSERT statements for metrics_cache.

INSERT INTO metrics_cache
  (bucket, time_slice, start_schedule_id, schedule_id, time, value)
VALUES (‘raw_metrics’, 10:00, 100, 100, 10:18, {3: 3.14});

INSERT INTO metrics_cache
  (bucket, time_slice, start_schedule_id, schedule_id, time, value)
VALUES (‘raw_metrics’, 10:00, 220, 221, 10:16, {3: 84});

INSERT INTO metrics_cache
  (bucket, time_slice, start_schedule_id, schedule_id, time, value)
VALUES (‘raw_metrics’, 10:00, 360, 366, 10:09, {3: 2.17});

bucket identifies the historical table in which the data is stored. It will have as its value one of raw_metrics, one_hour_metrics, or six_hour_metrics depending on the type of metric data.

time_slice is derived by rounding down the timestamp to the start of the current hour.

start_schedule_id will be determined by the values of scheduleId and the cache block size. It identifies a block of schedule IDs, e.g., 100 to 110, 220 to 230, 360 to 370.

time is the actual timestamp for the data being inserted.

value is stored as a map so that we can accommodate both raw and aggregate metrics.

Here are the INSERT statements for metrics_cache_index.

INSERT INTO metrics_cache_index
  (bucket, day, partition, collection_time_slice, start_schedule_id, insert_time_slice, schedule_ids)
VALUES (‘raw_metrics’, 2014-04-26 00:00:00, 0, 10:00, 100, 10:00, {100});

INSERT INTO metrics_cache_index
  (bucket, day, partition, collection_time_slice, start_schedule_id, insert_time_slice, schedule_ids)
VALUES (‘raw_metrics’, 2014-04-26 00:00:00, 0, 10:00, 220, 10:00, {221});

INSERT INTO metrics_cache_index
  (bucket, day, partition, collection_time_slice, start_schedule_id, insert_time_slice, schedule_ids)
VALUES (‘raw_metrics’, 2014-04-26 00:00:00, 0, 10:00, 360, 10:00, {366});

bucket is one of the historical tables and has the same value as metrics_cache.bucket.

day is the collection time rounded down to the start of the day. We only ingest late data that is not too old. What is too old? Certainly anything older than the retention period of raw data which is currently 7 days is too old. Filtering by day allows us to efficiently search for past data. We determine any and all of the measurement schedules having late data with only 7 queries.

partition is a number between 0 and n - 1 where n is the number of partitions that we want to split the bucket, time slice across. Initially and probably in many cases n will always be zero. If and when a single partition gets very big, it can lead to a hot spot on a node. If we find that that we have hit that point, we can increase the value of n. Clients will alternate the value of n on writes in a round robin fashion so that partitions grow at a similar rate. Partitions will be merged client side during aggregation.

collection_time_slice is the reported timestamp rounded down to the start of the hour. This will typically be the same as metrics_cache.time_slice but may differ in the event of late measurement reports.

start_schedule_id is the same as metrics_cache.start_schedule_id.

insert_time_slice is the time slice in which the data was inserted. Under normal circumstances insert_time_slice and collection_time_slice will have the same value. They will differ though in the cases of late measurement reports. This column helps us determine whether data should be pulled from metrics_cache or from raw_metrics.

schedule_ids is a set of all the schedule ids (for the particular cache block) for which data has been stored during the collection time slice.

Reducing I/O Overhead

During aggregation we only fetch data from metrics_cache and not from the historical tables (under normal circumstances). For a given bucket and time slice, we execute (N / CBS) + 1 queries. If N = 1,000,000, and CBS = 10, then we execute 100,000 queries. This yields a 90% reduction in the number of queries and is made possible by storing data for multiple schedules within the same partition. The extra query is against metrics_cache_index which tells us all of the partitions in metrics_cache that have data.
When we finish aggregating data for a partition block, we delete it from metrics_cache as well as the corresponding row from metrics_cache_index. Past data situations are discussed in the next sections.

Aggregating Past Data

There are a couple different types of situations that we have to handle. Aggregation of some cache partitions fails; so, we need to retry the aggregation for those partitions at some point in the future. Then we have the scenario in which the agent reports data that was collected during a time slice for which aggregation has already run.

Handling Failed Aggregations

Suppose that there is an error inserting the computed aggregates for partition block 360. Neither the partition in metrics_cache nor the corresponding row in metrics_cache_index is deleted. This allows us to retry the cache partition during a subsequent aggregation run. Let's use the data from the previous examples. The aggregation job runs at 11:00 to aggregate data collected during the 10:00 - 11:00 time slice. There is an error for cache partition block 360.

Now it is 12:00 and the aggregation job runs again. We query metrics_cache_index and find that we have data from the 10:00 hour with a start schedule id of 360 that needs to be aggregated. Because collection_time_slice and insert_time_slice are the same, we can pull data from the metrics_cache table. When they are the same, it means that the cache partition for that time slice has not deleted and therefore contains all of the data collected during that time slice.

Handling Late Measurement Reports

Now suppose the aggregation run at 11:00 completes successfully without error. This means that all of the cache partitions for the 10:00 hour have been deleted as well as all of the corresponding rows in the cache index. The agent reports some late data from the 10:00 hour that we store at 11:05. We perform the following inserts into metrics_cache and metrics_cache_index.

INSERT INTO metrics_cache
  (bucket, time_slice, start_schedule_id, schedule_id, time, value)
VALUES (‘raw_metrics’, 10:00, 360, 366, 10:09, {3: 5.11});

INSERT INTO metrics_cache_index
  (bucket, day, partition, collection_time_slice, start_schedule_id, insert_time_slice, schedule_ids)
VALUES (‘raw_metrics’, 2014-04-26 00:00:00, 0, 10:00, 360, 11:00, {366});

When the aggregation job runs again at 12:00, we find that there is data from 10:00 for cache block 360 that needs to be aggregated. insert_time_slice is later than collection_time_slice. Because the cache partition had been deleted and effectively recreated when we execute the INSERTs at 11:05, it will not contain all of the data collected during the 10:00 time slice; therefore, we need to pull data from raw_metrics instead of metrics_cache.

Handling Failed Aggregations and Late Measurement Reports

Let's say that aggregation for cache partition 360 for the 10:00 time slice fails and that we also late data reported at 11:05. We will find the following rows in metrics_cache_index when the aggregation job runs at 12:00,

 bucket           | day                      | partition | collection_time_slice    | start_schedule_id | insert_time_slice        | schedule_ids
------------------+--------------------------+-----------+--------------------------+-------------------+--------------------------+--------------
      raw_metrics | 2014-04-26 00:00:00-0400 |         0 | 2014-04-26 10:00:00-0400 |               360 | 2014-04-26 10:00:00-0400 |   {363, 364, 365, 366}
      raw_metrics | 2014-04-26 00:00:00-0400 |         0 | 2014-04-26 10:00:00-0400 |               360 | 2014-04-26 11:00:00-0400 |   {366}

There are two rows in the cache index for the 10:00 time slice for cache block 360. When there are multiple rows for a particular cache block and collection time slice, two things need to happen. We need to see if there one where collection_time_slice == insert_time_slice. If so, then we can pull data from metrics_cache. While there are multiple rows, they all correspond to the same collection time slice; therefore, we can combine them so that we wind up with,

 bucket           | day                      | partition | collection_time_slice    | start_schedule_id | insert_time_slice        | schedule_ids
------------------+--------------------------+-----------+--------------------------+-------------------+--------------------------+--------------
      raw_metrics | 2014-04-26 00:00:00-0400 |         0 | 2014-04-26 10:00:00-0400 |               360 | 2014-04-26 10:00:00-0400 |   {363, 364, 365, 366}

The values of schedule_ids are merged and insert_time_slice is set to 10:00 to denote that we can pull data from metrics_cache. Now suppose we have multiple rows in the cache index where none satisfy the condition collection_time_slice == insert_time_slice as in the following example.

 bucket           | day                      | partition | collection_time_slice    | start_schedule_id | insert_time_slice        | schedule_ids
------------------+--------------------------+-----------+--------------------------+-------------------+--------------------------+--------------
      raw_metrics | 2014-04-26 00:00:00-0400 |         0 | 2014-04-26 10:00:00-0400 |               360 | 2014-04-26 11:00:00-0400 |   {363, 364, 365, 366}
      raw_metrics | 2014-04-26 00:00:00-0400 |         0 | 2014-04-26 10:00:00-0400 |               360 | 2014-04-26 12:00:00-0400 |   {361, 362}
      raw_metrics | 2014-04-26 00:00:00-0400 |         0 | 2014-04-26 10:00:00-0400 |               360 | 2014-04-26 13:00:00-0400 |   {362, 367}

In this case we have to pull data from raw_metrics the cache partition might not contain all of the data collected for the time slice. As for combining them we have,

 bucket           | day                      | partition | collection_time_slice    | start_schedule_id | insert_time_slice        | schedule_ids
------------------+--------------------------+-----------+--------------------------+-------------------+--------------------------+--------------
      raw_metrics | 2014-04-26 00:00:00-0400 |         0 | 2014-04-26 10:00:00-0400 |               360 |                        0 |   {361, 362, 363, 364, 365, 366, 367}

Notice that we set insert_time_slice to zero. This denotes that data should be pulled from raw_metrics.

Recomputing 6 hour and 24 hour Data

If the 6 hour time slice has already completed, then we need to recompute the 6 hour data. Suppose we have the following row in metrics_cache_index,

 bucket           | day                      | partition | collection_time_slice    | start_schedule_id | insert_time_slice        | schedule_ids
------------------+--------------------------+-----------+--------------------------+-------------------+--------------------------+--------------
     raw_metrics | 2014-04-26 00:00:00-0400 |         0 | 2014-04-26 11:00:00-0400 |               360 | 2014-04-26 12:00:00-0400 |   {362, 367}

Data was collected during the 11:00 hour, which falls into the 06:00 - 12:00 six hour time slice. It was inserted during the 12:00 which means that the six hour data for schedule ids 362 and 367 have already been computed. We therefore need to recompute the six hour data.

The same logic applies to 24 hour data. If the 24 hour time slice has completed, then we will have to recompute the 24 hour data as well.

Ingesting Old Data

We will cap the age of data that we ingest. Suppose the cap is 24 hours. If the server receives measurement reports with data more than a day old, it simply drops the data. The cap will be made configurable. Without a cap, we would need to rely on using TTLs or some background job that ensure old data gets purged. Putting a cap in place effectively and safely allows us to avoid the overhead incurred from using TTLs.

Upgrades

Schema changes are applied at installation (or upgrade) time. There could potentially be a large amount of data to migrate over to metrics_cache. This could make for a long-running, complicated upgrade process. A better solution is to only migrate data from metrics_index over to metrics_cache_index. This should be very fast by comparison. We cannot pull data from metrics_cache during aggregation until the next day because it will have incomplete 1 hour and 6 hour data.

If there are any errors inserting data into metrics_cache_index, then the upgrade should fail so that the migration can be retried.

Cache Activation Time

We will introduce a global setting, cache activation time, which when set indicates that data should only be pulled from the historical tables until the specified time. Let's say we upgrade the server at 2014-04-26 14:25:00. The cache activation time will then be set to 2014-04-27 00:00:00 after data is migrated to metrics_cache_index.

Changing the Cache Block Size

Increasing the cache block size can potentially improve aggregation performance at the cost of higher memory consumption by both the storage node and the server. It will also reduce the number of queries executed. The setting will be exposed through resource configuration of the measurement subsystem resource type that is part of the RHQ Server agent plugin.

Changing the block size can result in incomplete data in metrics_cache; consequently, the cache activation time setting will be utilized. Whenever the block size is changed, the cache activation time will be set to the start of the next day.

JBoss.org Content Archive (Read Only), exported from JBoss Community Documentation Editor at 2020-03-11 12:57:07 UTC, last content change 2014-04-28 19:34:31 UTC.