Getting Started with Storm(2)

第二章
这一章,我们将会创建一个Storm项目及第一个Storm topology,需要我们事先安装好JRE1.6或者更高版本。建议从oracle的http://www.java.com/downloads/去下载
Operation Modes
在开始之前,我们需要理解Storm的Operation Modes,有两种模式,Local Mode和Remote ode
Local ode
在Local ode,Strom topology在本地机器的单一的JVM中运行。此模式主要用于开发、测试以及调试,主要是比较容易看到所有的topology组件一起工作的状态。在此模式下,我们可以通过调整参数来观察我们的topology在各种不同的Storm配置环境中的运行。为了在Local ode中运行topology,我们需要下载Storm开发依赖包,包含了开发和测试所有必备的条目。我们在创建第一个Storm项目时候会看到这一些有多简单。在Local ode运行topology和在Storm集群中运行是很相似的。但我们需要确保所有的组件都是线程安全的,因为当这些组件被部署到Remote ode的时候,他们可能会运行在不同的JVMs,甚至不同的物理机器上,这时候是没有直接的通信或者共享的内存的。此章所有的例子都运行在Local ode下。
Remote ode
在Remote ode,我们提交我们的topology到Storm集群,一般而言,Strom集群可以有许多运行在不同的机器上的进程组成。Remote ode不会显示调试信息,这也是为什么被认为是Production ode的原因。然而,在一台开发机器上创建一个Storm集群也是可能的,而且在正式部署到生产环境是这么做是一个非常好的做法,可以确保在生产环境部署的时候不会有问题。第六章会更多地说明Remote ode,在附录B会说明怎么安装一个集群。
Hello Word Storm
我们会创建一个简单的topolgy来计算单词的项目,我们把这个项目称为Storm topology的Hello World。不过这也是一个非常强大的topology因为事实上这个topology可以被无限制的扩展,稍做修改甚至我们可以做一个统计系统。比如我们可以修改一下用来挖据Twitter的主题的趋势。
为了创建这个Topology,我们会用一个Spout来负责读入单词,第一个Bolt负责标准化单词,第二个Bolt负责计算单词次数,如下图所示:
Getting <wbr>Started <wbr>with <wbr>Storm(2) 可以在https://github.com/storm-book/examples-ch02-getting_started/zipball/masteri下载源代码
如果你使用git(一个分布式的版本控制和源代码管理工具),可以在你想要下载源代码的目录中运行:git clone git@github.com:storm-book/examples-ch02-getting_started.git
检测java
配置环境的第一步是检查使用的java的版本。可以在命令行窗口中运行: java -version,可以得到类似下图的信息:Getting <wbr>Started <wbr>with <wbr>Storm(2)
Getting <wbr>Started <wbr>with <wbr>Storm(2)
Getting <wbr>Started <wbr>with <wbr>Storm(2)
创建项目
为了开始此项目,创建一个放应用的目录,将此项目的源代码放在此目录。然后我们需要下载Storm所有的依赖包:是一组jar包,我们需要将其加入此应用的classpath。有两种做法:
  • 下载所有的依赖包,解压缩然后加入classpath
  • 或者使用Apache aven。Maven是软件项目管理工具,可以用来管理项目开发周期中的几个方面,从依赖包到发布流程。在此书,我们将广泛使用Maven。请使用mvn命令来检查是否安装了Maven。可以到http://maven.apache.org/download.html去下载Maven。尽管使用Storm不需要你是一个Maven专家,不过了解Maven是非常有用的。
