演讲人-耿飙-阿里云计算平台事业部-开发工程师 DataFunSummit#2023 目录CONTENT 01FlinkCEP介绍03CEP SQL语法增强&性能优化 04风控场景实际案例 02动态多规则支持 01FlinkCEP介绍 DataFunSummit#2023 什么是FlinkCEP •CEP:复杂事件处理(Complex Event Processing) •Flink CEP:基于Flink实现的复杂事件处理库,它可以识别出数据流中符合特定模式(Pattern)的事件序列,并允许用户作出针对性处理。 Flink CEP应用场景 实时营销 实时风控 物联网 营销策略优化:大促期间在购物车中添加了超过3次商品后,但最后没有结账付款的用户 异常状态告警:共享单车被骑出指定区域且15分钟内没有回到指定区域后发出告警 风险用户检测:5分钟内转账次数超过10次且金额大于10000 02动态多规则支持 DataFunSummit#2023 动态规则支持:背景 •为什么需要支持动态规则更新? •频繁变化的实际场景要求对初始规则的内容进行调整或者添加新的规则,而重启Flink作业来使变化后的规则生效的方式时间成本高、影响范围大 •关键问题: •如何让Flink作业不停机加载新规则?•如何解决Pattern的(de)serialization? •现有方案: •修改CepOperator添加注入规则的接口•基于Groovy引擎动态生成Pattern对象 5分钟内通过广告链接访问某商品超过5次但最终没有购买à3分钟内访问超过7次最终没有购买 动态规则支持:设计 •新增接口(FLIP-200)•PatternProcessor•id•version•timestamp•pattern•patternProcessorFunction •PatternProcessorDiscoverer •PatternProcessorManager 动态规则支持:设计 动态规则支持:(de)serialization 规则的(de)serialization格式设计原则: Pattern的抽象: •表达能力完整•方便序列化反序列化•易于拓展,方便集成•可读可编辑 •NFAßà状态转换图•子Pattern:节点•事件选择策略:边 动态规则支持:(de)serialization Pattern<Event,Event> pattern =Pattern.<Event>begin("start",AfterMatchSkipStrategy.skipPastLastEvent()).where(newStartCondition("action == 0")).timesOrMore(3).followedBy("end").where(newEndCondition()); 动态规则支持:拓展Condition AviatorCondition: •结合Java反射机制,使用Aviator引擎解析表达式字符串 •原理•Compileexpression in constructor •Execute expression with variables in filter() public classEvent {private final int id;private finalStringname;private final double price;private final int action;private final long eventTime;} •示例•AviatorCondition(‘action==1&& price > 20’) •AviatorCondition(‘action==0&& price > 50’) GroovyCondition:支持Groovy语法,将Groovy表达式作为参数CustomArgsCondition:自定义参数 动态规则支持:多规则支持 如何在同一输入流应用多条规则? 多个PatternStream,多个CEPOperator,多个NFAà数据要复制多次 一个PatternStream,一个CEPOperator,多个NFAà数据只需传递一次 动态规则支持:Demo 场景:广告投放中的实时反作弊https://help.aliyun.com/document_detail/459880.html https://github.com/RealtimeCompute/ververica-cep-demo CEP SQL语法增强&性能优化 DataFunSummit#2023 FlinkCEP SQL:介绍 选择事件表 定义逻辑分区定义事件顺序 FlinkCEP SQL:示例 FlinkCEP SQL:语法增强 01输出带时间约束模式的匹配超时序列 案例场景:用户行为模式识别 用户从流量入口进入产品边界,执行一系列的操作后最终完成价值转化。识别整体流程周期在10分钟之内的高质量用户。 FlinkCEP SQL:语法增强 01输出带时间约束模式的匹配超时序列 FlinkCEP SQL:语法增强 01输出带时间约束模式的匹配超时序列 FlinkCEP SQL:语法增强 02定义事件之间的连续性 FlinkCEP SQL:语法增强 03定义循环模式中的连续性和贪婪性 FlinkCEP SQL:语法增强 04循环模式指定停止条件(Until) FlinkCEP SQL:语法增强 05组合模式(Group Pattern) 组合模式(group pattern):将多个模式组合为一个整体用在next()、followedBy()和followedByAny()函数中,并支持整体的循环。在阿里云实时计算Flink版的SQL作业中使用SQL标准中的括号语法(...)来定义组合模式,支持使用循环量词如+、*、{3, }等。 PATTERN (A (B C*)+? D) Pattern.<String>begin("A").where(...).next( Pattern.<String>begin("B").where(...).next("C").where(...).oneOrMore().optional().greedy().consecutive()).oneOrMore().consecutive().next("D").where(...) FlinkCEP SQL:语法增强 06AFTER MATCH NO SKIP策略 SKIP_TO_NEXT_ROW:丢弃以相同事件开始的所有部分匹配(CEP SQL默认策略)NO_SKIP:每个成功的匹配都会被输出(Java API默认策略) 模式:a b+,输入:a1b1b2b3: 阿里云实时计算Flink版扩展了SQL标准中的AFTER MATCH语句,可通过AFTER MATCH NO SKIP语句来声明NO_SKIP策略,NO_SKIP策略在完成一条序列的匹配时,不会终止或丢弃其他已经开始的匹配过程。 FlinkCEP性能优化 •减少State访问•增加Cache、优化onEvent/ProcessingTime()实现 •修复State泄漏•https://issues.apache.org/jira/browse/FLINK-23314 •对于部分生命周期较短的key,和其相关的computationStates没有及时清理,导致State不断增大。当key包含timestamp或随机ID时,容易出现该问题•及时清除该key •Tip:使用Flink1.16及之上的Flink CEP版本,减少Timer的注册,大大减少作业的CPU消耗(10x)•https://issues.apache.org/jira/browse/FLINK-23890 风控场景典型应用 DataFunSummit#2023 风控场景典型应用 •交易风控•一段时间内某个IP退款超过一定金额,触发熔断 •一段时间内某个IP退款超过一定次数,触发熔断 •内容风控 •某用户在X分钟内发布了超过Y条帖子,则进行账号禁言或其他处理 •物联网•设备上报埋点到日志存储上,有成功和失败信息。如果某个设备连续发生10次以上的 某类异常,并且超过15分钟未恢复则告警 •网络安全•检测到某台电脑的行为满足“点击钓鱼邮件”、“下载异常文件”、“执行远程代” 等模式后,触发报警 风控场景典型应用 支持定义相邻事件之间的时间间隔: 获取该子Pattern之前匹配的事件: context.getEventsForPattern() WithinType.PREVIOUS_AND_CURRENT newIterativeCondition<Event>() {@Overridepublicbooleanfilter(Event value,Context<Event>ctx) throws Exception {double amount = 0;for (Eventevnt:ctx.getEventsForPattern("Refund")) {amount +=event.getPrice();}return amount >= 1000;}} Pattern<Event, Event> pattern =Pattern.<Event>begin("acceptCoupon").where(new StartCondition()).followedBy("addItem").where(newMiddleCondition()).within(Time.minutes(5),WithinType.PREVIOUS_AND_CURRENT).notFollowedBy("pay").where(newEndCondition()).within(Time.days(1)); 感谢观看