Matthew Note

Splunk add-on 开发

Splunk简介

Splunk是一个广泛用于信息分析的一个软件,功能非常强大,并且提供了非常好的二次开发体验,总的来说Splunk支持两种二次开发,一种是关于统计分析内容的,splunk提供多种多样的数据可视化的功能,开发人员只要简单的编写符合splunk标准的xml文件或者Django的文件就可以轻松实现多种多样的数据分析和显示;另外一种是关于数据导入的,虽然splunk提供了众多的导入方式,但是并不是所有的都能覆盖到,所以用户可以根据自己的数据结构来开发plugin,来导入特定的信息类型,比如IPFIX。本文就简述如何开发一个splunk plugin。

Splunk SDK

Splunk 提供了多种语言的SDK,本文我们以Python为例,猛击这里下载Python SDK

建立自己的splunk plugin

Splunk SDK 提供三种主要的功能

  1. 设置plugin需要接受的参数, 这部分参数会在你配置你的plugin的时候在splunk页面上显示出来
  2. 验证参数, 确保你设置的参数有效.
  3. 发送数据流.

下面是一个plugin的框架, 你必须继承于splunk提供的Script类, 并且实现三个函数: get_scheme, validate_input, stream_events

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import sys
from splunklib.modularinput import *
class MyScript(Script):
def get_scheme(self):
# Returns scheme.
def validate_input(self, validation_definition):
# Validates input.
def stream_events(self, inputs, ew):
# Splunk Enterprise calls the modular input,
# streams XML describing the inputs to stdin,
# and waits for XML on stdout describing events.
if __name__ == "__main__":
sys.exit(MyScript().run(sys.argv))

好, 这就是最基本的plugin的框架, 下面我们来具体实现它, 我们将建立一个plugin来监听UDP端口, 来转发到Splunk server, 你可能要说Splunk不是已经提供了一个UDP类型的输入么, 当然, 我们这里只是以此为例, 因为很多UDP的数据都是自定义的, 所以Splunk自带的UDP未必能够满足你的需求, 所以还是可能需要自己开发.

UDP receiver例子

我把各个部分的解释放到了注释中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
class UDPreceiver(Script):
def get_scheme(self):
"""
这里我们声明了 Scheme, 叫做UDP receiver. 然后use_external_validation = true 表示我们要用自己写的函数来验证这个receiver的参数
use_single_instance 表示如果只想启用一个plugin的实例, 则设为true, 不然设置为false.
接下来Argument("port"), 声明了一个输入参数叫做port
required_on_create = True 设置这个参数是必须的。
"""
scheme = Scheme("UDP receiver")
scheme.description = "UDP receiver"
scheme.use_external_validation = True
#if con-current start the input modular set it False
scheme.use_single_instance = False
port_argument = Argument("port")
port_argument.data_type = Argument.data_type_number
port_argument.description = "Listen Port"
port_argument.required_on_create = True
scheme.add_argument(port_argument)
return scheme
def validate_input(self, validation_definition):
"""
这里我们要验证我们的参数。
通过"port"这个key我们找到要验证的参数, 如果没有则调用raise ValueError("Port is not a integer"),
这样会在splunk web上弹出错误提醒.
接下来我们在验证端口是否在范围内, 是否被占用.
"""
try:
port = int(validation_definition.parameters["port"])
except Exception as e:
raise ValueError("Port is not a integer")
if(port<0 or port>65535):
raise ValueError("Port range exceeded")
try:
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sk.settimeout(2)
sk.bind(('', port))
sk.close()
except socket.error, e:
sk.close()
if e.errno == 98 or e.errno == 13:
raise ValueError("This port is already in use")
def stream_events(self, inputs, ew):
"""
这里就是我们建立一个UDP socket监听端口的地方
"""
host =''
port =0
for input_name, input_item in inputs.inputs.iteritems():
port = int(input_item["port"])
ew.log(EventWriter.INFO,"IPFIX converter init")
#checkForRunningProcess(port)
#writePidFile(port)
#ew.log(EventWriter.INFO,"Start to listen port "+str(port))
#ew.log(EventWriter.INFO,port)
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host, port))
except Exception as e:
ew.log(EventWriter.ERROR,str(sys._getframe().f_code.co_name)+":Failed When Establish Connection:"+str(e))
while True:
try:
message, address = s.recvfrom(16384)
except Exception as e:
ew.log(EventWriter.ERROR,str(sys._getframe().f_code.co_name)+"="+str(e))
"""
Event() 是Splunk一条数据的一个对象, 里面可以设置多种信息, 包括data, stanza, time, host等等
最主要的还是data信息
最后调用ew.write_event()来发送这条数据
"""
try:
event1 = Event()
event1.data = str(message)
event1.stanza = str(address)
event1.time = time.strftime("%Y-%m-%d %H:%M:%S")
event1.host = str(address[0])
event1.sourcetype = "UDP packets"
#event1.index=None
#event1.done=None
#event1.unbroken=None
ew.write_event(event1)
except Exception as e:
f = sys.exc_info()[2].tb_frame.f_back
ew.log(EventWriter.ERROR,str(f.f_code.co_name)+" at line "+str(f.f_lineno)+", failure reason:"+ str(e))
s.close()
if __name__ == "__main__":
sys.exit(UDPreceiver().run(sys.argv))

stream_events方法中包含两个参数,一个是input,一个是ew, 从input中可以获取我们设置的参数,ew是eventWriter,是splunk sdk内置的一个类,用来发送数据,同时也具备log功能。

添加log

Splunk具有log的功能,在sdk也提供了写入系统log的功能,我们可以通过这样的语句来发送一个log ew.log(EventWriter.ERROR,"OUTPUT STRING:"+str(outstr)) 除了ERROR我们还有几种log level

  • ERROR
  • WARNING
  • INFO
  • DEBUG

现在的DEBUG level的log似乎会有问题,不知道原因,所以慎用。log最终被写入splunkd.log文件中,实际上splunk sdk的log是对标准输出和标准错误输出进行了重新定向。

生成安装包

在生成之前先建立自己的目录结构

1
2
3
4
5
6
7
8
9
10
$SPLUNK_HOME/etc/apps/<app_name>/
bin/
app_name.py
splunklib/
__init__.py
...
default/
app.conf
README/
inputs.conf.spec

需要把splunklib目录拷贝到$SPLUNK_HOME/etc/apps/<app_name>/bin, 然后根据需要建立app.conf和inputs.conf.spec.

Splunk的安装包是spl格式,但实际上就是个tar.gz格式,所以可以直接打包目录:

1
2
3
4
cd $SPLUNK_HOME/etc/apps
tar cv MyApp/ > MyApp.tar
gzip MyApp.tar
mv MyApp.tar.gz MyApp.spl

之后我们就可以在splunk上直接安装啦。

参考文档

  1. Package your app or add-on
  2. How to create modular inputs

联系方式

Matthew Gao, matthewgao[at]gmail.com