Monday, December 13, 2010

Insert apache log lines into HBase via Flume


Before going through the following integration process, you should be aware of HBase and Flume installation as well as some basic concepts of HBase and Flume. In this POC, I have used flume0.9.1 and hbase0.20.6 version. My POC will describe Flume HBase Integration and how to extract different attribute from apache log file (Example IP, User Agent, Referrer etc).

Fig:-Collecting log data into HBase via Flume.


As above diagram show, for storing apache log  lines into HBase, we need to create a new jar, whose procedure is described below. These jar contain two new class HBaseEventSink.java and regexAllExtractor.java.

1.  Download source code of Flume from the following link https://github.com/cloudera/flume/downloads  or you can find it inside the flume folder.

2. Make a new Java project in eclipse.

3. In Flume source code, you  find one java folder. This java folder contain com and org folder. Add this org and com folder into your java Project.

4. In Flume source code, you  find one ahocorasick folder. This folder contains org folder. Add this org folder into your java project. Now, delete TestAhoCorasick.java  , TestAll.java ,TestQueue .java and TestState.java classes from org.arabidopsis.ahocorasick package.

5. In Flume source code, go to gen-java/com/cloudera/flume/conf folder. This conf folder has one thrift folder. Add this thrift folder into com.cloudera.flume.conf package of your Java project.

6. You can find all required jar at lib folder of flume.

7. Now, create a new com.cloudera.flume.handlers.hbase Package. Inside com.cloudera.flume.handlers.hbase Package, Create a new java class HBaseEventSink. 



package com.cloudera.flume.handlers.hbase;
import java.io.IOException;
import java.util.Map.Entry;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.google.common.base.Preconditions;
public class HBaseEventSink extends EventSink.Base
{
String tableName;
String familyName;
HBaseConfiguration config;
HTable table;
public HBaseEventSink(String tableName, String familyName)
{
      this(tableName, familyName, new HBaseConfiguration());
}
public HBaseEventSink(String tableName, String familyName, HBaseConfiguration config)
{
      Preconditions.checkNotNull(tableName);
      Preconditions.checkNotNull(familyName);
      this.tableName = tableName;
      this.familyName = familyName;
      this.config = config;
}
@Override
public void append(Event e) throws IOException
{
String row = String.valueOf(e.getTimestamp());
byte[] rowKey=Bytes.toBytes(row);
Put p = new Put(rowKey);  
for (Entry<String, byte[]> a : e.getAttrs().entrySet())
{          
if(a.getKey().equals("AckType")||a.getKey().equals("AckTag")||a.getKey().equals("AckChecksum"))
{}
else
{
p.add(Bytes.toBytes(familyName), Bytes.toBytes(a.getKey()), a.getValue());  
}
if(!p.isEmpty())
{         
table.put(p);
}
}
@Override
public void close() throws IOException
{
table.close();
}
      @Override
public void open() throws IOException
{
      // This instantiates an HTable object that connects you to
      // the tableName table.
      table = new HTable(config, tableName);
}
public static SinkBuilder builder()
{
      return new SinkBuilder()
{
            @Override
            public EventSink build(Context context, String... argv)
{
                  Preconditions.checkArgument(argv.length == 2,
                  "usage: hbase(table, family)");
                  return new HBaseEventSink(argv[0], argv[1]);
            }
      };
}
}
8. Inside com.cloudera.flume.core.extractors Package, Create a new java class RegexAllExtractor.java. This class is using to extract different attributes from apache log files. Each attribute becomes individual column in hbase.

package com.cloudera.flume.core.extractors;
import java.lang.reflect.*;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Attributes;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.google.common.base.Preconditions;
import com.maxmind.geoip.LookupService;
public class RegexAllExtractor extends EventSinkDecorator<EventSink>
final Pattern pat;
final String names;
public RegexAllExtractor(EventSink snk, Pattern pat, String names) {
super(snk);
this.pat = pat;
this.names = names;
public RegexAllExtractor(EventSink snk, String regex, String names)
{
      this(snk, Pattern.compile(regex), names);
}
@Override
public void append(Event e) throws IOException
{
String logLine = new String(e.getBody());
      Matcher m = pat.matcher(logLine);
      String[] n = this.names.split(",");
      int count=0;
      int pageViewCounter=0;
      String val = ""; // default
      Integer grpCnt = m.groupCount();
      int grp=1;
      if(m.find())
{
val=m.group(grp);
            //ip,date,status,query,size,http_protocol,method
            for(grp = 1; grp <= grpCnt; grp++)
{
            try
{
                  val = "";
                  try
{
                        val = m.group(grp);
                  } catch (IndexOutOfBoundsException ioobe)
{
                  val = "";
                  }
                  if(n[grp-1] != "")
                  {     //Identify PageView and PageHit
                        if(!n[grp-1].equals("query"))
                        {
                              Attributes.setString(e, n[grp-1], val);
                        }
                        else
                        {                            
                              try
                              {
                              String[] htmlPageElement=   {"js","gif","jpg","png","css"};
                              String pageHit[]=val.split("\\.");
                              if(pageHit.length>1)
                              {
                              for(int i=0;i<htmlPageElement.length;i++)
                              {
                              if(pageHit[pageHit.length-   1].equals(htmlPageElement[i]))
                                    {    
                                          Attributes.setString(e, "pageElement", val);
                                          count++;
                                          break;
                                    }
                              }
                              }
                              if(count==0)
                              {
                                    Attributes.setString(e, "pageView", val);                              
                                    pageViewCounter++;
                                   
                              }    
                              count=0;
                              }catch(Exception exception)
                              {
                                    System.out.println(exception);
                              }
                        }                      
                  }
            } catch (IndexOutOfBoundsException ioobe) {
                  break;
            }
           
          }    
            String[] temp=null;
            String delimiter = "\"";
            temp = logLine.split(delimiter);
            if(temp.length>=5)
            {
                  Attributes.setString(e, "userAgent", temp[5]);
                  Attributes.setString(e,"referrer", temp[3]);         
                 
            }
            }
    super.append(e);
  }
 
public static SinkDecoBuilder builder()
{
return new SinkDecoBuilder()
{
            @Override
            public EventSinkDecorator<EventSink> build(Context context,String... argv)
{
            Preconditions.checkArgument(argv.length == 2,"usage: regexAll(regex, names)");
            String regex = argv[0];
            Pattern pat = Pattern.compile(regex);
            String names = argv[1];
            EventSinkDecorator<EventSink> snk = new RegexAllExtractor(null, pat, names);
            return snk;
      }
    };
  }
} }
9. Add hbase-0.20.6.jar in Java project.

