logstash对ip字段增强使用python脚本

  • 15
  • 2023年2月11日19:13:26
  • 阅读模式

使用Logstash处理Kafka中的JSON数据,并使用Python脚本对IP字段进行校验,新增一个check字段表示校验是否成功的步骤如下:

安装Logstash的Python插件:

bin/logstash-plugin install logstash-filter-python
创建Python脚本文件,用于校验IP地址,并设置check字段:


import socket
import struct

def validate_ip(ip_address):
    try:
        socket.inet_aton(ip_address)
        return True
    except socket.error:
        return False

def process(event):
    ip_address = event.get("ip")
    if ip_address is not None:
        if validate_ip(ip_address):
            event["check"] = True
        else:
            event["check"] = False
    return event

创建Logstash配置文件,并配置Kafka输入插件:

input {
    kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["json_data"]
        codec => "json"
    }
}

在Logstash配置文件中配置Python脚本插件:

filter {
    python {
        script_path => "validate_ip.py"
        function => "process"
    }
}

配置Elasticsearch输出插件:

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "logstash_json_data"
    }
}

启动Logstash:

bin/logstash -f logstash.conf

这样,Logstash就可以从Kafka中读取JSON数据,使用Python脚本对IP字段进行校验,并新增check字段表示校验是否成功,最后将处理后的数据写入Elasticsearch。

本文来自凡蜕博客(https://blog.ysboke.cn), 转载请带上地址.。
匿名

发表评论

匿名网友