如果在我的应用程序获取新的redditNews时更改设备的方向,我会出现以下错误.

  E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1
    Process: com.spicywdev.schmeddit, PID: 26522
    io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | null
        at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
        at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:73)
        at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:43)
        at io.reactivex.Observable.subscribe(Observable.java:12090)
        at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
        at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
        at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
        at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
        at java.util.concurrent.FutureTask.run(FutureTask.java:237)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
        at java.lang.Thread.run(Thread.java:762)
     Caused by: java.io.InterruptedIOException
        at okhttp3.internal.http2.Http2Stream.waitForIo(Http2Stream.java:579)
        at okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:143)
        at okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:125)
        at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:88)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
        at okhttp3.RealCall.execute(RealCall.java:77)
        at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
        at retrofit2.ExecutorCallAdapterFactory$ExecutorCallbackCall.execute(ExecutorCallAdapterFactory.java:91)
        at com.spicywdev.schmeddit.features.news.NewsManager$getNews$1.subscribe(NewsManager.kt:16)
        at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)

That's what the class looks like. Error happens in function requestNews().

class NewsFragment : RxBaseFragment() {

    companion object {
        private val KEY_REDDIT_NEWS = "redditNews"
    }

    private var redditNews: RedditNews? = null
    private val newsManager by lazy { NewsManager() }

    override fun onCreateView(inflater: LayoutInflater, container: ViewGroup?, savedInstanceState: Bundle?): View? {
        return container?.inflate(R.layout.news_fragment)
    }

    override fun onActivityCreated(savedInstanceState: Bundle?) {
        super.onActivityCreated(savedInstanceState)

        news_list.apply {
            setHasFixedSize(true)
            val linearLayoutManager = LinearLayoutManager(context)
            layoutManager = linearLayoutManager
            clearOnScrollListeners()
            addOnScrollListener(InfiniteScrollListener({ requestNews()}, linearLayoutManager))
        }

        initAdapter()

        if (savedInstanceState != null && savedInstanceState.containsKey(KEY_REDDIT_NEWS)) {
            redditNews = savedInstanceState.get(KEY_REDDIT_NEWS) as RedditNews
            (news_list.adapter as NewsAdapter).clearAndAddNews(redditNews!!.news)
        } else {
            requestNews()
        }
    }

    override fun onSaveInstanceState(outState: Bundle) {
        super.onSaveInstanceState(outState)
        val news = (news_list.adapter as NewsAdapter).getNews()
        if (redditNews != null && news.isNotEmpty()) {
            outState.putParcelable(KEY_REDDIT_NEWS, redditNews?.copy(news = news))
        }
    }

    private fun requestNews() {
        /**
         * first time will send empty string for after parameter.
         * Next time we will have redditNews set with the next page to
         * navigate with the after param.
         */
        val subscription = newsManager.getNews(redditNews?.after ?: "")
            .subscribeOn(Schedulers.io())
            .subscribe(
                {
                    retrievedNews ->
                    redditNews = retrievedNews
                    (news_list.adapter as NewsAdapter).addNews(retrievedNews.news)
                },
                {
                    e -> Snackbar.make(news_list, e.message ?: "WZF", Snackbar.LENGTH_LONG).show()
                }
        )
        subscriptions.add(subscription)
    }


    private fun initAdapter() {
        if (news_list.adapter == null) {
            news_list.adapter = NewsAdapter()
        }
    }
}

I'm pretty new to RxJava - I'd be really glad if someone can help me with this. Thank you.

推荐答案

You can override Rx Error Handler with following way:

  1. 使用自定义MyApplication扩展应用程序类.
  2. 然后在应用程序类的onCreate方法中编写:

    @Override
    public void onCreate() {
        super.onCreate();
        RxJavaPlugins.setErrorHandler(throwable -> {}); // nothing or some logging
    }
    
  3. Add MyApplication class to Manifest:

    <application
        android:name=".MyApplication"
        ...>
    

For more information, please take a look at RxJava Wiki

Kotlin相关问答推荐

来自SnapshotFlow的单元测试StateFlow仅发出initialValue

Kotlin中的增广赋值语句中的难以理解的错误

Android Studio中的只读集合不支持操作失败的原因是什么?

两个LocalDateTime之间的Kotlin差异

仅某些用户出现 DateTimeFormatter 错误

为什么在jacksonObjectMapper上将DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES设置为false无效?

Criteria Api 中的 Kotlin 泛型

为什么 Kotlin main 函数需要 @JVMStatic 注解?

修改器的属性是什么,我需要更改以使角变圆且宽度更小?喷气背包组合

如何为 Kotlin 中的每个循环设置以避免越界异常

Kotlin spring boot @RequestBody 验证未触发

是什么让 Kotlin 中的 String 类能够使用方括号?

TextField maxLength - Android Jetpack Compose

Kotlin:如何在活页夹中返回正在运行的服务实例?

如何通过反射使用 Kotlin 对象

requireNotNull vs sure !! 操作符

Mocked suspend函数在Mockito中返回null

如何启用spring security kotlin DSL?

Kotlin替换字符串中的多个单词

在 kotlin 中,如何将主构造函数中的属性设置器设为私有?