Before going through the following integration process, you should be aware of HBase and Flume installation as well as some basic concepts of HBase and Flume. In this POC, I have used flume0.9.1 and hbase0.20.6 version. My POC will describe Flume HBase Integration and how to extract different attribute from apache log file (Example IP, User Agent, Referrer etc).
As above diagram show, for storing apache log lines into HBase, we need to create a new jar, whose procedure is described below. These jar contain two new class HBaseEventSink.java and regexAllExtractor.java.
1. Download source code of Flume from the following link https://github.com/cloudera/flume/downloads or you can find it inside the flume folder.
2. Make a new Java project in eclipse.
3. In Flume source code, you find one java folder. This java folder contain com and org folder. Add this org and com folder into your java Project.
4. In Flume source code, you find one ahocorasick folder. This folder contains org folder. Add this org folder into your java project. Now, delete TestAhoCorasick.java , TestAll.java ,TestQueue .java and TestState.java classes from org.arabidopsis.ahocorasick package.
5. In Flume source code, go to gen-java/com/cloudera/flume/conf folder. This conf folder has one thrift folder. Add this thrift folder into com.cloudera.flume.conf package of your Java project.
6. You can find all required jar at lib folder of flume.
7. Now, create a new com.cloudera.flume.handlers.hbase Package. Inside com.cloudera.flume.handlers.hbase Package, Create a new java class HBaseEventSink.
package com.cloudera.flume.handlers.hbase; import java.io.IOException; import java.util.Map.Entry; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import com.cloudera.flume.conf.Context; import com.cloudera.flume.conf.SinkFactory.SinkBuilder; import com.cloudera.flume.core.Event; import com.cloudera.flume.core.EventSink; import com.google.common.base.Preconditions;
public class HBaseEventSink extends EventSink.Base { String tableName; String familyName; HBaseConfiguration config; HTable table;
public HBaseEventSink(String tableName, String familyName) { this(tableName, familyName, new HBaseConfiguration()); }
public HBaseEventSink(String tableName, String familyName, HBaseConfiguration config) { Preconditions.checkNotNull(tableName); Preconditions.checkNotNull(familyName); this.tableName = tableName; this.familyName = familyName; this.config = config; }
@Override public void append(Event e) throws IOException { String row = String.valueOf(e.getTimestamp()); byte[] rowKey=Bytes.toBytes(row); Put p = new Put(rowKey); for (Entry<String, byte[]> a : e.getAttrs().entrySet()) { if(a.getKey().equals("AckType")||a.getKey().equals("AckTag")||a.getKey().equals("AckChecksum")) {} else { p.add(Bytes.toBytes(familyName), Bytes.toBytes(a.getKey()), a.getValue()); } if(!p.isEmpty()) { table.put(p); } } @Override public void close() throws IOException { table.close(); }
@Override public void open() throws IOException { // This instantiates an HTable object that connects you to // the tableName table. table = new HTable(config, tableName); } public static SinkBuilder builder() { return new SinkBuilder() { @Override public EventSink build(Context context, String... argv) { Preconditions.checkArgument(argv.length == 2, "usage: hbase(table, family)"); return new HBaseEventSink(argv[0], argv[1]); } }; } }
8. Inside com.cloudera.flume.core.extractors Package, Create a new java class RegexAllExtractor.java. This class is using to extract different attributes from apache log files. Each attribute becomes individual column in hbase.
package com.cloudera.flume.core.extractors; import java.lang.reflect.*;
import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.cloudera.flume.conf.Context; import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder; import com.cloudera.flume.core.Attributes; import com.cloudera.flume.core.Event; import com.cloudera.flume.core.EventSink; import com.cloudera.flume.core.EventSinkDecorator; import com.google.common.base.Preconditions; import com.maxmind.geoip.LookupService; public class RegexAllExtractor extends EventSinkDecorator<EventSink> { final Pattern pat; final String names; public RegexAllExtractor(EventSink snk, Pattern pat, String names) { super(snk); this.pat = pat; this.names = names; } public RegexAllExtractor(EventSink snk, String regex, String names) { this(snk, Pattern.compile(regex), names); } @Override public void append(Event e) throws IOException { String logLine = new String(e.getBody()); Matcher m = pat.matcher(logLine); String[] n = this.names.split(","); int count=0; int pageViewCounter=0; String val = ""; // default Integer grpCnt = m.groupCount(); int grp=1; if(m.find()) { val=m.group(grp); //ip,date,status,query,size,http_protocol,method for(grp = 1; grp <= grpCnt; grp++) { try { val = ""; try { val = m.group(grp); } catch (IndexOutOfBoundsException ioobe) { val = ""; } if(n[grp-1] != "") { //Identify PageView and PageHit if(!n[grp-1].equals("query")) { Attributes.setString(e, n[grp-1], val); } else { try { String[] htmlPageElement= {"js","gif","jpg","png","css"}; String pageHit[]=val.split("\\."); if(pageHit.length>1) { for(int i=0;i<htmlPageElement.length;i++) { if(pageHit[pageHit.length- 1].equals(htmlPageElement[i])) { Attributes.setString(e, "pageElement", val); count++; break; } } } if(count==0) { Attributes.setString(e, "pageView", val); pageViewCounter++; } count=0; }catch(Exception exception) { System.out.println(exception); } } } } catch (IndexOutOfBoundsException ioobe) { break; } } String[] temp=null; String delimiter = "\""; temp = logLine.split(delimiter); if(temp.length>=5) { Attributes.setString(e, "userAgent", temp[5]); Attributes.setString(e,"referrer", temp[3]); } } super.append(e); } public static SinkDecoBuilder builder() { return new SinkDecoBuilder() { @Override public EventSinkDecorator<EventSink> build(Context context,String... argv) { Preconditions.checkArgument(argv.length == 2,"usage: regexAll(regex, names)"); String regex = argv[0]; Pattern pat = Pattern.compile(regex); String names = argv[1];
EventSinkDecorator<EventSink> snk = new RegexAllExtractor(null, pat, names); return snk;
} }; } }
}
9. Add hbase-0.20.6.jar in Java project.
10. Go to com.cloudera.flume.conf package and perform following changes into SinkFactoryImpl.java class.
a) Add element { "Hbase",HBaseEventSink.builder()} inside the sinkList 2D array.
b) Add element { "regexAll", RegexAllExtractor.builder() } inside the decoList 2D array.
c) Import the following classes.
import com.cloudera.flume.handlers.hbase.HBaseEventSink;
import com.cloudera.flume.core.extractors.RegexAllExtractor;
11. Now, Build this project and create jar.
12. Put this jar file at Flume_Home_Dir folder.
Start Hbase:-
impetus@ubuntu:~/hbase$ bin/start-hbase.sh
impetus@ubuntu:~/hbase$ bin/hbase shell
Now, we need to create table “WebAnalytics”.
hbase(main):001:0>create 'WebAnalytics','colFamily1'
0 row(s) in 1.2020 seconds
hbase(main):002:0> list //display all existing table.
WebAnalytics
1 row(s) in 0.0830 seconds
hbase(main):003:0>
Create a new txt file and add following line:
127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "GET /listing/pay/32.html HTTP/1.0" 200 2326 "http://www.example.com/start.html" "Mozilla/4.08 [en] (Win98; I ;Nav)"
Start flume master:-
impetus@ubuntu:~$ flume master
Start flume agent and flume collector:-
impetus@ubuntu:~$ flume node
impetus@ubuntu:~$ flume node -n collector
After starting nodes and master, we need to configure flume nodes. You can configure flume nodes using web interface of flume master http://localhost:35871
Configuration of flume node:-
source:- tail(path/of/txtFile)
sink:- agentSink("localhost",35853)
Configuration of flume collector:-
source:-
collectorSource(35853)
sink:-
{ regexAll( "^([0-9.]+)\\s([0-9A-Za-z\\-]+)\\s([0-9A-Za-z\\-]+)\\s\\[([0-9a-zA-z\\/: -]+)\\]\\s\"([A-Z]+)\\s([0-9a-zA-z\\/?.=&_:-]+)\\s([A-Z0-9\\/.]+)\"\\s([0-9]{3})\\s([0-9]+)","IPAddr,hyphen,userid,ServerTime,method,query,http_protocol,status,size" ) => Hbase( "WebAnalytics", "colfamily1" ) }
Now, wait for some time and type command “scan 'WebAnalytics'” at hbase shell.
hbase(main):003:0>scan 'WebAnalytics'
1294936604575[B@136a1a1 column=colFamily1:ServerTime, timestamp=1294936699637, value=10/Oct/2000:13:55:36 -0700
1294936604575[B@136a1a1 column=colFamily1:http_protocol, timestamp=1294936699637, value=HTTP/1.0
1294936604575[B@136a1a1 column=colFamily1:hyphen, timestamp=1294936699637, value=-
1294936604575[B@136a1a1 column=colFamily1:userid, timestamp=1294936699637, value=-
1294936604575[B@136a1a1 column=colFamily1:IPAddr, timestamp=1294936699637, value=127.0.0.1
1294936604575[B@136a1a1 column=colFamily1:method, timestamp=1294936699637, value=GET
1294936604575[B@136a1a1 column=colFamily1:pageView, timestamp=1294936699637, value=/listing/pay/32.html
1294936604575[B@136a1a1 column=colFamily1:size, timestamp=1294936699637, value=2326
1294936604575[B@136a1a1 column=colFamily1:status, timestamp=1294936699637, value=200
1294936604575[B@136a1a1 column=colFamily1:userAgent, timestamp=1294936699637, value= Mozilla/4.08 [en] (Win98; I ;Nav)
1294936604575[B@136a1a1 column=colFamily1:referrer, timestamp=1294936699637, value= http://www.example.com/start.html
1 row(s) in 0.0830 seconds
hbase(main):004:0>
