Rxjava 3 + Retrofit2 - 多次插入数据库问题

2024-02-03

我正在尝试做以下事情;使用 Retrofit 将云数据库同步到设备上的本地 SqLite DB (Room)。 DB 可能会变得很大,大约有 100,000 个寄存器或更多,因此同步过程可能需要一些时间。所以它会发送第一个Retrofit请求来获取寄存器的数量,这样它就可以计算出总页数,之后它会发送多个Retrofit请求,以从API获取所有数据,每次请求后,它都会将数据保存到房间。

现在,我在组合两个 RxJava 调用或进程时遇到问题,也在第二个 RxJava 进程上,在 Retrofit 调用之后,有一个对象列表的房间插入,但在洞进程结束后,我注意到没有100% 的所有记录都被插入,每次运行该过程时,插入的记录数都会发生变化,约为 80% - 98%,但永远不会 100%,即使发送了所有 Retrofit 调用。

请帮助我:

  1. 如何在一次 RxJava 调用中完成所有进程,而不是像我那样调用 2 个 现在?
  2. 如何将100%的记录插入Room?

按照代码:

Gradle

def room_version = "2.2.5"
//RxJava 2
implementation "io.reactivex.rxjava2:rxjava:2.2.19"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
//Retrofit
implementation 'com.squareup.retrofit2:retrofit:2.8.1'
implementation 'com.squareup.retrofit2:converter-gson:2.8.1'
//Retrofit2 Adapter for RxJava 2
implementation "com.squareup.retrofit2:adapter-rxjava2:2.8.1"
//okhttp3 Logging Interceptor
implementation "com.squareup.okhttp3:logging-interceptor:4.5.0"
//Room
implementation "androidx.room:room-runtime:$room_version"
annotationProcessor "androidx.room:room-compiler:$room_version"
//RxJava support for Room
implementation "androidx.room:room-rxjava2:$room_version" 

项目同步详情

...
public class ItemSyncDetails {
    @SerializedName("CurrentPage")
    int currentPage;
    @SerializedName("PageCount")
    int pageCount;
    @SerializedName("PageSize")
    int pageSize;
    @SerializedName("RecordCount")
    int recordCount;
    @SerializedName("Results")
    List<Item> mItemList;
...
}

ItemDao

注意:我没有使用过 Observer/Flowable/Maybe/Single,因为我有 能够使其与 RxJava 一起工作

import io.reactivex.Flowable;

@Dao
public interface ItemDao {

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    long insert(Item item);

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    List<Long> insertAll(List<Item> items);
...

DataApi

import io.reactivex.rxjava3.core.Observable;
...

public interface DataApi {

    @GET("item")
    Observable<ItemSyncDetails> getItemsByPage(
            @Query("pageSize") Integer pageSize,
            @Query("currentPage") Integer currentPage,
            @Query("sortBy") Integer sortBy
    );

项目存储库

import io.reactivex.Observable;
    ...

    public class ItemRepository {
    ...

        public ItemRepository(Application application) {
            mDataApi = RetrofitClient.getRetrofitInstance("http://192.168.1.100").create(DataApi.class);
            RfidDatabase db = RfidDatabase.getAppDatabase(application);
            itemDao = db.itemDao();
            itemList = itemDao.getAllItems();
            inserts = 0;
        }

        public List<Long> insertAllLocal (List<Item> itemList) {
            List<Long> items = itemDao.insertAll(itemList);
            inserts += items.size();
            Log.i(TAG, "************insertAllLocal - ItemRepository: " + inserts + "*************");
            Log.i(TAG, "************insertAllLocal - ItemRepository: " + items);
            return items;
        }

        public Observable<ItemSyncDetails> getRecordsCount(){
            return mDataApi.getItemsByPage(1,1,1);
        }

        public Observable<ItemSyncDetails> getItemsPerPage(int pageSize,int currentPage){
            return mDataApi.getItemsByPage(pageSize,currentPage,1);
        }
    ...

SyncConfigFragment 

    import io.reactivex.Observable;
    import io.reactivex.android.schedulers.AndroidSchedulers;
    import io.reactivex.disposables.CompositeDisposable;
    import io.reactivex.functions.Function;
    import io.reactivex.schedulers.Schedule
    ...

