在线程环境中,分布式事务如何处理与同一数据库的多个连接?

2024-01-17

我正在尝试确定分布式事务中多个数据库连接的行为。

我有一个长时间运行的进程,它会产生一系列线程,然后每个线程负责管理其数据库连接等。所有这些都在事务范围内运行,并且每个线程都通过DependentTransaction object.

当我并行处理这个过程时,我遇到了一些问题,即似乎存在某种阻止查询在事务上同时执行的块。

我想知道事务协调器如何处理从多个连接到同一数据库的查询,以及是否建议跨线程传递连接对象?

我读到 MS SQL 只允许每个事务有一个连接,但我显然能够在同一事务中创建和初始化到同一数据库的多个连接。在打开连接时,如果没有出现“事务上下文正在被另一个会话使用”异常,我根本无法并行执行线程。结果是连接必须等待执行而不是同时运行,最终代码运行完成,但由于此锁定问题,线程化应用程序没有任何净收益。

代码看起来像这样。

    Sub StartThreads()
        Using Scope As New TransactionScope
            Dim TL(100) As Tasks.Task
            Dim dTx As DependentTransaction
            For i As Int32 = 0 To 100
                Dim A(1) As Object
                dTx = CType(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete), DependentTransaction)
                'A(0) = some_other_data
                A(1) = dTx 'the Dependent Transaction

                TL(i) = Tasks.Task.Factory.StartNew(AddressOf Me.ProcessData, A) 'Start the thread and add it to the array
            Next

            Tasks.Task.WaitAll(TL) 'Wait for threads to finish

            Scope.Complete()
        End Using
    End Sub
    Dim TransLock As New Object
    Sub ProcessData(ByVal A As Object)
        Dim DTX As DependentTransaction = A(1)
        Dim Trans As Transactions.TransactionScope
        Dim I As Int32
        Do While True
            Try
                SyncLock (TransLock)
                    Trans = New Transactions.TransactionScope(DTX, TimeSpan.FromMinutes(1))
                End SyncLock
                Exit Do
            Catch ex As TransactionAbortedException
                If ex.ToString.Contains("Failure while attempting to promote transaction") Then
                ElseIf ex.Message = "The transaction has aborted." Then
                    Throw New Exception(ex.ToString)
                    Exit Sub
                End If
                I += 1
                If I > 5 Then
                    Throw New Exception(ex.ToString)
                End If
            Catch ex As Exception

            End Try
            Thread.Sleep(10)
        Loop
        Using Trans
            Using DALS As New DAC.DALScope
                Do While True
                    Try
                        SyncLock (TransLock)
                            'This opens two connection to the same DB for later use.
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.FirstConnection)
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.SecondConnection)
                        End SyncLock
                        Exit Do
                    Catch ex As Exception
                        'This is usually where I find the bottleneck
                        '"Transaction context in use by another session" is the exception that I get
                        Thread.Sleep(100)
                    End Try
                Loop

                '*****************
                'Do some work here
                '*****************

                Trans.Complete()
            End Using
        End Using
        DTX.Complete()
    End Sub

EDIT

我的测试最终表明这是不可能完成的。即使有多个连接或使用相同的连接,事务中的所有请求或问题也会按顺序处理。

也许他们将来会改变这种行为。


首先,您必须将您在这里或那里读到的有关 SQL Server 事务的内容分为两种不同的情况:本地和分布式。

本地SQL事务:

  • SQL Server 只允许在每个本地事务上执行一个请求。
  • 默认情况下,只有一个会话可以注册本地事务。使用 sp_getbindtoken 和 sp_bindsession 可以在本地事务中注册多个会话。会话仍然仅限于在任何时间只有一个执行请求。
  • 通过多个活动结果集 (MARS),一个会话可以执行多个请求。所有请求都必须注册到同一个本地事务中。

