// optionally, you can consider sundays and saturdays as non working days
// val nonWorkingDays = Set(java.time.DayOfWeek.SUNDAY, java.time.DayOfWeek.SATURDAY)
// using an empty set to match your expected result
val nonWorkingDays = Set[java.time.DayOfWeek]()
val nextWorkingDay = udf((dates : WrappedArray[java.sql.Date]) => {
val dateSet = dates.map(_.toLocalDate).toSet
dates.map(date => {
var nextDate = date.toLocalDate.plusDays(1)
while(dateSet.contains(nextDate) || nonWorkingDays.contains(nextDate.getDayOfWeek))
nextDate = nextDate.plusDays(1)
date -> java.sql.Date.valueOf(nextDate)
})
})
val nextDayDF = holiday
.groupBy("Country_code", "currency_code")
.agg(collect_list('date) as "dates")
.withColumn("date_struct", explode(nextWorkingDay('dates)))
.drop("dates")
// renaming columns to simplify the join
.select($"Country_code" as "Country_code_demo", $"currency_code" as "currency_code_demo",
$"date_struct._1" as "date_demo", $"date_struct._2" as "date_updated")
everyday
.join(nextDayDF, Seq("Country_code_demo", "currency_code_demo", "date_demo"), "left")
.withColumn("date_updated", coalesce('date_updated, 'date_demo))
.show()