博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink之Java 8
阅读量:2055 次
发布时间:2019-04-28

本文共 6399 字,大约阅读时间需要 21 分钟。

地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/java8.html

Java8引入新特性,可以更快更清晰的编程,最重要的特性就是Lambda表达式,开启了Java函数式编程的大门。Lambda表达式允许实现和传递匿名函数!

例如:

words.map{x=>(x,1)}中的x=>(x,1)就是一个单一变量到二元组的匿名函数!

最新的Flink版本支持Java 的Lambda表达式编程,这边文档将会讲解Lambda表达式编程和目前的局限!更多介绍参考:

Examples

如下例子就是Lambda表达式的程序,map函数将元数平方之后打印,其中的map里面的函数就是一个匿名函数,不需要声明函数类型,Java8可以推断出类型

env.fromElements(1, 2, 3)// returns the squared i.map(i -> i*i).print();

如下例子是无法推断出Collector的类型的,所以需要声明类型:

DataSet
input = env.fromElements(1, 2, 3);// collector type must be declaredinput.flatMap((Integer number, Collector
out) -> {
StringBuilder builder = new StringBuilder(); for(int i = 0; i < number; i++) {
builder.append("a"); out.collect(builder.toString()); }})// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa".print();
如下是可以根据DataSet推断出类型:
DataSet
input = env.fromElements(1, 2, 3);// collector type must not be declared, it is inferred from the type of the datasetDataSet
manyALetters = input.flatMap((number, out) -> {
StringBuilder builder = new StringBuilder(); for(int i = 0; i < number; i++) {
builder.append("a"); out.collect(builder.toString()); }});

如下代码展示了一个使用lambda表达式的wordcount程序:

DataSet
input = env.fromElements("Please count", "the words", "but not this");// filter out strings that contain "not"input.filter(line -> !line.contains("not"))// split each line by space.map(line -> line.split(" "))// emit a pair
for each array element.flatMap((String[] wordArray, Collector
> out) -> Arrays.stream(wordArray).forEach(t -> out.collect(new Tuple2<>(t, 1))) )// group and sum up.groupBy(0).sum(1)// print.print();

如下是一些Java8使用的限制约束等问题,具体参考文档!

Compiler Limitations

Currently, Flink only supports jobs containing Lambda Expressions completely if they are compiled with the Eclipse JDT compiler contained in Eclipse Luna 4.4.2 (and above).

Only the Eclipse JDT compiler preserves the generic type information necessary to use the entire Lambda Expressions feature type-safely. Other compilers such as the OpenJDK’s and Oracle JDK’s javac throw away all generic parameters related to Lambda Expressions. This means that types such as Tuple2<String,Integer or Collector<String> declared as a Lambda function input or output parameter will be pruned to Tuple2 or Collector in the compiled .class files, which is too little information for the Flink Compiler.

How to compile a Flink job that contains Lambda Expressions with the JDT compiler will be covered in the next section.

However, it is possible to implement functions such as map() or filter() with Lambda Expressions in Java 8 compilers other than the Eclipse JDT compiler as long as the function has no Collectors or Iterableand only if the function handles unparameterized types such as IntegerLongStringMyOwnClass (types without Generics!).

If you are using the Eclipse IDE, you can run and debug your Flink code within the IDE without any problems after some configuration steps. The Eclipse IDE by default compiles its Java sources with the Eclipse JDT compiler. The next section describes how to configure the Eclipse IDE.

If you are using a different IDE such as IntelliJ IDEA or you want to package your Jar-File with Maven to run your job on a cluster, you need to modify your project’s pom.xml file and build your program with Maven. The  contains preconfigured Maven projects which can be used for new projects or as a reference. Uncomment the mentioned lines in your generated quickstart pom.xml file if you want to use Java 8 with Lambda Expressions.

Alternatively, you can manually insert the following lines to your Maven pom.xml file. Maven will then use the Eclipse JDT compiler for compilation.

maven-compiler-plugin
1.8
1.8
jdt
org.eclipse.tycho
tycho-compiler-jdt
0.21.0

If you are using Eclipse for development, the m2e plugin might complain about the inserted lines above and marks your pom.xml as invalid. If so, insert the following lines to your pom.xml.

org.apache.maven.plugins
maven-compiler-plugin
[3.1,)
testCompile
compile

First of all, make sure you are running a current version of Eclipse IDE (4.4.2 or later). Also make sure that you have a Java 8 Runtime Environment (JRE) installed in Eclipse IDE (Window -> Preferences -> Java -> Installed JREs).

Create/Import your Eclipse project.

If you are using Maven, you also need to change the Java version in your pom.xml for the maven-compiler-plugin. Otherwise right click the JRE System Library section of your project and open the Properties window in order to switch to a Java 8 JRE (or above) that supports Lambda Expressions.

The Eclipse JDT compiler needs a special compiler flag in order to store type information in .class files. Open the JDT configuration file at {project directoy}/.settings/org.eclipse.jdt.core.prefs with your favorite text editor and add the following line:

org.eclipse.jdt.core.compiler.codegen.lambda.genericSignature=generate

If not already done, also modify the Java versions of the following properties to 1.8 (or above):

org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8org.eclipse.jdt.core.compiler.compliance=1.8org.eclipse.jdt.core.compiler.source=1.8

After you have saved the file, perform a complete project refresh in Eclipse IDE.

If you are using Maven, right click your Eclipse project and select Maven -> Update Project....

You have configured everything correctly, if the following Flink program runs without exceptions:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.fromElements(1, 2, 3).map((in) -> new Tuple1
(" " + in)).print();env.execute();

转载地址:http://ryjlf.baihongyu.com/

你可能感兴趣的文章
Intellij IDEA使用(十四)—— 在IDEA中创建包(package)的问题
查看>>
Redis学习笔记(四)—— redis的常用命令和五大数据类型的简单使用
查看>>
别让自己“墙”了自己
查看>>
Kubernetes Pod 网络精髓:pause 容器详解
查看>>
Docker 技术鼻祖 Linux Namespace 入门系列:Namespace API
查看>>
使用 ebpf 深入分析容器网络 dup 包问题
查看>>
Kubelet 中的 “PLEG is not healthy” 到底是个什么鬼?
查看>>
超详细的网络抓包神器 Tcpdump 使用指南
查看>>
从 Kubernetes 资源控制到开放应用模型,控制器的进化之旅
查看>>
从此以后运维与开发过上了没羞没臊的性福生活
查看>>
教你如何优雅地魔改 Grafana 主题,太实用了!
查看>>
让我们来看看回到单体的 Istio 到底该怎么部署
查看>>
超详细的网络抓包神器 tcpdump 使用指南
查看>>
iTerm2 都不会用,还敢自称老司机?(上)
查看>>
两个奇技淫巧,将 Docker 镜像体积减小 99%
查看>>
Istio 1.5 部署指南修正版
查看>>
不要轻易使用 Alpine 镜像来构建 Docker 镜像,有坑!
查看>>
Kubectl exec 背后到底发生了什么?
查看>>
程序员涨薪宝典
查看>>
什么?终止一个容器竟然用了 10 秒钟,这不能忍!
查看>>