我正在尝试使用类似的方法从 databricks 笔记本批量插入 SQL Server 表:
批量复制到 Azure SQL 数据库或 SQL Server https://docs.databricks.com/spark/latest/data-sources/sql-databases-azure.html#id7
这工作正常,直到我尝试写入数据类型日期时间列。我尝试写入的表具有以下架构:
create table raw.HubDrg_TEST
(
DrgKey varchar(64) not null,
LoadDate datetime,
LoadProcess varchar(255),
RecordSource varchar(255),
DrgCode varchar(255)
)
我的Scala代码如下:
//Get dataset for data in staging table
var stagedData: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", sqlDwUrlSmall)
.option("tempDir", tempDir)
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select distinct CodeID as DrgCode, getdate() as LoadDate from StageMeditech.livendb_dbo_DAbsDrgs").load()
//Get dataset for data in existing Hub
val existingHub: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", sqlDwUrlSmall)
.option("tempDir", tempDir)
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "Select DrgKey as ExistingDrgKey from raw.HubDrg_TEST")
.load()
val sha_256 = udf((s: String) => { String.format("%032x", new BigInteger(1, MessageDigest.getInstance("SHA-256").digest(s.getBytes("UTF-8")))) })
//Add additional columns
stagedData = stagedData.withColumn("DrgKey",sha_256(col("DrgCode"))).withColumn("LoadProcess",lit("TestLoadProcess"))
.withColumn("RecordSource",lit("TestRecordSource"))
//Join and filter out existing hub records
val dff = stagedData.join(existingHub, col("DrgKey")===col("ExistingDrgKey"), "left_outer").filter(existingHub.col("ExistingDrgKey").isNull).drop("ExistingDrgKey")
//Bulk insert
val bulkCopyConfig = Config(Map(
"url" -> dwServer,
"databaseName" -> dwDatabase,
"user" -> dwUser,
"password" -> dwPass,
"dbTable" -> "raw.HubDrg_TEST",
"bulkCopyBatchSize" -> "2000",
"bulkCopyTableLock" -> "false",
"bulkCopyTimeout" -> "0"
))
dff.bulkCopyToSqlDB(bulkCopyConfig)
我看到的问题是我选择的日期时间值getdate() as LoadDate
尝试插入上述表时出现此错误:SqlNativeBufferBufferBulkCopy.WriteTdsDataToServer, error in OdbcDone: SqlState: 42000, NativeError: 4816, 'Error calling: bcp_done(this->GetHdbc()) | SQL Error Info: SrvrMsgState: 1, SrvrSeverity: 16, Error <1>: ErrorMsg: [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid column type from bcp client for colid 2. | Error calling: pConn->Done() | state: FFFF, number: 58673, active connections: 9', Connection String: Driver={pdwodbc17e};app=TypeC01-DmsNativeWriter:DB66\mpdwsvc (13056)-ODBC;trusted_connection=yes;autotranslate=no;server=\\.\pipe\DB.66-a313018f1e5b\sql\query;database=Distribution_15
即使尝试不使用 SQL Server 查询中的日期时间值并将 LoadDate 值更改为:withColumn("LoadDate",current_timestamp())
,尝试使用当前时间戳 https://spark.apache.org/docs/2.3.1/api/sql/index.html#current_timestampSpark的内置函数,还是不行。
I saw this https://stackoverflow.com/questions/56798914/scala-sql-server-how-to-insert-the-current-timestamp-as-datetime-in-sql-servestackoverflow文章,这是一个类似的问题,但仍然没有回答这个问题。有没有人有一个关于如何插入到 SQL Server 表的好例子datetime https://learn.microsoft.com/en-us/sql/t-sql/data-types/datetime-transact-sql?view=sql-server-2017数据类型使用com.microsoft.azure.sqldb.spark.bulkcopy._
图书馆?
这是执行以下操作时的数据示例dff.show()
+-------+--------------------+--------------------+---------------+----------------+
|DrgCode| LoadDate| DrgKey| LoadProcess| RecordSource|
+-------+--------------------+--------------------+---------------+----------------+
| 390|2019-07-02 09:05:...|48a1a756f2d83f1dc...|TestLoadProcess|TestRecordSource|
| 18|2019-07-02 09:05:...|4ec9599fc203d176a...|TestLoadProcess|TestRecordSource|
| 481|2019-07-02 09:05:...|51d089cdaf0c968c9...|TestLoadProcess|TestRecordSource|
| 460|2019-07-02 09:05:...|841a05fd378a2c067...|TestLoadProcess|TestRecordSource|
| 838|2019-07-02 09:05:...|cef5838d118dccd9d...|TestLoadProcess|TestRecordSource|
| 61|2019-07-02 09:05:...|d029fa3a95e174a19...|TestLoadProcess|TestRecordSource|
| 807|2019-07-02 09:05:...|fce86e339dc3131c4...|TestLoadProcess|TestRecordSource|
| 44|2019-07-02 09:05:...|71ee45a3c0db9a986...|TestLoadProcess|TestRecordSource|
| 267|2019-07-02 09:05:...|8acc23987b8960d83...|TestLoadProcess|TestRecordSource|
| 222|2019-07-02 09:05:...|9b871512327c09ce9...|TestLoadProcess|TestRecordSource|
| 934|2019-07-02 09:05:...|a8443b1426652157e...|TestLoadProcess|TestRecordSource|
| 677|2019-07-02 09:05:...|2782526eaa0c5c254...|TestLoadProcess|TestRecordSource|
| 701|2019-07-02 09:05:...|290a0b92873bdf4e4...|TestLoadProcess|TestRecordSource|
| 441|2019-07-02 09:05:...|2dfe70c43208f52b9...|TestLoadProcess|TestRecordSource|
| 439|2019-07-02 09:05:...|50a010ce24d089605...|TestLoadProcess|TestRecordSource|
| 883|2019-07-02 09:05:...|3055e0d8130c7a197...|TestLoadProcess|TestRecordSource|
| 947|2019-07-02 09:05:...|4d0198f4905a08812...|TestLoadProcess|TestRecordSource|
| 369|2019-07-02 09:05:...|5f193b350c8aba488...|TestLoadProcess|TestRecordSource|
| 21|2019-07-02 09:05:...|6f4b6612125fb3a0d...|TestLoadProcess|TestRecordSource|
| 503|2019-07-02 09:05:...|7182dd431b5c8833e...|TestLoadProcess|TestRecordSource|
+-------+--------------------+--------------------+---------------+----------------+
only showing top 20 rows
dff:org.apache.spark.sql.DataFrame
DrgCode:string
LoadDate:timestamp
DrgKey:string
LoadProcess:string
RecordSource:string