private SafeIterableMap<Observer<? super T>, ObserverWrapper> mObservers = newSafeIterableMap<>();
// how many observers are in active state @SuppressWarnings("WeakerAccess")/* synthetic access */ intmActiveCount=0; // to handle active/inactive reentry, we guard with this boolean privateboolean mChangingActiveState; privatevolatile Object mData; // 1️⃣:当前的data // when setData is called, we set the pending data and actual data swap happens on the main // thread @SuppressWarnings("WeakerAccess")/* synthetic access */ volatileObjectmPendingData= NOT_SET; // 2️⃣:给多线程使用的,在4️⃣mPostValueRunnable中使用 privateint mVersion; // 3️⃣:mVersion是数据版本
@SuppressWarnings("WeakerAccess")/* synthetic access */ voiddispatchingValue(@Nullable ObserverWrapper initiator) { if (mDispatchingValue) { mDispatchInvalidated = true; return; } mDispatchingValue = true; do { mDispatchInvalidated = false; if (initiator != null) { considerNotify(initiator); initiator = null; } else { for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator = mObservers.iteratorWithAdditions(); iterator.hasNext(); ) { considerNotify(iterator.next().getValue()); if (mDispatchInvalidated) { break; } } } } while (mDispatchInvalidated); mDispatchingValue = false; }
如果当前正在分派,则退出,并通过mDispatchInvalidated打断后续分派
如果参数不空,则只分派给参数,否则分派给所有observer
considerNotify
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
privatevoidconsiderNotify(ObserverWrapper observer) { if (!observer.mActive) { return; } // Check latest state b4 dispatch. Maybe it changed state but we didn't get the event yet. // // we still first check observer.active to keep it as the entrance for events. So even if // the observer moved to an active state, if we've not received that event, we better not // notify for a more predictable notification order. if (!observer.shouldBeActive()) { observer.activeStateChanged(false); return; } if (observer.mLastVersion >= mVersion) { return; } observer.mLastVersion = mVersion; observer.mObserver.onChanged((T) mData); }
@MainThread publicvoidremoveObserver(@NonNullfinal Observer<? super T> observer) { assertMainThread("removeObserver"); ObserverWrapperremoved= mObservers.remove(observer); if (removed == null) { return; } removed.detachObserver(); removed.activeStateChanged(false); }
/** * Removes all observers that are tied to the given {@link LifecycleOwner}. * * @param owner The {@code LifecycleOwner} scope for the observers to be removed. */ @SuppressWarnings("WeakerAccess") @MainThread publicvoidremoveObservers(@NonNullfinal LifecycleOwner owner) { assertMainThread("removeObservers"); for (Map.Entry<Observer<? super T>, ObserverWrapper> entry : mObservers) { if (entry.getValue().isAttachedTo(owner)) { removeObserver(entry.getKey()); } } }
/** * Creates a MutableLiveData initialized with the given {@code value}. * * @param value initial value */ publicMutableLiveData(T value) { super(value); }
/** * Creates a MutableLiveData with no value assigned to it. */ publicMutableLiveData() { super(); }
/** * Creates a MediatorLiveData with no value assigned to it. */ publicMediatorLiveData() { super(); }
/** * Creates a MediatorLiveData initialized with the given {@code value}. * * @param value initial value */ publicMediatorLiveData(T value) { super(value); }
/** * Starts to listen to the given {@code source} LiveData, {@code onChanged} observer will be * called when {@code source} value was changed. * <p> * {@code onChanged} callback will be called only when this {@code MediatorLiveData} is active. * <p> If the given LiveData is already added as a source but with a different Observer, * {@link IllegalArgumentException} will be thrown. * * @param source the {@code LiveData} to listen to * @param onChanged The observer that will receive the events * @param <S> The type of data hold by {@code source} LiveData */ @MainThread public <S> voidaddSource(@NonNull LiveData<S> source, @NonNull Observer<? super S> onChanged) { if (source == null) { thrownewNullPointerException("source cannot be null"); } Source<S> e = newSource<>(source, onChanged); Source<?> existing = mSources.putIfAbsent(source, e); if (existing != null && existing.mObserver != onChanged) { thrownewIllegalArgumentException( "This source was already added with the different observer"); } if (existing != null) { return; } if (hasActiveObservers()) { e.plug(); } }
/** * Stops to listen the given {@code LiveData}. * * @param toRemote {@code LiveData} to stop to listen * @param <S> the type of data hold by {@code source} LiveData */ @MainThread public <S> voidremoveSource(@NonNull LiveData<S> toRemote) { Source<?> source = mSources.remove(toRemote); if (source != null) { source.unplug(); } }
// a LiveData that tries to load the `User` from local cache first and then tries to fetch // from the server and also yields the updated value val user = liveData { // check local storage val cached = cache.loadUser(id) if (cached != null) { emit(cached) } if (cached == null || cached.isStale()) { val fresh = api.fetch(id) // errors are ignored for brevity cache.save(fresh) emit(fresh) } }
// a LiveData that immediately receives a LiveData<User> from the database and yields it as a // source but also tries to back-fill the database from the server val user = liveData { val fromDb: LiveData<User> = roomDatabase.loadUser(id) emitSource(fromDb) val updated = api.fetch(id) // errors are ignored for brevity roomDatabase.insert(updated) }
init { // use an intermediate supervisor job so that if we cancel individual block runs due to losing // observers, it won't cancel the given context as we only cancel w/ the intention of possibly // relaunching using the same parent context. val supervisorJob = SupervisorJob(context[Job])
// The scope for this LiveData where we launch every block Job. // We default to Main dispatcher but developer can override it. // The supervisor job is added last to isolate block runs. val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob) blockRunner = BlockRunner( liveData = this, block = block, timeoutInMs = timeoutInMs, scope = scope ) { blockRunner = null } }
@MainThread funcancel() { if (cancellationJob != null) { error("Cancel call cannot happen without a maybeRun") } cancellationJob = scope.launch(Dispatchers.Main.immediate) { delay(timeoutInMs) if (!liveData.hasActiveObservers()) { // one last check on active observers to avoid any race condition between starting // a running coroutine and cancelation runningJob?.cancel() runningJob = null } } } }
publicinterfaceLiveDataScope<T> { /** * Set's the [LiveData]'s value to the given [value]. If you've called [emitSource] previously, * calling [emit] will remove that source. * * Note that this function suspends until the value is set on the [LiveData]. * * @param value The new value for the [LiveData] * * @see emitSource */ publicsuspendfunemit(value: T)
/** * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]. Calling this * method will remove any source that was yielded before via [emitSource]. * * @param source The [LiveData] instance whose values will be dispatched from the current * [LiveData]. * * @see emit * @see MediatorLiveData.addSource * @see MediatorLiveData.removeSource */ publicsuspendfunemitSource(source: LiveData<T>): DisposableHandle
/** * References the current value of the [LiveData]. * * If the block never `emit`ed a value, [latestValue] will be `null`. You can use this * value to check what was then latest value `emit`ed by your `block` before it got cancelled. * * Note that if the block called [emitSource], then `latestValue` will be last value * dispatched by the `source` [LiveData]. */ publicval latestValue: T? }
// use `liveData` provided context + main dispatcher to communicate with the target // LiveData. This gives us main thread safety as well as cancellation cooperation privateval coroutineContext = context + Dispatchers.Main.immediate
internalclassEmittedSource( privateval source: LiveData<*>, privateval mediator: MediatorLiveData<*> ) : DisposableHandle { // @MainThread privatevar disposed = false /** * Unlike [dispose] which cannot be sync because it not a coroutine (and we do not want to * lock), this version is a suspend function and does not return until source is removed. */ suspendfundisposeNow() = withContext(Dispatchers.Main.immediate) { removeSource() }