如何在 Go 语言中用 Beats 开发 Logstash 插件

Python019

如何在 Go 语言中用 Beats 开发 Logstash 插件,第1张

配置环境

在OSX系统上很容易安装GO的可执行文件:

brew install go

虽然Java或Ruby (或者任何我知道的语言) 可以在本地文件系统的任何地方使用命令,,但是Go项目必须使用单一专用的地址,,并且在$GOPATH环境变量下可用。

第 1 段(可获 1.54 积分)

R e 7个月前 

创建项目对于Logstash插件,Beats项目可以从模板创建。官方文档的说明十分简单。鉴于Go对文件系统上的位置的严格要求,只需按照以下说明生成一个新的即可使用的Go项目。默认模板代码将在控制台中重复发送带增量计数器的事件:./redditbeat -e -d "*"

2016/12/13 22:55:56.013362 beat.go:267: INFO

  Home path: [/Users/i303869/projects/private/go/src/github.com/nfrankel/redditbeat]

  Config path: [/Users/i303869/projects/private/go/src/github.com/nfrankel/redditbeat]

  Data path: [/Users/i303869/projects/private/go/src/github.com/nfrankel/redditbeat/data]

  Logs path: [/Users/i303869/projects/private/go/src/github.com/nfrankel/redditbeat/logs]

2016/12/13 22:55:56.013390 beat.go:177: INFO Setup Beat: redditbeat Version: 6.0.0-alpha1

2016/12/13 22:55:56.013402 processor.go:43: DBG  Processors: 

2016/12/13 22:55:56.013413 beat.go:183: DBG  Initializing output plugins

2016/12/13 22:55:56.013417 logp.go:219: INFO Metrics logging every 30s

2016/12/13 22:55:56.013518 output.go:167: INFO Loading template enabled. Reading template file:

  /Users/i303869/projects/private/go/src/github.com/nfrankel/redditbeat/redditbeat.template.json

2016/12/13 22:55:56.013888 output.go:178: INFO Loading template enabled for Elasticsearch 2.x. Reading template file:

  /Users/i303869/projects/private/go/src/github.com/nfrankel/redditbeat/redditbeat.template-es2x.json

2016/12/13 22:55:56.014229 client.go:120: INFO Elasticsearch url: http://localhost:9200

2016/12/13 22:55:56.014272 outputs.go:106: INFO Activated elasticsearch as output plugin.

2016/12/13 22:55:56.014279 publish.go:234: DBG  Create output worker

2016/12/13 22:55:56.014312 publish.go:276: DBG  No output is defined to store the topology.

  The server fields might not be filled.

2016/12/13 22:55:56.014326 publish.go:291: INFO Publisher name: LSNM33795267A

2016/12/13 22:55:56.014386 async.go:63: INFO Flush Interval set to: 1s

2016/12/13 22:55:56.014391 async.go:64: INFO Max Bulk Size set to: 50

2016/12/13 22:55:56.014395 async.go:72: DBG  create bulk processing worker (interval=1s, bulk size=50)

2016/12/13 22:55:56.014449 beat.go:207: INFO redditbeat start running.

2016/12/13 22:55:56.014459 redditbeat.go:38: INFO redditbeat is running! Hit CTRL-C to stop it.

2016/12/13 22:55:57.370781 client.go:184: DBG  Publish: {

  "@timestamp": "2016-12-13T22:54:47.252Z",

  "beat": {

    "hostname": "LSNM33795267A",

    "name": "LSNM33795267A",

    "version": "6.0.0-alpha1"

  },

  "counter": 1,

  "type": "redditbeat"

}

第 2 段(可获 0.73 积分)

R e 7个月前 

关于命令行参数:-e记录到标准err,而-d“*”启用所有调试选择器。有关参数的完整列表,请键入./redditbeat --help。编码Go代码位于.go文件中(令人惊讶...)在$ GOPATH / src文件夹的项目子文件夹中。配置类型第一个有趣的文件是config / config.go,它定义了一个结构来声明Beat的可能参数。至于前面的Logstash插件,让我们添加一个subreddit参数,并设置它的默认值:type Config struct {

Period time.Duration `config:"period"`

Subreddit string `config:"subreddit"`

}

var DefaultConfig = Config {

Period: 15 * time.Second,

Subreddit: "elastic",

}

第 3 段(可获 0.89 积分)

