ELK三件套

环境搭建

ElasticSearch

window

安装ElasticSearch

官网:https://www.elastic.co/cn/elasticsearch/

image-20211105005446974

点击下载,然后选择对应的系统版本

image-20211105005534974

最后解压即可

image-20211105005631110

文件说明:

bin        启动文件
config    配置文件
    log4js    日志配置文件
    jvm.options    java虚拟机相关配置
    elasticsearch.yml    elasticsearch的配置文件
lib        相关jar包
logs    日志
modules    功能模块
plugins    插件

双击elasticsearch.bat文件就可以启动了

报错:启动失败

"could not find java in JAVA_HOME at "C:\Program Files\Java\jdk1.8.0_131;\bin\java.exe"

image-20211105012825013

解决方案:

上方的错误原因是 jdk1.8.0_131;\bin\多出了一个分号,可以在配置文件中写死,或者配置好JAVA_HOME的路径

修改elasticsearch-env.bat文件,将 %JAVA_HOME% 直接改成 C:\Program Files\Java\jdk1.8.0_131

image-20211105015936121

安装 ElasticSearch-head 可视化

项目网站:https://github.com/mobz/elasticsearch-head

直接下载未压缩包,然后解压

image-20211105010318766

解压完成之后进入文件夹执行如下指令

# 安装依赖
npm install
# 启动
npm run start

image-20211105010742880

启动完成之后可以访问http://localhost:9100

但是发现点击连接会报错,这是跨域问题

image-20211105020318719

修改ElasticSearch配置文件elasticsearch.yml

http.cors.enabled: true
http.cors.allow-origin: "*"

image-20211105020603742

重新启动 ElasticSearch 即可

image-20211105021002526Linux

下载安装包

1、在/etc目录下创建es文件

2、下载安装包

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.16.3-linux-x86_64.tar.gz

image-20220123211402820

3、解压

tar -zxvf elasticsearch-7.16.3-linux-x86_64.tar.gz

修改配置文件

# 1、修改jvm占用
vim jvm.options
# 增加如下配置
-Xms128m
-Xmx128m

# 2、修改elasticsearch.yml 文件
vim elasticsearch.yml
# 配置es的集群名称,默认是elasticsearch
cluster.name: my-es
# 节点名称
node.name: node-1
# 设置当前的ip地址,通过指定相同网段的其他节点会加入该集群中
network.host: 0.0.0.0
# 设置对外服务的http端口
http.port: 9200
# 设置主节点
cluster.initial_master_nodes: ["node-1"]
# 设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点
discovery.zen.ping.unicast.hosts: ["127.0.0.1","10.10.10.34:9200"]
# 解决跨域问题
http.cors.enabled: true
http.cors.allow-origin: "*"

启动es

./elasticsearch

image-20220123212513042

报错:因为安全问题elasticsearch 不让用root用户直接运行,所以要创建新用户

配置子用户

# 创建用户appadmin
useradd appadmin
# 为用户appadmin设置密码
passwd appadmin

# 赋予权限
chown -R appadmin /etc/es/elasticsearch-7.16.3

修改本地配置

image-20220123214928274

如果直接切换用户就启动会报错,因为无法创建本地文件问题,用户最大可创建文件数太小。

vim /etc/security/limits.conf
# 添加如下内容
* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096  
# 注:* 代表Linux所有用户名称(比如 hadoop)

vim /etc/sysctl.conf
# 添如下配置
vm.max_map_count = 262145
# 刷新配置
sysctl -p 

image-20220123215318006

配置安全组

登录阿里云控制台,增加9200端口

image-20220123221726591

在宝塔面板开放服务器端口

image-20220123221949229

启动ES

# 切换用户
su appadmin

# 启动es
bin/elasticsearch

image-20220123220506937

后台启动

# 后台启动
./elasticsearch -d

# 查看es进程
jps

ElasticSearch-head

下载安装包

Github地址:https://github.com/mobz/elasticsearch-head

# 下载安装包
wget https://github.com/mobz/elasticsearch-head/archive/refs/heads/master.zip

# 解压
unzip master.zip

# 安装
npm insatll

# 启动
npm run start

# 后台启动
nohup npm run start &

查看es数据

访问端口9100,输入es的主机和端口,并连接即可

image-20220124230211250

