spark通过kafka-appender指定日志输出到kafka引发的死锁问题_其它综合

来源:脚本之家  责任编辑:小易  

前面应该还有个数据生产者,比如flume.flume负责生产数据,发送至kafka。spark streaming作为消费者,实时的从kafka中获取数据进行计算。计算结果保存至redis,供实时推荐使用。flume+kafka+spark+redis是实时数据收集与计算的一套经典架构www.zgxue.com防采集请勿采集本网。

在采用log4j的kafka-appender收集spark任务运行日志时,发现提交到yarn上的任务始终ACCEPTED状态,无法进入RUNNING状态,并且会重试两次后超时。期初认为是yarn资源不足导致,但在确认yarn资源充裕的时候问题依旧,而且基本上能稳定复现。

我这边的应用是这样的: ①采集程序:使用avro方式将自定义对象序列化成字节流存入Kafka ②spark streaming:获取Kafka中的字节流,使用avro反序列化为自定义对象

起初是这么配置spark日志输出到kafka的:

你可以试一下这三种方法 1、At most once-每条数据最多被处理一次(0次或1次),这种语义下会出现数据丢失的问题;2、At least once-每条数据最少被处理一次(1次或更多),这个不会出现数据丢失,

log4j.rootCategory=INFO, console, kafkalog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.errlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m%n# Kafka appenderlog4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender# Set Kafka topic and brokerListlog4j.appender.kafka.topic=yarn_spark_loglog4j.appender.kafka.brokerList=localhost:9092log4j.appender.kafka.compressionType=nonelog4j.appender.kafka.syncSend=falselog4j.appender.kafka.maxBlockMs=10log4j.appender.kafka.layout=org.apache.log4j.PatternLayoutlog4j.appender.kafka.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m

一般来说kafka是挂不了的,它可以实现分布式容错,一个leader挂了,会有follower替代其处理请求,除非你的kafka都在一台机器上。假如挂了,那你的spark运行也没有意义了,它自然不是报错,就是

这里用org.apache.kafka.log4jappender.KafkaLog4jAppender默认将所有日志都输出到kafka,这个appender已经被kafka官方维护,稳定性应该是可以保障的。

若将Spark作业以yarn cluster模式提交到Yarn,由Yarn启动Spark作业,在某个子节点的Executor会监听该端口,接收数据。

问题定位

我先写了一个kafka的生产者程序,然后写了一个kafka的消费者程序,一切正常。生产者程序生成5条数据,消费者能够读取到5条数据。然后我将kafka的消费者程序替换成使用spark的读取kafka的程序,

发现问题后,尝试将输出到kafka的规则去掉,问题解除!于是把问题定位到跟日志输出到kafka有关。通过其他测试,证实目标kafka其实是正常的,这就非常奇怪了。

查看yarn的ResourceManager日志,发现有如下超时

2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.util.AbstractLivelinessMonitor: Expired:appattempt_1578970174552_3204_000002 Timed out after 600 secs

2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: Updating application attempt appattempt_1578970174552_3204_000002 with final

 state: FAILED, and exit status: -1000

2020-05-07 21:49:48,231 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1578970174552_3204_000002 State change from LAUNCHED to FINAL_SAV

ING on event = EXPIRE

表明,yarn本身是接收任务的,但是发现任务迟迟没有启动。在spark的场景下其实是指只有driver启动了,但是没有启动executor。

而查看driver日志,发现日志输出到一个地方就卡住了,不往下继续了。通过对比成功运行和卡住的情况发现,日志卡在这条上:

2020/05/07 19:37:10.324 INFO SecurityManager: Changing view acls to: yarn,root

2020/05/07 19:37:10.344 INFO Metadata: Cluster ID: 6iG6WHA2SoK7FfgGgWHt_A

卡住的情况下,只会打出SecurityManager这行,而无法打出Metadata这行。

