Friday, 31 July 2015

Word Count program in Trident

Word Count program in Trident



  • topology.java

package com.trident.wc;

import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Debug;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

public class topology {

public static StormTopology buildTopology() {

TridentTopology topology = new TridentTopology();

Wcspout wcspout = new Wcspout();

//Stream inputStream = topology.newStream("word", wcspout);

topology.newStream("word",wcspout)
.each(new Fields("sentance"), new Wcsplit(), new Fields("word"))
.groupBy(new Fields("word"))
.aggregate(new Fields("word"), new Count(),new Fields("count"))
.each(new Fields("word","count"), new Debug());

return topology.build();
}

public static void main(String[] args) throws Exception {
       Config conf = new Config();
       LocalCluster cluster = new LocalCluster();

//TopologyBuilder builder = new TopologyBuilder();

       System.out.println("Now submitting the Local Topology");
       cluster.submitTopology("wctrident", conf, buildTopology());
Thread.sleep(200000);
       cluster.shutdown();

//StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
   }
}

  • Wcspout.java
package com.trident.wc;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class Wcspout implements IRichSpout{

private SpoutOutputCollector collector;
private String[] sentences = {
       "I love big data",
       "I also love real time analytics",
       "i love hadoop",
       "i love kafka",
       "i love spark"
   };
private int index = 0;

@Override
public void ack(Object arg0) {
}

@Override
public void activate() {
}

@Override
public void close() {
}

@Override
public void deactivate() {
}

@Override
public void fail(Object arg0) {
}

@Override
public void nextTuple() {
String str = sentences[index];
index++;
if(index>=sentences.length)
{
index=0;
}
collector.emit(new Values(str));
}

@Override
public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
this.collector= collector;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentance"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}

}

  • Wcsplit.java
package com.trident.wc;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;

public class Wcsplit extends BaseFunction{

@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentance = tuple.getStringByField("sentance");
String[] words = sentance.split(" ");
for(int i=0;i<words.length;i++){
System.out.println(words[i]);
collector.emit(new Values(words[i]));
}
}

}

Filtering the strings by file lookup in Storm



Take words.txt as input file and storm_destination.txt as lookup file. Construct a storm topology such that when we run the topology, the data that is present in words.txt but not in storm_destination.txt goes to a new file storm_log.txt as a new record with the time details when they came in that file. 
  • Topology.java


import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;


public class Topology {

    private static final String FILE_READING_SPOUT = "file-reading-spout";
    private static final String LOOKUP_FILTER_BOLT = "loopup-filter-bolt";
    private static final String TIME_ADD_BOLT = "time-add-bolt";
    private static final String OUTPUT_WRITER_BOLT = "output-writer-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) throws Exception {

    FileReadingSpout spout = new FileReadingSpout();
    lookupfilterBolt splitBolt = new lookupfilterBolt();
    //timeadditionbolt countBolt = new timeadditionbolt();
        outputwritebolt reportBolt = new outputwritebolt();


        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(FILE_READING_SPOUT, spout);
        builder.setBolt(LOOKUP_FILTER_BOLT, splitBolt)
                .shuffleGrouping(FILE_READING_SPOUT);
 
        builder.setBolt(OUTPUT_WRITER_BOLT, reportBolt)
                .shuffleGrouping(LOOKUP_FILTER_BOLT);

        Config config = new Config();

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());

    }
}


  • FileReadingSpout.java


import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class FileReadingSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
private BufferedReader br;
private String line;
@Override
public void nextTuple() {

try {
while((line = br.readLine())!= null){
collector.emit(new Values(line));
}
} catch (IOException e) {
System.out.println("exception in reading the file.....kill the topology");
e.printStackTrace();
//System.exit(0);
e.printStackTrace();
}

}

@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
this.collector= collector;
try {
br = new BufferedReader(new FileReader("D://words.txt"));
} catch (FileNotFoundException e) {
System.out.println("file is not available.....kill the topology");
e.printStackTrace();
//System.exit(0);
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));

}

}


  • lookupfilterBolt.java


