初识flink

初识flink

Scroll Down

最近断更了几个月,一方面原因是在新的公司很少想起自己还有博客,整日忙于公司的事情,闲余摸鱼之际也未曾记录技术与生活,确实惭愧!

近期由于在做大数据推荐方面的工作,所以学习了flink框架,为什么选择flink框架作为推荐的底层计算框架呢?

原因有三:

  1. 对java api友好
  2. 社区更新比较快
  3. 实时数据流传输
  4. 扩展机器学习库支持java相对友好

相比于spark,网上有很多比较的文章,大概大多数都倾向于spark,也可能是因为flink出的比较晚,积累的用户没有spark多,但实话说,spark是对scala/python支持比较好,对java api支持受限,博主主要是从事java工作,如果再重新学习py/scala 成本会增加,因此首选flink作为底层计算引擎。

flink的特性我摘录了一段官网的:

Apache Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。Flink 不仅可以运行在包括 YARN、 Mesos、Kubernetes 在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核心,其状态可以达到 TB 级别,且仍能保持高吞吐、低延迟的特性。世界各地有很多要求严苛的流处理应用都运行在 Flink 之上。

大部分应用实时计算都会首选flink,比如 做双十一销售额大屏,用户实时推荐等一些实时的消息。

下面是java 搭建flink项目的代码:

flink版本我用的1.32.1
定义pom.xml

 <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <hadoop.version>2.8.2</hadoop.version>
        <flink.version>1.13.2</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.22</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <!-- <arg>-make:transitive</arg> -->
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.apache.spark.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

编写flink helloworld代码

package flink;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Iterator;

/**
 * 统计单词出现的次数
 */
public class WordJob {


    public static void main(String[] args) throws Exception {
//创建环境
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
//创建实时数据流 这里是模拟我本地9000端口发来的数据
        DataStreamSource<String> stream = executionEnvironment.socketTextStream("localhost", 9000, "\n");
//定义算子 用来对发来的数据流进行计算
        SingleOutputStreamOperator<Word> operator = stream.map(c -> {
            Word word = new Word();
            word.setWord(c);
            word.setCount(1);
            System.out.println("收到单词:" + c);
            return word;
        }).keyBy(Word::getWord)
    //定义一个滑动窗口,计算30秒内单词出现的次数,每10秒跑一次
            .window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)))
                .process(new ProcessWindowFunction<Word, Word, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<Word> iterable, Collector<Word> collector) throws Exception {
                        Iterator<Word> iterator = iterable.iterator();
                        long cnt = 0;
                        while (iterator.hasNext()) {
                            Word next = iterator.next();
                            long count = next.count;
                            cnt+=count;
                        }
                        System.out.println("单词" + s + "出现的次数:" + cnt + "次");
                    }
                });

        operator.print().setParallelism(1);


        executionEnvironment.execute("wordJob");
    }


    static class Word {

        private String word;

        private long count;

        public long getCount() {
            return count;
        }

        public String getWord() {
            return word;
        }

        public void setCount(long count) {
            this.count = count;
        }

        public void setWord(String word) {
            this.word = word;
        }
    }
}

然后用本地命令向9000端口发送数据
这里用 nc -l 9000 socket发送数据