java简化ELK收集日志数据保存到Elasticsearch用Grafana可视化展现

头像
码农笔录
2019-05-21 云计算/大数据 阅读量 7235

谈到java系统进行日志分析系统,大家肯定第一时间想到就是ELK,确实ELK是一个很成熟的体系了,不过现在grafana在可视化方面比kibana强多了,因为他支持更多的数据源。

如果java中想使用ELK,一般的做法就是使用logback吧日志通过kafka传送到logstash中,然后logstash清洗完数据以后保存到elasticsearch(以下简称es)中,但是大家有没有想过,如果一个小的项目,框架搭建下来就要占用好多资源,要启动一堆服务,所以可不可以简化呢,es和grfanfa是肯定要的,一个保存数据,一个可视化数据。那就只能吧日志数据产生到保存到es中这个环节精简了。我们知道logback吧数据通过kafka消息队列传送到logstash中清洗,就是为了非阻塞式的保存数据,并清洗数据,那就简化这块的步骤了。

1.直接将数据保存到es中,省去kafka、zookeeper两个组件

2.保存数据之前,组装成需要的数据,省去logstash清洗这一步


接下来就实现具体实现:

首先接入kafka其实就是为了异步非阻塞式的保存数据,那我们可以利用 logback-elasticsearch-appender 库将日志数据保存到es中,github上搜索,找一个合适自己的加入项目,配置一下即可。

到这里可能满足大部分的需求了,但是我发现,目前的库还不支持高度自定义扩展属性字段,我们需要如下的这种,而且我们只需要抽取固定几个接口的日志,不是所有的日志都需要,所以我直接放弃了使用logback,自己吧数据直接保存到es中。


那我们是怎么实现非阻塞式保存数据呢,那当然要用多线程,加队列了,这样就会保证不阻塞当前线程有顺序的保存数据了。

新建一个类实现Thread ,利用LinkedBlockingQueue这个队列做缓存数据实现kafka的效果,然后利用OKHttp 框架将数组保存到es中。

package com.trgis.livearth.log;

import com.alibaba.fastjson.JSON;
import com.google.gson.Gson;
import com.trgis.livearth.utils.OKHttpUtil;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* @author nelson
* @date 2019-5-17
* @desc 日志收集發送
*/

public class TRlog extends Thread {
private final static int MAX_CAPACITY = 65536;
private static Object locker = new Object();
private static BlockingQueue<TRlogVo> queue = new LinkedBlockingQueue<TRlogVo>(MAX_CAPACITY);

private String index = "livearth-logs-";
private String type = "tdtlogs"; private String esfmt = "yyyy-MM-dd";
private Gson gson;
private OKHttpUtil okHttpUtil;
private SimpleDateFormat dateFormat;

@Override
public synchronized void start() {
synchronized (locker) {
super.start();
try { gson = new Gson(); okHttpUtil = new OKHttpUtil();
dateFormat = new SimpleDateFormat(esfmt);
} catch (Exception e) {
e.printStackTrace();
}
}

}

public void run() {
while (true) {
try {
sendLog(queue.take()); //将队列里的数据取出保存到es中,如果没有数据会进入阻塞状态,直到有新的数据进来 } catch (Exception e) {
e.printStackTrace();
}
}
}
// 添加新的数据 public static void put(TRlogVo log) {
queue.offer(log);
}
// 将数据保存到es中 private void sendLog(TRlogVo log) {
String format = dateFormat.format(new Date());
EsIndex esIndex = new EsIndex(index + format, type);
String s = gson.toJson(esIndex);
s += "\n";

        // 这里有个问题就是时间的问题,需要格式化成es能识别的时间格式,java默认的date类型是不会识别的,如果用Gson 他不会调用get方法,所以用了阿里的JSON库

s += JSON.toJSON(log);
s += "\n";
okHttpUtil.es("http://47.98.109.5:8209/_bulk",s);
} /** 格式化后的数据,必须要换行的哦,第一行索引信息就是,第二行是数据信息 * {"index":{"_index":"zhouls","_type":"emp","_id":"10"}}

    *    {"name":"jack", "age" :18}

*/ }

