Word Count program in Trident
- topology.java
package com.trident.wc;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Debug;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
public class topology {
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
Wcspout wcspout = new Wcspout();
//Stream inputStream = topology.newStream("word", wcspout);
topology.newStream("word",wcspout)
.each(new Fields("sentance"), new Wcsplit(), new Fields("word"))
.groupBy(new Fields("word"))
.aggregate(new Fields("word"), new Count(),new Fields("count"))
.each(new Fields("word","count"), new Debug());
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
//TopologyBuilder builder = new TopologyBuilder();
System.out.println("Now submitting the Local Topology");
cluster.submitTopology("wctrident", conf, buildTopology());
Thread.sleep(200000);
cluster.shutdown();
//StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
}
- Wcspout.java
package com.trident.wc;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class Wcspout implements IRichSpout{
private SpoutOutputCollector collector;
private String[] sentences = {
"I love big data",
"I also love real time analytics",
"i love hadoop",
"i love kafka",
"i love spark"
};
private int index = 0;
@Override
public void ack(Object arg0) {
}
@Override
public void activate() {
}
@Override
public void close() {
}
@Override
public void deactivate() {
}
@Override
public void fail(Object arg0) {
}
@Override
public void nextTuple() {
String str = sentences[index];
index++;
if(index>=sentences.length)
{
index=0;
}
collector.emit(new Values(str));
}
@Override
public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
this.collector= collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentance"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
- Wcsplit.java
package com.trident.wc;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;
public class Wcsplit extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentance = tuple.getStringByField("sentance");
String[] words = sentance.split(" ");
for(int i=0;i<words.length;i++){
System.out.println(words[i]);
collector.emit(new Values(words[i]));
}
}
}