Logstash là gì? Cài đặt sử dụng và làm chủ Logstash trong 1 bài viết

Logstash là một công cụ mạnh mẽ để tập trung và phân tích nhật ký, có thể giúp cung cấp và tổng quan về môi trường của bạn cũng như xác định các vấn đề với máy chủ của bạn.

Một cách để tăng hiệu quả thiết lập ELK Stack (Elasticsearch, LogstashKibana) của bạn là thu thập nhật ký ứng dụng quan trọng và cấu trúc dữ liệu nhật ký bằng cách sử dụng bộ lọc, để dữ liệu có thể dễ dàng được phân tích và có thể truy vấn.

ELK là một ứng dụng được tạo lên bằng cách kết hợp các thành phần xử lý khác nhau:

  • Elasticsearch: lưu trữ và đánh chỉ mục dữ liệu
  • Logstash: Tập trung và phân tích Log
  • Kibana: Hiển thị truy vấn

Chúng sẽ xây dựng bộ lọc xung quanh các mẫu "dò tìm", sẽ phân tích dữ liệu trong Log thành các bit thông tin hữu ích.

Điểm mạnh của ELK là nó có thể cung cấp kết quả tìm kiếm theo thời gian thực với một lượng dữ liệu cực lớn.

Để hiểu về ELK cần hiểu về cách làm việc của từng thành phần, tôi sẽ tìm hiểu từng thành phần của ứng dụng này.

Tổng quan

Dữ liệu được các Beats (shipper) thu thập và gửi về cho Logstash; Logstash tiếp nhận và phân tích dữ liệu. Sau đó dữ liệu được gửi vào Elasticsearch; Elasticsearch nhận dữ liệu từ Logstash và lưu trữ, đánh chỉ mục; Kibana sử dụng các dữ liệu trong Elasticsearch để hiển thị và phân tích cú pháp tìm kiếm mà người dùng nhập vào để gửi cho Elasticsearch tìm kiếm.

Beats --> Logstash --> Elasticsearch <--> Kibana

Prerequisites

Để làm theo hướng dẫn này, bạn phải có máy chủ Logstash đang hoạt động đang nhận nhật ký từ người gửi hàng, chẳng hạn như Filebeat.

ELK Server Assumptions

  1. Logstash được cài đặt trong /opt/logstash
  2. Các tệp cấu hình Logstash của bạn nằm trong /etc/logstash/conf.d
  3. Bạn có một tệp đầu vào có tên 02-beat-input.conf
  4. Bạn có một tệp đầu ra có tên là 30-asticsearch-output.conf

Bạn có thể cần tạo thư mục mẫu bằng cách chạy lệnh này trên Máy chủ Logstash của mình:

sudo mkdir -p /opt/logstash/patterns
sudo chown logstash: /opt/logstash/patterns

Cấu hình

Kiểm tra version logstash:

/opt/logstash/bin/logstash --version

# ==> Kết quả
# logstash 2.3.4

Logstash có nhiều bộ lọc khác nhau, với logs gửi từ filebeat ta sẽ sử dụng grok để phân tích log

Có 2 phần khai báo. type được khai báo trước, nó nằm ở:

/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns

đối với phiên bản logstash

Sau đây là một số bộ lọc được sử dụng tại logstash, các client cần được khai báo đúng type để logstash chọn đúng pattern

Configuration Logstash example:

filter {
  if [type] == "linuxlog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
    }
  }
  if [type] == "windowslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}?: %{POSINT:syslog_pid}?: %{GREEDYDATA:syslog_message}" }
    }
  }
}

Define config logstash http

filter {
	if [type] == "http-access" {
		grok {
			match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} %{USER:LoadTime} [%{HTTPDATE:timestamphttp}] (?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest}) %{NUMBER:response} (?:%{NUMBER:bytes}|-)" }
		}
		date {
			match => [ "timestamphttp", "dd/MMM/yyyy:HH:mm:ss Z" ]
		}
	}
}

filter {
	if [type] == "http-error" {
		grok {
			match => { "message" => "[%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}] [%{WORD:severity}] [client %{IP:clientip}] %{GREEDYDATA:message}" }
		}
	}
}

Check nginx not access, 502 bad gateway:

setsebool -P httpd_can_network_connect 1

Config logstash cho network cisco

