如何增加 Cassandra 的数据流读取并行性

2023-12-28

我正在尝试将大量数据(2 TB,30kkk 行)从 Cassandra 导出到 BigQuery。我所有的基础设施都在 GCP 上。我的 Cassandra 集群有 4 个节点(每个节点 4 个 vCPU、26 GB 内存、2000 GB PD (HDD))。集群中有1个种子节点。我需要在写入 BQ 之前转换数据,因此我使用 Dataflow。工人类型是n1-highmem-2。 Worker 和 Cassandra 实例位于同一区域europe-west1-c。我对 Cassandra 的限制:

我负责读取转换的部分管道代码位于here https://gitlab.com/snippets/1691645.

自动缩放

问题是当我没有设置时--numWorkers,以这种方式自动缩放设置工人数量(平均 2 个工人):

负载均衡

当我设置--numWorkers=15阅读率没有增加,只有 2 个工作人员与 Cassandra 进行通信(我可以从iftop并且只有这些工作人员的 CPU 负载约为 60%)。

同时Cassandra节点没有太多负载(CPU使用率20-30%)。种子节点的网络和磁盘使用率比其他节点高2倍左右,但也不是太高,我认为:

对于这里的非种子节点:

管道启动警告

管道启动时我收到一些警告:

WARNING: Size estimation of the source failed: 
org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@7569ea63
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.132.9.101:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.101:9042] Cannot connect), /10.132.9.102:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.102:9042] Cannot connect), /10.132.9.103:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.103:9042] Cannot connect), /10.132.9.104:9042 [only showing errors of first 3 hosts, use getErrors() for more details])

我的 Cassandra 集群位于 GCE 本地网络中,并且一些查询是从我的本地计算机发出的,无法到达集群(我正在使用 Dataflow Eclipse 插件启动管道,如下所述)here https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-eclipse)。这些查询与表的大小估计有关。我可以手动指定大小估计或从 GCE 实例启动 pipline 吗?或者我可以忽略这些警告吗?对阅读速度有影响吗?

我尝试从 GCE VM 启动管道。连接不再有问题。我的表中没有 varchar 列,但收到此类警告(datastax 驱动程序中没有编解码器 [varchar java.lang.Long])。 :

WARNING: Can't estimate the size
com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [varchar <-> java.lang.Long]
        at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:741)
        at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:588)
        at com.datastax.driver.core.CodecRegistry.access$500(CodecRegistry.java:137)
        at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:246)
        at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:232)
        at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
        at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
        at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
        at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
        at com.google.common.cache.LocalCache.get(LocalCache.java:4053)
        at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
        at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
        at com.datastax.driver.core.CodecRegistry.lookupCodec(CodecRegistry.java:522)
        at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:485)
        at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:467)
        at com.datastax.driver.core.AbstractGettableByIndexData.codecFor(AbstractGettableByIndexData.java:69)
        at com.datastax.driver.core.AbstractGettableByIndexData.getLong(AbstractGettableByIndexData.java:152)
        at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:26)
        at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:95)
        at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getTokenRanges(CassandraServiceImpl.java:279)
        at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getEstimatedSizeBytes(CassandraServiceImpl.java:135)
        at org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource.getEstimatedSizeBytes(CassandraIO.java:308)
        at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.startDynamicSplitThread(BoundedReadEvaluatorFactory.java:166)
        at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:142)
        at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146)
        at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

管道读取代码

// Read data from Cassandra table
PCollection<Model> pcollection = p.apply(CassandraIO.<Model>read()
        .withHosts(Arrays.asList("10.10.10.101", "10.10.10.102", "10.10.10.103", "10.10.10.104")).withPort(9042)
        .withKeyspace(keyspaceName).withTable(tableName)
        .withEntity(Model.class).withCoder(SerializableCoder.of(Model.class))
        .withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));

// Transform pcollection to KV PCollection by rowName
PCollection<KV<Long, Model>> pcollection_by_rowName = pcollection
        .apply(ParDo.of(new DoFn<Model, KV<Long, Model>>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                c.output(KV.of(c.element().rowName, c.element()));
            }
        }));

拆分数量(Stackdriver 日志)

W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 
W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 
W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 

我尝试过的

没有效果:

  1. 将读取一致性级别设置为 ONE
  2. nodetool setstreamthroughput 1000, nodetool setinterdcstreamthroughput 1000
  3. 增加 Cassandra 读取并发性(在cassandra.yaml): concurrent_reads: 32
  4. 设置不同的工人数量1-40。

一些效果: 1.我按照@jkff的建议设置numSplits = 10。现在我可以在日志中看到:

