ELK栈集成
    ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
    │  Logstash   │────▶│Elasticsearch│────▶│   Kibana    │
    │  (数据收集)  │     │  (数据存储)  │     │  (数据展示)  │
    └─────────────┘     └─────────────┘     └─────────────┘
           ▲
           │
    ┌──────┴──────┐
    │  Beats      │  Filebeat / Metricbeat / Packetbeat
    │  (数据收集)  │
    └─────────────┘
                
1 Filebeat 配置
轻量级日志收集器
# filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/*.log
    - /var/log/app/*.log
  fields:
    service: nginx
    environment: production
  fields_under_root: true
  multiline.pattern: '^\['
  multiline.negate: true
  multiline.match: after

output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "filebeat-%{[agent.version]}-%{+yyyy.MM.dd}"
  username: "elastic"
  password: "your_password"

# 启用模块
filebeat modules enable nginx
filebeat modules enable system

# 启动
./filebeat -e -c filebeat.yml
2 Logstash 配置
数据处理和转换管道
# logstash.conf
input {
  beats {
    port => 5044
  }
  
  tcp {
    port => 5000
    codec => json
  }
}

filter {
  # Grok解析日志
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
  
  # 日期解析
  date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "@timestamp"
  }
  
  # 添加字段
  mutate {
    add_field => { "service" => "web-server" }
    remove_field => [ "message", "timestamp" ]
  }
  
  # IP地理位置
  geoip {
    source => "clientip"
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logs-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "your_password"
  }
  
  stdout { codec => rubydebug }
}
3 Kibana 配置
数据可视化和仪表盘
# kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://localhost:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "your_password"

# 常用功能
# 1. Discover - 搜索和过滤日志
# 2. Visualize - 创建图表
# 3. Dashboard - 组合仪表盘
# 4. Stack Monitoring - 监控集群
# 5. Index Patterns - 创建索引模式

# 创建索引模式
# Management → Stack Management → Index Patterns
# 输入: filebeat-* 或 logs-*
4 Docker Compose 部署
version: '3.8'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
    ports:
      - "9200:9200"
    volumes:
      - es-data:/usr/share/elasticsearch/data

  logstash:
    image: docker.elastic.co/logstash/logstash:8.11.0
    container_name: logstash
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
      - ./logs:/logs
    ports:
      - "5044:5044"
      - "5000:5000"
    depends_on:
      - elasticsearch

  kibana:
    image: docker.elastic.co/kibana/kibana:8.11.0
    container_name: kibana
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    depends_on:
      - elasticsearch

  filebeat:
    image: docker.elastic.co/beats/filebeat:8.11.0
    container_name: filebeat
    volumes:
      - ./filebeat.yml:/usr/share/filebeat/filebeat.yml
      - /var/log:/var/log:ro
    depends_on:
      - logstash

volumes:
  es-data:
5 常用场景
# 场景1: Nginx日志分析
# Filebeat收集 → Logstash解析 → Elasticsearch存储 → Kibana展示

# 场景2: 应用日志聚合
# 多服务器Filebeat → Logstash过滤 → Elasticsearch → Kibana告警

# 场景3: 系统监控
# Metricbeat收集系统指标 → Elasticsearch → Kibana监控仪表盘

# 场景4: 安全审计
# Auditbeat收集安全事件 → Elasticsearch → Kibana SIEM

# Kibana常用查询
# 状态码分布: response:[500 TO 599]
# 慢查询: duration:>1000
# 特定IP: clientip:192.168.1.1
# 错误日志: level:error OR level:ERROR
ELK最佳实践
  • 使用Filebeat代替Logstash直接收集,减轻资源消耗
  • 合理设置索引生命周期(ILM),自动删除旧数据
  • 使用索引模板统一Mapping配置
  • 配置适当的分片和副本数
  • 启用监控和告警,及时发现问题