import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Vector;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class lookupfilterBolt extends BaseRichBolt{
private OutputCollector collector;
private Vector<String> list;

@Override
public void execute(Tuple tuple) {
String input = tuple.getStringByField("word");
if(!list.contains(input)){
collector.emit(new Values(input));
}else{
System.out.println("skipped string is -->"+ input);
}
}

@Override
public void prepare(Map map, TopologyContext context, OutputCollector collector) {
String line;
this.collector= collector;
list= new Vector<String>();
try {
BufferedReader br  = new BufferedReader(new FileReader("D://storm_destination.txt"));
try {
while((line= br.readLine())!= null){
list.add(line);
}
} catch (IOException e) {
System.out.println("storm_destination --> reading exception..");
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}


  • outputwritebolt.java


import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Date;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class outputwritebolt extends BaseRichBolt{
private OutputCollector collector;
private BufferedWriter wr;
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
String tab= "\t";
String line= "\n";
Date date= new Date();
String date_str= date.toString();

OutputStream o;
        try {
            o = new FileOutputStream("D://storm_log.txt", true);
            o.write(word.getBytes());
            o.write(tab.getBytes());
            o.write(date_str.getBytes());
            o.write(line.getBytes());
            o.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
//try {
//wr.write(word);
//wr.write(date);
// } catch (IOException e) {
// System.out.println("exception while wrinting into putput file....");
//e.printStackTrace();
// }


}

@Override
public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
this.collector = collector;
try {
wr = new BufferedWriter(new FileWriter("D://storm_log.txt"));
} catch (IOException e) {
System.out.println("not able to open the file for write...");
e.printStackTrace();
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

}

Local File reading from Storm Spout


in additon to my earlier if we want to do testing the topology using reading the file from local file system then we can use below bolt code. this is using java basic file api.


package com.atul.wc;



import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;


public class SentenceSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;
 
    private int index = 0;
    private BufferedReader br;
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    public void open(Map config, TopologyContext context,
            SpoutOutputCollector collector) {
    try {
    br = new BufferedReader(new FileReader("D://test.txt"));

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
               // Charset.defaultCharset().name());
        this.collector = collector;
    }

    public void nextTuple() {
    String line;
try {
while ((line = br.readLine()) != null){
this.collector.emit(new Values(line));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
    }
}

Apache Word Count Code

Apache Word Count Code-->

This is basic word count Storm code. copy paste this code to eclipse in different java files and run the main method including jars for storm which can be downloaded from Apache Storm website.


Saturday, 25 April 2015

Hive Tables

Hive Tables 

Hive is Apache Project.
There are 2 types of the tables.

1. Internal or Managed table
2  External table

Internal/ Managed Table:-

Internal tables are basically used for testing purpose or local code development.

Syntax:-

Creation of the table:-

Create table <table name>
(  <field name>   <data type>,
 <field name>   <data type>,
 <field name>   <data type>,
 <field name>   <data type>
)
row format delimited
fields terminated by '<separator char in file>';

Loading data:-

Load data inpath '<path of the file>' into table <tablename>;


Key points:
1. when user run create table command that time hive internally creates a directory with the name of the table  in the Data base directory at '/user/hive/warehouse/<db name>'

2. as the name suggested, the managed tables are managed by hive.

3. when user run the load data command at that time file is copied to '/user/hive/warehouse/<db name>/<file name>'

4 so it is like cut paste. cut the file from original location and paste the file into warehouse path.

5.on dropping the table , hive will delete the directory from warehouse location and data will be lost.


External table:-
Basic property of external is if we drop the external table the original file as well as copy of the file in warehouse location not deleted.

There are 2 types of external table
1 . with location
2. without location

with location --> table is copied into warehouse location --> [copy paste]
without location --> table is not copied to warehouse location