doris stream load

2023-05-16

package uhp;


import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.*;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;




public class HttpClientUtilStreamLoadDoris {


    private final static String DORIS_HOST_DEV = "xxx";

    //public static Logger logger = LoggerFactory.getLogger(HttpClientUtilStreamLoadDoris.class);

    //    private final static String DORIS_TABLE = "join_test";
    private final static String DORIS_USER = "xxx";
    private final static String DORIS_PASSWORD = "xxx";
    private final static int DORIS_HTTP_PORT = 8030;

    public void sendData(String content, String db, String table) throws Exception {

        final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
                DORIS_HOST_DEV,
                DORIS_HTTP_PORT,
                db,
                table);
        final HttpClientBuilder httpClientBuilder = HttpClients
                .custom().addInterceptorFirst(new ContentLengthHeaderRemover())
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                });
        CloseableHttpClient client = httpClientBuilder.build();


        HttpPut put = new HttpPut(loadUrl);
        StringEntity entity = new StringEntity(content, "UTF-8");
        put.setHeader(HttpHeaders.EXPECT, "100-continue");
        put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(DORIS_USER, DORIS_PASSWORD));

        //put.setHeader("strip_outer_array", "true");
        put.setHeader("format", "json");
        //put.setHeader("merge_type", "MERGE");
        //put.setHeader("delete", "canal_type=\"DELETE\"");
        put.setHeader("label", "39c25a5c-7000-496e-a98e-348a264c81dd");
        put.setEntity(entity);
        put.removeHeaders("Content-Length");


        reConnect(client, put);

    }

    private void reConnect(CloseableHttpClient client, HttpPut put) throws Exception {


        String loadResult = "";
        CloseableHttpResponse response = client.execute(put);
        //todo 调用方法
        if (response.getEntity() != null) {

            loadResult = EntityUtils.toString(response.getEntity());
        }
        final int statusCode = response.getStatusLine().getStatusCode();
        if (statusCode != 200) {
            System.out.println("写入失败");
        } else {
            if (loadResult.contains("OK") && loadResult.contains("Success")) {
                System.out.println(loadResult);
            } else if (loadResult.contains("Fail")) {
                throw new Exception(loadResult + ",抛出异常,任务失败,当前时间: " + System.currentTimeMillis());
            } else {
                throw new Exception(loadResult + ",抛出异常,任务失败,当前时间: " + System.currentTimeMillis());
            }

        }


    }

    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }

    public static void main(String[] args) throws Exception {
        HttpClientUtilStreamLoadDoris httpClientUtilStreamLoadDoris = new HttpClientUtilStreamLoadDoris();
        httpClientUtilStreamLoadDoris.sendData("{\"event_day\":\"2021-03-24\",\"siteid\":1,\"citycode\":1,\"username\":\"杰a森\",\"pv\":23,\"uv\":66}","aac_test","aac_test_table2");

    }

    private static class ContentLengthHeaderRemover implements HttpRequestInterceptor{
        @Override
        public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
            request.removeHeaders("Content-Length");// fighting org.apache.http.protocol.RequestContent's ProtocolException("Content-Length header already present");
        }
    }
}

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

