《日子》 ApacheBeam 初试WordCount

对机器学习会有很大的帮助,模型简单,易用

Apache Beam 的两大特点

1、将数据的批处理(batch)和流处理(stream)编程范式进行了统一;

2、能够在任何的执行引擎上运行。

它不仅为模型设计、更为执行一系列数据导向的工作流提供了统一的模型。这些工作流包括数据处理、吸收和整合。

新建maven项目

Paste_Image.png

pom.xml加入依赖

<code>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>0.4.0</version>
</dependency>
</dependencies>
</code>

测试类WordCount.java

<code>
package org.tom;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import java.io.Serializable;
public class WordCount implements Serializable{
private transient Pipeline pipeline = null;
public WordCount() {
PipelineOptions options = PipelineOptionsFactory.create();
options.setJobName("wordcount");
pipeline = Pipeline.create(options);
}
public void transform() {
PCollection<String> collection = pipeline.apply(TextIO.Read.from("file:///d:/tom/beam-test/src/main/resources/word.txt"));
PCollection<String> extractWords = collection.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] split = c.element().split(" ");
for (String word : split) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}));
PCollection<KV<String, Long>> pCollection = extractWords.apply(Count.<String>perElement());
PCollection<String> formatResults = pCollection.apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}));
formatResults.apply(TextIO.Write.to("D:\tom\beam-test\src\main\resources\wordcounts"));
}
public void run(){
pipeline.run().waitUntilFinish();
}
public static void main(String[] args) {
WordCount wordCount = new WordCount();
wordCount.transform();
wordCount.run();
}
}
</code>

统计文本\resources\word.txt

<code>
tom
hello
tom
luo
hello
tom
tom
word
word
word
tom
</code>

运行结果

Paste_Image.png

word: 3
luo: 1
tom: 5
hello: 2
结果生成了两个文件,是由于hash分区了

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,982评论 19 139
  • 一:问题现象联机运行,在控制台打印的信息是: 解决办法 是不是so easy 幕后原因IOS10 之后苹果对推送提...
    乐意先生阅读 1,234评论 0 0
  • ——写给许嵩新歌《今年勇》 图片发自简书App(也写给。我的2016。)暮年时分我已老得 记不起你的...
    易长安_阅读 2,532评论 0 1
  • 风雨潇潇,林间慢跑开启了这一天。 出门的时候只是飘些小雨点,我想,这些毛毛雨顶多润润头发,林间枝叶厚密,估计都淋不...
    半山桃源阅读 2,900评论 2 1

友情链接更多精彩内容