logstash环境部署

Logstash架构:
– 一个服务器节点可以有多个Logstash实例

– 一个Logstash实例可以有多个pipeline,若没有定义pipeline id,则默认为main pipeline。
– 每个pipeline都有三个组件组成,其中filter插件是可选组件:
– input :
数据从哪里来 。
– filter:
数据经过哪些插件处理,该组件是可选组件。
– output:
数据到哪里去。

  • 1.下载Logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.17.28-amd64.deb
  • 2.安装Logstash
[root@elk93 ~]# ll -h logstash-7.17.28-amd64.deb 
-rw-r--r-- 1 root root 359M Mar 13 14:41 logstash-7.17.28-amd64.deb
[root@elk93 ~]# 
[root@elk93 ~]# dpkg -i logstash-7.17.28-amd64.deb 
  • 3.创建符号链接,将Logstash命令添加到PATH环境变量
[root@elk93 ~]# ln -svf /usr/share/logstash/bin/logstash /usr/local/bin/
'/usr/local/bin/logstash' -> '/usr/share/logstash/bin/logstash'
[root@elk93 ~]# 
[root@elk93 ~]# logstash --help
  • 4.基于命令行的方式启动实例,使用-e选项指定配置信息(不推荐)
[root@elk93 ~]# logstash -e "input { stdin { type => stdin } } output { stdout { codec => rubydebug } }"  --log.level warn
...
The stdin plugin is now waiting for input:
111111111111111111111111111
{
    "@timestamp" => 2025-03-13T06:51:32.821Z,
          "type" => "stdin",
       "message" => "111111111111111111111111111",
          "host" => "elk93",
      "@version" => "1"
}

  • 5.基于配置文件方式启动Logstash
[root@elk93 ~]# cat /etc/logstash/conf.d/01-stdin-to-stdout.conf
input { 
  stdin { 
    type => stdin 
  } 
} 


output { 
  stdout { 
    codec => rubydebug 
  } 
}
[root@elk93 ~]# 
[root@elk93 ~]# logstash -f /etc/logstash/conf.d/01-stdin-to-stdout.conf
...
333333333333333333333333333333
{
          "type" => "stdin",
       "message" => "333333333333333333333333333333",
          "host" => "elk93",
    "@timestamp" => 2025-03-13T06:54:20.223Z,
      "@version" => "1"
}

Logstash采集文本日志策略

  • 1.编写配置文件
[root@elk93 ~]# cat /etc/logstash/conf.d/02-file-to-stdout.conf
input { 
  file { 
    # 指定文件的路径
    path => "/tmp/student.txt"
    # 指定首次从哪个位置开始采集,有效值为:beginning,end。默认值为"end"
    start_position => "beginning"
  } 
} 


filter {
  mutate {
    remove_field => [ "@version","host","path" ]
  }
}

output { 
  stdout { 
    codec => rubydebug 
  } 
}
[root@elk93 ~]# 
  • 2.热加载启动配置
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/02-file-to-stdout.conf 

Logstash多实例案例

	启动实例1:
[root@elk93 ~]# logstash -f /etc/logstash/conf.d/01-stdin-to-stdout.conf 


	启动实例2:
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/02-file-to-stdout.conf  --path.data /tmp/logstash-multiple
本质也是指定不同的启动数据目录,不指定 path.data 它会选择默认的路径

Logstash采集nginx日志实战案例

  • 1.安装nginx
[root@elk93 ~]# apt -y install nginx
  • 2.访问测试
http://10.0.0.93/
  • 3.Logstash采集nginx日志之grok案例
[root@elk93 ~]# cat /etc/logstash/conf.d/03-nginx-grok-stdout.conf 
input { 
  file { 
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
  } 
} 


filter {
  # 基于正则提取任意文本,并将其封装为一个特定的字段。
  grok {
    match => { "message" => "%{HTTPD_COMMONLOG}" }
  }

  mutate {
    remove_field => [ "@version","host","path" ]
  }
}