    public class SyncConfigFragment extends Fragment {


        private ItemViewModel itemViewModel;
        private ImageView imageSyncItems;
        private ProgressDialog progressDialog;
        private TextView tvSyncDescriptionItems;
        private DataApi service;
        private ItemSyncDetails mItemSyncDetails;
        private List<Item> mItemlist;
        private CompositeDisposable mCompositeDisposable;
        private int mNumPages;
        private int syncProgress;
        ...

        @Override
        public View onCreateView(LayoutInflater inflater, ViewGroup container, Bundle savedInstanceState) {
            View view =  inflater.inflate(R.layout.fragment_config_sync,container, false);
            progressDialog = new ProgressDialog(getActivity());
            sharedPref = getActivity().getSharedPreferences(
                    getString(R.string.sharepref_filename), Context.MODE_PRIVATE);
            mItemlist = new ArrayList<Item>();
            mCompositeDisposable = new CompositeDisposable();
            itemViewModel = ViewModelProviders.of(this).get(ItemViewModel.class);
            tvSyncDescriptionItems = view.findViewById(R.id.tvDescriptionSyncItems);
            if(sharedPref.contains("last_sync_item")) {
                tvSyncDescriptionItems.setText("Última actualización " + sharedPref.getString("last_sync_item",""));
            } else{
                tvSyncDescriptionItems.setText("No se ha Sincronizado");
            }
            imageSyncItems = view.findViewById(R.id.imageViewSyncItems);
            imageSyncItems.setOnClickListener(clickListener);
            return view;
        }

        private View.OnClickListener clickListener = new View.OnClickListener() {
            public void onClick(View v) {
                    if (v.equals(imageSyncItems)) {
                //If I uncomment the next line it does not work
                        //mCompositeDisposable.add(
                        mNumPages = 0;
                        syncProgress = 0;
                        showProgressDialog("Items");
                        getRecordsCount();
                       //); Closing round bracket for mCompositeDisposable
                }
            }
        };//End View.OnClickListener 

        private void getRecordsCount(){
            itemViewModel.getRecordsCount()
                    .subscribeOn(Schedulers.io())
                    .retry(3)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(this::HandleResults, this::handleError,this::getNumPagesHandlerComplete );
        }

        private void HandleResults(ItemSyncDetails itemSyncDetails) {
            this.mItemSyncDetails = itemSyncDetails;
            int pageSize = 100;
            int numPages = itemSyncDetails.getRecordCount()/pageSize;
            if (itemSyncDetails.getRecordCount() < pageSize || itemSyncDetails.getRecordCount()%pageSize != 0){
                numPages++;
            }
            this.mNumPages = numPages;
        }

        private void getNumPagesHandlerComplete() {
            getAllRecords(mNumPages);
        }

        private void handleError(Throwable throwable) {
            tvSyncDescriptionItems.setText("**********Error de conexión...");
            closeProgressDialog();
        }

        private void getAllRecords(int numPages){
            //numPages: total of pages are the number of times to send the request to API
            Observable.range(1, numPages)
                    .flatMap(i -> itemViewModel.getItemsPerPage(100,i))
                    .map(new Function<ItemSyncDetails, Integer>() {
                        @Override
                        public Integer apply(ItemSyncDetails itemSyncDetails) throws Throwable {
                            return itemViewModel.insertAllLocal(itemSyncDetails.getItemList()).size();
                        }
                    })
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(this::getAllHandleResults, this::handleError,this::handleComplete);
        }

        private void getAllHandleResults(Integer i) {
            progressDialog.setProgress(getProgress(i));
        }

        private void handleComplete() {
            //last request finished
            closeProgressDialog();
        }