R e 7个月前 

Beater TypeBeat本身的代码在beater / redditbean.go中找到。默认模板为Beat和三个函数创建一个struct:Beat构造函数—用来读取配置: func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { ... } 

 Run 函数- 需要覆盖Beat的主要功能: func (bt *Redditbeat) Run(b *beat.Beat) error { ... } 

Stop 函数管理优雅关闭: func (bt *Redditbeat) Stop() { ... } 

 Note 1:在Go中没有明确的接口实现。实现了 interface 中的所有方法,即创建一个隐式继承关系. 出于写文档的目的,这是 Beater 接口:type Beater interface {

Run(b *Beat) error

Stop()

}

第 4 段(可获 0.93 积分)

R e 7个月前 

因此,由于Beat结构实现了Run和Stop,它是一个Beater。Note 2: 在Go中没有类的概念,所以方法不能在一个具体类型上声明。但是,它存在扩展函数的概念:可以添加行为到一个类型(在单个包中)的函数。它需要声明receiver 类型:这是在fun关键字和函数名之间完成的 - 这里是指Redditbeat类型(或者更准确地说,是一个指向Redditbeat类型的指针,但是这里有一个隐式转换)。构造函数和Stop函数可以保持不变,无论什么特性都应该在Run函数中。在这种情况下,功能是调用Reddit REST API并为每个Reddit帖子发送一条消息。

第 5 段(可获 1.59 积分)

R e 7个月前 

最终代码如下所示:func (bt *Redditbeat) Run(b *beat.Beat) error {

bt.client = b.Publisher.Connect()

ticker := time.NewTicker(bt.config.Period)

reddit := "https://www.reddit.com/r/" + bt.config.Subreddit + "/.json"

client := &http.Client {}

for {

select {

case <-bt.done:

return nil

case <-ticker.C:

}

req, reqErr := http.NewRequest("GET", reddit, nil)

req.Header.Add("User-Agent", "Some existing header to bypass 429 HTTP")

if (reqErr != nil) {

panic(reqErr)

}

resp, getErr := client.Do(req)

if (getErr != nil) {

panic(getErr)

}

body, readErr := ioutil.ReadAll(resp.Body)

defer resp.Body.Close()

if (readErr != nil) {

panic(readErr)

}

trimmedBody := body[len(prefix):len(body) - len(suffix)]

messages := strings.Split(string(trimmedBody), separator)

for i := 0 i < len(messages) i ++ {

event := common.MapStr{

"@timestamp": common.Time(time.Now()),

"type":       b.Name,

"message":    "{" + messages[i] + "}",

}

bt.client.PublishEvent(event)

}

}

}

第 6 段(可获 0.09 积分)

R e 7个月前 

这里是对最重要的几部分的解释:line 4: 通过连接字符串创建Reddit REST URL,包括配置Subreddit参数。记住,它的默认值已在config.go文件中定义。line 5: 引用httpClient类型line 12: 创建新的HTTP请求。注意Go允许多个返回值。line 13: 如果没有设置标准请求头,Reddit的API将返回429状态码。line 14: Go标准错误不通过异常处理,而是随着常规返回值返回。根据Golang wiki:指示调用者的错误条件,应通过返回错误值来完成line 15: panic() 函数类似于在Java中抛出异常, 被处理时推到栈顶。 有关详细信息,请查看相关文档。line 17: 执行HTTP请求。line 21: 将响应主体读入字节数组。line 22: 关闭主体流。注意defer关键字:defer语句延迟函数的执行,直到环绕的函数返回。line 26: 创建整个响应主体字节数组的切片 - 对数组的一部分的引用。实质上,它删除了前缀和后缀以保持相关的JSON值。之后将字节数组解析成JSON。line 27: 分割切片以单独获取每个JSON片段。line 29: 将消息创建为简单的字典结构。line 34: 发送。

第 7 段(可获 3.11 积分)

R e 7个月前 

配置, 构建, 运行默认配置参数可以在项目根目录下的redditbeat.yml文件中找到。请注意,redditbeat.full.yml中列出了其他常见的Beat参数,以及相关注释。关于Beats的一个有趣的事情是,他们的消息可以直接发送到Elasticsearch或Logstash进行进一步处理。这在上述配置文件中配置。redditbeat:

  period: 10s

