Getting Started with Storm(2)
第二章
这一章,我们将会创建一个Storm项目及第一个Storm
topology,需要我们事先安装好JRE1.6或者更高版本。建议从oracle的http://www.java.com/downloads/去下载
Operation Modes
在开始之前,我们需要理解Storm的Operation Modes,有两种模式,Local Mode和Remote
ode
Local
ode
在Local
ode,Strom
topology在本地机器的单一的JVM中运行。此模式主要用于开发、测试以及调试,主要是比较容易看到所有的topology组件一起工作的状态。在此模式下,我们可以通过调整参数来观察我们的topology在各种不同的Storm配置环境中的运行。为了在Local
ode中运行topology,我们需要下载Storm开发依赖包,包含了开发和测试所有必备的条目。我们在创建第一个Storm项目时候会看到这一些有多简单。在Local
ode运行topology和在Storm集群中运行是很相似的。但我们需要确保所有的组件都是线程安全的,因为当这些组件被部署到Remote
ode的时候,他们可能会运行在不同的JVMs,甚至不同的物理机器上,这时候是没有直接的通信或者共享的内存的。此章所有的例子都运行在Local
ode下。
Remote
ode
在Remote
ode,我们提交我们的topology到Storm集群,一般而言,Strom集群可以有许多运行在不同的机器上的进程组成。Remote
ode不会显示调试信息,这也是为什么被认为是Production
ode的原因。然而,在一台开发机器上创建一个Storm集群也是可能的,而且在正式部署到生产环境是这么做是一个非常好的做法,可以确保在生产环境部署的时候不会有问题。第六章会更多地说明Remote
ode,在附录B会说明怎么安装一个集群。
Hello
Word Storm
我们会创建一个简单的topolgy来计算单词的项目,我们把这个项目称为Storm topology的Hello
World。不过这也是一个非常强大的topology因为事实上这个topology可以被无限制的扩展,稍做修改甚至我们可以做一个统计系统。比如我们可以修改一下用来挖据Twitter的主题的趋势。
为了创建这个Topology,我们会用一个Spout来负责读入单词,第一个Bolt负责标准化单词,第二个Bolt负责计算单词次数,如下图所示:
可以在https://github.com/storm-book/examples-ch02-getting_started/zipball/masteri下载源代码
可以在https://github.com/storm-book/examples-ch02-getting_started/zipball/masteri下载源代码
如果你使用git(一个分布式的版本控制和源代码管理工具),可以在你想要下载源代码的目录中运行:git clone
git@github.com:storm-book/examples-ch02-getting_started.git
检测java
配置环境的第一步是检查使用的java的版本。可以在命令行窗口中运行: java
-version,可以得到类似下图的信息:
创建项目
为了开始此项目,创建一个放应用的目录,将此项目的源代码放在此目录。然后我们需要下载Storm所有的依赖包:是一组jar包,我们需要将其加入此应用的classpath。有两种做法:
- 下载所有的依赖包,解压缩然后加入classpath
- 或者使用Apache aven。Maven是软件项目管理工具,可以用来管理项目开发周期中的几个方面,从依赖包到发布流程。在此书,我们将广泛使用Maven。请使用mvn命令来检查是否安装了Maven。可以到http://maven.apache.org/download.html去下载Maven。尽管使用Storm不需要你是一个Maven专家,不过了解Maven是非常有用的。
使用这些依赖包,我们可以写一个包含运行topology的最基本组件的pom.xml:
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
storm.book
Getting-Started
0.0.1-SNAPSHOT
org.apache.maven.plugins
maven-compiler-plugin
2.3.2
1.6
1.6
1.6
clojars.org
http://clojars.org/repo
storm
storm
0.6.0
最初的
几行定义了项目的名字和版本,然后我们增加了一个编译器插件,告诉Maven我们的源代码需要被java1.6去编译。记者我们定义了代码库(Maven可以支持对同一个项目的多个代码库)。clojars是所有Storm的依赖包的代码库。Maven会自动下载Storm在Local
ode运行所需要的所有依赖包和子包。
(*增加对于Maven的POM的说明:
xmlns等主要是POM相关的命名空间以及xsd元素;
modelVersion指定了当前POM模型的版本;
下面是最重要的三个子元素:
groupId:
定义了项目属于哪个组;比如,如果你的公司叫mycom,有一个项目叫myapp,那么一般groupId就应该是com.mycom.myapp
artifactId:定义了当前Maven项目在组中的唯一ID,相当于子模块的定义
version:定义了当前项目的当前版本,有SNAPSHOT的,一般是不稳定的版本
最后,其实还可以加一个name元素,可以声明一个对用户更友好的项目名称)
这个应用将会有如图所示的结构,类似于普通的Maven Java项目:
our-application-folder/
├── pom.xml
└── src
└── main
└── java
| ├── spouts
| └── bolts
└── resources
在java下面的目录会包括所有我们的项目代码,我们需要处理的单词文件会放在resources目录下面
创建我们第一个Topology
为了创建第一个topology,我们需要创建所有为了运行单词计数的类。在当前阶段,有些样例可能不是这么清楚,不过不要紧,我们会在今后的几章中说明。
Spout
WordReader
spout是实现了IRichSpout的类,在第四章会进一步的说明。WordReader负责读入文件,并将每一行提供给bolt。Spout通常emit定义好的字段清单。这样的结构允许我们对同一个spout流有不同类型的bolt来对应。
Example2-1包含了这个类的完整代码,我们将会分析每一段代码。
Example2-1
src/main/java/spouts/WordReader.java(本人用Eclipse新建了项目,和原书稍微有一点点区别,主要是package懒得改了)
这里插一下,在Eclipse中选择文件-》新建-》项目,然后选中Maven
Project,选择quickstart,输入groupid和artifactid
可以先生成一个空白的Maven项目,然后把
可以先生成一个空白的Maven项目,然后把
package
storm.book.Getting_Started.spouts;
import
java.io.BufferedReader;
import
java.io.FileNotFoundException;
import
java.io.FileReader;
import
java.util.Map;
import
backtype.storm.spout.SpoutOutputCollector;
import
backtype.storm.task.TopologyContext;
import
backtype.storm.topology.IRichSpout;
import
backtype.storm.topology.OutputFieldsDeclarer;
import
backtype.storm.tuple.Fields;
import
backtype.storm.tuple.Values;
public class
WordReader implements IRichSpout {
private static final long serialVersionUID =
1L;
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
private TopologyContext context;
public boolean isDistributed() {return
false;}
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
public void close() {}
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}
public void nextTuple() {
if(completed){
try {
Thread.sleep(1000);
} catch (InterruptedException
e) {
//Do nothing
}
return;
}
String str;
//Open the reader
BufferedReader reader = new
BufferedReader(fileReader);
try{
//Read all lines
while((str =
reader.readLine()) != null){
this.collector.emit(new
Values(str),str);
}
}catch(Exception e){
throw new
RuntimeException("Error reading tuple",e);
}finally{
completed = true;
}
}
public void open(Map conf, TopologyContext
context,
SpoutOutputCollector
collector) {
try {
this.context = context;
this.fileReader = new
FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new
RuntimeException("Error reading
file["+conf.get("wordFile")+"]");
}
this.collector = collector;
}
public void
declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
//下面三行是无效的,在0.9.0.1的版本中是有如下几个抽象方法必须被实现的。
}
第一,在任何一个Spout中第一个被调用的方法是public void open(Map conf, TopologyContext
context, SpoutOutputCollector
collector).接收的参数组包括TopologyContext,包含了我们所有的topology数据;conf对象,在topology定义中被创建;SpoutOutputCollector,允许我们emit数据,给到bolts。下面这段代码就是此方法的实装:
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.context =
context;
this.fileReader = new
FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new
RuntimeException("Error reading
file["+conf.get("wordFile")+"]");
}
this.collector = collector;
}
在这个方法中,我们也创建了一个reader,负责读入文件。
第二,我们需要实装pubilc void
nextTuple(),我们将在此方法中emit数据给到bolts。在现在这个例子中,此方法主要是读入文件然后按行emit数据
public void nextTuple() {
if(completed){
try {
Thread.sleep(1000);
} catch
(InterruptedException e) {
}
return;
}
String str;
BufferedReader reader = new
BufferedReader(fileReader);
try{
while((str =
reader.readLine()) != null){
this.collector.emit(new Values(str),str);
}
}catch(Exception e){
throw new
RuntimeException("Error reading tuple",e);
}finally{
completed = true;
}
}
注意,Values方法构筑了一个ArrayList的实例化。
nextTulpe()被间隙性地从同一个循环中被调用,比如ack()和fail()方法。如果已经没有工作要处理了,它必须释放对于线程的控制,以确保其他方法有机会被调用。所以第一行总是去检查看看处理是否已经完成了。如果是的话,就sleep至少1毫秒以降低cpu的负载。如果所有的工作都被完成的话,就是说一个文件的每一行都已经被读入赋值并emit出去。
注意,Tuple是数据的被命名的list,可以是任何一种可以被序列化的java对象。Storm默认可以序列化包括string,byte
array,arraylist,hashmap和hashset类型。
Bolts
我们现在已经有了一个spout,可以从一个文件中读取内容并把每一行emit一个tuple。接着我们需要创建2个bolt来处理这些tuple。这里,bolt会实装backtype.storm.topology.IRichBolt接口。
最重要的方法是void
execute(Tuple
input),只要收到一个tuple此方法就会被调用一次。然后对于每一个接收到的tuple,此bolt会emit数个tuple。
Bolt或者Spout可以根据需要emit多个tuple。当nextTuple或者execute方法被调用的时候,他们可以emit:0,1或者多个tuples。这一点,第五章还会详细阐述。
第一个bolt是WordNormalizer,负责把每一行内容正规化。它会把行切分成单词组,把所有单词小写化,并去掉首尾空格。
1.我们需要声明bolt的输出参数:
public void
declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
这里我们声明这个bolt会emit一个叫做“word"的字段。
2.我们需要实现public
void execute(Tuple input)方法,用来处理输入的tuples
public void
execute(Tuple input) {
String
sentence = input.getString(0);
String[] words
= sentence.split(" ");
for(String
word : words){
word =
word.trim();
if(!word.isEmpty()){
word =
word.toLowerCase();
//Emit the
word
collector.emit(new Values(word));
}
}
// Acknowledge
the tuple
collector.ack(input);
}
第一行从tuple中读取数据,可以按照位置或者名字读取。然后我们来处理读入的数据,最后用collector对象来emit数据。当每一个tuple都被处理之后,collector对象的ack()方法被调用表明处理已经被成功完成了。如果tuple没有被成功处理,需要调用collector对象的fail()对象。
Example2-2包含了完整的代码:src/main/java/bolts/WordNormalizer.java
package
storm.book.Getting_Started.bolts;
import
java.util.ArrayList;
import
java.util.List;
import
java.util.Map;
import
backtype.storm.task.OutputCollector;
import
backtype.storm.task.TopologyContext;
import
backtype.storm.topology.IRichBolt;
import
backtype.storm.topology.OutputFieldsDeclarer;
import
backtype.storm.tuple.Fields;
import
backtype.storm.tuple.Tuple;
import
backtype.storm.tuple.Values;
public class
WordNormalizer implements IRichBolt {
private static final long serialVersionUID =
1L;
private OutputCollector collector;
public void cleanup() {}
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word =
word.toLowerCase();
//Emit the word
List a = new
ArrayList();
a.add(input);
collector.emit(a,new
Values(word));
}
}
// Acknowledge the tuple
collector.ack(input);
}
public void prepare(Map stormConf,
TopologyContext context,
OutputCollector collector)
{
this.collector = collector;
}
public void
declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
//public Map getComponentConfiguration(){return
new java.util.HashMap();}
}