I  Murmur3Partitioner detected, splitting 
W  Can't estimate the size 
W  Can't estimate the size 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@6d83ee93 produced 10 bundles with total serialized response size 20799 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@25d02f5c produced 10 bundles with total serialized response size 19359 
I  Splitting source [0, 1) produced 1 bundles with total serialized response size 1091 
I  Murmur3Partitioner detected, splitting 
W  Can't estimate the size 
I  Splitting source [0, 0) produced 0 bundles with total serialized response size 76 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@2661dcf3 produced 10 bundles with total serialized response size 18527 

但我还有另一个例外:

java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.Cassandra...
(5d6339652002918d): java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@5f18c296
    at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:582)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:347)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:183)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148)
    at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68)
    at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336)
    at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294)
    at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$'
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:43)
    at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl$CassandraReaderImpl.start(CassandraServiceImpl.java:80)
    at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:579)
    ... 14 more
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$'
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:144)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186)
    at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:50)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:817)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:651)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1077)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1000)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    ... 1 more

也许有一个错误:CassandraServiceImpl.java#L220 https://github.com/apache/beam/blob/master/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java#L220

这个声明看起来像是打字错误:CassandraServiceImpl.java#L207 https://github.com/apache/beam/blob/release-2.1.1/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java#L207

我对 CassandraIO 代码所做的更改

正如 @jkff 提议的那样,我已经按照我需要的方式更改了 CassandraIO:

@VisibleForTesting
protected List<BoundedSource<T>> split(CassandraIO.Read<T> spec,
                                              long desiredBundleSizeBytes,
                                              long estimatedSizeBytes) {
  long numSplits = 1;
  List<BoundedSource<T>> sourceList = new ArrayList<>();
  if (desiredBundleSizeBytes > 0) {
    numSplits = estimatedSizeBytes / desiredBundleSizeBytes;
  }
  if (numSplits <= 0) {
    LOG.warn("Number of splits is less than 0 ({}), fallback to 10", numSplits);
    numSplits = 10;
  }

  LOG.info("Number of splits is {}", numSplits);

  Long startRange = MIN_TOKEN;
  Long endRange = MAX_TOKEN;
  Long startToken, endToken;

  String pk = "$pk";
  switch (spec.table()) {
  case "table1":
          pk = "table1_pk";
          break;
  case "table2":
  case "table3":
          pk = "table23_pk";
          break;
  }

  endToken = startRange;
  Long incrementValue = endRange / numSplits - startRange / numSplits;
  String splitQuery;
  if (numSplits == 1) {
    // we have an unique split
    splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
    sourceList.add(new CassandraIO.CassandraSource<T>(spec, splitQuery));
  } else {
    // we have more than one split
    for (int i = 0; i < numSplits; i++) {
      startToken = endToken;
      endToken = startToken + incrementValue;
      Select.Where builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where();
      if (i > 0) {
        builder = builder.and(QueryBuilder.gte("token(" + pk + ")", startToken));
      }
      if (i < (numSplits - 1)) {
        builder = builder.and(QueryBuilder.lt("token(" + pk + ")", endToken));
      }
      sourceList.add(new CassandraIO.CassandraSource(spec, builder.toString()));
    }
  }
  return sourceList;
}

我认为这应该被归类为 CassandraIO 中的错误。我提交了BEAM-3424 http://issues.apache.org/jira/browse/BEAM-3424。您可以尝试构建自己的 Beam 版本,将默认值 1 更改为 100 或类似的值,同时解决此问题。

我也提交了BEAM-3425 https://issues.apache.org/jira/browse/BEAM-3425针对尺寸估计期间的错误。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何增加 Cassandra 的数据流读取并行性 的相关文章

