Take words.txt as input file and storm_destination.txt as lookup file. Construct a storm
topology such that when we run the topology, the data that is present in words.txt but not in storm_destination.txt
goes to a new file storm_log.txt as a
new record with the time details when they came in that file.
- Topology.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
public class Topology {
private static final String FILE_READING_SPOUT = "file-reading-spout";
private static final String LOOKUP_FILTER_BOLT = "loopup-filter-bolt";
private static final String TIME_ADD_BOLT = "time-add-bolt";
private static final String OUTPUT_WRITER_BOLT = "output-writer-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) throws Exception {
FileReadingSpout spout = new FileReadingSpout();
lookupfilterBolt splitBolt = new lookupfilterBolt();
//timeadditionbolt countBolt = new timeadditionbolt();
outputwritebolt reportBolt = new outputwritebolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(FILE_READING_SPOUT, spout);
builder.setBolt(LOOKUP_FILTER_BOLT, splitBolt)
.shuffleGrouping(FILE_READING_SPOUT);
builder.setBolt(OUTPUT_WRITER_BOLT, reportBolt)
.shuffleGrouping(LOOKUP_FILTER_BOLT);
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
}
}
- FileReadingSpout.java
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class FileReadingSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
private BufferedReader br;
private String line;
@Override
public void nextTuple() {
try {
while((line = br.readLine())!= null){
collector.emit(new Values(line));
}
} catch (IOException e) {
System.out.println("exception in reading the file.....kill the topology");
e.printStackTrace();
//System.exit(0);
e.printStackTrace();
}
}
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
this.collector= collector;
try {
br = new BufferedReader(new FileReader("D://words.txt"));
} catch (FileNotFoundException e) {
System.out.println("file is not available.....kill the topology");
e.printStackTrace();
//System.exit(0);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
- lookupfilterBolt.java
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Vector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class lookupfilterBolt extends BaseRichBolt{
private OutputCollector collector;
private Vector<String> list;
@Override
public void execute(Tuple tuple) {
String input = tuple.getStringByField("word");
if(!list.contains(input)){
collector.emit(new Values(input));
}else{
System.out.println("skipped string is -->"+ input);
}
}
@Override
public void prepare(Map map, TopologyContext context, OutputCollector collector) {
String line;
this.collector= collector;
list= new Vector<String>();
try {
BufferedReader br = new BufferedReader(new FileReader("D://storm_destination.txt"));
try {
while((line= br.readLine())!= null){
list.add(line);
}
} catch (IOException e) {
System.out.println("storm_destination --> reading exception..");
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
- outputwritebolt.java
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Date;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
public class outputwritebolt extends BaseRichBolt{
private OutputCollector collector;
private BufferedWriter wr;
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
String tab= "\t";
String line= "\n";
Date date= new Date();
String date_str= date.toString();
OutputStream o;
try {
o = new FileOutputStream("D://storm_log.txt", true);
o.write(word.getBytes());
o.write(tab.getBytes());
o.write(date_str.getBytes());
o.write(line.getBytes());
o.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//try {
//wr.write(word);
//wr.write(date);
// } catch (IOException e) {
// System.out.println("exception while wrinting into putput file....");
//e.printStackTrace();
// }
}
@Override
public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
this.collector = collector;
try {
wr = new BufferedWriter(new FileWriter("D://storm_log.txt"));
} catch (IOException e) {
System.out.println("not able to open the file for write...");
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
No comments:
Post a Comment