doris stream load 的相关文章

  • Dart 产生来自另一个流监听器的流事件

    我有一个函数可以生成stream的具体事件 现在我有一个来自存储服务的流 它有自己的事件 寻找一种方法在发生变化时产生我的事件storage stream 这段代码片段并不能解决问题 Stream
  • 如何在 Bash 中添加到流之前?

    假设我在 bash 中有以下命令 one two one运行很长时间产生输出流并且two对该流的每一行执行快速操作 但是two除非它读取的第一个值告诉它每行要读取多少个值 否则根本不起作用 one不输出该值 但我提前知道它是什么 假设它是1
  • 收到“Stream 不支持写入。”以下代码中出现异常

    我正在尝试将图像上传到 Amazon S3 但在此之前我正在调整图像大小 为了调整大小 我必须传递流对象 并且在某一时刻 注释为 Error 的行 我收到 Stream 不支持写入 例外 请帮忙 public ActionResult Ad
  • 如何从 Amazon Kinesis 流获取最新记录?

    我想从 Amazon Kinesis 流中获取最新记录 我打算从该记录中提取时间戳 并将其与消费者应用程序检查点的最后一个记录的时间戳进行比较 以检查消费者是否落后 我无法使用最新的分片迭代器类型 这是因为 LATEST 指向最近的记录之后
  • 从 http 服务流式传输大文件

    我正在编写一个组件来从 HTTP 服务传输大数据 4 GB 该组件采用 URL 和目标流 目标流可以是文件流 也可以是 POSTS 到不同 HTTP 服务的流 甚至两者都是 作为组件的作者 我需要执行以下步骤直到完成 从 HTTP 流中读取
  • WCF Web服务流响应的最佳实践

    我正在尝试从 WCF Web 服务中提取大量数据 请求相当小 而响应消息将非常大 目前 由于 IIS6 对其可分配的内存 1 4GB 有限制 Web 服务正在引发 SystemOutOfMemory 异常 我在一些博客中读到 实施流式传输可
  • 无法使用 jQuery.load 将外部页面加载到我页面的 div 中

    我无法将外部 html 页面加载到我页面的 div 中 我的 Jquery 代码是 document ready function var url http www google com get url function response
  • 使用 ostream 进行 C++ 日志记录

    我正在制作一个记录器 我想创建一个函数log 以流作为输入 例如 log hello lt lt lt lt world lt lt 10 lt lt n 我也希望它是线程安全的 我重新定义了 lt lt 运算符所以我可以这样做 log l
  • 如何通过管道将 OutputStream 传输到 StreamingDataHandler?

    我在 JAX WS 中有一个 Java Web 服务 它从另一个方法返回一个 OutputStream 我似乎无法弄清楚如何将 OutputStream 流式传输到返回的 DataHandler 中 除了创建一个临时文件 写入它 然后再次将
  • 延迟加载插件 (jQuery)

    a lightbox hover function if jQuery lightbox required otherwise lightbox js will be loaded on hover each time a lightbox
  • 如何检查 Stream.Null?

    我有一个 WCF 服务 它返回一个 Stream 如下所示 public Stream StreamFile string filepath try Grab the file from wherever it is Throw an ex
  • 如果您不打算从自适应渲染中受益,那么使用 HtmlTextWriter 有什么好处吗?

    除了从替代设备的自适应渲染中受益之外 编写所有这些代码是否有意义 writer WriteBeginTag table writer WriteBeginTag tr writer WriteBeginTag td writer Write
  • 您能解释一下流的概念吗?

    我知道流是字节序列的表示 每个流都提供了向其给定的后备存储读取和写入字节的方法 但流的意义何在 为什么我们与之交互的不是后备存储本身 不管出于什么原因 这个概念并不适合我 我读过很多文章 但我想我需要一个类比或其他东西 选择 流 这个词是因
  • 在 Gulp 中将流与事件流连接时的顺序

    在此 Gulp 任务中 vendorFiles 代码放置在 dest style css 文件中的 appFiles 代码之后 这是因为 appFiles 流运行得更快吗 如何让vendorFiles代码按预期出现在前面 gulp task
  • 如何有效计算文档流中文档之间的相似度

    我收集文本文档 在 Node js 中 其中一个文档i表示为单词列表 考虑到新文档以文档流的形式出现 计算这些文档之间相似性的有效方法是什么 我目前对每个文档中单词的归一化频率使用余弦相似度 我不使用 TF IDF 词频 逆文档频率 因为我
  • 在页面加载之前运行 JavaScript 函数(设置适当大小的背景)

    我有一个图像背景 无论用户的分辨率是多少 我都希望其内容始终可见 因此 我希望能够在一开始就在页面加载之前确定分辨率并设置适当的背景图像文件 有可能吗 您可以运行可访问 DOM 的 Javascript 函数 无需等待页面加载 的最早点是放
  • 将 InputStream 转换为固定长度字符串的 Stream

    Like in 将 InputStream 转换为给定字符集的 Stream https stackoverflow com questions 30336257 convert inputstream into streamstring
  • 使用 Node.js 就地流式传输和转换文件

    我想做这样的事情 var fs require fs var through require through var file path to file json var input fs createReadStream file utf
  • 使用azure逻辑应用程序将消息作为字符串发送到azure服务总线

    我正在使用逻辑应用操作 发送消息 向服务总线主题发送消息 在控制台应用程序中读取它时 如果我这样做 SubscriptionClient subClient SubscriptionClient CreateFromConnectionSt
  • 计算流数据的直方图 - 在线直方图计算

    我正在寻找一种算法来生成大量流数据的直方图 最大值和最小值事先未知 但标准差和平均值在特定范围内 我很欣赏你的想法 Cheers 我刚刚找到了一个解决方案 秒 从流式并行决策树算法构建在线直方图 论文的 2 2 该算法由 Hive 项目中的