分布式事务:

  • 多个会话可以将其本地事务注册到单个分布式事务中。
  • 每个会话仍然注册本地事务,受到上述本地事务的所有限制
  • 分布式事务中注册的本地事务受到分布式事务协调的两阶段提交
  • 注册分布式事务的实例上的所有本地事务仍然独立的本地事务,主要意味着它们具有冲突的锁命名空间。

因此,当客户端创建 .Net TransactionScope 并在此事务范围下在同一服务器上执行多个请求时,这些请求都是注册在分布式事务中的本地事务。一个简单的例子:

class Program
    {
        static string sqlBatch = @"
set nocount on;
declare @i int;
set @i = 0;
while @i < 100000
begin
    insert into test (a) values (replicate('a',100));
    set @i = @i+1;
end";

        static void Main(string[] args)
        {
            try
            {
                TransactionOptions to = new TransactionOptions();
                to.IsolationLevel = IsolationLevel.ReadCommitted;
                using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
                {
                    using (SqlConnection connA = new SqlConnection(Settings.Default.connString))
                    {
                        connA.Open();
                        using (SqlConnection connB = new SqlConnection(Settings.Default.connString))
                        {
                            connB.Open();

                            SqlCommand cmdA = new SqlCommand(sqlBatch, connA);
                            SqlCommand cmdB = new SqlCommand(sqlBatch, connB);

                            IAsyncResult arA = cmdA.BeginExecuteNonQuery();
                            IAsyncResult arB = cmdB.BeginExecuteNonQuery();

                            WaitHandle.WaitAll(new WaitHandle[] { arA.AsyncWaitHandle, arB.AsyncWaitHandle });

                            cmdA.EndExecuteNonQuery(arA);
                            cmdB.EndExecuteNonQuery(arB);
                        }
                    }
                    scp.Complete();
                }
            }
            catch (Exception e)
            {
                Console.Error.Write(e);
            }
        }
    }

创建一个虚拟测试表:

create table test (id int not null identity(1,1) primary key, a varchar(100));

并运行我的示例中的代码。您将看到两个请求并行执行,每个请求在表中插入 100k 行,然后在事务范围完成时提交两个请求。因此,您看到的问题与 SQL Server 或 TransactionScope 无关,它们可以轻松处理您描述的场景。此外,代码非常简单和直接,不需要创建依赖事务、进行克隆或提升事务。

Updated