猜想Metadata这行是kafka-client本身打出来的,因为整个上下文只有yarn, spark, kafka-client可能会打出这个日志。

在kafka-client 2.2.0版本中找到这个日志是输出位置:

public synchronized void update(MetadataResponse metadataResponse, long now) { ... String newClusterId = cache.cluster().clusterResource().clusterId(); if (!Objects.equals(previousClusterId, newClusterId)) { log.info("Cluster ID: {}", newClusterId); } ...}

看到synchronized,高度怀疑死锁。于是考虑用jstack分析:

在yarn上运行spark任务的时候,driver进程叫ApplicationMaster,executor进程叫CoarseGrainedExecutorBackend。这里首先尝试再复现过程中找到drvier最终在哪个节点上运行,然后快速使用jstack -F <pid>打印堆栈

jstack果然不负众望,报告了死锁!这里我把结果贴的全一点

[root@node1 ~]# jstack 2013620136: Unable to open socket file: target process not responding or HotSpot VM not loadedThe -F option can be used when the target process is not responding[root@node1 ~]# jstack -F 20136Attaching to process ID 20136, please wait...Debugger attached successfully.Server compiler detected.JVM version is 25.231-b11Deadlock Detection:Found one Java-level deadlock:============================="kafka-producer-network-thread | producer-1": waiting to lock Monitor@0x00000000025fcc48 (Object@0x00000000ed680b60, a org/apache/kafka/log4jappender/KafkaLog4jAppender), which is held by "main""main": waiting to lock Monitor@0x00007fec9dbde038 (Object@0x00000000ee44de38, a org/apache/kafka/clients/Metadata), which is held by "kafka-producer-network-thread | producer-1"Found a total of 1 deadlock.Thread 20157: (state = BLOCKED) - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=0, line=231 (Interpreted frame) - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame) - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame) - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame) - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame) - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String, java.lang.Object) @bci=34, line=324 (Interpreted frame) - org.apache.kafka.clients.Metadata.update(org.apache.kafka.common.requests.MetadataResponse, long) @bci=317, line=365 (Interpreted frame) - org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(org.apache.kafka.common.requests.RequestHeader, long, org.apache.kafka.common.requests.MetadataResponse) @bci=184, line=1031 (Interpreted frame) - org.apache.kafka.clients.NetworkClient.handleCompletedReceives(java.util.List, long) @bci=215, line=822 (Interpreted frame) - org.apache.kafka.clients.NetworkClient.poll(long, long) @bci=132, line=544 (Interpreted frame) - org.apache.kafka.clients.producer.internals.Sender.run(long) @bci=227, line=311 (Interpreted frame) - org.apache.kafka.clients.producer.internals.Sender.run() @bci=28, line=235 (Interpreted frame) - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)Thread 20150: (state = BLOCKED)Thread 20149: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.ref.ReferenceQueue.remove(long) @bci=59, line=144 (Interpreted frame) - java.lang.ref.ReferenceQueue.remove() @bci=2, line=165 (Interpreted frame) - java.lang.ref.Finalizer$FinalizerThread.run() @bci=36, line=216 (Interpreted frame)Thread 20148: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - java.lang.Object.wait() @bci=2, line=502 (Interpreted frame) - java.lang.ref.Reference.tryHandlePending(boolean) @bci=54, line=191 (Interpreted frame) - java.lang.ref.Reference$ReferenceHandler.run() @bci=1, line=153 (Interpreted frame)Thread 20137: (state = BLOCKED) - java.lang.Object.wait(long) @bci=0 (Interpreted frame) - org.apache.kafka.clients.Metadata.awaitUpdate(int, long) @bci=63, line=261 (Interpreted frame) - org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(java.lang.String, java.lang.Integer, long) @bci=160, line=983 (Interpreted frame) - org.apache.kafka.clients.producer.KafkaProducer.doSend(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=19, line=860 (Interpreted frame) - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=12, line=840 (Interpreted frame) - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord) @bci=3, line=727 (Interpreted frame) - org.apache.kafka.log4jappender.KafkaLog4jAppender.append(org.apache.log4j.spi.LoggingEvent) @bci=69, line=283 (Interpreted frame) - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=106, line=251 (Interpreted frame) - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame) - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame) - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame) - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame) - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String) @bci=12, line=305 (Interpreted frame) - org.apache.spark.internal.Logging$class.logInfo(org.apache.spark.internal.Logging, scala.Function0) @bci=29, line=54 (Interpreted frame) - org.apache.spark.SecurityManager.logInfo(scala.Function0) @bci=2, line=44 (Interpreted frame) - org.apache.spark.SecurityManager.setViewAcls(scala.collection.immutable.Set, java.lang.String) @bci=36, line=139 (Interpreted frame) - org.apache.spark.SecurityManager.<init>(org.apache.spark.SparkConf, scala.Option) @bci=158, line=81 (Interpreted frame) - org.apache.spark.deploy.yarn.ApplicationMaster.<init>(org.apache.spark.deploy.yarn.ApplicationMasterArguments) @bci=85, line=70 (Interpreted frame) - org.apache.spark.deploy.yarn.ApplicationMaster$.main(java.lang.String[]) @bci=25, line=802 (Interpreted frame) - org.apache.spark.deploy.yarn.ApplicationMaster.main(java.lang.String[]) @bci=4 (Interpreted frame)