filter {
        # Extract fields from the each of the detailed message types
        # The patterns provided below are included in core of LogStash 1.4.2.
        grok {
                match => [
                        "message", "%{CISCOFW106001}",
                        "message", "%{CISCOFW106006_106007_106010}",
                        "message", "%{CISCOFW106014}",
                        "message", "%{CISCOFW106015}",
                        "message", "%{CISCOFW106021}",
                        "message", "%{CISCOFW106023}",
                        "message", "%{CISCOFW106100}",
                        "message", "%{CISCOFW110002}",
                        "message", "%{CISCOFW302010}",
                        "message", "%{CISCOFW302013_302014_302015_302016}",
                        "message", "%{CISCOFW302020_302021}",
                        "message", "%{CISCOFW305011}",
                        "message", "%{CISCOFW313001_313004_313008}",
                        "message", "%{CISCOFW313005}",
                        "message", "%{CISCOFW402117}",
                        "message", "%{CISCOFW402119}",
                        "message", "%{CISCOFW419001}",
                        "message", "%{CISCOFW419002}",
                        "message", "%{CISCOFW500004}",
                        "message", "%{CISCOFW602303_602304}",
                        "message", "%{CISCOFW710001_710002_710003_710005_710006}",
                        "message", "%{CISCOFW713172}",
                        "message", "%{CISCOFW733100}"
                ]
        }
        # Parse the syslog severity and facility
        syslog_pri { }

# Do a DNS lookup for the sending host
# Otherwise host field will contain an
# IP address instead of a hostname
dns {
    reverse => [ "host" ]
    action => "replace"
  }

geoip {
      source => "src_ip"
      target => "geoip"
      database => "/etc/logstash/GeoLiteCity.dat"
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    }
    mutate {
      convert => [ "[geoip][coordinates]", "float"]
    }
    # do GeoIP lookup for the ASN/ISP information.
    geoip {
      database => "/etc/logstash/GeoIPASNum.dat"
      source => "src_ip"
    }
}

Logstash Patterns: Nginx

Các mẫu nhật ký Nginx không có trong các mẫu mặc định của Logstash, vì vậy chúng tôi sẽ thêm các mẫu Nginx theo cách thủ công.

sudo vi /opt/logstash/patterns/nginx

Sau đó chèn các dòng sau:

NGUSERNAME [a-zA-Z\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
NGINXACCESS %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent}

Lưu và thoát. Mẫu NGINXACCESS phân tích cú pháp và gán dữ liệu cho các số nhận dạng khác nhau (e.g. clientipidentauth, etc.).

Logstash Filter: Nginx

sudo vi /etc/logstash/conf.d/11-nginx-filter.conf
filter {
  if [type] == "nginx-access" {
    grok {
      match => { "message" => "%{NGINXACCESS}" }
    }
  }
}

Lưu và thoát. Lưu ý rằng bộ lọc này sẽ cố gắng khớp các thông báo của kiểu truy cập nginx với mẫu NGINXACCESS, được xác định ở trên.

Bây giờ khởi động lại Logstash để tải lại cấu hình:

sudo service logstash restart

Beats

Được viết bằng Golang, chạy trên các client để thu thập dữ liệu. Bạn có thể sử dụng libbeat để viết các beats cho mục đích thu thập của riêng mình

Hiện tại có các Beats được cung cấp sẵn bởi elastic là:

  • Filebeat: đọc file và lưu vị trí cuối cùng, khi có dữ liệu mới sẽ đọc tiếp và gửi
  • Packetbeat: capture gói tin trên các port của client, chuyển tiếp dữ liệu về Logstash
  • Topbeat (metricbeat): Thu thập dữ liệu về tài nguyên hệ thống client và gửi về Logstash
  • Winlobeat: Thu thập dữ liệu trên windows (Chưa thử, nhưng có thể sử dụng nxlog để thu thập)

Logstash

Nhận dữ liệu từ các beats, tiến hành phân tích dữ liệu

Phân tích dữ liệu gửi từ filebeat bằng grok

Grok là một dạng khai báo pattern sử dụng regular expression

Grok đã được khai báo rất nhiều pattern có sẵn, bạn có thể sử dụng ngay.

Nếu bạn có các loại dữ liệu đặc thù thì có thể dựa vào regular expression để khai báo các pattern theo yêu cầu

Các dữ liệu packetbeat, topbeat thì có các template sẵn nên không cần khai báo filter, được logstach gửi thẳng vào Elasticsearch

Dữ liệu được gửi sang cho Elasticsearch để lưu trữ

Elasticsearch

Thực hiện lưu trữ và đánh chỉ mục dữ liệu.

Sử dụng các template để lưu trữ dữ liệu

Có thể cấu hình cluster, shard, replica để tăng tính an toàn, tính sẵn sàng, tăng hiệu năng đánh chỉ mục, tăng hiệu năng tìm kiếm dữ liệu (phần này nâng cao)

Kibana

Hiển thị dữ liệu theo thời gian thực

Hỗ trợ tìm kiếm dữ liệu theo nhiều kiểu

Hiển thị dữ liệu theo nhiều dạng biểu đồ

Dịch vụ seo

Tags