ELK Stack

What is ELK Stack?

ELK Stack consists of Elasticsearch (search/storage), Logstash (processing), and Kibana (visualization) for centralized logging and analysis.

Architecture

Applications → Logstash → Elasticsearch → Kibana

              Filtering
              Parsing
              Enrichment

Docker Compose Setup

version: '3.8'

services:
  elasticsearch:
    image: elasticsearch:8.11.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "9200:9200"
    volumes:
      - elasticsearch-data:/usr/share/elasticsearch/data
    networks:
      - elk
  
  logstash:
    image: logstash:8.11.0
    ports:
      - "5000:5000"
      - "9600:9600"
    volumes:
      - ./logstash/pipeline:/usr/share/logstash/pipeline
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml
    depends_on:
      - elasticsearch
    networks:
      - elk
  
  kibana:
    image: kibana:8.11.0
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    depends_on:
      - elasticsearch
    networks:
      - elk

networks:
  elk:
    driver: bridge

volumes:
  elasticsearch-data:

Logstash Configuration

# logstash/pipeline/logstash.conf
input {
  # TCP input for structured logs
  tcp {
    port => 5000
    codec => json
  }
  
  # File input
  file {
    path => "/var/log/app/*.log"
    start_position => "beginning"
  }
  
  # Beats input
  beats {
    port => 5044
  }
}

filter {
  # Parse JSON
  if [message] =~ /^\{/ {
    json {
      source => "message"
    }
  }
  
  # Add timestamp
  date {
    match => ["timestamp", "ISO8601"]
    target => "@timestamp"
  }
  
  # Parse log level
  if [level] {
    mutate {
      uppercase => ["level"]
    }
  }
  
  # Add tags for errors
  if [level] == "ERROR" {
    mutate {
      add_tag => ["error"]
    }
  }
  
  # Grok for unstructured logs
  grok {
    match => {
      "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}"
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }
  
  # Debug output
  stdout {
    codec => rubydebug
  }
}

Application Integration

Node.js

const winston = require('winston');
const LogstashTransport = require('winston-logstash/lib/winston-logstash-latest');

const logger = winston.createLogger({
  transports: [
    new LogstashTransport({
      port: 5000,
      host: 'logstash',
      node_name: 'api'
    })
  ]
});

logger.info('User created', {
  userId: '123',
  email: 'user@example.com',
  service: 'api'
});

.NET

using Serilog;
using Serilog.Sinks.Elasticsearch;

Log.Logger = new LoggerConfiguration()
    .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri("http://elasticsearch:9200"))
    {
        AutoRegisterTemplate = true,
        IndexFormat = "logs-{0:yyyy.MM.dd}",
        NumberOfShards = 2,
        NumberOfReplicas = 1
    })
    .CreateLogger();

Log.Information("Order {OrderId} created by {UserId}", orderId, userId);

Kibana Dashboards

{
  "dashboard": {
    "title": "Application Logs",
    "panels": [
      {
        "title": "Log Volume",
        "type": "line",
        "query": {
          "query": "*",
          "language": "lucene"
        }
      },
      {
        "title": "Error Logs",
        "type": "table",
        "query": {
          "query": "level:ERROR",
          "language": "lucene"
        }
      },
      {
        "title": "Top Services",
        "type": "pie",
        "aggregation": {
          "field": "service.keyword"
        }
      },
      {
        "title": "Response Times",
        "type": "histogram",
        "field": "duration"
      }
    ]
  }
}

Index Templates

PUT _index_template/logs_template
{
  "index_patterns": ["logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 2,
      "number_of_replicas": 1,
      "index.lifecycle.name": "logs_policy"
    },
    "mappings": {
      "properties": {
        "@timestamp": { "type": "date" },
        "level": { "type": "keyword" },
        "message": { "type": "text" },
        "service": { "type": "keyword" },
        "userId": { "type": "keyword" },
        "correlationId": { "type": "keyword" },
        "duration": { "type": "long" }
      }
    }
  }
}

Search Queries

// Search for errors
GET logs-*/_search
{
  "query": {
    "match": {
      "level": "ERROR"
    }
  }
}

// Search by correlation ID
GET logs-*/_search
{
  "query": {
    "term": {
      "correlationId": "abc-123"
    }
  },
  "sort": [
    { "@timestamp": "asc" }
  ]
}

// Aggregate by service
GET logs-*/_search
{
  "size": 0,
  "aggs": {
    "services": {
      "terms": {
        "field": "service.keyword"
      }
    }
  }
}

// Time-based query
GET logs-*/_search
{
  "query": {
    "range": {
      "@timestamp": {
        "gte": "now-1h"
      }
    }
  }
}

Alerting

// Watcher for high error rate
PUT _watcher/watch/high_error_rate
{
  "trigger": {
    "schedule": {
      "interval": "5m"
    }
  },
  "input": {
    "search": {
      "request": {
        "indices": ["logs-*"],
        "body": {
          "query": {
            "bool": {
              "must": [
                { "match": { "level": "ERROR" } },
                { "range": { "@timestamp": { "gte": "now-5m" } } }
              ]
            }
          }
        }
      }
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gt": 100
      }
    }
  },
  "actions": {
    "send_email": {
      "email": {
        "to": "alerts@example.com",
        "subject": "High Error Rate Alert",
        "body": "Error count: {{ctx.payload.hits.total}}"
      }
    }
  }
}

Filebeat Integration

# filebeat.yml
filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/app/*.log
    fields:
      service: api
      environment: production

output.logstash:
  hosts: ["logstash:5044"]

Performance Tuning

# elasticsearch.yml
cluster.name: elk-cluster
node.name: node-1

# Memory
bootstrap.memory_lock: true

# Indexing
index.refresh_interval: 30s
index.number_of_shards: 2
index.number_of_replicas: 1

# Query cache
indices.queries.cache.size: 10%

Kubernetes Deployment

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: elasticsearch
spec:
  serviceName: elasticsearch
  replicas: 3
  selector:
    matchLabels:
      app: elasticsearch
  template:
    metadata:
      labels:
        app: elasticsearch
    spec:
      containers:
        - name: elasticsearch
          image: elasticsearch:8.11.0
          ports:
            - containerPort: 9200
          env:
            - name: cluster.name
              value: "elk-cluster"
            - name: discovery.seed_hosts
              value: "elasticsearch-0,elasticsearch-1,elasticsearch-2"
          volumeMounts:
            - name: data
              mountPath: /usr/share/elasticsearch/data
  volumeClaimTemplates:
    - metadata:
        name: data
      spec:
        accessModes: ["ReadWriteOnce"]
        resources:
          requests:
            storage: 10Gi

Interview Tips

  • Explain ELK: Elasticsearch, Logstash, Kibana
  • Show configuration: Logstash pipelines
  • Demonstrate integration: Node.js, .NET
  • Discuss queries: Search and aggregation
  • Mention indexing: Templates and mappings
  • Show visualization: Kibana dashboards

Summary

ELK Stack provides centralized logging with Elasticsearch for storage, Logstash for processing, and Kibana for visualization. Configure Logstash pipelines for parsing and filtering. Integrate applications with structured logging. Create Kibana dashboards for monitoring. Implement alerting for critical events. Essential for log management in DevOps.

Test Your Knowledge

Take a quick quiz to test your understanding of this topic.

Test Your Cicd Knowledge

Ready to put your skills to the test? Take our interactive Cicd quiz and get instant feedback on your answers.