This page looks best with JavaScript enabled

elasticsearch + logstash + kafka + filebeat 的集群搭建及相关优化。

 ·  ☕ 4 min read

基于elasticsearch + logstash + kafka + filebeat的日志收集及近实时处理的集群介绍及搭建,基于rancher1.6平台。

elasticsearch + logstash + kafka + filebeat

简介

graph TD
A[logFile] -->|filebeat| B(kafka)
B --> C{logstash}
C -->|index1| D
C -->|index2| D
C -->|index3| D[Elasticsearch]

Elasticsearch是一个基于Lucene搜索引擎。它提供了一个分布式,多租户 -能够全文搜索与发动机HTTP Web界面和无架构JSON文件。

Elasticsearch与名为Logstash的数据收集和日志解析引擎以及名为Kibana的分析和可视化平台一起开发。这三种产品设计用作集成解决方案,称为**“弹性堆栈”**(以前称为“ELK堆栈”)。

Elasticsearch可用于搜索各种文档。它提供可扩展的搜索,近实时搜索,并支持多租户。[2] “Elasticsearch是分布式的,这意味着索引可以分为多个分,每个分可以有零个或多个副本。每个节点托管一个或多个分片,并充当协调器,将操作委托给正确的分片。重新平衡和路由是自动完成的“。[2]相关数据通常存储在同一索引中,该索引由一个或多个主分片和零个或多个副本分片组成。创建索引后,无法更改主分片的数量

Elasticsearch

1
2
cluster status:
http://172.18.4.42:9200/_cluster/health

配置详解

data node

如果你想让节点从不选举为主节点,只用来存储数据,可作为负载器
node.master: false
node.data: true

master node

如果想让节点成为主节点,且不存储任何数据,并保有空闲资源,可作为协调器
node.master: true
node.data: false

client node

如果想让节点既不称为主节点,又不成为数据节点,那么可将他作为搜索器,从节点中获取数据,生成搜索结果等
node.master: false
node.data: false

filebeat

从文件读取日志,通过tag进行标记,用于区分不同的日志,最后输出到kafka。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#输入
- type: log
  paths:
    - /var/log/amp/*.log
  tags: ["amp-log"]
  fields_under_root: true
  document_type: amp-log
  ignore_older: 24h
#输出
output.kafka:
  hosts: ["kafka-broker-1:9092","kafka-broker-2:9092","kafka-broker-3:9092"]
  topic: logstash
  required_acks: 1

kafka

用于临时存储filebeat收集来的日志。

key value
ADVERTISE_PUB_IP true
JVMFLAGS -Xmx2048m -Xms2048m
KAFKA_REPLICATION_FACTOR(副本数量) 3
KAFKA_LOG_RETENTION_HOURS(日志过期时间) 24
KAFKA_NUM_PARTITIONS(topic分区数量) 10
ZK_SERVICE ZK_SERVICE

logstash

从kafka读取日志,使用正则过去日志信息,之后结构化日志,存入elasticsearch。

graph LR
A[INPUT] -->|kafka| B(filter)
B -->|tags| C{output}
C -->|index1| D
C -->|index2| D[Elasticsearch]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 从kafka读取日志
input {
  # beats {
  #    port => "5044"
  # }
  kafka {
    bootstrap_servers => "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092"
    topics => "logstash"
    group_id => "logstash-group-01"
    consumer_threads => 5
    decorate_events => false
    auto_offset_reset => "latest" # 从最新的偏移量开始消费
    codec => "json"
  }
# 结构化日志并清洗日志
filter {
  if "amp-log" in [tags] {
     grok {
       match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:threadname}\] %{WORD:level}\s+%{DATA:topic}:%{GREEDYDATA:jsondata}" }
    }
     json {
       source => "jsondata"
       #target => "parsedJson"
       remove_field=>["jsondata"]
    }
     date {
       match => ["timestamp","YYYY-MM-dd HH:mm:ss.SSS"]
       target => [ "@timestamp" ]
    }
     ruby {
       code=>"event.set('index_day', event.get('@timestamp').time.localtime.strftime('%Y.%m.%d'))"
    }
     mutate {
       remove_field => ["timestamp"]
       remove_field => ["message"]
       remove_field => ["threadname"]
   }
     if "success" in [topic] {
      mutate { add_tag => [ "monitor-success" ] }
   }
     else if "repeat" in [topic] {
      mutate { add_tag => [ "monitor-repeat" ] }
   }
}
}
# 输出到elasticsearch
output {
   if "monitor-success" in [tags] {
        elasticsearch {
          hosts => "elasticsearch:9200"
          index => "amp-success-%{index_day}"
        }
    }
}

kibana

用于查询elasticsearch的数据,监控elasticsearch的监控状态

other

配置

1
2
3
4
5
6
7
8
9
// 索引写入缓存
indices.memory.index_buffer_size=20% 
indices.memory.min_index_buffer_size=96m
// 存活超时时间
discovery.zen.fd.ping_timeout=120s
// 存活超时重试次数
discovery.zen.fd.ping_retries=6
// 节点间存活检测间隔
discovery.zen.fd.ping_interval=30s

索引模板配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 刷新间隔,merge线程数
PUT /_template/adv_logs 
{
  "index_patterns": ["aap-*", "amp-*"], 
  "order":    1, 
  "settings": {
    "refresh_interval": "60s",
    "merge": {
        "scheduler": {
            "max_thread_count": "1"
        }
    }	
  }
}

kafka

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# alter partition
bin/kafka-topics.sh --zookeeper zk:2181 --alter --topic logstash --partitions 5

# list topics
bin/kafka-topics.sh --zookeeper zk:2181 --list

# describe topic 
bin/kafka-topics.sh --zookeeper zk:2181 --topic logstash --describe

# describe consumer 
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group logstash-group-01 --describe
Share on

tux
WRITTEN BY
tux
devops

What's on this Page