output { 
  stdout { 
    codec => rubydebug 
  } 
}
[root@elk93 ~]# 
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/03-nginx-grok-stdout.conf 
...
{
        "message" => "10.0.0.1 - - [13/Mar/2025:15:56:28 +0800] \"GET / HTTP/1.1\" 200 396 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1\"",
          "bytes" => "396",
           "verb" => "GET",
        "request" => "/",
           "auth" => "-",
    "httpversion" => "1.1",
       "response" => "200",
          "ident" => "-",
       "clientip" => "10.0.0.1",
     "@timestamp" => 2025-03-13T07:56:28.801Z,
      "timestamp" => "13/Mar/2025:15:56:28 +0800"
}
  • 4.Logstash采集nginx日志之useragent案例
[root@elk93 ~]# cat /etc/logstash/conf.d/04-nginx-useragent-stdout.conf
input { 
  file { 
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
  } 
} 


filter {
  grok {
    match => { "message" => "%{HTTPD_COMMONLOG}" }
  }

  # 用于提取用户的设备信息
  useragent {
    # 指定从哪个字段解析用户设备信息
    source => "message"
    # 将解析的结果存储到某个特定字段,若不指定,则默认放在顶级字段。
    target => "custom_user_agent"
  }


  mutate {
    remove_field => [ "@version","host","path" ]
  }
}

output { 
  stdout { 
    codec => rubydebug 
  } 
}
[root@elk93 ~]# 
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/04-nginx-useragent-stdout.conf
...
{
                 "ident" => "-",
           "httpversion" => "1.1",
               "message" => "10.0.0.1 - - [13/Mar/2025:16:09:22 +0800] \"GET / HTTP/1.1\" 200 396 \"-\" \"Mozilla/5.0 (Linux; Android 8.0.0; SM-G955U Build/R16NW) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Mobile Safari/537.36\"",
               "request" => "/",
             "timestamp" => "13/Mar/2025:16:09:22 +0800",
                  "verb" => "GET",
            "@timestamp" => 2025-03-13T08:09:23.476Z,
              "response" => "200",
                  "auth" => "-",
                 "bytes" => "396",
              "clientip" => "10.0.0.1",
    "linux95_user_agent" => {
          "os_patch" => "0",
              "name" => "Chrome Mobile",
           "version" => "134.0.0.0",
          "os_minor" => "0",
             "major" => "134",
          "os_major" => "8",
            "device" => "Samsung SM-G955U",
        "os_version" => "8.0.0",
                "os" => "Android",
             "minor" => "0",
           "os_name" => "Android",
             "patch" => "0",
           "os_full" => "Android 8.0.0"
    }
}
  • 5.Logstash采集nginx日志之geoip案例
[root@elk93 ~]# cat /etc/logstash/conf.d/05-nginx-geoip-stdout.conf
input { 
  file { 
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
  } 
} 


filter {
  grok {
    match => { "message" => "%{HTTPD_COMMONLOG}" }
  }

  useragent {
    source => "message"
    target => "custom_user_agent"
  }

  # 基于公网IP地址分析你的经纬度坐标点
  geoip {
    # 指定要分析的公网IP地址的字段
    source => "clientip"
  }

  mutate {
    remove_field => [ "@version","host","path" ]
  }
}

output { 
  stdout { 
    codec => rubydebug 
  } 
}
[root@elk93 ~]# 
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/05-nginx-geoip-stdout.conf
...
{
               "message" => "221.218.213.9 - - [13/Mar/2025:16:15:25 +0800] \"GET / HTTP/1.1\" 200 75161927680 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1\"",
                  "verb" => "GET",
                  "auth" => "-",
           "httpversion" => "1.1",
            "@timestamp" => 2025-03-13T08:19:29.582Z,
              "clientip" => "221.218.213.9",
              "response" => "200",
                 "bytes" => "75161927680",
    "linux95_user_agent" => {
                "os" => "iOS",
              "name" => "Mobile Safari",
           "version" => "16.6",
           "os_name" => "iOS",
        "os_version" => "16.6",
           "os_full" => "iOS 16.6",
             "minor" => "6",
          "os_major" => "16",
            "device" => "iPhone",
             "major" => "16",
          "os_minor" => "6"
    },
                 "geoip" => {
          "country_name" => "China",
              "timezone" => "Asia/Shanghai",
         "country_code2" => "CN",
             "city_name" => "Beijing",
           "region_code" => "BJ",
             "longitude" => 116.395,
              "latitude" => 39.911,
         "country_code3" => "CN",
           "region_name" => "Beijing",
              "location" => {
            "lat" => 39.911,
            "lon" => 116.395
        },
        "continent_code" => "AS",
                    "ip" => "221.218.213.9"
    },
               "request" => "/",
             "timestamp" => "13/Mar/2025:16:15:25 +0800",
                 "ident" => "-"
}
  • 6.Logstash采集nginx日志之date案例