如果连接不上可以考虑两个问题:

  1. 没有配置es的跨域问题
  2. es的当前节点没有配置在主节点中

LogStash

安装logstash

# 下载
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.16.3-linux-x86_64.tar.gz

# 解压
tar -zxvf logstash-7.16.3-linux-x86_64.tar.gz

修改配置文件

vim jvm.options

# 配置内存
-Xms128m
-Xmx128m

创建配置文件

cd conf.d
vim logstash.conf

# 输入如下内容
input { 
     file { 
          path => ["/usr/local/jweb/jweb_tl_pms4/logs/*.log"]
          type => syslog
          start_position => "end" # 第一次从头开始读取文件  beginning or end
          stat_interval => "3"  # 定时检查文件是否更新,默认1s
   } 
} 

output { 
    # 保存到es中
    elasticsearch { 
        hosts => "106.53.119.27:9200"
        index => "logstash-%{ +YYYY.MM.dd} "
    } 
    stdout {  
        codec => rubydebug
    } 
} 

启动

./logstash -f ../config/logstash.conf

# 后台启动
nohup ./logstash -f ../config/logstash.conf &

Kibana

安装

# 下载安装包
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.16.3-linux-x86_64.tar.gz

# 解压
tar -zxvf kibana-7.16.3-linux-x86_64.tar.gz

修改配置文件

vim kibana.yml

# 修改如下内容
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.url: "http://[elasticsearch主机]:9200"
kibana.index: ".kibana"
# 设置为中文内容
i18n.locale: "zh-CN"

启动

bin/kibana

# 后台启动
nohup bin/kibana &

访问5601端口即可

image-20220124225501207

查看数据

点击侧边栏的discover

image-20220124225559161

第一次进入可能会让你创建索引模式,可以匹配以有的索引,用于展示内容

image-20220124225902157

配置好之后再次点击侧边栏的discover,查看数据,目前展示的便是刚才闯将的索引模式

image-20220124230053964

查询日志

日志内容存放在message字段中

image-20220124230756416

可以使用上方的KQL(Kibana Query Language)语句进行查询

image-20220124230909002

LogStash

logstash是一个数据分析软件,主要目的是分析log日志。整一套软件可以当作一个MVC模型,logstash是controller层,Elasticsearch是一个model层,kibana是view层。

在这里插入图片描述

input

input 及输入是指日志数据传输到Logstash中

  • file:从文件系统中读取一个文件,很像UNIX命令 “tail -0a”
  • syslog:监听514端口,按照RFC3164标准解析日志数据
  • redis:从redis服务器读取数据,支持channel(发布订阅)和list模式。redis一般在Logstash消费集群中作为”broker”角色,保存events队列共Logstash消费。
  • lumberjack:使用lumberjack协议来接收数据,目前已经改为 logstash-forwarder。
stdin {  }  # 从控制台中输入来源

file {  
    # 从文件中来
    path => "E:/software/logstash-1.5.4/logstash-1.5.4/data/*" #单一文件
    # 监听文件的多个路径
    path => ["E:/software/logstash-1.5.4/logstash-1.5.4/data/*.log","F:/*.log"]
    # 排除不想监听的文件
    exclude => "1.log"

    # 添加自定义的字段
    add_field => { "test"=>"test"} 
    # 增加标签
    tags => "tag1"

    # 设置新事件的标志
    delimiter => "\n"

    # 设置多长时间扫描目录,发现新文件
    discover_interval => 15
    # 设置多长时间检测文件是否修改
    stat_interval => 1

    # 监听文件的起始位置,默认是end
    start_position => beginning

    # 监听文件读取信息记录的位置
    sincedb_path => "E:/software/logstash-1.5.4/logstash-1.5.4/test.txt"
    # 设置多长时间会写入读取的位置信息
    sincedb_write_interval => 15
} 

# 系统日志方式
syslog {  
    # 定义类型
    type => "system-syslog"  
    # 定义监听端口
    port => 10514    
} 

# filebeats方式
beats {  
    port => 5044
} 

以上文件来源file,syslog,beats 只能选择其中一种

注意:

  • 文件的路径名需要使用绝对路径
  • 支持globs写法
  • 如果想要监听多个目标文件可以改成数组

filter