output.elasticsearch:

  hosts: ["localhost:9200"]

output.logstash:

  hosts: ["localhost:5044"]

  enabled: true

第 8 段(可获 0.78 积分)

R e 7个月前 

此配置片段将每10秒循环运行Run方法,并将消息发送到在localhost上运行的Logstash实例在端口5044上。这可以在运行Beat时被覆盖(见下文)。注意:为了使Logstash接受来自Beats的消息,必须安装Logstash Beat插件,并且必须为Beats配置Logstash的input:input {

  beats {

    port => 5044

  }

}

要构建项目,请在项目的根目录中键入make。它将创建一个可以运行的可执行文件。./redditbeat -e -E redditbeat.subreddit=java

-E参数可以覆盖在的redditbeat.yml配置文件中找到的参数(见上文)。在这里,它设置subreddit读为“java”,而不是默认的“elastic”。

第 9 段(可获 1.3 积分)

R e 7个月前 

输出如下所示:2016/12/17 14:51:19.748329 client.go:184: DBG  Publish: {

  "@timestamp": "2016-12-17T14:51:19.748Z",

  "beat": {

    "hostname": "LSNM33795267A",

    "name": "LSNM33795267A",

    "version": "6.0.0-alpha1"

  },

  "message": "{

    \"kind\": \"t3\", \"data\": {

      \"contest_mode\": false, \"banned_by\": null, 

      \"domain\": \"blogs.oracle.com\", \"subreddit\": \"java\", \"selftext_html\": null, 

      \"selftext\": \"\", \"likes\": null, \"suggested_sort\": null, \"user_reports\": [], 

      \"secure_media\": null, \"saved\": false, \"id\": \"5ipzgq\", \"gilded\": 0, 

      \"secure_media_embed\": {}, \"clicked\": false, \"report_reasons\": null, 

      \"author\": \"pushthestack\", \"media\": null, \"name\": \"t3_5ipzgq\", \"score\": 11, 

      \"approved_by\": null, \"over_18\": false, \"removal_reason\": null, \"hidden\": false, 

      \"thumbnail\": \"\", \"subreddit_id\": \"t5_2qhd7\", \"edited\": false, 

      \"link_flair_css_class\": null, \"author_flair_css_class\": null, \"downs\": 0, 

      \"mod_reports\": [], \"archived\": false, \"media_embed\": {}, \"is_self\": false, 

      \"hide_score\": false, \"spoiler\": false, 

      \"permalink\": \"/r/java/comments/5ipzgq/jdk_9_will_no_longer_bundle_javadb/\", 

      \"locked\": false, \"stickied\": false, \"created\": 1481943248.0, 

      \"url\": \"https://blogs.oracle.com/java-platform-group/entry/deferring_to_derby_in_jdk\", 

      \"author_flair_text\": null, \"quarantine\": false, 

      \"title\": \"JDK 9 will no longer bundle JavaDB\", \"created_utc\": 1481914448.0, 

      \"link_flair_text\": null, \"distinguished\": null, \"num_comments\": 4, 

      \"visited\": false, \"num_reports\": null, \"ups\": 11

    }

  }",

  "type": "redditbeat"

}

1、下载go的zip文件。并且一定要把文件解压到c:\go目录下。

2、配置windows的高级环境变量。包括:GOROOT、GOOS、GOBIN、GOARCH。并且在path变量里面把c:\go\bin加入。以便可以在命令行直接运行go命令。

举例:我的机器:

GOPATH= c:\goc:\go\srcF:\workspace\goSample01

GOBIN=c:\go\binF:\workspace\goSample01\bin

其中,c:\go是go的安装路径;

F:\workspace\goSample01是我写的go语言项目的工程目录;

F:\workspace\goSample01\bin是go语言项目的工程目录下的可执行文件路径

3、在完成环境变量配置后,打开一个命令行窗口,直接输入go,然后回车,看看是否出现go的帮助信息。如果出现,那么go的基本环境就OK了。

注意:这个基本环境不包含开发工具,也不能直接编译带C代码的go程序。

4、

(可选)为了支持Import远程包,最好装个gomingw。下载地址:http://code.google.com/p/gomingw

/downloads/list。如果下的是压缩包,请把它解压到C盘。例如,C:\gowin-env。里面有个Console.bat是以后使用go