[root@elk93 ~]# cat /etc/logstash/conf.d/06-nginx-date-stdout.conf
input { 
  file { 
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
  } 
} 


filter {
  grok {
    match => { "message" => "%{HTTPD_COMMONLOG}" }
  }

  useragent {
    source => "message"
    target => "linux95_user_agent"
  }

  geoip {
    source => "clientip"
  }

  # 转换日期字段
  date {
    # 匹配日期字段,将其转换为日期格式,将来存储到ES,基于官方的示例对号入座对应的格式即可。
    # https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-date.html#plugins-filters-date-match
    # "timestamp" => "23/Oct/2024:16:25:25 +0800"
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    # 将match匹配的日期修改后的值直接覆盖到指定字段,若不定义,则默认覆盖"@timestamp"。
    target => "custom-timestamp"
  }

  mutate {
    remove_field => [ "@version","host","path" ]
  }
}

output { 
  stdout { 
    codec => rubydebug 
  } 
}
[root@elk93 ~]# 
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/06-nginx-date-stdout.conf
...
{
                "message" => "218.213.9 - - [23/Oct/2024:16:25:25 +0800] \"GET / HTTP/1.1\" 200 75161927680 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1\"",
            "httpversion" => "1.1",
     "linux95_user_agent" => {
          "os_major" => "16",
            "device" => "iPhone",
          "os_minor" => "6",
              "name" => "Mobile Safari",
           "version" => "16.6",
             "minor" => "6",
           "os_name" => "iOS",
        "os_version" => "16.6",
                "os" => "iOS",
             "major" => "16",
           "os_full" => "iOS 16.6"
    },
                  "bytes" => "75161927680",
    "oldboyedu-timestamp" => 2024-10-23T08:25:25.000Z,
                  "ident" => "-",
                "request" => "/",
             "@timestamp" => 2025-03-13T08:37:08.622Z,
               "response" => "200",
               "clientip" => "218.213.9",
                   "verb" => "GET",
                   "auth" => "-",
                  "geoip" => {
             "longitude" => 114.1657,
              "location" => {
            "lon" => 114.1657,
            "lat" => 22.2578
        },
          "country_name" => "Hong Kong",
              "timezone" => "Asia/Hong_Kong",
         "country_code3" => "HK",
              "latitude" => 22.2578,
         "country_code2" => "HK",
                    "ip" => "218.213.0.9",
        "continent_code" => "AS"
    },
              "timestamp" => "23/Oct/2024:16:25:25 +0800"
}
  • 7.Logstash采集nginx日志之mutate案例
[root@elk93 ~]# cat /etc/logstash/conf.d/07-nginx-mutate-stdout.conf
input { 
  file { 
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
  } 
} 


filter {
  grok {
    match => { "message" => "%{HTTPD_COMMONLOG}" }
  }

  useragent {
    source => "message"
    target => "custom_user_agent"
  }

  geoip {
    source => "clientip"
  }

  date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "custom-timestamp"
  }
 
  # 对指定字段进行转换处理
  mutate {
    # 将指定字段转换成我们需要转换的类型
    convert => {
      "bytes" => "integer"
    }
    
    remove_field => [ "@version","host","message" ]
  }
}

