Getting Started with Storm(3)
第三章 Topologies
这一章,你将会看到在一个Storm
Topology中的不同组件之间如何传递Tuples,以及怎么把一个Topology部署到一个正在运行的Storm集群中。
Stream
Grouping
在设计topology的时候有件很重要的事情,就是需要定义数据在组件之间如何交换(换句话说,就是bolt怎么消费stream),Stream
Grouping定义了每个bolt消费哪些stream以及怎么被消费。
一个节点可以emit一个或者数个数据流,流组合允许我们选择接受哪个流。
正如我们的第二章看到的,在定义topology的时候,我们设置了流组合:
...
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
...
在上面这段代码中,bolt在topology 构造器中被设置,然后使用shuffle Stream
Grouping设置一个数据源。通常一个流组合使用源组件的ID作为参数,然后根据流组合的类型可选其他参数。
每一个InputDeclarer可以有多个数据源,每一个数据源可以和一个不同的流组合组合在一起。
Shuffle
Grouping
ShuffleGrouping是一个最常用的grouping。它接收一个参数(源数据组件ID),然后发送每一个被source
emit出来的tuple(元组)到一个随机被选择的bolt,并且保证每个bolt收到同样数量的tuple
ShuffleGrouping在做自动操作,比如数学运算的时候是非常有用的。然而如果一个操作不能被随机分配,比如在第二章中的例子你需要计算单词个数,你就需要使用其他grouping。
Fields
Grouping
Fields
Grouping允许你基于tuple的一个或者多个字段,控制tuple传递给bolt的方式。它可以保证一个被指定的字段组合的值总是可以被发送到同一个bolt。回到单词计数的例子,如果你通过word字段来分组,word-normalizer
bolt会将指定单词的tuple发送到同一个word-counter bolt的实例。
...
builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGrouping("word-normalizer", new Fields("word"));
...
需要注意的是,所有在fields grouping中被指定的字段都必须在源字段中声明过。
All
Grouping
All
Grouping发送每个元组的单一拷贝到所有接收bolt。这种类型的grouping主要用于给所有的bolt发送信号。比如,如果你需要刷新缓存,你可以发送一个“刷新缓存信号”给所有的bolt。在单词计数的例子中,你可以使用All
Grouping去清除counter的缓存。
public void
execute(Tuple input) {
String str =
null;
try{
if(input.getSourceStreamId().equals("signals")){
str =
input.getStringByField("action");
if("refreshCache".equals(str))
counters.clear();
}
}catch
(IllegalArgumentException e) {
//Do
nothing
}
...
}
我们增加一个If
功能来检查源数据流。Storm允许我们对流数据命名(如果你没有命名的话,默认为“default”数据流)。在鉴别元组的来源时给了我们极大的便利,比如上面这段代码,我们定义了signals。
在Topology定义中,你可以增加第二个流给到word-counter
bolt,从signals-spout流中发送一个元组给到bolt的每一个实例。
builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGrouping("word-normalizer", new Fields("word"))
.allGrouping("signals-spout","signals");
这个实现可以在git库中找到。
Custom
Grouping自定义分组
通过实现backtype.storm.grouping.CustomStreamGrouping接口,你也可以创建你自己的自定义流分组。这给了你权利来决定哪些bolt可以收到元组。
让我们对word
count的例子做些修改,我们对元组分类,确保以同一个字母开头的单词都被同一个bolt接收。
public class
oduleGrouping implements CustomStreamGrouping, Serializable{
int numTasks =
0;
@Override
public List
chooseTasks(List
List boltIds =
new ArrayList();
if(values.size()>0){
String str =
values.get(0).toString();
if(str.isEmpty())
boltIds.add(0);
else
boltIds.add(str.charAt(0) % numTasks);
}
return
boltIds;
}
@Override
public void
prepare(TopologyContext context, Fields outFields,
List
targetTasks) {
numTasks =
targetTasks.size();
}
}
你看到了一个简单的CustomStreamGrouping的实现,我们用每个单词的第一个字符的整数值对任务总量求余,然后依此选择哪个bolt会接收这个元组。
为了使用这个分组,我们改变一下work-normalizer的实现:
builder.setBolt("word-normalizer", new WordNormalizer())
.customGrouping("word-reader", new ModuleGrouping());
Direct
Grouping
这个是一个特殊的分组,由源来决定哪个组件接收元组。回到上面的例子,基于单词的第一个字符,由源来决定哪一个bolt会接收到元组。为了使用direct
grouping,在WordNormalizer bolt中,用emitDirect方法代替emit
public void
execute(Tuple input) {
...
for(String
word : words){
if(!word.isEmpty()){
...
collector.emitDirect(getWordCountIndex(word),new
Values(word));
}
}
// Acknowledge
the tuple
collector.ack(input);
}
public Integer
getWordCountIndex(String word) {
word =
word.trim().toUpperCase();
if(word.isEmpty())
return
0;
else
return
word.charAt(0) % numCounterTasks;
}
在Prepare方法中计算出目标的任务数量:
public void
prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector
= collector;
this.numCounterTasks =
context.getComponentTasks("word-counter");
}
在topology的定义出,指定流的分组方式:
builder.setBolt("word-counter", new WordCounter(),2)
.directGrouping("word-normalizer");
Global
Grouping
Global
Grouping会发送所有源实例产生的元组到一个单一的目标实例(通常是到一个有最小ID的任务)
None
Grouping
在Storm0.7.1版本中,None Grouping和Shuffle
Grouping是一样的。换句话说,使用此分组就意味着不去关心Stream怎么被分组。
LocalCluster对StormSubmitter
到目前为止,我们都是使用一个叫做LocalCluster的工具在本地运行topology。在本地运行便于我们调试不同的topoloties。但是当我们把topology提交到一个运行中的Storm会怎么样呢?Storm的一个极其有趣的特性就是非常方便就可以把我们的topology提交给一个正在运行的真实的集群。我们需要在实现submitTopology时,把LocalCluster改变为StormSubmitter,它将负责把topology提交给集群。
//LocalCluster
cluster = new LocalCluster();
//cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache",
conf, builder.createTopology());
StormSubmitter.submitTopology("Count-Word-Topology-With-Refresh-Cache",
conf, builder.createTopology());
//Thread.sleep(1000);
//cluster.shutdown();
当我们使用SotrmSubmitter的时候,我们将不能像使用LocalCluster一样在代码中控制集群。
下一步,我们需要打包一个Jar文件,然后通过StormClient命令提交这个topology。我们在例子中使用的是Maven,所以打包很简单,在代码目录中运行:mvn
package
当我们有了这个Jar包,我们使用storm jar命令提交这个topology(可以参考附录A来安装Storm client)语法是:
storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3.
在这个例子中 storm
jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain
src/main/ resources/words.txt这样我们就提交了这个topology到集群了。
为了停止或者终止这个topology,使用命令:storm kill
Count-Word-Topology-With-Refresh-Cache
注意:Topology的名称必须唯一
DRPC
Topologies
有一种特殊的topology,我们称之为 分布式远程过程调用(Distributed Remote Procedure Call
- DRPC),liyong
Storm强大的分布式处理能力来执行远程过程调用。Strorm提供了一些工具使我们可以使用DRPC。
第一个是DRPC服务器,作为客户端和Storm topology的连接器运行,同时作为topology
的spout运行。它接收到要执行的函数及其参数。然后对每一个要处理的数据,服务器都会安排一个requestID来鉴别这个RPC请求。当topology执行完最后一个bolt,它会emit这个RPC
requestID和结果,确保DRPC服务器返回结果给到正确的客户端。注意:一个DRPC服务器可以执行多个功能,每个功能都需要有一个唯一的名字。
第二个是LinearDRPCTopologyBuilder,一个抽象类帮助实现DRPC
topologies。这个topology会创建DRPCSpouts(连接到DRPC服务器并emit数据到其他的topology),会封装bolt以确保结果可以从最后一个bolt返回。所有被增加到LinearDRPCTopologyBuilder的bolt都会按照顺序执行。作为这种类型的topology的例子,我们来创建一个流程来增加数字。非常简单的例子,但是可以由此扩展到实现复杂的分布式数学运算。
bolt有如下的输出宣言:
public void
declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","result"));
}
尽管这是此topology中唯一的一个bolt,它仍然需要emit RPC ID和结果。
execute方法负责执行增加操作:
public void
execute(Tuple input) {
String[]
numbers = input.getString(1).split("\+");
Integer added
= 0;
if(numbers.length<2){
throw new
InvalidParameterException("Should be at least 2 numbers");
}
for(String num
: numbers){
added +=
Integer.parseInt(num);
}
collector.emit(new Values(input.getValue(0),added));
}
需要在topology的定义中包含额外的bolt
public static
void main(String[] args) {
LocalDRPC drpc
= new LocalDRPC();
LinearDRPCTopologyBuilder builder = new
LinearDRPCTopologyBuilder("add");
builder.addBolt(new AdderBolt(),2);
Config conf =
new Config();
conf.setDebug(true);
LocalCluster
cluster = new LocalCluster();
cluster.submitTopology("drpc-adder-topology", conf,
builder.createLocalTopology(drpc));
String result
= drpc.execute("add", "1+-1");
checkResult(result,0);
result =
drpc.execute("add", "1+1+5+10");
checkResult(result,17);
cluster.shutdown();
drpc.shutdown();
}
首先创建一个LocalDRPC对象在本地运行DRPC服务器;然后创建一个topology
builder,并增加bolt到此topology。然后用execute方法来测试这个topology。
为了连接到一个远程的DRPC服务器,需要使用DRPCClient类。DRPC服务器给出了“Thrift
API”(http://thrift.apache.org/)可以被多种语言使用,而且无论本地还是远程都是一样的API。上面的例子是本地的,如果需要提交topology到Storm集群,使用createRemoteTopology来代替上面的createLocalTopology,此方法会从Storm
Config中读到DRPC的配置并据此运行。