- 1、getExecutionEnvironment
这种方式就是创建本地执行环境;有种智能的方式,如果在本地就是获取本地的执行环境,如果提交到集群,就使用集群的环境。
- 2、createLocalEnvironment
创建本地环境 - 3、createRemoteEnvironment
返回集群执行环境
建议使用第一种自适应的模式,因为第二,三种需要在提交的jar包的时候进行修改
可以设置并行度
批处理:ExecutionEnvirnoment
流处理:Stream ExecutionEnvirnoment
但是如今Flink已经做到了流批一体了
BATCH模式的配置方式
1、通过命令行的方式
2、通过设置
批处理和流处理其实就是有界和无界的区别
- 1、从文件读取数据
- 2、从集合中读取数据
- 3、从元素中读取
env.fromElements(直接扔数据,不用变成ArrayList了)
上面三种都是批处理的形式
- 4、从kafka中读取数据
第一个参数:topic
第一个参数:序列化模式
第一个参数:kafka的配置信息
- 5、自定义接口
定义一个类,实现SourceFunction类,重写两个方法,Run()和Cancel()
Run()负责执行,Cancel取消方式
如果需要设置并行度,实现PraallelSourceFunction()
java和MySQL上的类型,有的,Flink基本都有,需要注意的是
类似于定义了Tuple2,这种二元组,Flink不清楚里面的泛型,可以通过这两种方式设置
1、Types
2、TypeHint
-
1、Map算子
一一映射的关系,来一个处理一个
传入MapFunction的实现类
在输入参数和输出参数都只有一个的时候可以使用Lamda表达式 -
2、filter算子
过滤操作
传入filterFuntion的实现类 -
FlatMap算子
扁平化映射,一条数据变成多条
传入FlatMapFunction的实现类
-
1、keyBy按键分区
keyBy得到的结果不是DataStream而是KeyedStream(按照hash值进行分组)
传入KeySelector的实现类
(这个只是逻辑上的分区) -
2、简单聚合
sum(),min(), max(),minBy(),maxBy()
min()和 minMax()的区别
min()只对需要的字段求最小值,其他字段保持第一条信息
minBy()求出最小值的那条信息 -
3、规约聚合
reduce
先使用keyBy算子进行分组,然后调用reduce方法,实现reduceFunction类。
思想:reduceFuntion有两个参数,类似于1+2+3+4,1和2先加,3+3,6+4这样
其实就是实现FilterFuntion方法之类的
Rich Funtion函数类,RichReduceFuntion,RichFilter Function等
只要带Rich的,就可以获取上下文环境和生命周期方法
例如open()和close()方法
open()初始化,只会在算子运行之前调用一次,有多个并行度,调用多次
close(),一般用来做一些清理工作
getRuntimeContext(),获取上下文的信息
- 1、随机分区
当数据量足够多的时候,能够达到均匀分配的效果
- 2、轮询分区
- 5、全局分区
global
全部都发送到一个分区中,也就是把下游并行度强行变为1 - 6、自定义分区
使用partitionCustom()方法,传入两个参数
1)partitioner,自定义分区策略
2)指定key,按哪个key进行分区
通过调用addSink()算子,里面传递Sink的类
StreamingFileSink支持行编码和批量编码。
需要传入两个参数
1、指定存储桶的基本路径
2、数据的编码逻辑,rowEncoder或bulkWriterFactory
通过**.withRollingPolicy()**方法指定了一个“滚动策略”。
“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以
我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面
的代码设置了在以下 3 种情况下,我们就会滚动分区文件:
⚫ 至少包含 15 分钟的数据
⚫ 最近 5 分钟没有收到新的数据
⚫ 文件大小已达到 1 GB
addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,因为需要向 Kafka 写入数据,自然应该创建一个生产者。
重写 invoke方法
面试问你,你就说,其实主流组件Flink都有对应的API,需要自定义的机会不多,我不太记得具体的细节了,有个印象就是要重写invoke方法
- 1、处理时间(Processing Time)
指数据被真正处理的时间,机器的系统时间 - 2、事件时间(Event Time)
作为数据的一个属性,嵌套在数据中,也就是这条数据的时间戳
如何生成
为流中的数据分配时间戳,并生成水位线来指示事件时间
在上面的接口中,需要实现水位线生成策略
TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。
WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
133
onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为
200msFlink内置了水位线生成器
1、有序流
时间戳单调增长(Monotonously Increasing Timestamps),也就是不会有迟到的数据
直接调用WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner:就是指定如何获取时间戳
2、乱序流
调用 **WatermarkStrategy. forBoundedOutOfOrderness()**方法就可以实现。
这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”水位线传递
1、按驱动类型分
1)时间窗口:到达设定的start和end时间
2)计数窗口:到达固定的个数就触发计算统一计算
- 1、按键分区窗口
- 2、非按键分区窗口
如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只
能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用
这种方式。总结:窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
时间窗口
- 1、滚动处理时间窗口
表示长度为5秒的滚动窗口
这个有第二个参数,就是设置偏移量。
我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0
点开启窗口,这时是北京时间早上 8 点。只要设置-8 小时的偏移量就可以了:
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
- 2、滑动处理时间窗口
size 和 slide
前者表示滑动窗口的大小
后者表示滑动窗口的滑动步长
- 3、处理时间会话窗口
- 4、滚动事件时间窗口
这里.of()方法也可以传入第二个参数 offset,用于设置窗口起始点的偏移量。
- 5、滑动事件窗口
- 6、事件时间会话窗口
计数窗口:
- 1、滚动计数窗口
- 2、滑动计数窗口
size 和 slide,
前者表示窗口大小
后者表示滑动步长
- 3、全局窗口
需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。
- 1、增量聚合函数
思想:每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了,只是在于不立即输出结果,而是要等到窗口结束时间。1)ReduceFunction
和前面的DataStream中的Reduce函数一样,中间保存聚合状态,后面来的数据和前面保存的状态进行规约其实就是输入的是什么,输出的就是什么。输入是二元组,输出也是二元组。
2)AggregateFunction
Reduce的增强版,有三个参数:
输入类型(IN)、累加器类型(ACC)和输出类型(OUT)
接口中有四个方法
- createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
- add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
- getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
- merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
计算时间的平均值为例:
- 2、全窗口函数
把数据缓存起来,等到窗口关闭才输出结果。
ProcessWindowFunction(处理窗口函数)继承processWindowFunction类,实现process方法
processWindowFunction有四个类型
1、输入类型
2、输出类型
3、Key的类型
4、时间窗口
- 1、触发器(Trigger)
- 2、移除器(Evictor)
移除器主要用来定义移除某些数据的逻辑
- 3、允许延迟(Allowed Lateness)
- 4、将迟到数据输入到侧流
补充:迟到数据的处理
1、设置水位线延迟时间:waterMark,更像是全局的处理,上帝把表的时间调慢了
2、允许窗口处理迟到数据:相当于一辆大巴,在发动的同时,门还是给你敞开的
3、输出到侧流Process Function必须实现抽象方法.processElement() (来一条数据处理一条)
三个参数
1、value:当前输入流的输入元素
2、ctx:获取上下文,时间戳等
3、out:收集器,用于返回输出数据(1)ProcessFunction
最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入。
(2)KeyedProcessFunction
对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入。要想使用
定时器,比如基于 KeyedStream。
(3)ProcessWindowFunction
开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作
为参数传入。
(4)ProcessAllWindowFunction
同样是开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入。
(5)CoProcessFunction
合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参
数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
(6)ProcessJoinFunction
间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为
参数传入。
(7)BroadcastProcessFunction
广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这
里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广
播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后
续章节详细介绍。
(8)KeyedBroadcastProcessFunction
按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时
作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream
与广播流(BroadcastStream)做连接之后的产物。使用侧流输出
数据类型不能改变,灵活性较小
原理是:把一条数据流广播到下游的每一条流上
广播状态底层是用一个“映射”(map)结构来保存的。在代码实现上,可以直接调用
DataStream 的.broadcast()方法,传入一个“映射状态描述器”(MapStateDescriptor)说明状态
的名称和类型,就可以得到规则数据的“广播流”(BroadcastStream):如果对数据流调用过 keyBy 进行了按键分区,那么要传入的就是 KeyedBroadcastProcessFunction;
如果没有按键分区,就传入 BroadcastProcessFunction。
- 1、窗口联结
首先需要调用 DataStream 的.join()方法来合并两条流,得到一个 JoinedStreams;接着通过.where()和.equalTo()方法指定两条流中联结的 key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算,注意这里只能调用.apply(),没有其他替代的方法。
。通用调用形式如下:
1、托管状态和原始状态
也就是全由Flink负责
也就是全部需要自定义利用上下文获取状态
这跟我们声明一个变量时做的事情完全一样2、列表状态
将需要保存的数据,以列表(List)的形式组织起来。3、映射状态
把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组 key-value 映射的列表。4、规约状态
类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。5、聚合状态
与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。
一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。创建一个 StateTtlConfig 配置对象,然后调用状态描述器的**.enableTimeToLive()**方法启动 TTL 功能。
算子状态一般用在Source或者Sink等与外部系统连接的算子上。或者完全没有Key的地方。
算子状态主要有三种:ListState,UnionListState和BroadcastState
1、列表状态
2、联合列表状态
3、广播状态
checkPointedFunction
对状态进行持久化保存的快照机制叫做“检查点(CheckPoint)”
每次应用保存检查点做快照时,都会调用.snapshotState()方法,将状态进行外部持久化。
而在算子任务进行初始化时,会调用. initializeState()方法。状态广播出去,所有并行子任务的状态都是相同的
广播状态与其他算子状态的列表(list)结构不同,底层是以键值对(key-value)形式描述的,所以其实就是一个映射状态(MapState)。
而广播流调用process函数可以传入BroadcastProcessFunction对象,对象里面有两个方法
广播处理函数里面有两个方法.processElement()和.processBroadcastElement()
这里的.processElement()方法,处理的是正常数据流,第一个参数 value 就是当前到来的流
数据;而.processBroadcastElement()方法就相当于是用来处理广播流的,它的第一个参数 value
就是广播流中的规则或者配置数据。
- 1、检查点(CheckPoint)和保存点(savePoint)
默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境
的.enableCheckpointing()方法就可以开启检查点。这里传入的参数是检查点的间隔时间,单位为毫秒。
除了检查点之外,还提供了保存点(savepoint)的功能。
区别在于,保存点是自定义的镜像保存,所以不会由 Flink 自动创建,而需要用户手动触发。
- 2、状态后端
一种是“哈希表状态后端”(HashMapStateBackend),
另一种是“内嵌 RocksDB 状态后端”(EmbeddedRocksDBStateBackend)。
1)哈希表状态后端
把状态存放在内存里
2)内嵌RocksDB状态后端
RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。
HashMap 和 RocksDB 两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内
存,后者是 RocksDB。[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZfLaE1mn-1674661124581)(attachment:a7b002d0df9f9b8112552df3b10454b1)]
怎么保存?
1、类似于检查点分界线,在数据流中插入检查点
2、分布式,等到分界线对齐,然后保存
检查点的配置
代码中显式地调用执行环境的.enableCheckpointing()方法:
可以选择保存到内存还是磁盘中
其他API
- 保存点
. 保存点的用途
保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复。
基于blink Planner模式
基于dataStream模式
其实StreamTableEnvironment是实现TableEnvironment的
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DncOx5ZZ-1674811607631)(attachment:a9ecfd937a9f312275c9445b9d5edb8b)]
表转换成流
1、toDataStream()
2、toChangelogStream()
两者区别,toDataStream只支持插入,而toChangelogStream支持更新操作
流转换成表
可以增加一个字段,通过 WATERMARK语句来定义事件时间属性。WATERMARK 语句主要用来定义水位线(watermark)的生成表达式,这个表达式会将带有事件时间戳的字段标记为事件时间属性,并在它基础上给出水位线的延迟时间。
而如果原始的时间戳就是一个长整型的毫秒数,这时就需要另外定义一个字段来表示事件时间属性,类型定义为 TIMESTAMP_LTZ 会更方便:
- 处理时间,就是我们当前的系统时间
可以增加一个额外的字段,通过调用系统内置的 PROCTIME()函数来指定当前的处理时间属性,
Flink CEP是用来处理一些复杂事件的,“连续登录失败”或者“下单支付,下单后跟着支付”等,这种多个事件的组合就叫“复杂事件”
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AHXY4qVe-1674811625389)(attachment:1c1118bd22b532673817e6685b825e3d)]
我们需要检测用户行为,连续三次登录失败,就输出报警信息
步骤:
1、定义一个模式(Pattern)
2、将Pattern应用到DataStream上,检测满足规则的复杂事件,得到一个PatternStream
3、将PatternStream进行转换操作,将检测到的复杂事件提取出来,包装成报警信息输出
模拟数据
上面就是定义一个模式(三次登录失败)
将匹配的复杂事件选择出来,变成patternStream流
像这样的一个规则就是个体模式
每个个体模式都是以一个“连接词”开始定义的,比如begin、next等等,需要一个过滤条件where(),里面传入SimpleCondition的filter()方法
可以加**.within(Time.minutes(15)),**定义时间
- 量词
个体模式可
以包括“单例(singleton)模式”和“循环(looping)模式”。默认情况下,个体模式是单例
模式,匹配接收一个事件;当定义了量词之后,就变成了循环模式,可以匹配接收多个事件。
个体模式后面可以接上一个量词
1、.oneOrMore ():匹配事件一个或者多个,可以用a+表示】
2、.times(times) :a.times(3),表示aaa,a出现3次
3、.times(fromTimes,toTimes),表示在这个范围内出现的,times(2,4),aa,aaa,aaaa都可以匹配
4、.greedy():贪心,a.times(2, 4).greedy(),如果出现了连续 4 个 a,那么会直接把 aaaa 检测出来进行处
理,其他任意 2 个 a 是不算匹配事件的。5、.optional():使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。
- 条件
1、限定子类型:pattern.subtype(SubEvent.class),也就是当事件是SubEvent类型时,才可以满足当前模式pattern的匹配规则
2、简单条件:Simple Conditions,本质上就是一个filter的操作
3、迭代条件:Iterative Conditions,将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。这种需要依靠之前事件来做判断的条件,就叫作“迭代条件”
在 IterativeCondition 中同样需要实现一个 filter()方法,不过与 SimpleCondition 中不同的
是,这个方法有两个参数:除了当前事件之外,还有一个上下文 Context。调用这个上下文
的.getEventsForPattern()方法,传入一个模式名称,就可以拿到这个模式中已匹配到的所有数
据了。4、组合条件:就是.where()后面再接一个.where()。这相当于就是多个条件的“逻辑与”,同时也可以是.or()实现逻辑或。
- 组合模式
1、初始模式:调用.begin()来创建
2、近邻条件:通过一些连接词实现
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GfBfh1yx-1674811625390)(attachment:3546ba52c27bd0e9a028737ac8c3d683)]
3、.notNext(),一个模式后面不能跟着某种事件
4、.notFollowedBy(),数据不断的过来,不能保证后面不会出现某种时间,需要用一个限定条件.within(),增加一个时间限制
5、.consecutive(),为循环模式中的匹配事件增加严格的近邻条件,.times()的是宽松条件,如果设置近邻条件可以.times(3).consecutive()
6、.allowCombinations(),为循环模式中的事件指定非确定宽松近邻条件
- 匹配后跳过策略
例子:
我们如果输入事件序列“a a a b”——这里为了区分前后不同的 a 事件,可以记作“a1 a2
a3 b”——那么应该检测到 6 个匹配结果:(a1 a2 a3 b),(a1 a2 b),(a1 b),(a2 a3 b),(a2 b),
(a3 b)。如果在初始模式的量词.oneOrMore()后加上.greedy()定义为贪心匹配,那么结果就是:
(a1 a2 a3 b),(a2 a3 b),(a3 b),每个事件作为开头只会出现一次。1、不跳过(No_SKIP),那就是输出6个
2、跳到下一个(SKIP_TO_NEXT),输出效果和添加greedy()一样
3、跳过所有子匹配(SKIP_PAST_LAST_EVENT),找到 a1 开始的匹配(a1 a2 a3 b)之后,直接跳过所有 a1 直到 a3 开头的匹配,相当于把这些子匹配都跳过了。最终得到(a1 a2 a3 b),这是最为精简的跳过策略。
4、跳至第一个(SKIP_TO_FIRST),最终得到(a1 a2 a3 b),(a1 a2 b),(a1 b)
5、跳到最后一个(SKIP_TO_LAST),最终得到(a1 a2 a3 b),(a3 b)。
将模式应用到流上
同时还可以传入一个比较器,作为第三个参数
匹配事件的选择提取(select)
1、patternSelectFunction
2、PatternFlatSelectFunction
除此之外,PatternStream 还有一个类似的方法是.flatSelect(),传入的参数是一个
PatternFlatSelectFunction。从名字上就能看出,这是 PatternSelectFunction 的“扁平化”版本;内
部需要实现一个 flatSelect()方法,它与之前 select()的不同就在于没有返回值,而是多了一个收
集器(Collector)参数 out,通过调用 out.collet()方法就可以实现多次发送输出数据了。3、process