output { 
  stdout { 
    codec => rubydebug 
  } 
}
[root@elk93 ~]# 
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/07-nginx-mutate-stdout.conf
...
{
               "clientip" => "50.218.213.9",
                  "bytes" => 75161927680,
                  "geoip" => {
        "continent_code" => "NA",
              "latitude" => 38.1228,
              "timezone" => "America/Los_Angeles",
          "country_name" => "United States",
             "longitude" => -121.2543,
              "dma_code" => 862,
           "region_code" => "CA",
                    "ip" => "50.218.213.9",
              "location" => {
            "lon" => -121.2543,
            "lat" => 38.1228
        },
           "postal_code" => "95240",
           "region_name" => "California",
             "city_name" => "Lodi",
         "country_code3" => "US",
         "country_code2" => "US"
    },
     "linux95_user_agent" => {
            "device" => "iPhone",
          "os_major" => "16",
             "major" => "16",
              "name" => "Mobile Safari",
           "version" => "16.6",
           "os_name" => "iOS",
                "os" => "iOS",
           "os_full" => "iOS 16.6",
        "os_version" => "16.6",
             "minor" => "6",
          "os_minor" => "6"
    },
               "response" => "200",
                  "ident" => "-",
              "timestamp" => "23/Oct/2024:16:25:25 +0800",
             "@timestamp" => 2025-03-13T08:43:38.846Z,
                   "verb" => "GET",
                   "auth" => "-",
                "request" => "/",
    "oldboyedu-timestamp" => 2024-10-23T08:25:25.000Z,
                   "path" => "/var/log/nginx/access.log",
            "httpversion" => "1.1"
}

Logstash采集nginx日志到ES集群并出图展示

  • 1.编写配置文件
[root@elk93 ~]# cat /etc/logstash/conf.d/08-nginx-to-es.conf
input { 
  file { 
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
  } 
} 


filter {
  grok {
    match => { "message" => "%{HTTPD_COMMONLOG}" }
  }

  useragent {
    source => "message"
    target => "custom_user_agent"
  }

  geoip {
    source => "clientip"
  }

  date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "custom-timestamp"
  }
 
  # 对指定字段进行转换处理
  mutate {
    # 将指定字段转换成我们需要转换的类型
    convert => {
      "bytes" => "integer"
    }
    
    remove_field => [ "@version","host","message" ]
  }
}

output { 
  stdout { 
    codec => rubydebug 
  }

  elasticsearch {
      # 对应的ES集群主机列表
      hosts => ["10.0.0.91:9200","10.0.0.92:9200","10.0.0.93:9200"]
      # 对应的ES集群的索引名称
      index => "oldboyedu-linux96-elk-nginx"
  }
}
[root@elk93 ~]# 
  • 2.启动Logstash实例
[root@elk93 ~]# rm -f /usr/share/logstash/data/plugins/inputs/file/.sincedb*
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/08-nginx-to-es.conf
  • 3.kibana查看数据
存在的问题:
	Failed (timed out waiting for connection to open). Sleeping for 0.02

问题描述:
	此问题在 ElasticStack 7.17.28版本中,可能会出现Logstash无法写入ES的情况。
	
TODO:
	需要调研官方是否做了改动,导致无法写入成功,需要额外的参数配置。

临时解决方案:
	- 回退版本到7.17.23版本。

ELK架构进阶(Elastic->logsearch-kibana)

单独部署geoip

参考链接:
https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html#plugins-filters-geoip-manage_update

  • 1.下载软件包
[root@elk92 ~]# wget http://192.168.16.253/Resources/ElasticStack/softwares/7.17.28/geoip/GeoLite2-ASN_20250313.tar.gz
https://github.com/logstash-plugins/logstash-filter-geoip.git

[root@elk92 ~]# wget http://192.168.16.253/Resources/ElasticStack/softwares/7.17.28/geoip/GeoLite2-City_20250311.tar.gz
  • 2.解压geoip数据库
