!CJoUbovqKaCGrFkbrY:matrix.org

Spark with Scala

400 Members
A place to discuss and ask questions about using Scala for Spark programming.3 Servers

Load older messages


SenderMessageTime
4 Jan 2023
@_discord_296634968426414081:t2bot.ioPaha-Vaatturi#2035 * Hi, does anyone know of a way to return a struct using UDF in Spark? I've tried searching online, but all tutorials use udf(f, DataType), but IDE warns that:

Scala udf method with return type parameter is deprecated. Please use Scala udf method without return type parameter. Since 3.0.0.

Can one still do this somehow? My goal is roughly:

val schema = StructType(
  StructField("cats"), ArrayType(IntegerType),
  StructField("dogs"), ArrayType(IntegerType)
)

def my_func(inputData: Array[Byte]): = {
  // Perform magic: val result = ???
  result
}

val my_ufd = udf(my_func, schema)


Whether using Scala, SQL or Python API, I would want to be able to call the UDF like:
SELET id, my_udf(blob) as blob_struct
FROM megatable


And a single row of blob_struct would look like:
{"cats": [1,2,3], "dogs": [4,5,6]}
12:22:24
@_discord_296634968426414081:t2bot.ioPaha-Vaatturi#2035 * Hi, does anyone know of a way to return a struct using UDF in Spark? I've tried searching online, but all tutorials use udf(f, DataType), but IDE warns that:

Scala udf method with return type parameter is deprecated. Please use Scala udf method without return type parameter. Since 3.0.0.

Can one still do this somehow? My goal is roughly:

val schema = StructType(
  StructField("cats"), ArrayType(IntegerType),
  StructField("dogs"), ArrayType(IntegerType)
)

def my_func(inputData: Array[Byte]): = {
  // Perform magic: val result = ???
  result
}

val my_ufd = udf(my_func(_), schema)


Whether using Scala, SQL or Python API, I would want to be able to call the UDF like:
SELET id, my_udf(blob) as blob_struct
FROM megatable


And a single row of blob_struct would look like:
{"cats": [1,2,3], "dogs": [4,5,6]}
12:23:34
@_discord_296634968426414081:t2bot.ioPaha-Vaatturi#2035 * Hi, does anyone know of a way to return a struct using UDF in Spark? I've tried searching online, but all tutorials use udf(f, DataType), but IDE warns that:

Scala udf method with return type parameter is deprecated. Please use Scala udf method without return type parameter. Since 3.0.0.

Can one still do this somehow? My goal is roughly:

val schema = StructType(
  StructField("cats"), ArrayType(IntegerType),
  StructField("dogs"), ArrayType(IntegerType)
)

def my_func(inputData: Array[Byte]): = {
  // Perform magic: val result = ???
  result
}

val my_ufd = udf(my_func(_), schema)


Whether using Scala, SQL or Python API, I would want to be able to call the UDF like:
SELET id, my_udf(blob) as blob_struct
FROM megatable


And a single row of blob_struct would look like:
{"cats": [1,2,3], "dogs": [4,5,6]}