        private int getProgress(int newItems){
            syncProgress += newItems;
            int progress = 0;
            if (syncProgress == mItemSyncDetails.getRecordCount()){
                progress = 100;
            } else {
                progress = (100 * syncProgress)/mItemSyncDetails.getRecordCount();
            }
            return progress;
        }
    ...
    }

注意:页面大小可能会改变,我使用的是 100 的固定大小 每页项目。

{
  Results: [
  {
    epc: "202020202020202030303031",
    barcode: "0001",
    name: "Televisor Samnsung",
    description: "0001",
    creation_date: "2020-02-26T10:55:06",
    last_update: "2020-02-26T10:55:06",
    last_seen: "2020-02-26T10:55:06",
    brand: "Samnsung",
    serial_number: "0001",
    parent: "",
    fk_category: 1,
    responsable: "",
    purchase_date: "2020-02-26T10:55:06",
    cost: 0,
    fk_location: 1008,
    fk_item_state: 1,
    inventory_date: "2020-02-26T10:55:06"
  }
 ],
 CurrentPage: 1,
 PageCount: 65565,
 PageSize: 1,
 RecordCount: 65565
}

您在编辑之前在此处发布了 json 响应。

    CurrentPage: 1,
    PageCount: 65566,
    PageSize: 1,
    RecordCount: 65566

如果我理解正确的话,那么你有 65k 个项目,每页有 1 个项目。意味着 65k 页面意味着 65k 网络调用。好多啊。你可以先改进这个设计。