get的环境。举例:有个文件a.go,里面import(

"fmt"

"github.com/astaxie/beedb"

_ "github.com/ziutek/mymysql/godrv"

为了编译该a.go文件,需要启动Console.bat,然后在该命令行窗口,进入c:\go\src目录下,执行go getgithub.com/astaxie/beedb

Go get github.com/ziutek/mymysql/godrv .

Go会自动下载该远程包并编译和安装这些包。

配置goclipse(可选)

(如果不喜欢eclipse开发工具,请跳过这个配置。)

1、下载并安装goclipse插件。Goclipse是go语言for eclipse的插件,下载地址:http://code.google.com/p/goclipse/

2、启动eclipse并创建go项目。然后写个最简单的helloworld.go文件,并运行。代码如下:

packagemainimport"fmt"func main(){fmt.Printf("hello, world")}

配置gocode(可选)

如果不需要go语法辅助和eclipse里面的(按ALT+/)弹出go语言自动辅助功能,请跳过这个配置。

1、下载gocode的zip文件,解压后放在go的bin目录下。

2、下载并安装Git软件。并且在path里面配置git的执行路径。例如c:\git\bin

3、在命令行执行:go build .\gocode。如果一切正常,那么将会编译生成一个gocode.exe文件在go的bin目录下。如果编译失败,那么就转第4步。

4、如果第3步直接编译gocode源文件成功,那就直接到第5步。否则,就需要通过git下载gocode源文件,然后再编译。在命令行执行:go get -u github.com/nsf/gocode 。就会生成gocode.exe文件。

5、在goclipse插件里面指定gocode的路径。就可以在elcipse里面调用gocode来帮助写编码了。

从开发工具这块看,go语言还不够成熟,开发工具都还不完善,有待改进。

下载go-tour教程源代码(可选)

Google有个在线运行go语言的教程(http://tour.golang.org/#2),很不错。支持在web上直接运行大部分的go程序,想了解这个教程的源代码的朋友可以通过以下方式获取。如果没兴趣,可以跳过这个步骤。

1、下载安装Mercurial软件。

2、在命令行下输入:

hg clone

作为测试用的。如果把http改成https协议,下载就会失败。搞不懂。

编译带调用C代码的go文件(可选)

1、为了在windows下编译带C代码的go程序,你首先需要下载并安装MinGW或者Cygwin。

2、首选安装MinGW。在安装MinGW之后,记得要把MinGW安装目录\bin路径设置在path环境变量里面,以便能在dos窗口下直接调用gcc。

3、下载一个gowin-env。下载地址:gowin-env。下载后解压到某个目录下,例如:C:\gowin-env. 然后,编辑go-env.bat。配置相关的go参数。例如,我的配置是:

set GOARCH=386

set GOOS=windows

set GOROOT=c:\go

set GOBIN=%GOROOT%\bin

set GOPATH=%GOROOT%F:\workspace\goSample01

设置好go-env.bat后,就可以点击Console.bat来启动编译和运行窗口。

4、编写一个带C代码的go程序。例如,testc.go

5、编译

例如:

go build -compiler gccgo test_c.go

运行调用C代码的go文件(可选)

1、testc.go.

创建rand目录,然后在rand里面创建testc.go. 代码如下:

package rand

/*

//

#include <stdio.h>

*/

import "C"

func PrintHello() {

C.puts(C.CString("Hello, world\n"))

}

2、a.go

在rand下创建a.go.代码如下:

package rand

import "fmt"

func SayHello(name string){

fmt.Println(name)

}

3、test_import.go

在rand的上一级创建test_import.go。代码如下:

package main

import "./rand"

func main(){

rand.SayHello("tom")

rand.PrintHello()

}

4、运行test_import.go

go run test_import.go

在测试其它几个C代码的时候,发现windows版本的cgo还有些编译问题,同样的代码转移到苹果的XCODE下就没有问题。后来终于发现原因了,原来有些例子是unix平台下的,而在windows平台下,方法名和参数需要做调整。

例如:下面代码在windows下编译报一堆错误。

package rand

/*

#include <stdlib.h>

*/

import "C"

func Random() int {

return int(C.random())

}

func Seed(i int) {

C.srandom(C.uint(i))

}

这里需要把return int(C.random()) 修改为“return int(C.rand())”

C.srandom(C.uint(i))修改为“C.srand(C.uint(i))”编译就OK了。