Flink

0

下载地址:https://flink.apache.org/downloads.html
文档地址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/

最新文档可能没有汉化

生产环境使用:Flink on Yarn,不过学习直接使用单机就可以了。

下载一个版本Apache Flink 1.14.0 for Scala 2.12,解压然后使用命令./start-cluster.sh即可启动。

简单理解

Flink编程流程就是先把数据进行分析处理映射成一张虚拟表,然后再对这个表进行聚合操作。

Windows启动提示

错误: 找不到或无法加载主类 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
错误: 找不到或无法加载主类 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint

需要安装Cygwin启动

启动错误

Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
Improperly specified VM option 'MaxMetaspaceSize=268435456'

这个问题其实很奇怪,我们在flink-daemon.sh添加JVM_ARGS变量输出,发现提示内容:

-Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456
-XX:+UseG1GC -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456

我用的JVM1.8,理论上是支持这些参数的,网上是直接修改JVM_ARGS为下面内容:

JVM_ARGS="-XX:+UseG1GC -Xms256M -Xmx512M -XX:PermSize=64M -XX:MaxNewSize=128M -XX:MaxPermSize=128M"

然后确实可以启动,而且参数还配置正确了,就离谱。

Lambda

说到lambda就不得不说Java泛型擦除和泛型推导了,具体百度,反正就是使用lambda就会出现下面异常:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(SetWordCount.java:28)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
	at org.apache.flink.api.java.DataSet.getType(DataSet.java:181)
	at org.apache.flink.api.java.DataSet.groupBy(DataSet.java:739)
	at com.acgist.SetWordCount.main(SetWordCount.java:36)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:557)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
	at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:271)
	at com.acgist.SetWordCount.main(SetWordCount.java:28)

这时候就需要我们指定类型,可以使用匿名类或者指定返回类型:

		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		final DataSource<String> text = env.fromElements("who's there?", "I think I hear them. Stand, ho! Who's there?");
		// DataSet API:匿名类
//		text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
//
//			private static final long serialVersionUID = 1L;
//
//			@Override
//			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//				Stream.of(value.split("\\W+")).forEach(word -> out.collect(new Tuple2<>(word, 1)));
//			}
//		})
		// DataSet API:lambda
		text.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
			Stream.of(value.split("\\W+")).forEach(word -> out.collect(new Tuple2<>(word, 1)));
		})
		// 指定返回类型
		.returns(Types.TUPLE(Types.STRING, Types.INT))
//		.returns(new TypeHint<Tuple2<String, Integer>>() {})
		// 实现ResultTypeQueryable接口
		// Table API
		.groupBy(0).sum(1).print();

窗口时间类型异常

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
	at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	... 4 more
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
	at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:83)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:293)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:748)

如果没有指定窗口时间类型,默认使用TumblingProcessingTimeWindows,使用TumblingEventTimeWindows就会报错,需要使用assignTimestampsAndWatermarks方法设置窗口时间类型。
Watermark配合窗口主要用来处理乱序的,可以参考TCP窗口机制,两者差不多一个意思。