  1. 将整个记录分成几页(甚至可能 10 或 20 页)。如果整个记录有数十万个项目,则 1 页仍将有数千个项目。
  2. 然后使用 gzip 压缩来压缩每个页面的 json 响应并从服务器提供该响应。或者不要将记录分成几页,然后将它们全部传递到使用 gzip 压缩的一个响应中(如果不是那么大)。
  3. 在 android 上解压缩响应,解析它,然后做任何你想做的事情。

通过这种方式,您可以减少大量网络调用,并可能减少同步的等待时间。

至于您实际的接收问题:

val pageSize = 100
viewModel.getRecordsCount()
    .map {
        // logic from `HandleResults` function
        // do some calculation
        var numPages: Int = it.records / pageSize
        if (it.records < pageSize || it.records % pageSize != 0) {
            numPages++
        }
        return@map numPages
    }
    .flatMap { pages -> Observable.range(1, pages) }
    .flatMap { page -> viewModel.getItemsPerPage(pageSize, page) }
    .flatMap { itemSyncDetails ->
        val items = viewModel.insertAllLocal(itemSyncDetails.getItemList())
        return@flatMap Observable.just(items.size)
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(....)

我注意到并不是 100% 的记录都被插入,每次运行该过程时,插入的记录数都会发生变化,大约是 80% - 98%,但永远不会 100%,即使发送了所有的 Retrofit 调用。

将错误记录在handleError功能并查看实际问题是什么。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rxjava 3 + Retrofit2 - 多次插入数据库问题 的相关文章

随机推荐

  • python 组合范围和数字列表

    range 5 15 1 1 5 6 10 10 10 11 17 28 range 6 24 4 10 10 10 15 16 18 20 24 30 range 7 41 9 18 19 23 23 26 28 40 42 44 ran
  • 获取OpenCV当前的FPS

    我正在编写一个 OpenCV 应用程序 FPS 非常重要 如何计算主循环的处理时间以获得当前和平均 FPS 这样 我就可以知道我的应用程序运行速度有多快 顺便说一句 我在 SSD 上使用 imread 所以处理器是这 里的瓶颈 你可以做这样
  • 单击锚标记时,将 HTML 文本输入的 readonly 属性设置为 false

    My HTML div class profileForm fieldset fieldset div
  • 网关未出现在 jhipster 注册表中

    我已经为网关应用程序创建了一个 docker 映像 但是当我运行命令时 docker compose up只有微服务和注册表是UP的 但网关甚至没有出现在实例中 22 08 25 10 57 23 661 ERROR 1 restarted
  • 强制完全重绘 Jpanel Java2D

    我的问题是 我需要制作一个不断更新的 GUI 因为我从数据库获取可以更改的值 并且在图形区域中遇到了一些问题 我使用 Graphics2D 中的 Drawline 和 Drawstring 打印在数据库中找到的值 这些字符串和线条移动并更改
  • Google脚本DriveApp.getFolders().hasNext()错误

    我想在驱动器中创建一个目录 如果该目录尚不存在 function CreateDirectory var folderName Example var Directory var fi DriveApp getFoldersByName f
  • WEKA 生成的模型似乎无法预测给定属性索引的类别和分布

    Overview 我正在使用 WEKA API 3 7 10 开发者版本 来使用我预制的 model files 我制作了 25 个模型 五种算法的五个结果变量 J48决策树 http weka sourceforge net doc de
  • Magento:(大多数)国家/地区在结账时扣除税费 [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我在一家英国时装店工作 客户有一个特
  • 用图像替换 d3.js 符号

    参考这个fiddle example http jsfiddle net andycooper a7as6 我需要用图像替换符号 或者可能首先用单个图像替换 例如 此图像 https github com favicon ico 我在代码中
  • Haskell——如何在同一个文件中使用多个模块?

    抱歉 这是一个愚蠢的问题 但我无法弄清楚如何将多个模块放在同一个文件中 假设文件名为A hs 如果我把模块B首先 即 module B where module A where 它抱怨说它期望A当我运行 ghci A 时 它不是顶级的 所以
  • 通过 Docker 主机名在两个微服务之间进行通信

    现在如何运作 微服务 X 使用静态 ip 向微服务 Y 发出 REST API 请求 http ip address port doSomething 问题 问题是我不能长期保证静态ip 我不想通过使用 docker 主机名来解决这个问题
  • 复制 Xcode 4 项目

    基本上我想为我的 Xcode 项目和所有文件制作一个独立的副本 我怎样才能做到这一点 我正在研究图形框架 我想为每个框架使用相同的 UI 借调 zoul https stackoverflow com users 17279 zoul的评论
  • GWT CellTable 列调整大小/排序

    有没有人找到一种方法使 GWT CellTable 允许用户调整列大小 我们正在放弃旧的 gwt incubator 小部件 因为它们似乎与 GWT 2 1 存在一些兼容性问题 并且仍然需要以前具有的此功能 另外 如果我们能够像孵化器那样进
  • 我可以禁用对已弃用的方法和类的 CheckStyle 投诉吗?

    我正在维护一个已弃用某些公共静态字段的 API CheckStyle 大声抱怨这些 但我宁愿让它完全忽略它们 因为我已经通过将字段标记为已弃用来处理问题 具体来说 该库具有用于枚举的常量 公共静态最终 但它们没有标记为最终的 CheckSt
  • 如何将 R 脚本加载到 JRI 并从 Java 执行?

    我正在使用 JRI 从 Java 执行 R 我看到 JRI 使用eval 方法来执行R命令 我有一个用于执行的 R 脚本 如何在 JRI 中加载此脚本并执行它 您可以使用 R 命令运行整个脚本source
  • jQuery 验证 - 相同的规则取决于值

    我在输入字段验证中获取 2 个范围值的语法时遇到问题 我的表单有 2 个选择字段和 1 个文本输入字段 If select1 1 and select2 A 我希望文本字段上的范围值是1 to 120 If select1 1 and se
  • pyRevit WPF非模态问题

    所以我刚刚开始涉足 pyRevit 中的 WPF 我尝试像这样实现pyrevit forms WPFWindow 类 coding UTF 8 Third Party software credits pyRevit repository
  • IPython Notebook ipywidgets 不显示

    我创建了一个带有交互式滑块的表格 它允许我在表格上的不同时段之间切换 过去几天一直有效 直到今天 当我重新运行笔记本时 滑动条不再显示 没有出现错误消息 当表格出现时 代码似乎运行得很好 但滑动条没有出现 我也没有更改我的代码 因为我正在处
  • Breezejs 和 EF6 中基于角色的安全性

    我在一个具有 3 个主要安全角色的项目中使用 Breeze js AngularJS Web API 和 EF6 可以说高级别 中级别和低级别 在这些示例中 我有 Person Company LowLevelSecret MediumLe
  • Rxjava 3 + Retrofit2 - 多次插入数据库问题

    我正在尝试做以下事情 使用 Retrofit 将云数据库同步到设备上的本地 SqLite DB Room DB 可能会变得很大 大约有 100 000 个寄存器或更多 因此同步过程可能需要一些时间 所以它会发送第一个Retrofit请求来获