使用显式线程和相关事务:

 private class ThreadState
    {
        public DependentTransaction Transaction {get; set;}
        public EventWaitHandle Done {get; set;}
        public SqlConnection Connection { get; set; }
    }
    static void Main(string[] args)
    {
        try
        {
            TransactionOptions to = new TransactionOptions();
            to.IsolationLevel = IsolationLevel.ReadCommitted;
            using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
            {
                ThreadState stateA = new ThreadState 
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateA.Connection.Open();
                ThreadState stateB = new ThreadState
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateB.Connection.Open();

                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateA);
                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateB);

                WaitHandle.WaitAll(new WaitHandle[] { stateA.Done, stateB.Done });

                scp.Complete();

                //TODO: dispose the open connections
            }

        }
        catch (Exception e)
        {
            Console.Error.Write(e);
        }
    }

    private static void Worker(object args)
    {
        Debug.Assert(args is ThreadState);
        ThreadState state = (ThreadState) args;
        try
        {
            using (TransactionScope scp = new TransactionScope(state.Transaction))
            {
                SqlCommand cmd = new SqlCommand(sqlBatch, state.Connection);
                cmd.ExecuteNonQuery();
                scp.Complete();
            }
            state.Transaction.Complete();
        }
        catch (Exception e)
        {
            Console.Error.WriteLine(e);
            state.Transaction.Rollback();
        }
        finally
        {
            state.Done.Set();
        }

    }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在线程环境中,分布式事务如何处理与同一数据库的多个连接? 的相关文章

  • Postgresql 串行错误自动增量

    我在 postgresql 上遇到问题 我认为 postgresql 中有一个错误 我错误地实现了一些东西 有一个表包括colmn1 primary key colmn2 unique colmn3 插入一行后 如果我尝试使用现有的另一次插
  • Oracle如何将UTC时间转换为本地时间(缺少偏移信息)

    我有一个包含日期列的表 我认为该列中的日期是以 UTC 格式保存的 我希望检索日期时以当地时间打印 这意味着当我从德国调用日期时 结果应该是这样的 2015 04 29 11 24 06 0200UTC EUROPE BERLIN 我尝试了
  • 使用转义换行符和回车符取消转义字符串

    我正在尝试编写一个 PLPGSQL 函数来混淆 审查 编辑文本 Obfuscate a body of text by replacing lowercase letters and numbers with symbols CREATE
  • 调试 Java InterruptedException,即查找原因

    在调试Android应用程序时 有时中断异常发生并使应用程序崩溃 我已经能够在默认异常处理程序上设置断点 但调用堆栈不提供信息 at java util concurrent locks AbstractQueuedSynchronizer
  • 如何将彼此“接近”的纬度/经度点分组?

    我有一个用户提交的纬度 经度点的数据库 并且正在尝试将 接近 点分组在一起 接近 是相对的 但目前看来约为 500 英尺 起初 我似乎只能按前 3 个小数位具有相同纬度 经度的行进行分组 大约是一个 300x300 的盒子 了解当您远离赤道
  • REGEXP_REPLACE - 仅当包含在 () 中时才从字符串中删除逗号

    我在 oracle 论坛网站找到了一个例子 输入字符串 a b c x y z a xx yy zz x WITH t AS SELECT a b c x y z a xx yy zz x col1 FROM dual SELECT t c
  • 使用单独的线程在java中读取和写入文件

    我创建了两个线程并修改了 run 函数 以便一个线程读取一行 另一个线程将同一行写入新文件 这种情况会发生直到整个文件被复制为止 我遇到的问题是 即使我使用变量来控制线程一一执行 但线程的执行仍然不均匀 即一个线程执行多次 然后控制权转移
  • 当底层连接是有状态时如何使用 Apache HttpClient?

    我在谷歌上搜索了很多关于如何使用 HttpClient 进行多线程处理的信息 他们中的大多数人建议使用 ThreadSafeClientConnManager 但我的应用程序必须登录某个主机 登录表单页面 以便 HttpClient 获得底
  • PyQt5:如何使QThread返回数据到主线程

    I am a PyQt 5 4 1 1初学者 我的Python是3 4 3 这是我尝试遵循的many https mayaposch wordpress com 2011 11 01 how to really truly use qthr
  • ORA-00933 与内部联接和“as”混淆

    我有一个使用以下命令从两个表中获取数据的查询inner join 但我收到错误SQL command not properly ended as 下面有一个星号 select P carrier id O order id O aircra
  • 如何对多行的一列值求和?

    我有这个表 我想添加几行的 change 列的值 或者更准确地说 从 ne 值为零的行到 ne 值为零的下一行 不是第二个本身 任何答案将不胜感激 rn date ne change 0 2008 12 07 0 10330848398 1
  • SQL Server 2012:有条件地增加计数器用户 ROW_NUMBER()

    我正在尝试申请ROW NUMBER 根据特定条件增加计数器 我的数据如下所示 目标计数器是Prep column id DSR PrepIndicator Prep 1662835 1 1 1 1662835 14 2 2 1662835
  • SQL UPDATE 语句根据另一个现有行更新列

    基本上我有一个与下表具有相似格式的表格 我想做的是根据这个逻辑更新 Col4 如果 Col2 为空 则用 Col3 更新 Col4 如果 Col2 不为 null 则在 Col1 中查找与 Col2 中的值匹配的值 使用 col3 中的相应
  • SQLite (Android):使用 ORDER BY 更新查询

    Android SQLite 我想要在 myTable 中的其他行之间插入行在android中使用SQLite 为此 我尝试增加从第 3 行开始的所有行的 id 这样 我就可以在位置 3 处插入新行 myTable 的主键是列 id 表中没
  • 如何引用下一行的数据?

    我正在 PostgreSQL 9 2 中编写一个函数 对于股票价格和日期的表 我想计算每个条目较前一天的百分比变化 对于最早一天的数据 不会有前一天 因此该条目可以简单地为 Nil 我知道WITH声明可能不应该高于IF陈述 到目前为止 这就
  • 在 Mysql 上使用 EntityManager JPA 运行脚本

    我正在尝试运行脚本 sql 文件 但由于我尝试了多种方法 因此出现多个错误 这是我的主要 sql 脚本 INSERT INTO Unity VALUES 11 paq 0 2013 04 15 11 41 37 Admin Paquete
  • Spring Data JPA 选择不同

    我有一个情况 我需要建立一个select distinct a address from Person a 其中地址是 Person 内的地址实体 类型的查询 我正在使用规范动态构建我的 where 子句并使用findAll Specifi
  • MySQL:如何获取每个分组的x个结果数[重复]

    这个问题在这里已经有答案了 可能的重复 mysql 在 GROUP BY 中使用 LIMIT 来获取每组 N 个结果 https stackoverflow com questions 2129693 mysql using limit w
  • 如何连续添加起始行和下一行的值

    我只想创建一个 sql 查询 结果就像图片上的那样 类似于 SQL 中的斐波那契数列 Ex Column 1 10 则 Result 列的值为 Result 10 因为这是第一行 然后假设column1第二行的值为50 那么Result第二
  • 没有为 1 个或多个必需参数给出值。更新SQL

    我正在编写一个程序 当用户在列表视图上选择记录时 该程序会更新密码或积分 我收到错误 没有为 1 个或多个必需参数给出值 我不知道如何纠正 我是否遗漏了一些明显的东西 Dim sql As String UPDATE Users SET P