[root@elk92 ~]# mkdir -pv /lgshgp/geoip
[root@elk92 ~]# tar xf GeoLite2-ASN_20250313.tar.gz -C /lgshgp/geoip
[root@elk92 ~]# 
[root@elk92 ~]# tar xf GeoLite2-City_20250311.tar.gz -C /lgshgp/geoip
[root@elk92 ~]# 
[root@elk92 ~]# tree /lgshgp/geoip
/lgshgp/geoip/
├── GeoLite2-ASN_20250313
│   ├── COPYRIGHT.txt
│   ├── GeoLite2-ASN.mmdb
│   └── LICENSE.txt
└── GeoLite2-City_20250311
    ├── COPYRIGHT.txt
    ├── GeoLite2-City.mmdb
    ├── LICENSE.txt
    └── README.txt

2 directories, 7 files
[root@elk92 ~]# 
  • 3.创建overview.json文件
[root@elk92 ~]# /usr/share/elasticsearch/bin/elasticsearch-geoip -s /lgshgp/geoip/
overview.json created
[root@elk92 ~]# 
[root@elk92 ~]# tree /lgshgp/geoip/
/lgshgp/geoip/
├── GeoLite2-ASN_20250313
│   ├── COPYRIGHT.txt
│   ├── GeoLite2-ASN.mmdb
│   └── LICENSE.txt
├── GeoLite2-City_20250311
│   ├── COPYRIGHT.txt
│   ├── GeoLite2-City.mmdb
│   ├── LICENSE.txt
│   └── README.txt
└── overview.json

2 directories, 8 files
[root@elk92 ~]# 
  • 4.nginx准备站点目录
[root@elk92 ~]# cat /etc/nginx/nginx.conf 
worker_processes  4;

events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;

    sendfile        on;

    keepalive_timeout  65;

    charset utf-8;
	
    server {
        listen       80;
        server_name  localhost;
        
		charset utf-8;  

        location / {
            root  /lgshgp/geoip/ ;
            autoindex on;
			autoindex_exact_size on;
			autoindex_localtime on;
        }

        error_page  404              /404.html;
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }

}

[root@elk92 ~]# 
[root@elk92 ~]# rm -rf /usr/share/nginx/html/*
[root@elk92 ~]# 
[root@elk92 ~]# cp -r /lgshgp/geoip/ /usr/share/nginx/html/
[root@elk92 ~]# 
  • 5.测试访问nginx的站点目录
[root@elk91 ~]# curl http://10.0.0.92/
<html>
<head><title>Index of /</title></head>
<body>
<h1>Index of /</h1><hr><pre><a href="../">../</a>
<a href="GeoLite2-ASN_20250313/">GeoLite2-ASN_20250313/</a>                             13-Mar-2025 16:15                   -
<a href="GeoLite2-City_20250311/">GeoLite2-City_20250311/</a>                            11-Mar-2025 17:20                   -
<a href="overview.json">overview.json</a>                                      14-Mar-2025 09:30                   2
</pre><hr></body>
</html>
[root@elk91 ~]# 
  • 6.修改”logstash.yml”文件
[root@elk93 ~]# vim /etc/logstash/logstash.yml 
...
xpack.geoip.download.endpoint=http://10.0.0.92/overview.json
  • 7.测试验证
依旧报错: "Failed (Connection reset). Sleeping for 0.04"
  • 经过安装官网教程单独部署本地geoip,依旧无法解决,最终不得不注释geoip来解决此问题。
[root@elk93 logstash]# cat 08-nginx-to-es.conf
input { 
  file { 
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
  } 
} 


filter {
  grok {
    match => { "message" => "%{HTTPD_COMMONLOG}" }
  }

  useragent {
    source => "message"
    target => "custom_user_agent"
  }

 # geoip {
 #   source => "clientip"
 # }

  date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "custom-timestamp"
  }
 
  mutate {
    convert => {
      "bytes" => "integer"
    }
    
    remove_field => [ "@version","host","message" ]
  }
}

output { 
  stdout { 
    codec => rubydebug 
  }

  elasticsearch {
     index => "custom-xixi"
     hosts => ["http://10.0.0.91:9200"]
  }

}
[root@elk93 logstash]# 
[root@elk93 logstash]# logstash -rf 08-nginx-to-es.conf

