Time Series Data

What is Time Series Data?

Time series data is a sequence of data points indexed in time order, commonly used for metrics, logs, IoT sensor data, and financial data.

MongoDB Time Series Collections

Create Time Series Collection

// MongoDB 5.0+ native time series support
db.createCollection("weather", {
  timeseries: {
    timeField: "timestamp",
    metaField: "metadata",
    granularity: "hours"
  }
});

// Insert time series data
await db.collection('weather').insertMany([
  {
    timestamp: new Date("2024-01-01T10:00:00Z"),
    metadata: { sensorId: "sensor-1", location: "NYC" },
    temperature: 25.5,
    humidity: 60
  },
  {
    timestamp: new Date("2024-01-01T11:00:00Z"),
    metadata: { sensorId: "sensor-1", location: "NYC" },
    temperature: 26.0,
    humidity: 58
  }
]);

Query Time Series Data

// Query by time range
const data = await db.collection('weather').find({
  timestamp: {
    $gte: new Date("2024-01-01T00:00:00Z"),
    $lt: new Date("2024-01-02T00:00:00Z")
  },
  "metadata.sensorId": "sensor-1"
}).toArray();

// Aggregation on time series
const hourlyAvg = await db.collection('weather').aggregate([
  {
    $match: {
      timestamp: {
        $gte: new Date("2024-01-01T00:00:00Z"),
        $lt: new Date("2024-01-02T00:00:00Z")
      }
    }
  },
  {
    $group: {
      _id: {
        $dateTrunc: {
          date: "$timestamp",
          unit: "hour"
        }
      },
      avgTemp: { $avg: "$temperature" },
      avgHumidity: { $avg: "$humidity" }
    }
  },
  {
    $sort: { _id: 1 }
  }
]).toArray();

Cassandra Time Series

Table Design