到这里,已经确定是死锁,导致driver一开始就运行停滞,那么当然无法提交executor执行。

具体的死锁稍后分析,先考虑如何解决。从感性认识看,似乎只要不让kafka-client的日志也输出到kafka即可。实验后,发现果然如此:如果只输出org.apache.spark的日志就可以正常执行。

根因分析

从stack的结果看,造成死锁的是如下两个线程: kafka-client内部的网络线程spark 主入口线程

两个线程其实都是卡在打日志上了,观察堆栈可以发现,两个线程同时持有了同一个log对象。而这个log对象实际上是kafka-appender。而kafka-appender本质上持有kafka-client,及其内部的Metadata对象。log4j的doAppend为了保证线程安全也用synchronized修饰了:

public synchronized void doAppend(LoggingEvent event) { if(closed) { LogLog.error("Attempted to append to closed appender named ["+name+"]."); return; } if(!isAsSevereAsThreshold(event.level)) { return; } Filter f = this.headFilter; FILTER_LOOP: while(f != null) { switch(f.decide(event)) { case Filter.DENY: return; case Filter.ACCEPT: break FILTER_LOOP; case Filter.NEUTRAL: f = f.next; } } this.append(event); }

于是事情开始了: main线程尝试打日志,首先进入了synchronized的doAppend,即获取了kafka-appender的锁 kafka-appender内部需要调用kafka-client发送日志到kafka,最终调用到Thread 20137展示的,运行到Metadata.awaitUpdate(也是个synchronized方法),内部的wait会尝试获取metadata的锁。(详见https://github.com/apache/kaf...) 但此时,kafka-producer-network-thread线程刚好进入了上文提到的打Cluster ID这个日志的这个阶段(update方法也是synchronized的),也就是说kafka-producer-network-thread线程获得了metadata对象的锁 kafka-producer-network-thread线程要打印日志同样执行synchronized的doAppend,即获取了kafka-appender的锁

上图main-thread持有了log对象锁,要求获取metadata对象锁;kafka-producer-network-thread持有了metadata对象锁,要求获取log对象锁于是造成了死锁。

总结

到此这篇关于spark通过kafka-appender指定日志输出到kafka引发的死锁的文章就介绍到这了,更多相关spark指定日志输出内容请搜索真格学网以前的文章或继续浏览下面的相关文章希望大家以后多多支持真格学网!

1、KafkaUtils.createDstream构造函数为KafkaUtils.createDstream(ssc,[zk],[consumer group id],[per-topic,partitions])使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上A、创建一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故如果增加特定主体分区数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量B、对于不同的group和topic可以使用多个receivers创建不同的DStreamC、如果启用了WAL,需要设置存储级别,即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)2.KafkaUtils.createDirectStream区别Receiver接收数据,这种方式定期地从kafka的topic+partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,使用的是kafka的简单消费者api优点:A、简化并行,不需要多个kafka输入流,该方法将会创建和kafka分区一样的rdd个数,而且会从kafka并行读取。B、高效,这种方式并不需要WAL,WAL模式需要对数据复制两次,第一次是被kafka复制,另一次是写到wal中C、恰好一次语义(Exactly-once-semantics),传统的读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,存在数据丢失的可能性是zookeeper中和ssc的偏移量不一致。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具内容来自www.zgxue.com请勿采集。


  • 本文相关:
  • 详解如何使用spark和scala分析apache访问日志
  • jetbrains mono字体安装方法(推荐)
  • mercurial入门学习介绍
  • 256种编程语言大汇总
  • 详解ansible批量管理服务
  • 关于数据处理包dplyr的函数用法总结
  • webpack基础教程之名词解释
  • 献给写作者的 markdown 新手指南
  • 将新型冠状病毒转二进制的代码(首发)
  • 分享win10 1903过tp的双机调试问题
  • hadoop 分布式存储系统 hdfs的实例详解
  • spark读取kafka数据 createStream和createDirectStream的区别
  • spark每个时间片从kafka读取了多少条数据
  • spark读取kafka数据 createStream和createDirectStream的区别
  • 如何在spark中取出kafka队列的数据
  • 如何使用spark streaming接收kafka中发送的自定义对象
  • spark从kafka读取数据遇到什么问题了吗
  • kafka宕机怎么办,spark挂了怎么办?
  • 如何使用spark streaming接收kafka中发送的自定义对象
  • 请教一个关于使用spark 读取kafka只能读取一个分区数据的问题
  • hadoop、kafka、spark、storm、zookeeper、akka这些都是干啥的,学习顺序怎么样的?
  • 网站首页网页制作脚本下载服务器操作系统网站运营平面设计媒体动画电脑基础硬件教程网络安全javascriptasp.netphp编程ajax相关正则表达式asp编程jsp编程编程10000问css/htmlflex脚本加解密web2.0xml/rss网页编辑器相关技巧安全相关网页播放器其它综合dart首页其它综合详解如何使用spark和scala分析apache访问日志jetbrains mono字体安装方法(推荐)mercurial入门学习介绍256种编程语言大汇总详解ansible批量管理服务关于数据处理包dplyr的函数用法总结webpack基础教程之名词解释献给写作者的 markdown 新手指南将新型冠状病毒转二进制的代码(首发)分享win10 1903过tp的双机调试问题hadoop 分布式存储系统 hdfs的实例详解最新idea2020注册码永久激活(激活删除svn三种方法delsvn(windows+intellij idea激活码获取方法(ic/s和b/s两种架构的概念、区别和网址(url)支持的最大长度是多少5个linux平台程序员最爱的开发工url中斜杠/和反斜杠\的区别小结支付宝 接口开发帮助(asp,php,as提示“处理url时服务器出错”和“thymeleaf实现th:each双重多重嵌套功能分享win10 1903过tp的双机调试问题字符编码详解及由来(unicode,utf-8,gbk) 网站统计中的数据收集原理及实现win10 + anaconda3 + python3.6 安装tens2019最新系统学习路线零基础如何转行大数支付宝 接口开发帮助(asp,php,asp.net,jssession的存储方式和配置方法介绍unicode utf-8 gb18030 gb2312 gbk各种编12个常用前端ui框架集合汇总
    免责声明 - 关于我们 - 联系我们 - 广告联系 - 友情链接 - 帮助中心 - 频道导航
    Copyright © 2017 www.zgxue.com All Rights Reserved