ELFK架构梳理之json插件案例

  • 1.启动Logstash
[root@elk93 logstash]# cat 09-beats-to-es.conf
input { 
  # 表示从beats组件接收数据,比如从Filebeat.
  beats {
    port => 5044
  }
} 


filter {
 
   json {
     source => "message"
   }


  mutate {
    remove_field => [ "@version","host","agent","ecs","tags","input","log" ]
  }
}

output { 
  #stdout { 
  #  codec => rubydebug 
  #}

  elasticsearch {
     index => "custom-logstash-json"
     hosts => ["http://10.0.0.91:9200","http://10.0.0.92:9200","http://10.0.0.93:9200"]
  }
}
[root@elk93 logstash]# 
[root@elk93 logstash]# logstash -rf 09-beats-to-es.conf
  • 2.Filebeat发送数据到Logstash
2.1 启动Filebeat实例
[root@elk91 ~]# cat /etc/filebeat/config/13-filestream-to-logstash.yml
filebeat.inputs:
- type: filestream
  paths:
    - /tmp/students.json
  parsers:
    - multiline:
        type: count
        count_lines: 4

output.logstash:
  hosts: ["10.0.0.93:5044"]
[root@elk91 ~]# 
[root@elk91 ~]# rm -rf /var/lib/filebeat/
[root@elk91 ~]# 
[root@elk91 ~]# filebeat -e -c /etc/filebeat/config/13-filestream-to-logstash.yml


2.2 准备测试数据
[root@elk91 ~]# cat /tmp/students.json 
{
  "name":"邓建森",
  "hobby":["小说","Rap"]
}
{
  "name":"田明雷",
  "hobby":["健身","台球","洗脚"]
}
{
  "name":"李航",
  "hobby":["面膜","手串","游戏"]
}
{
   "name": "胡桁",
   "hobby": ["学习","吃盖饭","按摩"]
}
[root@elk91 ~]# 
  • 3.kibana查看数据

ELFK架构梳理之电商指标分享项目案例ELFK架构梳理之电商指标分享项目案例

  • 1.使用python准备测试数据
[root@elk91 ~]# cat gen-log.py
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author : violet

import datetime
import random
import logging
import time
import sys

LOG_FORMAT = "%(levelname)s %(asctime)s [com.oldboyedu.%(module)s] - %(message)s "
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"

# 配置root的logging.Logger实例的基本配置
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1]
, filemode='a',)
actions = ["浏览页面", "评论商品", "加入收藏", "加入购物车", "提交订单", "使用优惠券", "领取优惠券",
 "搜索", "查看订单", "付款", "清空购物车"]

while True:
    time.sleep(random.randint(1, 5))
    user_id = random.randint(1, 10000)
    # 对生成的浮点数保留2位有效数字.
    price = round(random.uniform(15000, 30000),2)
    action = random.choice(actions)
    svip = random.choice([0,1,2])
    logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id, action,svip,price))
[root@elk91 ~]# 
[root@elk91 ~]# 
[root@elk91 ~]# python3 gen-log.py /tmp/apps.log
  • 2.启动Logstash实例
[root@elk93 logstash]# cat 10-beats_apps-to-es.conf 
input { 
  beats {
    port => 9999
  }
} 


filter {

  mutate {
    # 将message字段按照"|"进行切分
    split => { "message" => "|" }

    # 添加字段
    add_field => { 
      "other" => "%{[message][0]}" 
      "userId" => "%{[message][1]}" 
      "action" => "%{[message][2]}" 
      "svip" => "%{[message][3]}" 
      "price" => "%{[message][4]}" 
    }

  }

  mutate {

    split => { "other" => " " }

    add_field => {
       datetime => "%{[other][1]} %{[other][2]}"
    }
    
    convert => {
       "price" => "float"
     }

    remove_field => [ "@version","host","agent","ecs","tags","input","log","message","other"]
  }


  date {
    # "2025-03-14 11:32:58"
    match => [ "datetime", "yyyy-MM-dd HH:mm:ss" ]
  }
}