10. Go to com.cloudera.flume.conf package and perform following changes into SinkFactoryImpl.java class.
a)      Add  element { "Hbase",HBaseEventSink.builder()} inside the sinkList 2D array.
b)      Add element { "regexAll", RegexAllExtractor.builder() } inside the decoList 2D array.
c)       Import the following classes.
import com.cloudera.flume.handlers.hbase.HBaseEventSink;
import com.cloudera.flume.core.extractors.RegexAllExtractor;

11. Now, Build this project and create jar.

12. Put this jar file at Flume_Home_Dir folder.

Start Hbase:-
impetus@ubuntu:~/hbase$ bin/start-hbase.sh
impetus@ubuntu:~/hbase$ bin/hbase shell

Now, we need to create table “WebAnalytics”.
hbase(main):001:0>create 'WebAnalytics','colFamily1'
0 row(s) in 1.2020 seconds
hbase(main):002:0> list //display all existing table.
WebAnalytics                                                                                                 
1 row(s) in 0.0830 seconds
hbase(main):003:0>

Create a new txt file and add following line:
127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "GET /listing/pay/32.html  HTTP/1.0" 200 2326 "http://www.example.com/start.html" "Mozilla/4.08 [en] (Win98; I ;Nav)"

Start flume master:-
impetus@ubuntu:~$ flume master

Start flume agent and flume collector:-
impetus@ubuntu:~$ flume node
impetus@ubuntu:~$ flume node -n collector

After starting nodes and master, we need to configure flume nodes. You can configure flume nodes using web interface of flume master http://localhost:35871

Configuration of flume node:-
source:-               tail(path/of/txtFile)
sink:-                   agentSink("localhost",35853)

Configuration of flume collector:-
source:-
collectorSource(35853) 
sink:-    
{ regexAll( "^([0-9.]+)\\s([0-9A-Za-z\\-]+)\\s([0-9A-Za-z\\-]+)\\s\\[([0-9a-zA-z\\/: -]+)\\]\\s\"([A-Z]+)\\s([0-9a-zA-z\\/?.=&_:-]+)\\s([A-Z0-9\\/.]+)\"\\s([0-9]{3})\\s([0-9]+)","IPAddr,hyphen,userid,ServerTime,method,query,http_protocol,status,size" ) => Hbase( "WebAnalytics", "colfamily1" ) }

Now, wait for some time and type command “scan 'WebAnalytics'” at hbase shell.

hbase(main):003:0>scan 'WebAnalytics'
1294936604575[B@136a1a1     column=colFamily1:ServerTime, timestamp=1294936699637, value=10/Oct/2000:13:55:36 -0700
1294936604575[B@136a1a1     column=colFamily1:http_protocol, timestamp=1294936699637, value=HTTP/1.0        
1294936604575[B@136a1a1     column=colFamily1:hyphen, timestamp=1294936699637, value=-                       
1294936604575[B@136a1a1     column=colFamily1:userid, timestamp=1294936699637, value=-                        
1294936604575[B@136a1a1     column=colFamily1:IPAddr, timestamp=1294936699637, value=127.0.0.1                 
1294936604575[B@136a1a1     column=colFamily1:method, timestamp=1294936699637, value=GET                     
1294936604575[B@136a1a1     column=colFamily1:pageView, timestamp=1294936699637, value=/listing/pay/32.html 
1294936604575[B@136a1a1     column=colFamily1:size, timestamp=1294936699637, value=2326                       
1294936604575[B@136a1a1     column=colFamily1:status, timestamp=1294936699637, value=200                     
1294936604575[B@136a1a1     column=colFamily1:userAgent, timestamp=1294936699637, value= Mozilla/4.08 [en] (Win98; I ;Nav)
1294936604575[B@136a1a1     column=colFamily1:referrer, timestamp=1294936699637, value= http://www.example.com/start.html
1 row(s) in 0.0830 seconds
hbase(main):004:0>


3 comments:

  1. Hi, I tried the steps. Everything seems fine, but when i execute the scan command on HBase, i get blank table.
    Here is the output:

    hbase(main):002:0> scan 'WebAnalytics'
    ROW COLUMN+CELL
    0 row(s) in 0.1250 seconds

    Can you tell me if I am missing on something?
    P.S. I am using hbase-0.92.1 with flume-0.9.3.

    ReplyDelete
  2. Same Error - - - I get blank table.

    ReplyDelete