Sender | Message | Time |
---|---|---|
4 Jan 2023 | ||
* Hi, does anyone know of a way to return a struct using UDF in Spark? I've tried searching online, but all tutorials use udf(f, DataType), but IDE warns that: Scala udf method with return type parameter is deprecated. Please use Scala udf method without return type parameter. Since 3.0.0. Can one still do this somehow? My goal is roughly:
Whether using Scala, SQL or Python API, I would want to be able to call the UDF like:
And a single row of blob_struct would look like: | 12:22:24 | |
* Hi, does anyone know of a way to return a struct using UDF in Spark? I've tried searching online, but all tutorials use udf(f, DataType), but IDE warns that: Scala udf method with return type parameter is deprecated. Please use Scala udf method without return type parameter. Since 3.0.0. Can one still do this somehow? My goal is roughly:
Whether using Scala, SQL or Python API, I would want to be able to call the UDF like:
And a single row of blob_struct would look like: | 12:23:34 | |
* Hi, does anyone know of a way to return a struct using UDF in Spark? I've tried searching online, but all tutorials use udf(f, DataType), but IDE warns that: Scala udf method with return type parameter is deprecated. Please use Scala udf method without return type parameter. Since 3.0.0. Can one still do this somehow? My goal is roughly:
Whether using Scala, SQL or Python API, I would want to be able to call the UDF like:
And a single row of blob_struct would look like:
Returning a list of Tuples gives me the proper format, but it lacks the names. The elements are _1, _2 instead of cats and dogs. | 12:57:13 | |
That looks a bit too complex for my skill level, I don't quite know what to fill in to the encoders etc. I may be forced to return a struct with default names. I can always rename them right after calling the UDF by doing array_zip(blob._1 as cats, blob._2 as dogs). | 13:45:18 | |
Or if I want to have it as nested list of structs , like [{”cats”: 1, ”dogs”: 4}, …] , I suppose I could cast a new schema on that column, like suggested here: https://stackoverflow.com/questions/43004849/rename-nested-field-in-spark-dataframe | 16:50:21 | |
5 Jan 2023 | ||
I ended up using CAST after calling the UDF. Like my_udf(col(”data”)).cast(column_schema) | 14:28:59 | |
My current problem is that I ran a performance test and I have problems trying to beat Python. There is a chance that this has to do with partitioning, because I have a fair amount of disk and memory spill, but I need to deep dive into this next week. | 14:29:55 | |
I tried changing the spark.sql.files.maxPartitionBytes, but that does not seem to be a hard limit. Some 128 MB partitions still pass as tasks, and they take like 1.1 hours to run. | 14:31:33 | |
Should I force the input data into smaller Parquet (Delta Lake) files? Or should I not use snappy compression? Spark can be fairly complex. | 14:33:02 | |
6 Jan 2023 | ||
11:53:09 | ||
11:53:21 | ||
11:53:24 | ||
9 Jan 2023 | ||
09:28:22 | ||
10 Jan 2023 | ||
Writing the files by limiting the size (e.g. using maxRecordsPerFile) allows this narrow transformation to fit into memory, thus avoiding the memory spill completely, which speeds up the process quite a bit (1.5 hours => 1 hour). Having that said, it came to me as a surprise that the maxPartitionBytes doesn't allow reading e.g. 128 MB Parquet file into two Tasks/Partitions. It seems that it is min(fileSize/maxPartitionBytes). Apparently the smallest chunk of data that Spark can load when using snappy compressed Parquet files is a "row group" within a file. And, that seems to often equal to the files overall size. If I write 50 MB files, I need to also set the maxPartitionBytes to <50 MB so that it doesn't concat those into the same partition. In fact, it is safer to push it to like 20 MB. Otherwise it might do it anyways, which seemed to crash an executor - which was previously able to work with all 128 MB, which is strange. | 11:36:24 | |
12 Jan 2023 | ||
21:07:39 | ||
17 Jan 2023 | ||
15:25:14 | ||
15:25:24 | ||
You could make a small standalone Spark project and then you could use intellij or vscode metals to get some Scala help. You could use this as an example to start from - https://github.com/ekrich/spark-stats | 19:32:40 | |
I used explode or it's opposite before - https://sparkbyexamples.com/spark/explode-spark-array-and-map-dataframe-column/ | 19:34:02 | |
I updated to the latest version in case you want to take a look. | 19:58:17 | |
Glad that helped out - is your project clean, maybe rebuild? | 23:30:31 | |
Spark is pretty cool because it will use you local cores on a single machine to distribute the workload. Not distributed across the network but ... | 23:32:28 | |
18 Jan 2023 | ||
plokhotnyuk Is that even possible without implementing a whole new Datasource in Spark? I'm not sure if you can just switch the json-parser implementation. | 08:50:31 | |
* plokhotnyuk Is that even possible without implementing a whole new Datasource-extension for Spark? I'm not sure if you can just switch the json-parser implementation. | 08:50:48 | |
19 Jan 2023 | ||
17:33:06 | ||
17:33:12 | ||
21 Jan 2023 | ||
21:06:46 | ||
24 Jan 2023 | ||
19:57:19 | ||
25 Jan 2023 | ||
If you don't want to flatten the structure into top level columns you can recreate a structurally matching schema for the nested column, rename the things you want to rename and apply it with a cast. | 06:29:48 | |
* If you don't want to flatten the structure into top level columns, you can recreate a structurally matching schema for the nested column, rename the things you want to rename and apply it with a cast. | 06:30:02 |