본문 바로가기
ELK

logstash grok rule

by 낭만프로그래머. 2019. 12. 6.

input example

#input elasticsearch

input {
        elasticsearch {
                hosts => "172.1.1.1:9200"
                index => "logstash-*"
                size => 10000
                query => '{"query": { "bool": { "must": [ { "range": { "@timestamp": { "gte": "now-7d", "lte": "now-1d" } } }]}}}'
        #       query => '{"query": { "bool": { "must": [ { "range": { "@timestamp": { "gte": "now-2d", "lte": "now-1d" } } }]}}}'
        }
}

 filter example

# csv filter (IPS log)

filter {
        grok {
                match => {
                        "message" => "%{SYSLOGTIMESTAMP:timestamp} %{IPORHOST:hostname} DefensePro: %{DATESTAMP:timestamp2} %{GREEDYDATA:data}"
                }
        }
        csv {
                source => "data"
                columns => ["severity","radware_id","category","event_name","protocol","source_ip","source_port","destination_ip","destination_port","physical_port","context","policy_name","event_type","packet_count","packet_bandwidth","vlan_tag","rpls_rd","rpls_tag"]
                separator => " "
        }
}

# kv filter
filter {
    kv {
               source => "data"
                value_split => ":"
                field_split => ","
                trim_key => " "
    }
}

#geoip filter
filter {
       geoip {
               add_tag => [ "GeoIP" ]
               database => "/Users/GeoLite2-City.mmdb" ### Change me to location of GeoLiteCity.dat file
               source => "SrcIP"
               target => "GEO_INFO"
       }

}

#cipher filter. In my case, this is working on single worker mode.
#Modify config/logstash.conf 
cipher {
    algorithm => "aes-256-cbc"
    mode => "encrypt"
    iv_random_length => 16
    key => "12345678901234567890123456789012"
    key_size => 32
}

output

#output csv

output {
        csv {
                path => "log.csv"
                fields => ["timestamp","Protocol","SrcIP","DstIP","Client","ReferencedHost"]
        }
}

#output elasticsearch

output {
        elasticsearch {
                hosts => "1.1.1.1:9200"
                index => "apache-access-%{+YYYY-'w'ww}"
        }
}

#output stdout

output {
        stdout {codec => rubydebug}
}

#output using case 
output {
        if "_grokparsefailure" in [tags] {
            stdout {
                            codec => rubydebug
            }
        }

}

 

date filter 

1. date type

    data : "20181017142402"
    date filter 
        match => [ "timestamp", "yyyyMMddHHmmss" ]

    data : "2018-10-17 12:34:56,999" 
    date filter 
        match => [ "timestamp", "yyyy-MM-dd HH:mm:ss,SSS" ]

    data : "Oct  17 09:30:33"
    date filter
        match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss"]

 

ES reindex, migration

#migration index  from gerernal index to weekly index 

input {
        elasticsearch {
                hosts => "1.1.1.1:9200"
                index => "apache-access-2017.*"
        }
}
output {
        elasticsearch {
                hosts => "1.1.1.1:9200"
                index => "apache-access-%{+YYYY-'w'ww}"
        }
}

 

추가 할게 더 많은데.. 나중에 업데이트 하는거로..

'ELK' 카테고리의 다른 글

elasticsearch docker-compose 로 테스트  (0) 2021.02.21