output { 
 # stdout { 
 #   codec => rubydebug 
 # }

  elasticsearch {
     index => "custom-logstash-elfk-apps"
     hosts => ["http://10.0.0.91:9200","http://10.0.0.92:9200","http://10.0.0.93:9200"]
  }
}
[root@elk93 logstash]# 
[root@elk93 logstash]# logstash -rf 10-beats_apps-to-es.conf
  • 3.Filebeat采集数据
[root@elk91 ~]# cat /etc/filebeat/config/14-filestream-to-logstash.yml
filebeat.inputs:
- type: filestream
  paths:
    - /tmp/apps.log

output.logstash:
  hosts: ["10.0.0.93:9999"]

[root@elk91 ~]# 
[root@elk91 ~]# rm -rf /var/lib/filebeat/
[root@elk91 ~]# 
[root@elk91 ~]# filebeat -e -c /etc/filebeat/config/14-filestream-to-logstash.yml
  • 4.kibana出图展示

elk架构之索引模板,数据映射及地图展示案例

  • 1.查看Logstash本地默认的geoip插件
[root@elk93 logstash]# tree /usr/share/logstash/data/plugins/filters/geoip/
/usr/share/logstash/data/plugins/filters/geoip/
├── 1741858169
├── 1741858251
├── 1741858548
├── 1741858648
├── 1741858823
│   ├── COPYRIGHT.txt
│   ├── elastic-geoip-database-service-agreement-LICENSE.txt
│   ├── GeoLite2-ASN.mmdb
│   ├── GeoLite2-ASN.tgz
│   └── LICENSE.txt
├── 1741859410
│   ├── COPYRIGHT.txt
│   ├── elastic-geoip-database-service-agreement-LICENSE.txt
│   ├── GeoLite2-ASN.mmdb
│   ├── GeoLite2-ASN.tgz
│   └── LICENSE.txt
├── 1741916677
├── 1741916965
├── CC
│   ├── GeoLite2-ASN.mmdb
│   └── GeoLite2-City.mmdb
└── metadata.csv

9 directories, 13 files
[root@elk93 logstash]# 
  • 2.在kibana配置索引模板
需要将"geoip.location"字段数据类型映射为"地理位置坐标点(geo_point)"
  • 3.编写Logstash实例
[root@elk93 logstash]# cat 08-nginx-to-es.conf 
input { 
  file { 
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
  } 
} 


filter {
  grok {
    match => { "message" => "%{HTTPD_COMMONLOG}" }
  }

  useragent {
    source => "message"
    target => "custom_user_agent"
  }

  geoip {
    source => "clientip"
    # 7.17.28版本中需要指定本地数据库文件进行解析,否则可能会长达8-10min的时间才能够进行解析。
    # 指定要解析的本地数据库文件路径,理论上官网说了默认就是"GeoLite2-City",但是不配置的确有延迟!
    database => "/usr/share/logstash/data/plugins/filters/geoip/CC/GeoLite2-City.mmdb"
    # database => "/usr/share/logstash/data/plugins/filters/geoip/CC/GeoLite2-ASN.mmdb"
    # 若不不配置此类型,则默认值为"City",当"database"指向的是"GeoLite2-City.mmdb"时,无需配置此项。
    # default_database_type => "ASN"
  }

  date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "oldboyedu-timestamp"
  }
 
  mutate {
    convert => {
      "bytes" => "integer"
    }
    
    remove_field => [ "@version","host","message" ]
  }
}

output { 
  stdout { 
    codec => rubydebug 
  }

  elasticsearch {
     # index => "custom-geoip-001"
     # index => "custom-geoip-002"
     index => "custom-geoip-003"
     hosts => ["http://10.0.0.91:9200","http://10.0.0.92:9200","http://10.0.0.93:9200"]
  }

}
[root@elk93 logstash]# 
[root@elk93 logstash]# rm -f /usr/share/logstash/data/plugins/inputs/file/.sincedb*
[root@elk93 logstash]# 
[root@elk93 logstash]# logstash -rf 08-nginx-to-es.conf 

