谷歌的android架构组件教程here https://developer.android.com/topic/libraries/architecture/guide.html有一部分解释了如何抽象通过网络获取数据的逻辑。在其中,他们使用 LiveData 创建一个名为 NetworkBoundResource 的抽象类,以创建反应流作为所有反应网络请求的基础。
public abstract class NetworkBoundResource<ResultType, RequestType> {
private final AppExecutors appExecutors;
private final MediatorLiveData<Resource<ResultType>> result = new MediatorLiveData<>();
@MainThread
NetworkBoundResource(AppExecutors appExecutors) {
this.appExecutors = appExecutors;
result.setValue(Resource.loading(null));
LiveData<ResultType> dbSource = loadFromDb();
result.addSource(dbSource, data -> {
result.removeSource(dbSource);
if (shouldFetch()) {
fetchFromNetwork(dbSource);
} else {
result.addSource(dbSource, newData -> result.setValue(Resource.success(newData)));
}
});
}
private void fetchFromNetwork(final LiveData<ResultType> dbSource) {
LiveData<ApiResponse<RequestType>> apiResponse = createCall();
// we re-attach dbSource as a new source, it will dispatch its latest value quickly
result.addSource(dbSource, newData -> result.setValue(Resource.loading(newData)));
result.addSource(apiResponse, response -> {
result.removeSource(apiResponse);
result.removeSource(dbSource);
//noinspection ConstantConditions
if (response.isSuccessful()) {
appExecutors.diskIO().execute(() -> {
saveCallResult(processResponse(response));
appExecutors.mainThread().execute(() ->
// we specially request a new live data,
// otherwise we will get immediately last cached value,
// which may not be updated with latest results received from network.
result.addSource(loadFromDb(),
newData -> result.setValue(Resource.success(newData)))
);
});
} else {
onFetchFailed();
result.addSource(dbSource,
newData -> result.setValue(Resource.error(response.errorMessage, newData)));
}
});
}
protected void onFetchFailed() {
}
public LiveData<Resource<ResultType>> asLiveData() {
return result;
}
@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {
return response.body;
}
@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);
@MainThread
protected abstract boolean shouldFetch();
@NonNull
@MainThread
protected abstract LiveData<ResultType> loadFromDb();
@NonNull
@MainThread
protected abstract LiveData<ApiResponse<RequestType>> createCall();
}
据我了解,该类的逻辑是:
a) 创建一个名为“result”的MediatorLiveData作为主要返回对象,并将其初始值设置为Resource.loading(null)
b) 从 Android Room db 获取数据作为 dbSource LiveData 并将其作为源 LiveData 添加到“结果”
c) 在 dbSource LiveData 第一次发射时,从“result”中删除 dbSource LiveData 并调用“shouldFetchFromNetwork()”,这将
- 如果为 TRUE,则调用“fetchDataFromNetwork(dbSource)”,它通过“createCall()”创建网络调用,返回封装为 ApiResponse 对象的响应的 LiveData
- 将 dbSource LiveData 添加回“结果”,并将发出的值设置为 Resource.loading(data)
- 将 apiResponce LiveData 添加到“结果”,并在第一次发射时删除 dbSource 和 apiResponce LiveDatas
- 如果 apiResponse 成功,则调用“saveCallResult(processResponse(response))”并将 dbSource LiveData 添加回“result”,并将发出的值设置为 Resource.success(newData)
- 如果 apiResponse 失败,则调用“onFetchFailed()”并将 dbSource LiveData 添加回“result”,并将发出的值设置为 Resource.error(response.errorMessage, newData))
- 如果为 FALSE,只需将 dbSource LiveData 添加到“结果”并将发出的值设置为 Resource.success(newData)
鉴于此逻辑是正确的解释,我尝试重构此类以使用 RxJava Observables 而不是 LiveData。这是我成功重构的尝试(我删除了最初的 Resource.loading(null),因为我认为这是多余的)。
public abstract class NetworkBoundResource<ResultType, RequestType> {
private Observable<Resource<ResultType>> result;
@MainThread
NetworkBoundResource() {
Observable<Resource<ResultType>> source;
if (shouldFetch()) {
source = createCall()
.subscribeOn(Schedulers.io())
.doOnNext(apiResponse -> saveCallResult(processResponse(apiResponse)))
.flatMap(apiResponse -> loadFromDb().toObservable().map(Resource::success))
.doOnError(t -> onFetchFailed())
.onErrorResumeNext(t -> {
return loadFromDb()
.toObservable()
.map(data -> Resource.error(t.getMessage(), data))
})
.observeOn(AndroidSchedulers.mainThread());
} else {
source = loadFromDb()
.toObservable()
.map(Resource::success);
}
result = Observable.concat(
loadFromDb()
.toObservable()
.map(Resource::loading)
.take(1),
source
);
}
public Observable<Resource<ResultType>> asObservable() {return result;}
protected void onFetchFailed() {}
@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {return response.body;}
@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);
@MainThread
protected abstract boolean shouldFetch();
@NonNull
@MainThread
protected abstract Flowable<ResultType> loadFromDb();
@NonNull
@MainThread
protected abstract Observable<ApiResponse<RequestType>> createCall();
}
由于我是 RxJava 新手,我的问题是我是否正确重构为 RxJava 并维护与此类的 LiveData 版本相同的逻辑?