从 Scala/Spark 写入 SQL Server 日期时间数据类型

2024-04-27

我正在尝试使用类似的方法从 databricks 笔记本批量插入 SQL Server 表:

批量复制到 Azure SQL 数据库或 SQL Server https://docs.databricks.com/spark/latest/data-sources/sql-databases-azure.html#id7

这工作正常,直到我尝试写入数据类型日期时间列。我尝试写入的表具有以下架构:

create table raw.HubDrg_TEST
(
  DrgKey varchar(64) not null,
  LoadDate datetime,
  LoadProcess varchar(255),
  RecordSource varchar(255),
  DrgCode varchar(255)
 )

我的Scala代码如下:

//Get dataset for data in staging table
var stagedData: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", sqlDwUrlSmall)
  .option("tempDir", tempDir)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "select distinct CodeID as DrgCode, getdate() as LoadDate from StageMeditech.livendb_dbo_DAbsDrgs").load() 

//Get dataset for data in existing Hub
val existingHub: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", sqlDwUrlSmall)
  .option("tempDir", tempDir)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "Select DrgKey as ExistingDrgKey from raw.HubDrg_TEST")
  .load()

val sha_256 = udf((s: String) => { String.format("%032x", new BigInteger(1, MessageDigest.getInstance("SHA-256").digest(s.getBytes("UTF-8")))) })

//Add additional columns
stagedData = stagedData.withColumn("DrgKey",sha_256(col("DrgCode"))).withColumn("LoadProcess",lit("TestLoadProcess"))
                                   .withColumn("RecordSource",lit("TestRecordSource"))
//Join and filter out existing hub records
val dff = stagedData.join(existingHub, col("DrgKey")===col("ExistingDrgKey"), "left_outer").filter(existingHub.col("ExistingDrgKey").isNull).drop("ExistingDrgKey") 

//Bulk insert
val bulkCopyConfig = Config(Map( 
"url" -> dwServer, 
"databaseName" -> dwDatabase, 
"user" -> dwUser, 
"password" -> dwPass, 
"dbTable" -> "raw.HubDrg_TEST", 
"bulkCopyBatchSize" -> "2000", 
"bulkCopyTableLock" -> "false", 
"bulkCopyTimeout" -> "0" 
)) 

dff.bulkCopyToSqlDB(bulkCopyConfig) 

我看到的问题是我选择的日期时间值getdate() as LoadDate尝试插入上述表时出现此错误:SqlNativeBufferBufferBulkCopy.WriteTdsDataToServer, error in OdbcDone: SqlState: 42000, NativeError: 4816, 'Error calling: bcp_done(this->GetHdbc()) | SQL Error Info: SrvrMsgState: 1, SrvrSeverity: 16, Error <1>: ErrorMsg: [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid column type from bcp client for colid 2. | Error calling: pConn->Done() | state: FFFF, number: 58673, active connections: 9', Connection String: Driver={pdwodbc17e};app=TypeC01-DmsNativeWriter:DB66\mpdwsvc (13056)-ODBC;trusted_connection=yes;autotranslate=no;server=\\.\pipe\DB.66-a313018f1e5b\sql\query;database=Distribution_15

即使尝试不使用 SQL Server 查询中的日期时间值并将 LoadDate 值更改为:withColumn("LoadDate",current_timestamp()),尝试使用当前时间戳 https://spark.apache.org/docs/2.3.1/api/sql/index.html#current_timestampSpark的内置函数,还是不行。

I saw this https://stackoverflow.com/questions/56798914/scala-sql-server-how-to-insert-the-current-timestamp-as-datetime-in-sql-servestackoverflow文章,这是一个类似的问题,但仍然没有回答这个问题。有没有人有一个关于如何插入到 SQL Server 表的好例子datetime https://learn.microsoft.com/en-us/sql/t-sql/data-types/datetime-transact-sql?view=sql-server-2017数据类型使用com.microsoft.azure.sqldb.spark.bulkcopy._图书馆?

这是执行以下操作时的数据示例dff.show()

    +-------+--------------------+--------------------+---------------+----------------+
    |DrgCode|            LoadDate|              DrgKey|    LoadProcess|    RecordSource|
    +-------+--------------------+--------------------+---------------+----------------+
    |    390|2019-07-02 09:05:...|48a1a756f2d83f1dc...|TestLoadProcess|TestRecordSource|
    |     18|2019-07-02 09:05:...|4ec9599fc203d176a...|TestLoadProcess|TestRecordSource|
    |    481|2019-07-02 09:05:...|51d089cdaf0c968c9...|TestLoadProcess|TestRecordSource|
    |    460|2019-07-02 09:05:...|841a05fd378a2c067...|TestLoadProcess|TestRecordSource|
    |    838|2019-07-02 09:05:...|cef5838d118dccd9d...|TestLoadProcess|TestRecordSource|
    |     61|2019-07-02 09:05:...|d029fa3a95e174a19...|TestLoadProcess|TestRecordSource|
    |    807|2019-07-02 09:05:...|fce86e339dc3131c4...|TestLoadProcess|TestRecordSource|
    |     44|2019-07-02 09:05:...|71ee45a3c0db9a986...|TestLoadProcess|TestRecordSource|
    |    267|2019-07-02 09:05:...|8acc23987b8960d83...|TestLoadProcess|TestRecordSource|
    |    222|2019-07-02 09:05:...|9b871512327c09ce9...|TestLoadProcess|TestRecordSource|
    |    934|2019-07-02 09:05:...|a8443b1426652157e...|TestLoadProcess|TestRecordSource|
    |    677|2019-07-02 09:05:...|2782526eaa0c5c254...|TestLoadProcess|TestRecordSource|
    |    701|2019-07-02 09:05:...|290a0b92873bdf4e4...|TestLoadProcess|TestRecordSource|
    |    441|2019-07-02 09:05:...|2dfe70c43208f52b9...|TestLoadProcess|TestRecordSource|
    |    439|2019-07-02 09:05:...|50a010ce24d089605...|TestLoadProcess|TestRecordSource|
    |    883|2019-07-02 09:05:...|3055e0d8130c7a197...|TestLoadProcess|TestRecordSource|
    |    947|2019-07-02 09:05:...|4d0198f4905a08812...|TestLoadProcess|TestRecordSource|
    |    369|2019-07-02 09:05:...|5f193b350c8aba488...|TestLoadProcess|TestRecordSource|
    |     21|2019-07-02 09:05:...|6f4b6612125fb3a0d...|TestLoadProcess|TestRecordSource|
    |    503|2019-07-02 09:05:...|7182dd431b5c8833e...|TestLoadProcess|TestRecordSource|
    +-------+--------------------+--------------------+---------------+----------------+
    only showing top 20 rows

dff:org.apache.spark.sql.DataFrame
DrgCode:string
LoadDate:timestamp
DrgKey:string
LoadProcess:string
RecordSource:string

None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 Scala/Spark 写入 SQL Server 日期时间数据类型 的相关文章

随机推荐