我们需要创建一个pom.xml(Project Object odel)文件来定义项目结构,会描述例如依赖关系,包,源代码等。我们将使用由nathanmarz创建的依赖和Maven库(https://github.com/nathanmarz/)。所有这些依赖包可以在https://github.com/nzthanmarz/storm/wiki/Maven找到。在Local ode运行Storm的话需要所有这些依赖包。
使用这些依赖包,我们可以写一个包含运行topology的最基本组件的pom.xml:
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
storm.book
Getting-Started
0.0.1-SNAPSHOT
org.apache.maven.plugins
maven-compiler-plugin
2.3.2
1.6
1.6
1.6
clojars.org
http://clojars.org/repo
storm
storm
0.6.0
最初的 几行定义了项目的名字和版本,然后我们增加了一个编译器插件,告诉Maven我们的源代码需要被java1.6去编译。记者我们定义了代码库(Maven可以支持对同一个项目的多个代码库)。clojars是所有Storm的依赖包的代码库。Maven会自动下载Storm在Local ode运行所需要的所有依赖包和子包。
(*增加对于Maven的POM的说明:
xmlns等主要是POM相关的命名空间以及xsd元素;
modelVersion指定了当前POM模型的版本;
下面是最重要的三个子元素:
groupId: 定义了项目属于哪个组;比如,如果你的公司叫mycom,有一个项目叫myapp,那么一般groupId就应该是com.mycom.myapp
artifactId:定义了当前Maven项目在组中的唯一ID,相当于子模块的定义
version:定义了当前项目的当前版本,有SNAPSHOT的,一般是不稳定的版本
最后,其实还可以加一个name元素,可以声明一个对用户更友好的项目名称)
这个应用将会有如图所示的结构,类似于普通的Maven Java项目:
our-application-folder/
├── pom.xml
└── src
└── main
└── java
| ├── spouts
| └── bolts
└── resources
在java下面的目录会包括所有我们的项目代码,我们需要处理的单词文件会放在resources目录下面
创建我们第一个Topology
为了创建第一个topology,我们需要创建所有为了运行单词计数的类。在当前阶段,有些样例可能不是这么清楚,不过不要紧,我们会在今后的几章中说明。
Spout
WordReader spout是实现了IRichSpout的类,在第四章会进一步的说明。WordReader负责读入文件,并将每一行提供给bolt。Spout通常emit定义好的字段清单。这样的结构允许我们对同一个spout流有不同类型的bolt来对应。
Example2-1包含了这个类的完整代码,我们将会分析每一段代码。
Example2-1 src/main/java/spouts/WordReader.java(本人用Eclipse新建了项目,和原书稍微有一点点区别,主要是package懒得改了)
这里插一下,在Eclipse中选择文件-》新建-》项目,然后选中Maven Project,选择quickstart,输入groupid和artifactidGetting <wbr>Started <wbr>with <wbr>Storm(2)
Getting <wbr>Started <wbr>with <wbr>Storm(2) 可以先生成一个空白的Maven项目,然后把
package storm.book.Getting_Started.spouts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader implements IRichSpout {
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
private TopologyContext context;
public boolean isDistributed() {return false;}
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
public void close() {}
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}
public void nextTuple() {
if(completed){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//Do nothing
}
return;
}
String str;
//Open the reader
BufferedReader reader = new BufferedReader(fileReader);
try{
//Read all lines
while((str = reader.readLine()) != null){
this.collector.emit(new Values(str),str);
}
}catch(Exception e){
throw new RuntimeException("Error reading tuple",e);
}finally{
completed = true;
}
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file["+conf.get("wordFile")+"]");
}
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
//下面三行是无效的,在0.9.0.1的版本中是有如下几个抽象方法必须被实现的。
}
第一,在任何一个Spout中第一个被调用的方法是public void open(Map conf, TopologyContext context, SpoutOutputCollector collector).接收的参数组包括TopologyContext,包含了我们所有的topology数据;conf对象,在topology定义中被创建;SpoutOutputCollector,允许我们emit数据,给到bolts。下面这段代码就是此方法的实装:
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.context = context; this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { throw new RuntimeException("Error reading file["+conf.get("wordFile")+"]"); } this.collector = collector; }
在这个方法中,我们也创建了一个reader,负责读入文件。
第二,我们需要实装pubilc void nextTuple(),我们将在此方法中emit数据给到bolts。在现在这个例子中,此方法主要是读入文件然后按行emit数据
public void nextTuple() { if(completed){ try { Thread.sleep(1000); } catch (InterruptedException e) { } return; } String str; BufferedReader reader = new BufferedReader(fileReader); try{ while((str = reader.readLine()) != null){ this.collector.emit(new Values(str),str); } }catch(Exception e){ throw new RuntimeException("Error reading tuple",e); }finally{ completed = true; } }
注意,Values方法构筑了一个ArrayList的实例化。
nextTulpe()被间隙性地从同一个循环中被调用,比如ack()和fail()方法。如果已经没有工作要处理了,它必须释放对于线程的控制,以确保其他方法有机会被调用。所以第一行总是去检查看看处理是否已经完成了。如果是的话,就sleep至少1毫秒以降低cpu的负载。如果所有的工作都被完成的话,就是说一个文件的每一行都已经被读入赋值并emit出去。
注意,Tuple是数据的被命名的list,可以是任何一种可以被序列化的java对象。Storm默认可以序列化包括string,byte array,arraylist,hashmap和hashset类型。
Bolts
我们现在已经有了一个spout,可以从一个文件中读取内容并把每一行emit一个tuple。接着我们需要创建2个bolt来处理这些tuple。这里,bolt会实装backtype.storm.topology.IRichBolt接口。
最重要的方法是void execute(Tuple input),只要收到一个tuple此方法就会被调用一次。然后对于每一个接收到的tuple,此bolt会emit数个tuple。
Bolt或者Spout可以根据需要emit多个tuple。当nextTuple或者execute方法被调用的时候,他们可以emit:0,1或者多个tuples。这一点,第五章还会详细阐述。
第一个bolt是WordNormalizer,负责把每一行内容正规化。它会把行切分成单词组,把所有单词小写化,并去掉首尾空格。
1.我们需要声明bolt的输出参数:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
这里我们声明这个bolt会emit一个叫做“word"的字段。
2.我们需要实现public void execute(Tuple input)方法,用来处理输入的tuples
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
//Emit the word
collector.emit(new Values(word));
}
}
// Acknowledge the tuple
collector.ack(input);
}
第一行从tuple中读取数据,可以按照位置或者名字读取。然后我们来处理读入的数据,最后用collector对象来emit数据。当每一个tuple都被处理之后,collector对象的ack()方法被调用表明处理已经被成功完成了。如果tuple没有被成功处理,需要调用collector对象的fail()对象。
Example2-2包含了完整的代码:src/main/java/bolts/WordNormalizer.java
package storm.book.Getting_Started.bolts;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer implements IRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
public void cleanup() {}
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
//Emit the word
List a = new ArrayList();
a.add(input);
collector.emit(a,new Values(word));
}
}
// Acknowledge the tuple
collector.ack(input);
}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
//public Map getComponentConfiguration(){return new java.util.HashMap();}
}
在这个类中,我们看到了在一个execute调用中emit了多个tuples。如果此方法接收到了一个句子“This is the Storm book",我们的这个execute方法会emit5个新的tuple
然后我们来看一下下一个bolt,WordCounter,负责对单词计数。当topology完成时(也就是cleanup()方法被调用的时候),我们将显示每个单词的个数
这里是个简单的bolt的例子,没有emit任何东西,只是简单把数据加入到一个map,而事实上经常会把数据存储到数据库。
package storm.book.Getting_Started.bolts;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class WordCounter implements IRichBolt {
private static final long serialVersionUID = 8620547231572082101L;
Integer id;
String name;
ap counters;
private OutputCollector collector;
public void cleanup() {
System.out.println("-- Word Counter ["+name+"-"+id+"] --");
for(Map.Entry entry : counters.entrySet()){
System.out.println(entry.getKey()+": "+entry.getValue());
}
}
public void execute(Tuple input) {
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str, c);
}
//Set the tuple as Acknowledge
collector.ack(input);
}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
//public Map getComponentConfiguration(){return new java.util.HashMap();}
}
execute方法是用一个map对象来收集和计算单词的个数。当这个topology终止的时候,cleanup()方法被调用,并打印这个counter map(这只是一个例子,通常你需要在cleanup()调用的时候去关闭活动的连接,释放资源)
Main类
在main类中,你将会创建一个topology和一个LocalCluster的对象,从而可以在本地测试和debug这个topology。通过和Config对象的联系,LocalCluster允许你尝试不同的cluster配置。比如,如果一个全局或者类变量被偶然使用,你可以通过改变配置使用不同数量的worker节点测试的时候发现错误。(在第三章会详细介绍)
所有的topology节点都需要能够在没有任何进程间共享数据的情况下运行,因为在实际的cluster中,这些进程会运行在不同的机器上。
你将会使用TopologyBuilder来创建一个topology,然后告诉Storm如何来安置这个节点,怎么样来交换数据。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-
reader");
builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-
normalizer");
Spout和blot使用shuffleGrouping联系在一起。这种类型的组合告诉Strom自由地从源节点发送信息到目标节点。
然后,创建一个包含了topology配置的Config对象,这个对象会在运行的时候和cluster的配置整合在一起,然后通过prepare方法发送给所有的节点。
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(true);
在开发环境中,设置wordsFile文件为会被spout读入的文件名,设置debug为true。当debug是true的时候,Storm会打印所有的节点之间交换数据的信息,以及其他对理解topology怎么运行的有用的信息。
正如前面解释的,你需要使用LocalCluster来运行这个cluster。在生产环境中,topology将会持续运行。不过在这个例子中,你将近运行这个topology几秒钟以便于看到结果。
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();
使用createTopology和submitTopology来创建并运行topology,sleep2秒(topology在不同的线程中运行),然后通过停止cluster来终止topology。
Example2-3 src/main/java/TopologyMain.java
import spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolts.WordCounter;
import bolts.WordNormalizer;
public class TopologyMain {
public static void main(String[] args) throws InterruptedException {
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGrouping("word-normalizer", new Fields("word"));
//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf,
builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();
}
}
让我们来执行~
我们可以来运行第一个topology了!我们在src/main/resources/words.txt中在每一行创建一个单词,比如:
Storm
test
are
great
is
an
Storm
simple
application
but
very
powerful
really
Storm
is
great
然后到pom.xml所在的目录运行Maven的命令:
mvn exec:java -Dexec.mainClass="storm.book.Getting_Started.TopologyMain" -Dexec.args="src/main/resources/
words.txt"
然后你可以在log中找到如下的内容:
1902 [Thread-24] INFO backtype.storm.util - Async loop interrupted!
-- Word Counter [word-counter-3] --
really: 1
but: 1
great: 2
an: 1
storm: 3
1907 [storm.book.Getting_Started.TopologyMain.main()] INFO backtype.storm.daemon.task - Shut down task Getting-Started-Toplogie-1-1386945629:3
1907 [storm.book.Getting_Started.TopologyMain.main()] INFO backtype.storm.daemon.task - Shutting down task Getting-Started-Toplogie-1-1386945629:2
1907 [Thread-26] INFO backtype.storm.util - Async loop interrupted!
-- Word Counter [word-counter-2] --
application: 1
is: 2
are: 1
test: 1
simple: 1
powerful: 1
very: 1
注意我们代码中的
builder.setBolt("word-counter", new WordCounter(),2)
.shuffleGrouping("word-normalizer");
这是创建了2个WordCounter节点。这就是我们并行化WordCounter的秘诀。
不可思议吧,这么简单就可以创造出并行机制(在实际中,每个实例都会跑在独立的机器上)不过再仔细看一下,你就会发觉,is和great在每个WordCounter的实例中都被计算了一次。为什么呢?当你使用shuffleGrouping的时候,你是告诉Storm使用随机的方式发送每一个消息。在这个例子中,最理想的是总是发送同一个单词到同一个WordCounter。为了实现这个,你可以把shuffleGrouping改成fieldGrouping("word-normalizer",new Fields("word")).然后让我们再尝试一下吧。在后面几张你会看到更多的分组和信息流。
结论:
我们讨论了Strom的Local和Remote操作模式的区别,Storm的威力以及开发的易用性。你也了解了一些Strom的基础概念,在随后的章节中会深入说明。