国内最全IT社区平台 联系我们 | 收藏本站
华晨云阿里云优惠2
您当前位置:首页 > 互联网 > 【Samza系列】实时计算Samza中文教程(四)―API概述

【Samza系列】实时计算Samza中文教程(四)―API概述

来源:程序员人生   发布时间:2014-11-20 09:03:22 阅读次数:3531次
    上1篇和大家1起宏观上学习了Samza平台的架构,重点讲了1下数据缓冲层和资源管理层,剩下的1块很重要的SamzaAPI层本节作为重点为大家展开介绍。
    当你使用Samza来实现1个数据流处理逻辑时,你必须实现1个叫StreamTask的接口,以下所示:
public class MyTaskClass implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { // process message } }
    当你运行你的job时,Samza将为你的class创建1些实例(可能在多台机器上)。这些任务实例会处理输入流里的消息。

    在你的job的配置中你能告知Samza你想消费哪条数据流。举1个较为完全的例子(大家也能够参看http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/configuration.html
):
# This is the class above, which Samza will instantiate when the job is run task.class=com.example.samza.MyTaskClass # Define a system called "kafka" (you can give it any name, and you can define # multiple systems if you want to process messages from different sources) systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory # The job consumes a topic called "PageViewEvent" from the "kafka" system task.inputs=kafka.PageViewEvent # Define a serializer/deserializer called "json" which parses JSON messages serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory # Use the "json" serializer for messages in the "PageViewEvent" topic systems.kafka.streams.PageViewEvent.samza.msg.serde=json
    对Samza从任务的输入流利接收的每条消息,处理逻辑都会被调用。它主要包括3个重要的信息:消息、关键词key和消息来自的数据流:
/** Every message that is delivered to a StreamTask is wrapped * in an IncomingMessageEnvelope, which contains metadata about * the origin of the message. */ public class IncomingMessageEnvelope { /** A deserialized message. */ Object getMessage() { ... } /** A deserialized key. */ Object getKey() { ... } /** The stream and partition that this message came from. */ SystemStreamPartition getSystemStreamPartition() { ... } }
    注意键和值都要被声明为对象,并且需要转化为正确的类型。如果你不配置1个serializer/deserializer,它们就会成为典型的java字节数组。1个deserializer能够转化这些字节到其他任意类型,举个例子来讲j1个son deserializer能够将字节数组转化为Map、List和字符串对象。
    SystemStreamPartition()这个方法会返回1个SystemStreamPartition对象,它会告知你消息是从哪里来的。它由以下3部份组成:
    1. The system:系统的名字来源于消息,就在你job的配置里定义。你可以有多个用于输入和输出的不同名字的系统;
    2. The stream name: 在原系统里数据流(话题、队列)的名字。一样也是在job的配置里定义;
    3. The partition: 1条数据流通常会被划分到多个分区,并且每个分区会被Samza安排1个StreamTask实例;
    API看起来像是这样的:
/** A triple of system name, stream name and partition. */ public class SystemStreamPartition extends SystemStream { /** The name of the system which provides this stream. It is defined in the Samza job's configuration. */ public String getSystem() { ... } /** The name of the stream/topic/queue within the system. */ public String getStream() { ... } /** The partition within the stream. */ public Partition getPartition() { ... } }
    在上面这个job的配置例子里可以看到,这个系统名字叫“Kafka”,数据流的名字叫“PageViewEvent”。(kafka这个名字不是特定的――你能给你的系统取任何你想要的名字)。如果你有1些输入流向导入你的StreamTask,你能够使用SystemStreamPartition去决定你接遭到哪1类消息。

    如何发送消息呢?如果你看1下StreamTask里的process()方法,你将看到你有1个MessageCollector接口。
/** When a task wishes to send a message, it uses this interface. */ public interface MessageCollector { void send(OutgoingMessageEnvelope envelope); }
    为了发送1个消息, 你会创建1个OutgoingMessageEnvelop对象并且把它传递给消息搜集器。它最少会肯定你想要发送的消息、系统和数据流名字再发送出去。你也能够肯定分区的key和另外一些参数。具体可以参考javadoc(http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)。

    注意事项:
    请只在process()方法里使用MessageCollector对象。如果你保持住1个MessageCollector实例并且以后再次使用它,你的消息可能会毛病地发送出去。举1个例子,这儿有1个简单的任务,它把每一个输入的消息拆成单词,并且发送每个单词作为1个消息:
public class SplitStringIntoWords implements StreamTask { // Send outgoing messages to a stream called "words" // in the "kafka" system. private final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "words"); public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { String message = (String) envelope.getMessage(); for (String word : message.split(" ")) { // Use the word as the key, and 1 as the value. // A second task can add the 1's to get the word count. collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1)); } } }
    Samza的API的概要介绍就到这里吧,很多细节的API可以参看javadoc文档,这也是官网下1节的内容,由于篇幅有限,大家可以自己针对性的去深入了解了解就能够了。下1篇会讲1下之前在架构篇里屡次提到的SamzaContainer。


    

生活不易,码农辛苦
如果您觉得本网站对您的学习有所帮助,可以手机扫描二维码进行捐赠
程序员人生
------分隔线----------------------------
分享到:
------分隔线----------------------------
关闭
程序员人生