logstash的多分支语句

  • 1.实战案例
[root@elk93 logstash]# cat 11-multiple-input-to-es.conf 
input { 
  beats {
    port => 9999
    type => "linux96-filebeat"
  }

  file { 
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
    type => "linux96-file"
  } 

  tcp {
    port => 8888
    type => "linux96-tcp"
  }

} 


filter {
   if [type] == "linux96-tcp" {
     mutate {
       add_field => {
          school => "qinghua"
          class => "qinghua5"
       }
       remove_field => [ "@version","port"]
     }
   } else if [type] == "custom-filebeat" {
      mutate {
        split => { "message" => "|" }

        add_field => { 
          "other" => "%{[message][0]}" 
          "userId" => "%{[message][1]}" 
          "action" => "%{[message][2]}" 
          "svip" => "%{[message][3]}" 
          "price" => "%{[message][4]}" 
          "address" => "上海"
        }

      }

      mutate {

        split => { "other" => " " }

        add_field => {
           datetime => "%{[other][1]} %{[other][2]}"
        }
        
        convert => {
           "price" => "float"
         }

        remove_field => [ "@version","host","agent","ecs","tags","input","log","message","other"]
      }

      date {
        match => [ "datetime", "yyyy-MM-dd HH:mm:ss" ]
      }
  } else {
      grok {
        match => { "message" => "%{HTTPD_COMMONLOG}" }
      }
    
      useragent {
        source => "message"
        target => "linux95_user_agent"
      }
    
      geoip {
        source => "clientip"
        database => "/usr/share/logstash/data/plugins/filters/geoip/CC/GeoLite2-City.mmdb"
      }
    
      date {
        match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
        target => "custom-timestamp"
      }
     
      mutate {
        convert => {
          "bytes" => "integer"
        }
        
        add_field => {
           office => "https://www.baidu.com"
        }

        remove_field => [ "@version","host","message" ]
      }

  }
}

output { 
 # stdout { 
 #   codec => rubydebug 
 # }

  if [type] == "custom-filebeat" {
      elasticsearch {
         index => "custom-logstash-if-filebeat"
         hosts => ["http://10.0.0.91:9200","http://10.0.0.92:9200","http://10.0.0.93:9200"]
      }
  } else if [type] == "custom-tcp" {
      elasticsearch {
         index => "custom-logstash-if-tcp"
         hosts => ["http://10.0.0.91:9200","http://10.0.0.92:9200","http://10.0.0.93:9200"]
      }

  }else {
      elasticsearch {
         index => "custom-logstash-if-file"
         hosts => ["http://10.0.0.91:9200","http://10.0.0.92:9200","http://10.0.0.93:9200"]
      }
  }
}
[root@elk93 logstash]# 
[root@elk93 logstash]# logstash -rf  11-multiple-input-to-es.conf 
  • 2.kibana查看数据验证

Logstash的pipeline

  • 1.修改pipeline文件
[root@elk93 logstash]# vim /etc/logstash/pipelines.yml 
...
- pipeline.id: xixi
  path.config: "/etc/logstash/conf.d/02-file-to-stdout.conf"
- pipeline.id: haha
  path.config: "/etc/logstash/conf.d/03-nginx-grok-stdout.conf"
[root@elk93 logstash]#
  • 2.创建符号链接
[root@elk93 logstash]# mkdir /usr/share/logstash/config
[root@elk93 logstash]# 
[root@elk93 logstash]# ln -svf /etc/logstash/pipelines.yml /usr/share/logstash/config/
'/usr/share/logstash/config/pipelines.yml' -> '/etc/logstash/pipelines.yml'
[root@elk93 logstash]# 
  • 3.热加载pipeline
[root@elk93 logstash]# logstash -r
...
[INFO ] 2025-03-14 15:52:34.210 [[xixi]-pipeline-manager] javapipeline - Pipeline terminated {"pipeline.id"=>"xixi"}
[INFO ] 2025-03-14 15:52:34.260 [[haha]-pipeline-manager] javapipeline - Pipeline terminated {"pipeline.id"=>"haha"}