随机推荐

  • tinymce鼠标贴不起作用

    我想在tinymce中启用鼠标粘贴 当我点击粘贴时 它显示错误 剪切 粘贴 复制在 Firefox 中被禁用 我在他们的论坛上搜索了这个 http www tinymce com forum viewtopic php id 20637 h
  • 在函数内分配内存后使用双指针

    我正在使用 C 中的双指针 想知道是否创建一个初始化表的函数 当我尝试使用 InitStringTable 分配的内存时 它会在返回 main 时崩溃 我相信一个简单的解决方法是使 strTable 成为全局的 然后我相信它可以 但我不想这
  • OpenCV-Python cv2.CV_CAP_PROP_POS_FRAMES 错误

    目前 我使用的是opencv 3 1 0 在执行以下代码时遇到以下错误 post frame cap get cv2 CV CAP PROP POS FRAMES 我收到以下错误消息 文件 videoOperation py 第 37 行
  • jQuery 模态和深度链接

    我目前有一个画廊 当您单击缩略图时 它会打开一个模式弹出窗口 我想做的是能够专门为模式生成一个唯一的链接 即 www mywebite com link1 它通过 ajax 加载其内容 如果有人要发送这个独特的模式链接并将其发送给某人 然后
  • NSDate 格式化程序

    不幸的是 我尝试将字符串转换为 NSDATE 但没有成功 2010 年 10 月 22 日星期五 11 26 45 美国东部时间 我知道格式化选项 http sree cc objective c nsdate format string
  • 与实体对象一起使用时,ResponseBuilder 不起作用

    我正在尝试使用responsebuilder 创建响应 当我在实体中传递字符串时 它工作正常 但是当我传递一些错误类时 它不起作用 这是代码 1 工作正常 Response status 400 entity test build 2 不工
  • 日期时间数据类型在soap php中不起作用

    这是我的代码 c new soapclient http www redbus in WS2 BookingService asmx wsdl array authentication gt array LoginID gt x Passw
  • jQuery Mobile:将数据从一个页面发送到另一页面

    我有一个问题 我需要将数据 ID 从列表发送到另一个页面 这是html代码 div div h1 Players App h1 div div ul ul div div
  • 如何以编程方式重置表单?

    我想在 JQuery 单击事件函数中重置表单 怎么做 最简单的是 form selector here 0 reset 但也请参阅 使用 jQuery 重置多阶段表单 https stackoverflow com questions 68
  • 如何根据日期是否大于今天的日期来呈现 JSF 组件?

    我从服务器端获取一个日期 以及如何在我的 xhtml 代码中比较它 这样如果它小于今天的日期 我将渲染面板 否则不会 你可以在你的bean中有一个方法 class MyBean public boolean isDateBigger dat
  • 将正则表达式模式从 Sub 传递到 Excel VBA 中的函数

    我试图将正则表达式模式传递给 Excel VBA 中的函数 但该模式似乎没有任何效果 我插入了 msgbox es 来查看字符串的样子 结果没问题 这是我正在使用的代码 Sub clean COP names Dim strSheet As
  • 一个 git 命令显示目录中所有文件的状态

    git ls files v o 显示未跟踪的文件 git ls files v 显示跟踪的文件 必须有一种更简单的方法来显示当前目录中所有文件的状态 如果有 259 个 可能是隐藏的 文件 那么我想查看 259 行 每行的状态如下 匹配头
  • Whoosh (Python) 在哪里物理存储索引内容?

    我开始研究内容索引的实现 并且正在查看 Whoosh https pypi python org pypi Whoosh https pypi python org pypi Whoosh 我很想知道 Whoosh 将其内容物理存储在哪里
  • 如何使用 DataAnnotation 验证下拉列表?

    我需要您的帮助 我在使用 AdataAnnotation 进行验证时遇到问题 我正在尝试使用它验证下拉列表 但它有一些问题 这是我的代码 查看侧面 using Html BeginForm addNewProject Activities
  • Oracle 19c Open_cursor 超出问题

    我们在 Oracle 10g 和 19c 中存在相同的存储过程 具有相同的数据集和设置 该过程执行大量数据获取和操作 当我们使用相同的数据集 假设 10000 条记录 执行时 它在 10g 中运行良好 时间更少 但在 19c 中需要很多时间
  • 我在哪里可以找到更新的实时汇率?

    如何将实时货币汇率链接到我的 iPhone 应用程序 首先 谁知道有哪些网站可以查询汇率 其次 如何将其链接到我的应用程序 我想做这个应用程序所做的事情 http the dream co uk currencee http the dre
  • Kendo 网格、ko 绑定和行索引访问

    我有一个 ko 视图模型 我使用 knockout kendo js 将其绑定到 KendoGrid 我使用 rowTemplate 因为我需要在某些列中使用一些自定义功能 图标 链接等 我需要根据行号执行一些自定义功能 当直接绑定 ko
  • EPERM:不允许操作 - IIS 上的 NPM Angular 7(后端为 .Net Core 2.1)

    大家好 我用 Visual Studio 2017 制作了一个 Angular 7 应用程序 所以我在 Windows 10 上安装了带有 IIS 的 AWS 机器 当我加载应用程序时 我收到此错误 AggregateException 发
  • 如何为 Web API 指定不同的 AADInstance?

    我正在致力于将 Web Api 与 azure China Active Directory 集成并部署到 azure China 环境 Azure 中国的端点与常规 Azure 环境完全不同 我想知道如何指定AADInstancehttp
  • 在线程环境中,分布式事务如何处理与同一数据库的多个连接?

    我正在尝试确定分布式事务中多个数据库连接的行为 我有一个长时间运行的进程 它会产生一系列线程 然后每个线程负责管理其数据库连接等 所有这些都在事务范围内运行 并且每个线程都通过DependentTransaction object 当我并行