EsIndex  索引的信息,后面要转gson

public class EsIndex implements Serializable {
private Map<String,String> index;

public EsIndex(String mindex,String mtype){
index = new HashMap<>();
index.put("_index",mindex);
index.put("_type",mtype);
}

public Map<String, String> getIndex() {
return index;
}

public void setIndex(Map<String, String> index) {
this.index = index;
}
}

OKHttpUtil  利用okhttp框架请求es保存数据,因为我们用的是es的restful 接口,所以http框架选择的okhttp,注意请求头

@Component
public class OKHttpUtil {

private OkHttpClient okHttpClient = null;

public void es(String url, String json) {
try {
if (okHttpClient == null) {
okHttpClient = new OkHttpClient.Builder()
.connectTimeout(15, TimeUnit.SECONDS)
.readTimeout(15,TimeUnit.SECONDS)
.writeTimeout(15,TimeUnit.SECONDS)
.build();
}
Request request = new Request.Builder()
.url(url)
.addHeader("Content-Type", "application/x-ndjson")
.put(RequestBody.create(
MediaType.parse("application/json; charset=utf-8"),
json))// post json提交 .build();
Response response = okHttpClient.newCall(request).execute();
System.out.println(response.body().string());
} catch (Exception e) {
e.printStackTrace();
}
}
}


TRlogVo  就是你要的保存的数据格式,要格式化成json,所以不确定的对象类型可以使用Object或者Map类型,这里根据自己的实际情况定义字段,时间必须要的哦。

/**
* @author nelson
* @date 2019-5-17
* @desc 日志收集發送
*/

public class TRlogVo implements Serializable {

private static final String LEVEL = "INFO";
private String id;
private String level = LEVEL; // 日志级别 private String resturl;
private Date timestamp = new Date(); // 这个必须要得 /**
* 逆地理编码
*/
private TdtGeocodeResultTemp geocodeResult; //自定义对象类型

/**
* poi搜索
*/
private TdtPoiResult poiResult; //自定义对象类型
// 这里省略了其他的属性和get set方法 //java默认的时间格式,es是不是别的,默认会存储为text类型,按照下面这种格式化,就会自动识别为date类型     public String getTimestamp() {

SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
format.setTimeZone(TimeZone.getDefault());
return format.format(timestamp);
}

}

启动多线程,我这里用的是springboot 所以在主程序入口,实例化TRlog这个bean,交给spring来管理实例,然后顺便启动线程。  


Controller中调用,线程中一直是阻塞状态,当你给队列push数据后,他会自动进行发送数据,所以我们只需要给队列put数据即可

@Api(description = "天地图POI搜索")
@TRestController("poi")
public class TDTPoiController {
@Autowired
private TRlog tRlog; //多线程对象
@Autowired
private Gson gson;
@ApiOperation(value = "搜索")
@GetMapping("search")
public String search(
@RequestParam(defaultValue = "query") String type,
@RequestParam String postStr) {
TRlogVo vo = new TRlogVo(); //实例化数据的数据对象,有用
try {
Map map = gson.fromJson(postStr, Map.class);
vo.setParms(map); // 给属性赋值,有用
vo.setResturl("search");
TdtPoiResult tdtPoiResult = new TdtPoiResult (); //对你无用,自定义对象
vo.setPoiResult(tdtPoiResult);
tRlog.put(vo);
return "success;
} catch (Exception e) {
vo.setLevel("ERROR");
vo.setErrMsg(e.getMessage());
tRlog.put(vo);
}
return "查詢失敗";
} }

下面这是我们实际项目的代码,有些是没用的,上面这个代码是缩减过的。


至于grafana,百度有好多教程,这里就不重复造轮子了,有两个点要注意一下,index我是按照天格式化后存的,大家注意,还有就是时间属性,这个很重要,后面做聚合的时候这个非常非常有用,大家一定要记得java默认的date类型es不识别,一定要格式化成es识别的格式