logstash hive解析存入hive

> 博客详情
直接上配置:
file {& & & type =&"carmobileboss_default" & & path =& ["/data/logs/CarMobileBoss/Default_DailyRolling.log"] & & start_position =& "beginning" #从文件开始处读写 & & #ignore_older =& 604800 & & & #2.x 无这个配置默认超过24小时的日志不解析,0表示不忽略任何过期日志 & & discover_interval =& 15 & & & #设置多长时间扫描目录,发现新文件 & & stat_interval =& 1 & & & & & &#设置多长时间检测文件是否修改 & & sincedb_path =& "/usr/local/logstash/sincedb_path/access_progress_carmobileboss_default" & & sincedb_write_interval =& 15 &#设置多长时间会写入读取的位置信息 & & tags =&["carmobileboss"] & & codec =& multiline { & & & &pattern =& "^%{TIMESTAMP_ISO8601}" & & & &negate =& true & & & &what =& previous & & & &auto_flush_interval =& 4
主要是用 codec =& multiline 插件来解析多行日志。
pattern :匹配行的正则表达式
negate : true/false .默认false . (true:表示不匹配正则表达式时)
what : 处理方式 previous/next
上面意思是: 不匹配时间开头的行 归入前面一行。
auto_flush_interval : 为了解决最后一行丢失问题(因为没有下一行到来前,无法判断改行是否完成),这个参数表示超过这个时间如果没来新行,则自动把它当成当前行。(这个参数在2.2版本及之后才支持)
人打赏支持
参与源创会
领取时间:
“”在线下联结了各位 OSCer,推广开源项目和理念,很荣幸有你的参与~
领取条件:参与过开源中国“源创会”的 OSCer 可以领取
码字总数 19864
支付宝支付
微信扫码支付
打赏金额: ¥
已支付成功
打赏金额: ¥博客访问: 781097
博文数量: 161
博客积分: 5339
博客等级: 大校
技术积分: 1466
注册时间:
IT168企业级官微
微信号:IT168qiye
系统架构师大会
微信号:SACC2013
分类: Python/Ruby
合并多行数据:
# with an input plugin:
# you can also use this codec with an output.
&&&&codec =& multiline {
&&&&&&charset =& ... # string, one of ["ASCII-8BIT", "Big5", "Big5-HKSCS", "Big5-UAO", "CP949", "Emacs-Mule", "EUC-JP", "EUC-KR", "EUC-TW", "GB18030", "GBK", "ISO-8859-1", "ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-13", "ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "KOI8-R", "KOI8-U", "Shift_JIS", "US-ASCII", "UTF-8", "UTF-16BE", "UTF-16LE", "UTF-32BE", "UTF-32LE", "Windows-1251", "GB2312", "IBM437", "IBM737", "IBM775", "CP850", "IBM852", "CP852", "IBM855", "CP855", "IBM857", "IBM860", "IBM861", "IBM862", "IBM863", "IBM864", "IBM865", "IBM866", "IBM869", "Windows-1258", "GB1988", "macCentEuro", "macCroatian", "macCyrillic", "macGreek", "macIceland", "macRoman", "macRomania", "macThai", "macTurkish", "macUkraine", "CP950", "CP951", "stateless-ISO-2022-JP", "eucJP-ms", "CP51932", "GB12345", "ISO-2022-JP", "ISO-2022-JP-2", "CP50220", "CP50221", "Windows-1252", "Windows-1250", "Windows-1256", "Windows-1253", "Windows-1255", "Windows-1254", "TIS-620", "Windows-874", "Windows-1257", "Windows-31J", "MacJapanese", "UTF-7", "UTF8-MAC", "UTF-16", "UTF-32", "UTF8-DoCoMo", "SJIS-DoCoMo", "UTF8-KDDI", "SJIS-KDDI", "ISO-2022-JP-KDDI", "stateless-ISO-2022-JP-KDDI", "UTF8-SoftBank", "SJIS-SoftBank", "BINARY", "CP437", "CP737", "CP775", "IBM850", "CP857", "CP860", "CP861", "CP862", "CP863", "CP864", "CP865", "CP866", "CP869", "CP1258", "Big5-HKSCS:2008", "eucJP", "euc-jp-ms", "eucKR", "eucTW", "EUC-CN", "eucCN", "CP936", "ISO2022-JP", "ISO2022-JP2", "ISO8859-1", "CP1252", "ISO8859-2", "CP1250", "ISO8859-3", "ISO8859-4", "ISO8859-5", "ISO8859-6", "CP1256", "ISO8859-7", "CP1253", "ISO8859-8", "CP1255", "ISO8859-9", "CP1254", "ISO8859-10", "ISO8859-11", "CP874", "ISO8859-13", "CP1257", "ISO8859-14", "ISO8859-15", "ISO8859-16", "CP878", "CP932", "csWindows31J", "SJIS", "PCK", "MacJapan", "ASCII", "ANSI_X3.4-1968", "646", "CP65000", "CP65001", "UTF-8-MAC", "UTF-8-HFS", "UCS-2BE", "UCS-4BE", "UCS-4LE", "CP1251", "external", "locale"] (optional), default: "UTF-8"
&&&&&&multiline_tag =& ... # string (optional), default: "multiline"
&&&&&&negate =& ... # boolean (optional), default: false
&&&&&&pattern =& ... # string (required)
&&&&&&patterns_dir =& ... # array (optional), default: []
&&&&&&what =& ... # string, one of ["previous", "next"] (required)
negate字段是一个选择开关,可以正向匹配和反向匹配
参考:/chenryn/logstash-best-practice-cn/blob/master/codec/multiline.md
参考:http://www.logstash.net/docs/1.4.2/codecs/multiline
拷贝@timestamp字段:
&&&&ruby {
&&&&&&&&&&&&code =& "event['read_time'] = event['@timestamp']"
&&&&mutate
&&&&&&&&add_field =& ["read_time_string", "%{@timestamp}"]
参考:/questions//logstash-how-to-make-a-copy-of-the-timestamp-field-while-maintaining-the-same
多行匹配:
在和&codec/multiline&搭配使用的时候,需要注意一个问题,grok 正则和普通正则一样,默认是不支持匹配回车换行的。就像你需要&=~ //m&一样也需要单独指定,具体写法是在表达式开始位置加&(?m)&标记。如下所示:
match =& {
"message" =& "(?m)\s+(?&request_time&\d+(?:\.\d+)?)\s+"
此段原文来自:/chenryn/logstash-best-practice-cn/blob/master/filter/grok.md
最终的配置文件:
&&&&&&&&file {
&&&&&&&&&&&&&&&&type =& "type"
&&&&&&&&&&&&&&&&path =& ["info.log"]
&&&&&&&&&&&&&&&&exclude =& ["*.gz", "access.log"]
&&&&&&&&&&&&&&&&codec =& multiline {
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&pattern =& "^2015"
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&negate =& true
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&what =& "previous"
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&}
&&&&grok {
&&&&&&&&match =& {
&&&&&&&&&&&&"message" =& "(?m)%{TIMESTAMP_ISO8601:logtime}"
&&&&ruby {
&&&&&&&&&&&&code =& "event['readtime'] = event['@timestamp']"
&&&&date {
&&&&&&&&#locale =& "en"
&&&&&&&&match =& ["logtime", "YYYY-MM-dd HH:mm:ss"]
&&&&&&&&#timezone =& "UTC"
&&&&&&&&#target =& "logtimestamp"
&&&&&&&&remove_field =& [ "logtime"]
&&&&&&&&stdout {}
&&&&&&&&redis {
&&&&&&&&&&&&&&&&host =& "127.0.0.1"
&&&&&&&&&&&&&&&&port =& 6379
&&&&&&&&&&&&&&&&data_type =& "list"
&&&&&&&&&&&&&&&&key =& "key_count"
grok内置正则表达式:/elasticsearch/logstash/blob/v1.4.2/patterns/grok-patterns
阅读(11333) | 评论(1) | 转发(0) |
相关热门文章
给主人留下些什么吧!~~
请问pattern&=&&\&^at&&abcd\&&&at&后面有空格,这种怎么写呢?
请登录后评论。大数据-查询引擎-Hive(22)
这些天看到很多人在使用Hive的过程遇到这样或那样的错误,看着那些少的可怜的错误日志出错,一直找不到原因。后来我给他们介绍了修改日志输出级别之后,错误原因很快得到定位。于是乎我写了这篇博文。希望那些在使用HQL的过程中遇到问题,通过这里介绍的方法进行调试而定位到错误,从而少走弯路。好了,废话不多说进入正文。
  在很多程序中,我们都可以通过输出日志的形式来得到程序的运行情况,通过这些输出日志来调试程序,Hive也不例外。
  在Hive中,使用的是Log4j来输出日志,默认情况下,CLI是不能将日志信息输出到控制台的。在Hive0.13.0之前版本,默认的日志级别是WARN,从Hive0.13.0开始,默认的日志级别是INFO。默认的日志存放在/tmp/&user.name&文件夹的hive.log文件中,全路径就是/tmp/&user.name&/hive.log。
  需要注意的一个bug:在本地模式情况下,log文件名为”.log”,而不是”hive.log”,可以在这里看到https://issues.apache.org/jira/browse/HIVE-5528,这个bug将会在Hive0.13.0中得到解决。
  在默认的日志级别情况下,是不能将DEBUG信息输出,这样一来出现的各种详细的错误信息都是不能数错的。但是我们可以通过以下两种方式修改log4j输出的日志级别,从而利用这些调试日志进行错误定位,具体做法如下:
~]$ hive --hiveconf hive.root.logger=DEBUG,console
或者在${HIVE_HOME}/conf/hive-log4j.properties文件中找到hive.root.logger属性,并将其修改为下面的设置
hive.root.logger=DEBUG,console
  上面两种方法的设置各有优劣,方法一的设定只是对本次会话有效,下次如果还想修改日志输出级别需要重新设定,但是不是每时每刻都需要修改日志的输出级别,所以在有时需要修改输出的日志级别,有时不需要的时候可以用这种方法;方法二将日志输出级别设定到文件中去了,这个设定是对所有的用户都生效,而且每次使用HQL的时候都会输出一大堆的日志,这种情况适合那些无时无刻都需要HQL的运行日志的用户。
  在文章中我们介绍了Hive三种参数配置方法,其中就提到了某些系统级的参数,在HQL中设定是无效的。这里就是一个很好的例子。因为设定log的参数读取在会话建立以前已经完成了。这也就说,我们不能通过下面的方法来修改log4j的日志输出级别:
set hiveconf:hive.root.logger=DEBUG,
这样你在进入CLI的时候将会得到一些类似下面的调试信息:
/home/q/hive-0.11.0-bin/conf]$
................................为了篇幅,省略了很多............................
DEBUG parse.VariableSubstitution: Substitution is on: hive
................................为了篇幅,省略了很多............................
DEBUG security.Groups:& Creating new
Groups object
DEBUG util.NativeCodeLoader: Trying to load the c...
your platform... using builtin-java classes where applicable
DEBUG security.JniBasedUnixGroupsMappingWithFallback:
back to shell based
DEBUG security.JniBasedUnixGroupsMappingWithFallback:
mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
DEBUG security.UserGroupInformation: hadoop login
DEBUG security.UserGroupInformation: using local
&user:UnixPrincipal:
DEBUG security.UserGroupInformation:
loginUser:yangping.wu (auth:SIMPLE)
................................为了篇幅,省略了很多............................
  下面举个日志调试的例子,在没有修改日志输出级别之前,有下面的查询所有表的HQL如下:
Error in metadata: java.lang.RuntimeException: Unable to instantiate
&&&&&&&&&&&&&&&&&&&org.apache.hadoop.hive.metastore.HiveMetaStoreClient
Execution Error, return
&&&&&&&&&&&&&&&&&&&org.apache.hadoop.hive.ql.exec.DDLTask
  得到上面的错误,我们从上面的错误输出只知道是元数据有问题。具体的错误也不知道,这时候如果我们修改了日志调试级别hive.root.logger=DEBUG,console,我们再来看看运行上面语句的错误输出:
................................为了篇幅,省略了很多............................
INFO metastore.ObjectStore: ObjectStore, initialize called
ERROR Datastore.Schema: Failed initialising database.
denied for
user 'datalog5'@'l-1'
(using password: YES)
org.datanucleus.exceptions.NucleusDataStoreException:
denied for
user 'datalog5'@'l-1'
(using password: YES)
org.datanucleus.store.rdbms.ConnectionFactoryImpl
&&&&&&&&&$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:536)
org.datanucleus.store.rdbms.RDBMSStoreManager.&init&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&(RDBMSStoreManager.java:290)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
................................为了篇幅,省略了很多............................
  通过上面的错误堆栈我们就可以将问题定位到是连接数据库出现了问题,这么一来错误的定位范围就大大减少了,我们可以查看书Hive-site.xml文件中连接数据库的配置是否正确等来解决上述问题。本博客文章除特别声明,全部都是原创!
尊重原创,转载请注明: 转载自
本文链接地址:&
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:541840次
积分:7336
积分:7336
排名:第3040名
原创:21篇
转载:1156篇
评论:26条
(1)(1)(56)(43)(43)(47)(63)(29)(51)(124)(23)(25)(10)(27)(20)(17)(14)(1)(42)(41)(2)(20)(38)(232)(4)(3)(28)(4)(43)(20)(22)(51)(1)(21)(1)(1)(6)(1)(1)(1)1043人阅读
1、这次主要是记录一下,前几天实现logstash传输文本信息到HDFS时,遇到的一个问题的解决办法,因为自己对logstash还不是很熟悉,所以当时折腾了很久,虽然发现解决方案很简单。
2、logstash有一个第三方插件:webhdfs
遇到的问题
logstash添加的字段与message之间的分隔符与message内部的分隔符不一致
我的需求是,将以“,”分隔的文本文件通过logstash传输到HDFS上,然后通过hive建立外表连接,对其中的内容进行查询。
遇到的问题是,logstash将一行行的信息message传输写入hdfs文件时,会添加两个字段:timestamp和hostname,而且分隔符为空格,即:
timestamp hostname message
将message拆开后,存储到HDFS上每行的格式如下:
timestamp hostname 1,小明,1995,98
如此的格式,便无法建立hive查询表,因为整体的分隔符为空格,导致message是一个整体,而不是我们设想的以“,”为分隔符的每个字段对应hive表中的一个字段。这样不利于我们构建hive外表对它进行查询,所以想要将message的字段和添加的字段之间的分隔符保持一致。
1、首先想要使用message_format进行解决,将timestamp hostname message之间的分隔符换成“,”,后来发现随着版本更新,webhdfs已经没有了该属性。
2、使用mutate插件的split将message分割,然后分别添加字段add_field,最后删除message,remove_field,后来发现添加没有任何效果,而删除最后得到%{message}的结果。
3、使用mutate插件的gsup解决了问题:gsub =& [“message”, “,”, ” “]
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:6018次
排名:千里之外
原创:15篇
(1)(3)(5)(7)(1)2592人阅读
大数据(160)
hive(13)
Hbase(38)
1、环境描述:
hadoop集群环境:hadoop-2.6.0;3台集群环境
hbase集群环境:habase-1.1.2 ;3台集群环境
hive环境:hive-1.2.1;1台测试环境
elasticsearch:elasticsearch-1.7.1测试环境
2、下载hive与es之间数据交互的插件。
说明:如果用ElasticSearch版本为2.1.0,必须使用elasticsearch-hadoop-2.2.0才能支持,如果ES版本低于2.1.0,可以使用elasticsearch-hadoop-2.1.2,在本次实验中我选择的是elasticsearch-hadoop-2.1.2。
3、注册elasticsearch-hadoop-2.1.2:
hive& add jar file:///home/hadoop/xuguokun/elasticsearch-hadoop-2.2.0.
4、在es端创建表索引创建
package com.test.file2
import static mon.xcontent.XContentFactory.jsonB
import java.io.BufferedR
import java.io.F
import java.io.FileNotFoundE
import java.io.FileR
import java.io.IOE
import java.text.DateF
import java.text.ParseE
import java.util.D
import org.elasticsearch.action.index.IndexR
import org.elasticsearch.client.C
import org.elasticsearch.client.transport.TransportC
import mon.transport.InetSocketTransportA
import com.test.utils.DateU
public class ReadFile2Es1 {
public static void main(String[] args) {
String filePath = &E:\\大数据相关资料\\TestData\\ESData&;//这里替换成205服务器上测试数据的存放根目录
@SuppressWarnings(&resource&)
Client client = new TransportClient().addTransportAddresses(new InetSocketTransportAddress(&192.168.174.130&, 9300));
bianli(filePath,client);
System.out.println(&遍历结束&);
}catch(Exception e){
e.printStackTrace();
System.out.println(&error&);
public static void bianli(String path,Client client) {
File file = new File(path);
if (file.exists()) {
File[] files = file.listFiles();
if (files.length == 0) {
System.out.println(&文件夹是空的!&);
for (File file2 : files) {
if (file2.isDirectory()) {
System.out.println(&文件夹:& + file2.getAbsolutePath());
bianli(file2.getAbsolutePath(),client);
fr = new FileReader(file2.getAbsolutePath());
BufferedReader br = new BufferedReader(fr);
String line = &&;
IndexResponse indexResponse =
while ((line = br.readLine()) != null) {
if(line!=null&&!line.equals(&&)){
String[] str = line.split(&,&);
indexResponse = client.prepareIndex(&hive2es&, &info&)
.setSource(jsonBuilder()
.startObject()
.field(&area&, str[0])
.field(&media_view_tags&,str[1])
.field(&interest&,str[2])
.endObject())
.execute()
.actionGet();
br.close();
fr.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
//System.out.println(&文件:& + file2.getAbsolutePath());
System.out.println(&文件不存在!&);
public static String getRealData(String str[]){
if (str.length == 38) {
String realData = &&;
for(int i = 0; i & 38;i++){
if((str[i].substring(1, str[i].length() - 1).equals(&null&))||(str[i].substring(1, str[i].length() - 1)).equals(&&)){
realData = realData + &null&;
realData =
realData + str[i].substring(1, str[i].length() - 1);
if(i!=37){
realData = realData + &,&;
return realD
return &格式不正确&;
注意:当前步骤es读取的本地测试数据如下所示:
beijing,diannaokeji,kejiwang
5、在hive端创建表hive_es
CREATE EXTERNAL TABLE hive_es (cookieid string,area string,media_view_tags string,interest string )STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.nodes' = '192.168.174.130:9200','es.index.auto.create' = 'false','es.resource' = 'hive2es/info','es.read.metadata' = 'true','es.mapping.names' = 'cookieid:_metadata._id, area:area, media_view_tags:media_view_tags, interest:interest');
6、在hive端检索数据
hive& select * from hive_
AVOwmuAVAOB0VDYE1GoM beijing diannaokeji kejiwang
Time taken: 0.126 seconds, Fetched: 1 row(s)
7、以上实现了hive读取es中的数据,下面从hive端导入数据。
本步骤的测试数据如下:
1,shanghai,diannaokeji,kejiwang
8、创建hive本地表
CREATE EXTERNAL TABLE hive_es_native (cookieid string,area string,media_view_tags string,interest string ) row format delimited
fields terminated by ',';9、向hive本地表
hive_es_native中导入数据,导入方法是:
hive& load data local inpath '/home/hadoop/xuguokun/test.txt' overwrite into table hive_es_
Loading data to table mydatabase.hive_es_native
Table mydatabase.hive_es_native stats: [numFiles=1, numRows=0, totalSize=32, rawDataSize=0]
Time taken: 0.51 seconds
10.通过hive向es中写入数据,并查看最终hive_es中的数据
hive& insert overwrite table hive_es select * from hive_es_
Query ID = hadoop_49_caab7883-e8ee-4fc7-9d01-cf34d2ee6473
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_3_0002, Tracking URL = http://Master:8088/proxy/application_3_0002/
Kill Command = /usr/local/hadoop/bin/hadoop job
-kill job_3_0002
Hadoop job information for Stage-0: number of mappers: 1; number of reducers: 0
19:21:09,038 Stage-0 map = 0%,
reduce = 0%
19:21:22,042 Stage-0 map = 100%,
reduce = 0%, Cumulative CPU 2.63 sec
MapReduce Total cumulative CPU time: 2 seconds 630 msec
Ended Job = job_3_0002
MapReduce Jobs Launched:
Stage-Stage-0: Map: 1
Cumulative CPU: 2.63 sec
HDFS Read: 4160 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 630 msec
Time taken: 36.247 seconds
hive& select * from hive_
AVOwucV0AOB0VDYE1GoX shanghai diannaokeji kejiwang
AVOwmuAVAOB0VDYE1GoM beijing diannaokeji kejiwang
Time taken: 0.126 seconds, Fetched: 2 row(s)
11、实验到此结束
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:164120次
积分:3199
积分:3199
排名:第10892名
原创:132篇
转载:212篇
评论:12条
(1)(3)(3)(10)(11)(13)(4)(14)(10)(26)(19)(17)(5)(11)(13)(15)(10)(14)(21)(13)(22)(18)(13)(18)(7)(18)(15)}

我要回帖

更多关于 hive json串解析 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信