随机推荐

  • Ruby:是否可以设置通过字符串命名的实例变量的值?

    不确定这种模式叫什么 但场景如下 class Some this class has instance variables called thing 1 thing 2 etc end 有没有什么方法可以设置实例变量的值 其中实例变量名称是
  • 是否可以在 iOS 上发送 SILENT LOCAL 通知

    My app wakes up从暂停模式开始静音遥控器来自服务器的通知 正是我想要的 该服务器发送带有 的推送通知 内容可用 1 它完成了这项工作 现在我想在没有服务器帮助的情况下执行此操作 所以我想发送本地无声通知 来自应用程序 将来的某
  • 将值从 MSBuild 任务传递到 TFS 构建工作流程

    有许多示例和方法可以将属性值从 TFS 2010 构建工作流传递到 MSBuild 但我需要执行相反的操作 基本上 在构建盒上编写了自定义目标文件 以对盒上运行的所有构建进行一些处理 这些目标文件中包含自定义任务 其中一些任务公开返回值 或
  • 尝试在 rake 任务中启动 redis 和 resque 调度程序

    我想从 rake 任务启动 redis 和 redis scheduler 所以我正在执行以下操作 namespace raketask do task start do system QUEUE rake resque work syst
  • 在 Unix 中的邮件中附加 2 个以上文件

    我有很多文件需要附加并通过电子邮件发送 我正在运行脚本来执行此操作 你能帮我写代码吗 您可以使用选项 a of mailx多次 例如 mailx s Few files attached a file1 txt a file2 txt em
  • IE 中的 window.location 问题

    我有这个简单的代码 可以在每个浏览器中正常工作 但不能在 IE 每个版本 中工作 window setTimeout window location http www domain modules yobilab copyright cla
  • 在 iOS 应用程序中使用现有的系统声音 [swift|

    是否可以在我自己的应用程序中使用现有的Apple系统声音 我想用 Swift 编写一个示例应用程序 执行以下步骤 读取 获取设备上所有可用系统声音的列表 我认为它们位于 System Library Audio UISounds 在屏幕上显
  • 宏定义冲突

    我遇到了这个问题 这与宏函数无关 只是简单的字符串值宏替换 我有两个头文件 test1 h define TEST 123 test2 h define TEST 456 现在我有一个程序包含这两个标头 但我希望我的实际 TEST 为 12
  • 将雪(和降雪)与 AWS 结合使用,在 R 中进行并行处理

    相对于我之前的类似的问题 https stackoverflow com questions 7241244 using aws for parallel processing with r 我尝试在AWS上使用snow snowfall进
  • .NET 4.5 中的序列化中断

    我们遇到了一个仅在 NET 4 5 中发生的序列化问题 相同的代码在 NET 4 中工作正常 我们正在尝试使用一些字段序列化继承类型 基类和继承类都标记为可序列化属性 我们在 Web 服务的客户端收到一个异常 说有一个方法访问异常在服务器端
  • 用于模糊字符串比较的好 Python 模块? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 Locked 这个问题及其答案是locked help locked posts因为这个问题是题外话 但却具有历史意义 目前不接受新的答案或互动
  • Wagtail:如何将模型实例传递给小部件并在模板中访问它

    我有一个基于 Wagtails 的模型Page模型 我正在将自定义小部件分配给模型字段之一 渲染管理视图时是否可以在小部件 HTML 模板中访问模型实例 我需要管理视图中的小部件来知道哪个 IDSimplePage该小部件所属 即获取值 p
  • OSX Mavericks - 不再安装 BIND...如何使本地 DNS 服务器正常工作?

    我一直在 OSX 上使用 BIND 为我的本地开发机器提供本地 DNS 解析器 特别是为了方便虚拟机访问我的本地开发环境 愚蠢的是 我决定连夜升级到 OSX Mavericks 但似乎不再安装 BIND 即使添加了命令行开发人员工具 有人建
  • clang 尝试捕获失败

    这是我所说的代码部分 try std cerr lt lt first try lt lt std endl po store po parse config file ifs configFileOptions false vm catc
  • 从两个向量创建新的 ID(双向)

    我在一个由因子组成的数据框中有两个向量 每个向量中有约 10000 个唯一因子 这是我的数据的简化示例 tg lt data frame A sample letters 1 5 30 replace TRUE B sample lette
  • Pandas 数据框 to_csv - 分成多个输出文件

    将非常大的数据帧 50GB 分割成多个输出 水平 的最佳 最简单方法是什么 我想过做类似的事情 stepsize int 1e8 for id i in enumerate range 0 df size stepsize start i
  • 如何在 JasperReports Server 4.0.0 中配置邮件服务器设置

    我在用着JasperReports服务器4 0 0我想如何配置邮件服务器设置以通过报告计划邮寄报告 我怎样才能做到这一点 有人知道这个吗 您应该编辑
  • 由于“完美分离错误”,无法运行逻辑回归

    我是 Python 数据分析的初学者 并且在完成这项特定任务时遇到了麻烦 我进行了广泛的搜索 但无法找出问题所在 我导入了一个文件并将其设置为数据框 清理了文件中的数据 然而 当我尝试将我的模型拟合到数据时 我得到了 检测到完美分离 结果不
  • 为 mat-select 预选多个值 - Angular 6

    我正在尝试在垫选择中预选多个选项 到目前为止我还无法实现这一目标 这是 HTML 文件
  • 如何增加 Cassandra 的数据流读取并行性

    我正在尝试将大量数据 2 TB 30kkk 行 从 Cassandra 导出到 BigQuery 我所有的基础设施都在 GCP 上 我的 Cassandra 集群有 4 个节点 每个节点 4 个 vCPU 26 GB 内存 2000 GB