Fillters 在Logstash处理链中担任中间处理组件。经常被组合起来实现一些特定的行为来,处理匹配特定规则的事件流。

  • grok:解析无规则的文字并转化为有结构的格式。Grok 是目前最好的方式来将无结构的数据转换为有结构可查询的数据。有120多种匹配规则,会有一种满足你的需要。
  • mutate:mutate filter 允许改变输入的文档,你可以从命名,删除,移动或者修改字段在处理事件的过程中。
  • drop:丢弃一部分events不进行处理,例如:debug events。
  • clone:拷贝 event,这个过程中也可以添加或移除字段。
  • geoip:添加地理信息(为前台kibana图形化展示使用)
filter { 
    # 定义数据的格式
    grok { 
        match => {  "message" => "%{ DATA:timestamp} \|%{ IP:serverIp} \|%{ IP:clientIp} \|%{ DATA:logSource} \|%{ DATA:userId} \|%{ DATA:reqUrl} \|%{ DATA:reqUri} \|%{ DATA:refer} \|%{ DATA:device} \|%{ DATA:textDuring} \|%{ DATA:duringTime:int} \|\|"} 
    } 

    #定义时间戳的格式
    date { 
        match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]
        # match => ["timeLocal": "dd/MMM/yyyy:HH:mm:ss Z"]
        locale => "cn"
    } 

    # 定义客户端的IP是哪个字段(上面定义的数据格式)
    geoip { 
        source => "clientIp"
    } 

    # 需要进行转换的字段,这里是将访问的时间转成int,再传给Elasticsearch
    mutate { 
        convert => ["duringTime", "integer"]
    } 
} 

常见的日志样例

内容: - 2015-04-29 13:04:23,733 [main] INFO (api.batch.ThreadPoolWorker) Command-line options for this run:
正则:- %{ TIMESTAMP_ISO8601:time}  \[%{ WORD:main} \] %{ LOGLEVEL:loglevel}  \(%{ JAVACLASS:class} \) %{ GREEDYDATA:mydata} 

output

outputs是logstash处理管道的最末端组件。一个event可以在处理过程中经过多重输出,但是一旦所有的outputs都执行结束,这个event也就完成生命周期。

  • elasticsearch:如果你计划将高效的保存数据,并且能够方便和简单的进行查询

  • file:将event数据保存到文件中

  • graphite:将event数据发送到图形化组件中,一个很流行的开源存储图形化展示的组件。http://graphite.wikidot.com/

  • statsd:statsd是一个统计服务,比如技术和时间统,通过udp通讯,聚合一个或者多个后台服务,如果你已经开始使用statsd,该选项对你应该很有用

output { 
    #将输出保存到elasticsearch,如果没有匹配到时间就不保存,因为日志里的网址参数有些带有换行
    elasticsearch { 
        host => "127.0.0.1:9200"
        index => "logstash-%{ +YYYY.MM.dd} "
    } 

    #输出到stdout
    #  stdout {  codec => rubydebug } 

    #定义访问数据的用户名和密码
    #  user => webService
    #  password => 1q2w3e4r
} 

Codecs

codecs 是基于数据流的过滤器,它可以作为input,output的一部分配置。Codecs可以帮助你轻松的分割发送过来已经被序列化的数据。流行的codecs包括 json,msgpack,plain(text)。

  • json:使用json格式对数据进行编码/解码
  • multiline:将汇多个事件中数据汇总为一个单一的行。比如:java异常信息和堆栈信息
    获取完整的配置信息,请参考 Logstash文档中 “plugin configuration”部分

监听多个类型的文件

input { 
    file { 
        path => ["/usr/local/jweb/jweb_tl_pms4/logs/run.*.log"]
        type => "run"
        start_position => "end"
        stat_interval => "3"
    } 
    file { 
        path => ["/usr/local/jweb/jweb_tl_pms4/logs/error.*.log"]
        type => "error"
        start_position => "end"
        stat_interval => "3"
    } 
} 

output { 
    if [type] == "run" { 
        elasticsearch { 
            hosts => "106.53.119.27:9200"
            index => "run.%{ +YYYY.MM.dd} .log"
        } 
    } 

    if [type] == "error" { 
        elasticsearch { 
            hosts => "106.53.119.27:9200"
            index => "error.%{ +YYYY.MM.dd} .log"
        } 
    } 
    stdout {  
        codec => rubydebug
    } 
}