Friday, 31 July 2015

Word Count program in Trident

Word Count program in Trident



  • topology.java

package com.trident.wc;

import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Debug;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

public class topology {

public static StormTopology buildTopology() {

TridentTopology topology = new TridentTopology();

Wcspout wcspout = new Wcspout();

//Stream inputStream = topology.newStream("word", wcspout);

topology.newStream("word",wcspout)
.each(new Fields("sentance"), new Wcsplit(), new Fields("word"))
.groupBy(new Fields("word"))
.aggregate(new Fields("word"), new Count(),new Fields("count"))
.each(new Fields("word","count"), new Debug());

return topology.build();
}

public static void main(String[] args) throws Exception {
       Config conf = new Config();
       LocalCluster cluster = new LocalCluster();

//TopologyBuilder builder = new TopologyBuilder();

       System.out.println("Now submitting the Local Topology");
       cluster.submitTopology("wctrident", conf, buildTopology());
Thread.sleep(200000);
       cluster.shutdown();

//StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
   }
}

  • Wcspout.java
package com.trident.wc;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class Wcspout implements IRichSpout{

private SpoutOutputCollector collector;
private String[] sentences = {
       "I love big data",
       "I also love real time analytics",
       "i love hadoop",
       "i love kafka",
       "i love spark"
   };
private int index = 0;

@Override
public void ack(Object arg0) {
}

@Override
public void activate() {
}

@Override
public void close() {
}

@Override
public void deactivate() {
}

@Override
public void fail(Object arg0) {
}

@Override
public void nextTuple() {
String str = sentences[index];
index++;
if(index>=sentences.length)
{
index=0;
}
collector.emit(new Values(str));
}

@Override
public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
this.collector= collector;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentance"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}

}

  • Wcsplit.java
package com.trident.wc;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;

public class Wcsplit extends BaseFunction{

@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentance = tuple.getStringByField("sentance");
String[] words = sentance.split(" ");
for(int i=0;i<words.length;i++){
System.out.println(words[i]);
collector.emit(new Values(words[i]));
}
}

}

No comments:

Post a Comment