随机推荐

  • Python中str与bytes互相转换

    快速转换方式 str to bytes my str 61 34 hello world 34 my str as bytes 61 str encode my str type my str as bytes ensure it is b
  • Python关于%matplotlib inline

    在github代码中经常会看到这样的代码 xff1a import numpy import matplotlib pyplot as plt from pandas import read csv import math from ker
  • Jupyter Notebook介绍、安装及使用教程

    目录 一 什么是Jupyter Notebook xff1f 1 简介 Jupyter Notebook是基于网页的用于交互计算的应用程序 其可被应用于全过程计算 xff1a 开发 文档编写 运行代码和展示结果 Jupyter Notebo
  • Python读取XML

    From http www cnblogs com fnng p 3581433 html 关于python读取xml文章很多 xff0c 但大多文章都是贴一个xml文件 xff0c 然后再贴个处理文件的代码 这样并不利于初学者的学习 xf
  • matlab解决中文显示乱码

    matlab很多函数在读取中文后显示乱码 xff0c 为了显示中文 xff0c 应改为UTF 8方式或其他支持中文的编码方式 xff0c 这在Matlab中的操作为 xff1a slCharacterEncoding 39 UTF 8 39
  • Matlab写TIFF格式文件(多于3波段)

    1 起因 通常情况下 xff0c 使用MATLAB做图像处理后 xff0c 使用下面的命令就可以保存处理结果为图片 imwrite im 39 im bmp 39 而如果需要保存的图像为single或者double类型 xff0c 或保存的
  • Python包设置清华源(pip, anaconda等)

    pip设置清华源 pypi 镜像每 5 分钟同步一次 临时使用 pip install i https pypi tuna tsinghua edu cn simple some package 注意 xff0c simple 不能少 是
  • shapefile字符集编码设置

    http zhihu esrichina com cn article 3 在 ArcGIS Desktop ArcMap ArcCatalog and ArcToolbox 中 xff0c 有编码页转换功能 xff08 CODE PAGE
  • pyhton 遍历文件夹,筛选文件

    如果我们需要遍历一个文件夹下的所有文件 xff0c 子文件夹里的内容 xff0c 用Python来实现 xff0c 很方便 xff0c 主要使用os walk folder xff0c 其中folder 是文件夹的路径 xff1a 先看代码
  • VINS 详解

    VINS是视觉与IMU融合SLAM的代表 xff0c 其实现了一个较为完整的SLAM工作 xff0c 开源地址为 xff1a GitHub HKUST Aerial Robotics VINS Mono A Robust and Versa
  • Python OS 文件/目录方法

    From http www runoob com python os file methods html os 模块提供了非常丰富的方法用来处理文件和目录 常用的方法如下表所示 xff1a 序号方法及描述1 os access path m
  • Python 异常处理

    From http www runoob com python python exceptions html python提供了两个非常重要的功能来处理python程序在运行中出现的异常和错误 你可以使用该功能来调试python程序 异常处
  • deeplabV3+源码分解学习

    From horsetif https www jianshu com p d0cc35b3f100 github上deeplabV3 43 的源码是基于tensorflow xff08 slim xff09 简化的代码 xff0c 是一款
  • 常用颜色名称与RGB数值对照表

    From http xh 5156edu com page z1015m9220j18754 html 颜色名 中文名称 Hex RGB 十进制 Decimal LightPink 浅粉红 FFB6C1 255 182 193 Pink 粉
  • c#调用C++DLL EntryPointNotFoundException 找不到入口点

    From http www voidcn com article p kqogmify rh html c 程序调用C 43 43 的dll的时候 xff0c 经常出现这样的问题 xff1a System EntryPointNotFoun
  • 混洗numpy.random.shuffle()与numpy.random.permutation()的区别

    参考API xff1a https docs scipy org doc numpy reference routines random html 1 numpy random shuffle API中关于该函数是这样描述的 xff1a M
  • 以time/gettimeofday系统调用为例分析ARM64 Linux 5.4.34

    目录 1 准备工作 2 触发系统调用 2 1依据 amp 分析 2 2构造代码 2 3触发系统调用 3 分析系统调用 3 1中断处理分析 xff08 保存现场 xff09 3 2内核堆栈pt regs xff08 保存现场 xff09 3
  • Kettle连接Access抽取数据到MS SQLServer

    软件准备 xff1a kettle5 1 access xff08 32位 xff09 jdk1 7 xff08 32位 xff09 软件位数需要一致 xff0c 不要求操作系统位数 搭建流程 xff1a 1 access新建表 2 准备a
  • python2 linux 解析文本乱码或UnicodeDecodeError: ‘ascii’ codec can’t decode byte

    linux乱码 xff0c 加下面两行 reload sys sys setdefaultencoding 39 utf 8 39 open 加参数errors 61 39 ignore 39 file init 61 io open fi
  • doris stream load

    package uhp import java io IOException import java nio charset StandardCharsets import org apache commons codec binary B