-- Time series table with clustering order
CREATE TABLE sensor_data (
  sensor_id UUID,
  timestamp TIMESTAMP,
  temperature DOUBLE,
  humidity DOUBLE,
  PRIMARY KEY (sensor_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC);

-- Partition by time bucket
CREATE TABLE sensor_data_by_day (
  sensor_id UUID,
  day DATE,
  timestamp TIMESTAMP,
  temperature DOUBLE,
  PRIMARY KEY ((sensor_id, day), timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC);

Insert and Query

const cassandra = require('cassandra-driver');

const client = new cassandra.Client({
  contactPoints: ['localhost'],
  localDataCenter: 'datacenter1',
  keyspace: 'iot'
});

// Insert time series data
await client.execute(
  'INSERT INTO sensor_data (sensor_id, timestamp, temperature, humidity) VALUES (?, ?, ?, ?)',
  [sensorId, new Date(), 25.5, 60],
  { prepare: true }
);

// Query recent data
const result = await client.execute(
  'SELECT * FROM sensor_data WHERE sensor_id = ? AND timestamp > ? LIMIT 100',
  [sensorId, new Date(Date.now() - 86400000)],
  { prepare: true }
);

// Query by day partition
const dayResult = await client.execute(
  'SELECT * FROM sensor_data_by_day WHERE sensor_id = ? AND day = ?',
  [sensorId, '2024-01-01'],
  { prepare: true }
);

InfluxDB (Purpose-Built Time Series)

Write Data

const { InfluxDB, Point } = require('@influxdata/influxdb-client');

const influxDB = new InfluxDB({
  url: 'http://localhost:8086',
  token: 'my-token'
});

const writeApi = influxDB.getWriteApi('my-org', 'my-bucket');

// Write point
const point = new Point('temperature')
  .tag('sensor', 'sensor-1')
  .tag('location', 'NYC')
  .floatField('value', 25.5)
  .timestamp(new Date());

writeApi.writePoint(point);
await writeApi.close();

// Batch write
const points = [
  new Point('temperature').tag('sensor', 'sensor-1').floatField('value', 25.5),
  new Point('humidity').tag('sensor', 'sensor-1').floatField('value', 60)
];

writeApi.writePoints(points);
await writeApi.close();

Query Data

const queryApi = influxDB.getQueryApi('my-org');

// Flux query
const query = `
  from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "temperature")
    |> filter(fn: (r) => r.sensor == "sensor-1")
    |> aggregateWindow(every: 5m, fn: mean)
`;

const data = [];
await queryApi.queryRows(query, {
  next(row, tableMeta) {
    const o = tableMeta.toObject(row);
    data.push(o);
  },
  error(error) {
    console.error(error);
  },
  complete() {
    console.log('Query complete');
  }
});

Redis Time Series

RedisTimeSeries Module

const redis = require('redis');
const client = redis.createClient();
await client.connect();

// Create time series
await client.ts.create('temperature:sensor-1', {
  RETENTION: 86400000,  // 24 hours in ms
  LABELS: { sensor: 'sensor-1', location: 'NYC' }
});

// Add data points
await client.ts.add('temperature:sensor-1', '*', 25.5);
await client.ts.add('temperature:sensor-1', Date.now(), 26.0);

// Query range
const data = await client.ts.range('temperature:sensor-1', 
  Date.now() - 3600000,  // Last hour
  Date.now()
);

// Aggregation
const hourlyAvg = await client.ts.range('temperature:sensor-1',
  Date.now() - 86400000,
  Date.now(),
  {
    AGGREGATION: {
      type: 'AVG',
      timeBucket: 3600000  // 1 hour
    }
  }
);

DynamoDB Time Series

Table Design

// Partition by device and time bucket
const tableSchema = {
  TableName: 'SensorData',
  KeySchema: [
    { AttributeName: 'deviceId_bucket', KeyType: 'HASH' },
    { AttributeName: 'timestamp', KeyType: 'RANGE' }
  ],
  AttributeDefinitions: [
    { AttributeName: 'deviceId_bucket', AttributeType: 'S' },
    { AttributeName: 'timestamp', AttributeType: 'N' }
  ]
};

// Insert with time bucket
async function insertReading(deviceId, reading) {
  const timestamp = Date.now();
  const bucket = Math.floor(timestamp / (24 * 60 * 60 * 1000)); // Daily bucket
  
  await docClient.send(new PutCommand({
    TableName: 'SensorData',
    Item: {
      deviceId_bucket: `${deviceId}#${bucket}`,
      timestamp,
      temperature: reading.temperature,
      humidity: reading.humidity
    }
  }));
}

// Query time range
async function queryRange(deviceId, startTime, endTime) {
  const startBucket = Math.floor(startTime / (24 * 60 * 60 * 1000));
  const endBucket = Math.floor(endTime / (24 * 60 * 60 * 1000));
  
  const results = [];
  for (let bucket = startBucket; bucket <= endBucket; bucket++) {
    const command = new QueryCommand({
      TableName: 'SensorData',
      KeyConditionExpression: 'deviceId_bucket = :pk AND #ts BETWEEN :start AND :end',
      ExpressionAttributeNames: {
        '#ts': 'timestamp'
      },
      ExpressionAttributeValues: {
        ':pk': `${deviceId}#${bucket}`,
        ':start': startTime,
        ':end': endTime
      }
    });
    
    const response = await docClient.send(command);
    results.push(...response.Items);
  }
  
  return results;
}

Downsampling and Aggregation

// MongoDB aggregation for downsampling
async function downsampleToHourly() {
  await db.collection('sensor_data_hourly').deleteMany({});
  
  await db.collection('sensor_data').aggregate([
    {
      $group: {
        _id: {
          sensorId: "$metadata.sensorId",
          hour: {
            $dateTrunc: {
              date: "$timestamp",
              unit: "hour"
            }
          }
        },
        avgTemp: { $avg: "$temperature" },
        minTemp: { $min: "$temperature" },
        maxTemp: { $max: "$temperature" },
        count: { $sum: 1 }
      }
    },
    {
      $out: "sensor_data_hourly"
    }
  ]).toArray();
}

// Scheduled downsampling
setInterval(downsampleToHourly, 3600000);  // Every hour

Data Retention

// MongoDB TTL index for automatic deletion
db.sensor_data.createIndex(
  { timestamp: 1 },
  { expireAfterSeconds: 2592000 }  // 30 days
);

// Manual cleanup
async function cleanupOldData() {
  const cutoffDate = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
  
  await db.collection('sensor_data').deleteMany({
    timestamp: { $lt: cutoffDate }
  });
}

// Cassandra TTL
await client.execute(
  'INSERT INTO sensor_data (sensor_id, timestamp, temperature) VALUES (?, ?, ?) USING TTL 2592000',
  [sensorId, new Date(), 25.5],
  { prepare: true }
);

Rollup Strategy

// Multi-resolution storage
class TimeSeriesRollup {
  async insertRawData(sensorId, data) {
    // Raw data (1 minute resolution, 7 days retention)
    await db.collection('sensor_data_raw').insertOne({
      sensorId,
      timestamp: new Date(),
      ...data
    });
  }
  
  async rollupToHourly() {
    // Hourly aggregates (30 days retention)
    await db.collection('sensor_data_raw').aggregate([
      {
        $match: {
          timestamp: {
            $gte: new Date(Date.now() - 3600000),
            $lt: new Date()
          }
        }
      },
      {
        $group: {
          _id: {
            sensorId: "$sensorId",
            hour: { $dateTrunc: { date: "$timestamp", unit: "hour" } }
          },
          avg: { $avg: "$temperature" },
          min: { $min: "$temperature" },
          max: { $max: "$temperature" }
        }
      },
      {
        $merge: {
          into: "sensor_data_hourly",
          whenMatched: "replace"
        }
      }
    ]).toArray();
  }
  
  async rollupToDaily() {
    // Daily aggregates (1 year retention)
    await db.collection('sensor_data_hourly').aggregate([
      {
        $match: {
          "_id.hour": {
            $gte: new Date(Date.now() - 86400000),
            $lt: new Date()
          }
        }
      },
      {
        $group: {
          _id: {
            sensorId: "$_id.sensorId",
            day: { $dateTrunc: { date: "$_id.hour", unit: "day" } }
          },
          avg: { $avg: "$avg" },
          min: { $min: "$min" },
          max: { $max: "$max" }
        }
      },
      {
        $merge: {
          into: "sensor_data_daily",
          whenMatched: "replace"
        }
      }
    ]).toArray();
  }
}

.NET Time Series

using MongoDB.Driver;

public class TimeSeriesService
{
    private readonly IMongoCollection<SensorReading> _readings;
    
    public async Task InsertReading(SensorReading reading)
    {
        reading.Timestamp = DateTime.UtcNow;
        await _readings.InsertOneAsync(reading);
    }
    
    public async Task<List<SensorReading>> GetRange(
        string sensorId, 
        DateTime start, 
        DateTime end)
    {
        return await _readings.Find(r =>
            r.Metadata.SensorId == sensorId &&
            r.Timestamp >= start &&
            r.Timestamp < end
        ).ToListAsync();
    }
    
    public async Task<List<HourlyAggregate>> GetHourlyAggregates(
        string sensorId,
        DateTime start,
        DateTime end)
    {
        var pipeline = _readings.Aggregate()
            .Match(r => 
                r.Metadata.SensorId == sensorId &&
                r.Timestamp >= start &&
                r.Timestamp < end)
            .Group(
                r => new
                {
                    SensorId = r.Metadata.SensorId,
                    Hour = r.Timestamp.Date.AddHours(r.Timestamp.Hour)
                },
                g => new HourlyAggregate
                {
                    SensorId = g.Key.SensorId,
                    Hour = g.Key.Hour,
                    AvgTemperature = g.Average(r => r.Temperature),
                    MinTemperature = g.Min(r => r.Temperature),
                    MaxTemperature = g.Max(r => r.Temperature)
                }
            )
            .SortBy(a => a.Hour);
        
        return await pipeline.ToListAsync();
    }
}

Best Practices

const timeSeriesBestPractices = [
  'Use time-bucketed partitions',
  'Create indexes on timestamp',
  'Implement data retention policies',
  'Use downsampling for old data',
  'Store multiple resolutions',
  'Batch writes for efficiency',
  'Use appropriate granularity',
  'Monitor storage growth',
  'Compress old data',
  'Use purpose-built databases for scale'
];

Interview Tips

  • Explain time series: Sequential data indexed by time
  • Show MongoDB: Time series collections, aggregations
  • Demonstrate Cassandra: Clustering order, time buckets
  • Discuss retention: TTL, cleanup strategies
  • Mention rollups: Multi-resolution storage
  • Show examples: IoT, metrics, financial data

Summary

Time series data is sequential data indexed by time. MongoDB 5.0+ offers native time series collections with optimized storage. Cassandra uses clustering order for time-based queries. InfluxDB and RedisTimeSeries are purpose-built for time series. DynamoDB uses time-bucketed partitions. Implement data retention with TTL indexes. Use downsampling and rollups for historical data. Store multiple resolutions (raw, hourly, daily). Essential for IoT, monitoring, and analytics applications.

Test Your Knowledge

Take a quick quiz to test your understanding of this topic.

Test Your Nosql Knowledge

Ready to put your skills to the test? Take our interactive Nosql quiz and get instant feedback on your answers.