最近断更了几个月,一方面原因是在新的公司很少想起自己还有博客,整日忙于公司的事情,闲余摸鱼之际也未曾记录技术与生活,确实惭愧!
近期由于在做大数据推荐方面的工作,所以学习了flink框架,为什么选择flink框架作为推荐的底层计算框架呢?
原因有三:
- 对java api友好
- 社区更新比较快
- 实时数据流传输
- 扩展机器学习库支持java相对友好
相比于spark,网上有很多比较的文章,大概大多数都倾向于spark,也可能是因为flink出的比较晚,积累的用户没有spark多,但实话说,spark是对scala/python支持比较好,对java api支持受限,博主主要是从事java工作,如果再重新学习py/scala 成本会增加,因此首选flink作为底层计算引擎。
flink的特性我摘录了一段官网的:
Apache Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。Flink 不仅可以运行在包括 YARN、 Mesos、Kubernetes 在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核心,其状态可以达到 TB 级别,且仍能保持高吞吐、低延迟的特性。世界各地有很多要求严苛的流处理应用都运行在 Flink 之上。
大部分应用实时计算都会首选flink,比如 做双十一销售额大屏,用户实时推荐等一些实时的消息。
下面是java 搭建flink项目的代码:
flink版本我用的1.32.1
定义pom.xml
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>2.8.2</hadoop.version>
<flink.version>1.13.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!-- <arg>-make:transitive</arg> -->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
编写flink helloworld代码
package flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Iterator;
/**
* 统计单词出现的次数
*/
public class WordJob {
public static void main(String[] args) throws Exception {
//创建环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
//创建实时数据流 这里是模拟我本地9000端口发来的数据
DataStreamSource<String> stream = executionEnvironment.socketTextStream("localhost", 9000, "\n");
//定义算子 用来对发来的数据流进行计算
SingleOutputStreamOperator<Word> operator = stream.map(c -> {
Word word = new Word();
word.setWord(c);
word.setCount(1);
System.out.println("收到单词:" + c);
return word;
}).keyBy(Word::getWord)
//定义一个滑动窗口,计算30秒内单词出现的次数,每10秒跑一次
.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)))
.process(new ProcessWindowFunction<Word, Word, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Word> iterable, Collector<Word> collector) throws Exception {
Iterator<Word> iterator = iterable.iterator();
long cnt = 0;
while (iterator.hasNext()) {
Word next = iterator.next();
long count = next.count;
cnt+=count;
}
System.out.println("单词" + s + "出现的次数:" + cnt + "次");
}
});
operator.print().setParallelism(1);
executionEnvironment.execute("wordJob");
}
static class Word {
private String word;
private long count;
public long getCount() {
return count;
}
public String getWord() {
return word;
}
public void setCount(long count) {
this.count = count;
}
public void setWord(String word) {
this.word = word;
}
}
}
然后用本地命令向9000端口发送数据
这里用 nc -l 9000
socket发送数据