!CJoUbovqKaCGrFkbrY:matrix.org

Spark with Scala

402 Members
A place to discuss and ask questions about using Scala for Spark programming.3 Servers

Load older messages


SenderMessageTime
12 May 2023
@_discord_496522507113857024:t2bot.iojaacko.torus#8796 changed their profile picture.23:21:54
19 May 2023
@_discord_243951995709292544:t2bot.ioMajestic changed their display name from Majestic to Majestic#1066.12:55:36
@_discord_243951995709292544:t2bot.ioMajestic changed their display name from Majestic#1066 to Majestic.12:55:49
21 May 2023
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 The timeout is about two minutes, you can change that with kafkaConsumer.pollTimeoutMs. 06:58:41
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 Which Spark-Version do you run? 09:09:51
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * Which Spark-Version do you run? Does it work when you remove the .awaitTermination-call? 09:16:58
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * Which Spark-Version do you run? Does it work when you remove the .awaitTermination-call? On the other side, under normal circumstances executors are restarted after a failure, so you would probably end up with a similar scenario. 09:30:50
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * Which Spark-Version do you run? Does it work when you remove the .awaitTermination-call? On the other side, under normal circumstances executors are restarted after a failure, so you would probably end up with a similar scenario, just on a slightly different level. 09:31:14
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 Mhm, I'm sure it will not help you in your situation, because it seems there is no exception, just a warning. And I do not think it is related to Spark, but to Kafka instead. I'm sure the Kafka-Consumer tries to reconnect and I did not find a "give up after t-time or n-tries" setting too. So, a naive try would be to have some timed health-check for the brokers and kill the stream if no broker is available. 09:56:44
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * Mhm, I'm sure it will not help you in your situation, because it seems there is no exception, just a warning. And I do not think it is related to Spark, but to Kafka instead. I'm sure the Kafka-Consumer tries to reconnect and I did not find a "give up after t-time or n-tries" setting either. So, a naive try would be to have some timed health-check for the brokers and kill the stream if no broker is available. 09:57:10
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * Mhm, I'm sure it will not help you in your situation, because it seems there is no exception, just a warning. And I do not think it is related to Spark, but to Kafka instead. I'm sure the Kafka-Consumer tries to reconnect and I did not find a "give up after t-time or n-tries" setting either. So, a naive try would be to have some timed health-check for the brokers and kill the stream if no broker is available. You could use the kafka-adminclient for that. 10:00:10
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 In your example it looks like you are running a local kafka cluster see line ".option("kafka.bootstrap.servers", broker). " 10:03:37
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * In your example it looks like you are running a local kafka cluster (probably one node) see line ".option("kafka.bootstrap.servers", broker). " 10:05:34
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 Yes, but you already defined the broker. To get information about the cluster from your broker, you will need this https://kafka.apache.org/documentation/#adminapi. I'm not sure if this is either bundled with Spark or you need to add it with your driver application. 10:14:27
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 You already did with val broker = "localhost:9092" 10:15:39
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * You already did with val broker = "localhost:9092". For the healthcheck stuff, you could start with https://www.baeldung.com/apache-kafka-check-server-is-running, but there are interesting points made here https://www.youtube.com/watch?v=aO2pv8W6oZU&ab_channel=Devoxx, it is targeted at K8s but very instructive. 10:16:59
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * Yes, but you already defined the broker. To get information about the cluster from one or all of your broker(s), you will need this https://kafka.apache.org/documentation/#adminapi. I'm not sure if this is either bundled with Spark or you need to add it with your driver application. 10:24:08
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 There are multiple ways to hand over some configuration settings to Spark. If you roll your own image you could modify the spark-conf files. You can also set options when you use spark submit, or you can set variables within the SparkConf-instance from your code. However, the cluster variable looks like that you can freely choose a name, because in the end the kafka cluster entrypoints are defined by the bootstrap-servers. 20:31:21
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 Ah, they even point it out in the documentation https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#configuration. 20:31:43
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 I think it is supposed to be like spark.kafka.clusters.my-nice-cluster.kafka.retries=1 21:23:03
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * I think it is supposed to be like spark.kafka.clusters.my-nice-cluster.kafka.retries=1, you can put in whatever you want, it is just for structuring multiple configurations which target kafka. 21:23:34
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 I'm not sure if it works if you put the option into the options of the read-Stream, you probably need to initialize the SparkSession with the configuration options. There are multiple ways you can create a SparkConf-instance and hand it over to your SparkContext. Or use the SparkSession.builder() and set every option that is related to Spark. I have to use an old version of Spark, so it might be different in the new 3+ variants. 21:32:48
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * I'm not sure if it works if you put the Spark-option into the options of the read-Stream, you probably need to initialize the SparkSession with the configuration options. There are multiple ways you can create a SparkConf-instance and hand it over to your SparkContext. Or use the SparkSession.builder() and set every option that is related to Spark. I have to use an old version of Spark, so it might be different in the new 3+ variants. 21:33:02
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * I'm not sure if it works if you put the Spark-option into the options of the read-Stream, you probably need to initialize the SparkSession with the configuration options. There are multiple ways, first you can create a SparkConf-instance and hand it over to your SparkContext or alternatively use the SparkSession.builder() and set every option that is related to Spark. I have to use an old version of Spark, so it might be different in the new 3+ variants. 21:33:37
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * I think it is supposed to be like spark.kafka.clusters.my-nice-cluster.kafka.retries=1, you can put in whatever you want, it is just for structuring multiple configurations that target kafka. 21:34:01
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * I'm not sure if it works if you put the Spark-option into the options of the read-Stream, you probably need to initialize the SparkSession with the Spark-configuration. There are multiple ways, first you can create a SparkConf-instance and hand it over to your SparkContext or alternatively use the SparkSession.builder() and set every option that is related to Spark. I have to use an old version of Spark, so it might be different in the new 3+ variants. 21:34:46
22 May 2023
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 Yes, a bit disappointing, you could try your original and see if it reacts on just "kafka.retries" without the spark.xxxx stuff. But I think it won't do anything, it seems that the retries-setting is for producers (writeStream) and not consumers (readStream). 16:13:05
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * Yes, a bit disappointing, you could try your original and see if it reacts on just "kafka.retries" without the spark.xxxx stuff. But I think it won't do anything, it looks like the retries-setting is for producers (writeStream) and not consumers (readStream). For more info see https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/producer/ProducerConfig.html (retries-config), there is nothing similar within the ConsumerConfig. 16:15:15
25 May 2023
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 What is your goal? Setting up everything by yourself or using AWS-EMR? For the latter the current supported libraries are here https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-6.x.html and it is compatible to Scala 2.12. 17:41:54
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 You can actually use other s3 hadoop clients as well, the apache hadoop project offers a suitable s3 client (hadoop-aws) if you do not want to use the stuff from Amazon. And just a little word of caution, S3 is an "eventually consistent" store, so if you are going to checkpoint Spark-streams you can get into consistency problems. 21:13:32

There are no newer messages yet.


Back to Room ListRoom Version: 9