Returning a list of Tuples gives me the proper format, but it lacks the names. The elements are _1, _2 instead of cats and dogs.
12:57:13
@_discord_296634968426414081:t2bot.ioPaha-Vaatturi#2035 That looks a bit too complex for my skill level, I don't quite know what to fill in to the encoders etc. I may be forced to return a struct with default names. I can always rename them right after calling the UDF by doing array_zip(blob._1 as cats, blob._2 as dogs). 13:45:18
@_discord_296634968426414081:t2bot.ioPaha-Vaatturi#2035 Or if I want to have it as nested list of structs , like [{”cats”: 1, ”dogs”: 4}, …] , I suppose I could cast a new schema on that column, like suggested here: https://stackoverflow.com/questions/43004849/rename-nested-field-in-spark-dataframe 16:50:21
5 Jan 2023
@_discord_296634968426414081:t2bot.ioPaha-Vaatturi#2035 I ended up using CAST after calling the UDF. Like my_udf(col(”data”)).cast(column_schema) 14:28:59
@_discord_296634968426414081:t2bot.ioPaha-Vaatturi#2035 My current problem is that I ran a performance test and I have problems trying to beat Python. There is a chance that this has to do with partitioning, because I have a fair amount of disk and memory spill, but I need to deep dive into this next week. 14:29:55
@_discord_296634968426414081:t2bot.ioPaha-Vaatturi#2035 I tried changing the spark.sql.files.maxPartitionBytes, but that does not seem to be a hard limit. Some 128 MB partitions still pass as tasks, and they take like 1.1 hours to run. 14:31:33
@_discord_296634968426414081:t2bot.ioPaha-Vaatturi#2035 Should I force the input data into smaller Parquet (Delta Lake) files? Or should I not use snappy compression? Spark can be fairly complex. 14:33:02
6 Jan 2023
@_discord_678980417868529744:t2bot.iohartmut changed their display name from Hartmut Pfarr to hartmut#7514.11:53:09
@_discord_678980417868529744:t2bot.iohartmut changed their profile picture.11:53:21
@_discord_678980417868529744:t2bot.iohartmut changed their display name from hartmut#7514 to hartmut.11:53:24
9 Jan 2023
@_discord_97104885337575424:t2bot.ioskip#2048 changed their profile picture.09:28:22
10 Jan 2023
@_discord_296634968426414081:t2bot.ioPaha-Vaatturi#2035 Writing the files by limiting the size (e.g. using maxRecordsPerFile) allows this narrow transformation to fit into memory, thus avoiding the memory spill completely, which speeds up the process quite a bit (1.5 hours => 1 hour). Having that said, it came to me as a surprise that the maxPartitionBytes doesn't allow reading e.g. 128 MB Parquet file into two Tasks/Partitions. It seems that it is min(fileSize/maxPartitionBytes). Apparently the smallest chunk of data that Spark can load when using snappy compressed Parquet files is a "row group" within a file. And, that seems to often equal to the files overall size.

If I write 50 MB files, I need to also set the maxPartitionBytes to <50 MB so that it doesn't concat those into the same partition. In fact, it is safer to push it to like 20 MB. Otherwise it might do it anyways, which seemed to crash an executor - which was previously able to work with all 128 MB, which is strange.
11:36:24
12 Jan 2023
@_discord_425199610680573954:t2bot.ioWatynecc#8589 changed their profile picture.21:07:39
17 Jan 2023
@_discord_510415959480336393:t2bot.ioPotatoe[F[_]] changed their display name from Potatoe[F[_]] to Potatoe[F[_]]#8651.15:25:14
@_discord_510415959480336393:t2bot.ioPotatoe[F[_]] changed their display name from Potatoe[F[_]]#8651 to Potatoe[F[_]].15:25:24
@_discord_632729825647525922:t2bot.ioekrich#7695 You could make a small standalone Spark project and then you could use intellij or vscode metals to get some Scala help. You could use this as an example to start from - https://github.com/ekrich/spark-stats 19:32:40
@_discord_632729825647525922:t2bot.ioekrich#7695 I used explode or it's opposite before - https://sparkbyexamples.com/spark/explode-spark-array-and-map-dataframe-column/ 19:34:02
@_discord_632729825647525922:t2bot.ioekrich#7695 I updated to the latest version in case you want to take a look. 19:58:17
@_discord_632729825647525922:t2bot.ioekrich#7695 Glad that helped out - is your project clean, maybe rebuild? 23:30:31
@_discord_632729825647525922:t2bot.ioekrich#7695 Spark is pretty cool because it will use you local cores on a single machine to distribute the workload. Not distributed across the network but ... 23:32:28
18 Jan 2023
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 plokhotnyuk Is that even possible without implementing a whole new Datasource in Spark? I'm not sure if you can just switch the json-parser implementation. 08:50:31
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * plokhotnyuk Is that even possible without implementing a whole new Datasource-extension for Spark? I'm not sure if you can just switch the json-parser implementation. 08:50:48
19 Jan 2023
@_discord_318714777536823299:t2bot.iocarlosedp changed their display name from carlosedp to carlosedp#5828.17:33:06
@_discord_318714777536823299:t2bot.iocarlosedp changed their display name from carlosedp#5828 to carlosedp.17:33:12
21 Jan 2023
@_discord_590413518466056192:t2bot.ioLight#4883 changed their profile picture.21:06:46
24 Jan 2023
@_discord_387435475142705152:t2bot.iozebos#7602 changed their profile picture.19:57:19
25 Jan 2023
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 If you don't want to flatten the structure into top level columns you can recreate a structurally matching schema for the nested column, rename the things you want to rename and apply it with a cast. 06:29:48
@_discord_818401984230981642:t2bot.iocheapsolutionarchitect#6849 * If you don't want to flatten the structure into top level columns, you can recreate a structurally matching schema for the nested column, rename the things you want to rename and apply it with a cast. 06:30:02

There are no newer messages yet.


Back to Room List