Bruno Charest 4bd1b5a6f3 feat: Add DASH support and stream resolution with conflict-safe sync protection
- Add Media3 DASH dependency for adaptive streaming support
- Implement StreamResolver for URL resolution and MIME type detection
- Update AudioHandler to resolve streams before playback with proper MIME hints
- Add immediate buffering state feedback in UI during stream resolution
- Protect local pending changes from being overwritten during sync operations
- Add French comment clarifying sync conflict prevention logic
2026-02-11 16:31:04 -05:00

630 lines
24 KiB
Kotlin

package com.shaarit.data.sync
import android.content.Context
import android.util.Log
import androidx.hilt.work.HiltWorker
import androidx.work.Constraints
import androidx.work.CoroutineWorker
import androidx.work.ExistingWorkPolicy
import androidx.work.NetworkType
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.WorkInfo
import androidx.work.WorkManager
import androidx.work.WorkerParameters
import androidx.work.workDataOf
import com.shaarit.data.api.ShaarliApi
import com.shaarit.data.dto.CollectionConfigDto
import com.shaarit.data.dto.CreateLinkDto
import com.shaarit.data.dto.CollectionsConfigDto
import com.shaarit.data.local.dao.LinkDao
import com.shaarit.data.local.dao.CollectionDao
import com.shaarit.data.local.dao.TagDao
import com.shaarit.data.local.entity.CollectionEntity
import com.shaarit.data.local.entity.CollectionLinkCrossRef
import com.shaarit.data.local.entity.LinkEntity
import com.shaarit.data.local.entity.SyncStatus
import com.shaarit.data.local.entity.TagEntity
import com.shaarit.data.mapper.LinkMapper
import com.shaarit.data.mapper.TagMapper
import com.shaarit.core.storage.TokenManager
import dagger.hilt.android.qualifiers.ApplicationContext
import dagger.assisted.Assisted
import dagger.assisted.AssistedInject
import com.squareup.moshi.Moshi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.withContext
import retrofit2.HttpException
import java.io.IOException
import javax.inject.Inject
import javax.inject.Singleton
/**
* Gestionnaire de synchronisation entre le cache local et le serveur Shaarli
*/
@Singleton
class SyncManager @Inject constructor(
@ApplicationContext private val context: Context,
private val linkDao: LinkDao,
private val tagDao: TagDao,
private val collectionDao: CollectionDao,
private val moshi: Moshi,
private val tokenManager: TokenManager,
private val api: ShaarliApi
) {
companion object {
private const val TAG = "SyncManager"
private const val SYNC_WORK_NAME = "shaarli_sync_work"
private const val COLLECTIONS_CONFIG_TITLE = "collections"
private const val COLLECTIONS_CONFIG_TAG = "shaarit_config"
private const val COLLECTIONS_CONFIG_URL = "https://shaarit.app/config/collections"
}
private val workManager = WorkManager.getInstance(context)
/**
* État actuel de la synchronisation
*/
val syncState: Flow<SyncState> = workManager.getWorkInfosForUniqueWorkFlow(SYNC_WORK_NAME)
.map { workInfos ->
when {
workInfos.isEmpty() -> SyncState.Idle
workInfos.any { it.state == WorkInfo.State.RUNNING } -> SyncState.Syncing
workInfos.any { it.state == WorkInfo.State.FAILED } -> SyncState.Error(
workInfos.firstOrNull { it.state == WorkInfo.State.FAILED }?.outputData?.getString("error")
?: "Unknown error"
)
workInfos.any { it.state == WorkInfo.State.SUCCEEDED } -> {
val info = workInfos.firstOrNull { it.state == WorkInfo.State.SUCCEEDED }
val completedAt = info?.outputData?.getLong("completedAt", 0L) ?: 0L
SyncState.Synced(if (completedAt > 0L) completedAt else System.currentTimeMillis())
}
workInfos.all { it.state.isFinished } -> SyncState.Idle
else -> SyncState.Idle
}
}
/**
* Nombre d'éléments en attente de synchronisation
*/
val pendingSyncCount: Flow<Int> = linkDao.getUnsyncedCount()
/**
* Déclenche une synchronisation manuelle
*/
fun syncNow() {
val constraints = Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED)
.build()
val syncWorkRequest = OneTimeWorkRequestBuilder<SyncWorker>()
.setConstraints(constraints)
.addTag("sync")
.build()
workManager.enqueueUniqueWork(
SYNC_WORK_NAME,
ExistingWorkPolicy.REPLACE,
syncWorkRequest
)
}
/**
* Annule toutes les synchronisations en cours
*/
fun cancelSync() {
workManager.cancelUniqueWork(SYNC_WORK_NAME)
}
/**
* Synchronise immédiatement (appel synchrone - utiliser avec précaution)
*/
suspend fun performFullSync(): SyncResult = withContext(Dispatchers.IO) {
try {
Log.d(TAG, "Démarrage de la synchronisation complète...")
// 1. Pousser les modifications locales vers le serveur
pushLocalChanges()
// 2. Récupérer les données depuis le serveur
pullFromServer()
Log.d(TAG, "Synchronisation terminée avec succès")
SyncResult.Success
} catch (e: HttpException) {
val code = e.code()
val details = try {
e.response()?.errorBody()?.string()?.take(500)
} catch (_: Exception) {
null
}
val message = buildString {
append("HTTP ")
append(code)
val base = e.message
if (!base.isNullOrBlank()) {
append(": ")
append(base)
}
if (!details.isNullOrBlank()) {
append(" | ")
append(details)
}
}
Log.e(TAG, "Erreur HTTP lors de la synchronisation", e)
SyncResult.Error(message)
} catch (e: IOException) {
Log.e(TAG, "Erreur réseau lors de la synchronisation", e)
SyncResult.NetworkError(e.message ?: "Network error")
} catch (e: Exception) {
Log.e(TAG, "Erreur lors de la synchronisation", e)
SyncResult.Error("${e::class.java.simpleName}: ${e.message}")
}
}
private suspend fun pushCollectionsConfigIfDirty() {
if (!tokenManager.isCollectionsConfigDirty()) return
try {
val config = buildCollectionsConfig()
val adapter = moshi.adapter(CollectionsConfigDto::class.java)
val json = adapter.toJson(config)
val existingId = tokenManager.getCollectionsConfigBookmarkId()
val linkId = existingId ?: findCollectionsConfigBookmarkIdOnServer()
if (linkId != null) {
val response = api.updateLink(
linkId,
CreateLinkDto(
url = COLLECTIONS_CONFIG_URL,
title = COLLECTIONS_CONFIG_TITLE,
description = json,
tags = listOf(COLLECTIONS_CONFIG_TAG),
isPrivate = true
)
)
if (response.isSuccessful) {
tokenManager.saveCollectionsConfigBookmarkId(linkId)
tokenManager.setCollectionsConfigDirty(false)
}
} else {
val response = api.addLink(
CreateLinkDto(
url = COLLECTIONS_CONFIG_URL,
title = COLLECTIONS_CONFIG_TITLE,
description = json,
tags = listOf(COLLECTIONS_CONFIG_TAG),
isPrivate = true
)
)
if (response.isSuccessful) {
val createdId = response.body()?.id
if (createdId != null) {
tokenManager.saveCollectionsConfigBookmarkId(createdId)
}
tokenManager.setCollectionsConfigDirty(false)
}
}
} catch (e: Exception) {
Log.e(TAG, "Erreur lors de la poussée de la configuration des collections", e)
}
}
private suspend fun findCollectionsConfigBookmarkIdOnServer(): Int? {
// 1) Si on a un ID en cache, vérifier qu'il existe
val cached = tokenManager.getCollectionsConfigBookmarkId()
if (cached != null) {
try {
api.getLink(cached)
return cached
} catch (_: Exception) {
tokenManager.clearCollectionsConfigBookmarkId()
}
}
// 2) Rechercher par filtre searchTerm + searchTags
return try {
val candidates = api.getLinks(
offset = 0,
limit = 20,
searchTerm = COLLECTIONS_CONFIG_TITLE,
searchTags = COLLECTIONS_CONFIG_TAG
)
val configLink = candidates.firstOrNull { dto ->
dto.id != null && dto.title?.trim()?.equals(COLLECTIONS_CONFIG_TITLE, ignoreCase = true) == true
}
configLink?.id?.also { tokenManager.saveCollectionsConfigBookmarkId(it) }
} catch (_: Exception) {
null
}
}
private suspend fun buildCollectionsConfig(): CollectionsConfigDto {
val collections = collectionDao.getAllCollectionsOnce()
val items = collections.map { entity ->
val linkIds =
if (!entity.isSmart) {
try {
collectionDao.getLinkIdsInCollection(entity.id)
} catch (_: Exception) {
emptyList()
}
} else {
emptyList()
}
CollectionConfigDto(
name = entity.name,
description = entity.description,
icon = entity.icon,
color = entity.color,
isSmart = entity.isSmart,
query = entity.query,
sortOrder = entity.sortOrder,
linkIds = linkIds
)
}
return CollectionsConfigDto(
version = 1,
collections = items
)
}
/**
* Pousse les modifications locales (créations, mises à jour, suppressions)
*/
private suspend fun pushLocalChanges() {
// Traiter les créations en attente
val pendingCreates = linkDao.getLinksBySyncStatus(SyncStatus.PENDING_CREATE)
Log.d(TAG, "${pendingCreates.size} créations en attente")
for (link in pendingCreates) {
try {
val response = api.addLink(
CreateLinkDto(
url = link.url,
title = link.title.takeIf { it.isNotBlank() },
description = link.description.takeIf { it.isNotBlank() },
tags = link.tags.ifEmpty { null },
isPrivate = link.isPrivate
)
)
if (response.isSuccessful) {
response.body()?.let { serverLink ->
// Mettre à jour l'ID local avec l'ID serveur
val serverId = serverLink.id
if (serverId != null) {
val updatedLink = link.copy(
id = serverId,
syncStatus = SyncStatus.SYNCED
)
linkDao.insertLink(updatedLink)
} else {
Log.w(TAG, "Serveur a retourné un lien sans ID pour ${link.url}")
}
}
} else {
Log.e(TAG, "Échec création lien ${link.id}: ${response.code()}")
}
} catch (e: Exception) {
Log.e(TAG, "Exception lors de la création du lien ${link.id}", e)
}
}
// Traiter les mises à jour en attente
val pendingUpdates = linkDao.getLinksBySyncStatus(SyncStatus.PENDING_UPDATE)
Log.d(TAG, "${pendingUpdates.size} mises à jour en attente")
for (link in pendingUpdates) {
try {
val response = api.updateLink(
link.id,
CreateLinkDto(
url = link.url,
title = link.title.takeIf { it.isNotBlank() },
description = link.description.takeIf { it.isNotBlank() },
tags = link.tags.ifEmpty { null },
isPrivate = link.isPrivate
)
)
if (response.isSuccessful) {
linkDao.markAsSynced(link.id)
} else {
Log.e(TAG, "Échec mise à jour lien ${link.id}: ${response.code()}")
}
} catch (e: Exception) {
Log.e(TAG, "Exception lors de la mise à jour du lien ${link.id}", e)
}
}
// Traiter les suppressions en attente
val pendingDeletes = linkDao.getLinksBySyncStatus(SyncStatus.PENDING_DELETE)
Log.d(TAG, "${pendingDeletes.size} suppressions en attente")
for (link in pendingDeletes) {
try {
val response = api.deleteLink(link.id)
if (response.isSuccessful) {
linkDao.deleteLink(link.id)
} else {
Log.e(TAG, "Échec suppression lien ${link.id}: ${response.code()}")
}
} catch (e: Exception) {
Log.e(TAG, "Exception lors de la suppression du lien ${link.id}", e)
}
}
// Synchroniser la configuration des collections si nécessaire
pushCollectionsConfigIfDirty()
}
/**
* Récupère les données depuis le serveur (sync incrémentale)
* S'arrête quand on rencontre des liens déjà synchronisés (non modifiés depuis la dernière sync)
*/
private suspend fun pullFromServer() {
var offset = 0
val limit = 100
var hasMore = true
val lastSyncTimestamp = tokenManager.getLastSyncTimestamp()
val syncStartTime = System.currentTimeMillis()
val isFirstSync = lastSyncTimestamp == 0L
var unchangedStreakCount = 0
val unchangedStreakThreshold = 2 // Stop after 2 consecutive pages of unchanged links
Log.d(TAG, "Sync incrémentale: lastSync=${if (isFirstSync) "jamais" else java.time.Instant.ofEpochMilli(lastSyncTimestamp)}")
while (hasMore) {
try {
val links = api.getLinks(offset = offset, limit = limit)
Log.d(TAG, "Reçu ${links.size} liens (offset=$offset)")
if (links.isEmpty()) {
hasMore = false
} else {
// Filtrer les liens invalides (sans ID ou URL) et convertir en entités
val validLinks = links.filter { dto ->
val isValid = dto.id != null && !dto.url.isNullOrBlank()
val isCollectionsConfig =
dto.title?.trim()?.equals(COLLECTIONS_CONFIG_TITLE, ignoreCase = true) == true &&
(dto.tags?.contains(COLLECTIONS_CONFIG_TAG) == true)
isValid && !isCollectionsConfig
}
Log.d(TAG, "${validLinks.size}/${links.size} liens valides")
var newOrUpdatedCount = 0
val entities = validLinks.mapNotNull { dto ->
try {
val existing = linkDao.getLinkById(dto.id!!)
val serverUpdatedAt = parseDate(dto.updated)
// Ne jamais écraser un lien avec des modifications locales en attente
if (existing != null && existing.syncStatus != SyncStatus.SYNCED) {
return@mapNotNull null
}
// Check if this link has been modified since last sync
if (!isFirstSync && existing != null && existing.updatedAt >= serverUpdatedAt && existing.syncStatus == SyncStatus.SYNCED) {
// Link unchanged since last sync — skip
return@mapNotNull null
}
newOrUpdatedCount++
LinkEntity(
id = dto.id,
url = dto.url!!,
title = dto.title ?: dto.url,
description = dto.description ?: "",
tags = dto.tags ?: emptyList(),
isPrivate = dto.isPrivate ?: false,
isPinned = existing?.isPinned ?: false,
createdAt = parseDate(dto.created),
updatedAt = serverUpdatedAt,
syncStatus = SyncStatus.SYNCED,
thumbnailUrl = existing?.thumbnailUrl,
readingTimeMinutes = existing?.readingTimeMinutes,
contentType = existing?.contentType ?: com.shaarit.data.local.entity.ContentType.UNKNOWN,
siteName = existing?.siteName,
excerpt = existing?.excerpt,
linkCheckStatus = existing?.linkCheckStatus ?: com.shaarit.data.local.entity.LinkCheckStatus.VALID,
failCount = existing?.failCount ?: 0,
lastHealthCheck = existing?.lastHealthCheck ?: 0,
excludedFromHealthCheck = existing?.excludedFromHealthCheck ?: false
)
} catch (e: Exception) {
Log.w(TAG, "Lien ignoré (id=${dto.id}): ${e.message}")
null
}
}
if (entities.isNotEmpty()) {
linkDao.insertLinks(entities)
}
Log.d(TAG, "Page offset=$offset: $newOrUpdatedCount nouveaux/modifiés sur ${validLinks.size} valides")
// Incremental sync: stop early if we encounter pages with no changes
if (!isFirstSync && newOrUpdatedCount == 0) {
unchangedStreakCount++
if (unchangedStreakCount >= unchangedStreakThreshold) {
Log.d(TAG, "Sync incrémentale: $unchangedStreakThreshold pages consécutives sans changement, arrêt anticipé")
hasMore = false
}
} else {
unchangedStreakCount = 0
}
offset += links.size
}
} catch (e: Exception) {
Log.e(TAG, "Erreur lors de la récupération des liens", e)
throw e
}
}
// Save sync timestamp on success
tokenManager.saveLastSyncTimestamp(syncStartTime)
// Synchroniser les tags
try {
val tags = api.getTags(limit = 1000)
val tagEntities = tags.map { TagMapper.toDomain(it) }.map { TagEntity(it.name, it.occurrences) }
tagDao.insertTags(tagEntities)
} catch (e: Exception) {
Log.e(TAG, "Erreur lors de la récupération des tags", e)
}
// Synchroniser la configuration des collections (bookmark serveur "collections")
pullCollectionsConfigFromServer()
}
private suspend fun pullCollectionsConfigFromServer() {
try {
val candidates = api.getLinks(
offset = 0,
limit = 20,
searchTerm = COLLECTIONS_CONFIG_TITLE,
searchTags = COLLECTIONS_CONFIG_TAG
)
val configLink = candidates.firstOrNull { dto ->
dto.title?.trim()?.equals(COLLECTIONS_CONFIG_TITLE, ignoreCase = true) == true
} ?: return
val rawJson = configLink.description?.trim().orEmpty()
if (rawJson.isBlank()) return
val adapter = moshi.adapter(CollectionsConfigDto::class.java)
val config = adapter.fromJson(rawJson) ?: return
applyCollectionsConfig(config)
} catch (e: Exception) {
Log.e(TAG, "Erreur lors de la récupération de la configuration des collections", e)
}
}
private suspend fun applyCollectionsConfig(config: CollectionsConfigDto) {
val serverNames =
config.collections
.map { it.name.trim() }
.filter { it.isNotBlank() }
.toSet()
// Supprimer les collections locales qui ne sont plus dans la config serveur
val existing = collectionDao.getAllCollectionsOnce()
existing
.filter { it.name !in serverNames }
.forEach { entity ->
try {
collectionDao.clearCollection(entity.id)
collectionDao.deleteCollection(entity.id)
} catch (e: Exception) {
Log.w(TAG, "Impossible de supprimer la collection locale ${entity.name}", e)
}
}
// Upsert des collections + relations
config.collections.forEach { dto ->
val name = dto.name.trim()
if (name.isBlank()) return@forEach
val existingEntity = collectionDao.getCollectionByName(name)
val entity = CollectionEntity(
id = existingEntity?.id ?: 0,
name = name,
description = dto.description,
icon = dto.icon ?: (existingEntity?.icon ?: "📁"),
color = dto.color,
isSmart = dto.isSmart,
query = dto.query,
sortOrder = dto.sortOrder,
createdAt = existingEntity?.createdAt ?: System.currentTimeMillis(),
updatedAt = System.currentTimeMillis()
)
val collectionId =
if (existingEntity != null) {
collectionDao.updateCollection(entity)
existingEntity.id
} else {
collectionDao.insertCollection(entity)
}
// Relations: uniquement pour les collections "non smart"
collectionDao.clearCollection(collectionId)
if (!dto.isSmart) {
dto.linkIds
.distinct()
.forEach { linkId ->
collectionDao.addLinkToCollection(
CollectionLinkCrossRef(collectionId = collectionId, linkId = linkId)
)
}
}
}
}
private fun parseDate(dateString: String?): Long {
if (dateString.isNullOrBlank()) return System.currentTimeMillis()
return try {
java.time.Instant.parse(dateString).toEpochMilli()
} catch (e: Exception) {
System.currentTimeMillis()
}
}
}
/**
* Worker pour exécuter la synchronisation en arrière-plan
*/
@HiltWorker
class SyncWorker @AssistedInject constructor(
@Assisted appContext: Context,
@Assisted params: WorkerParameters,
private val syncManager: SyncManager
) : CoroutineWorker(appContext, params) {
override suspend fun doWork(): Result {
return when (val result = syncManager.performFullSync()) {
is SyncResult.Success -> Result.success(
workDataOf("completedAt" to System.currentTimeMillis())
)
is SyncResult.NetworkError -> Result.retry()
is SyncResult.Error -> Result.failure(
workDataOf("error" to result.message)
)
}
}
}
/**
* États possibles de la synchronisation
*/
sealed class SyncState {
object Idle : SyncState()
object Syncing : SyncState()
data class Synced(val completedAt: Long) : SyncState()
data class Error(val message: String) : SyncState()
}
/**
* Résultats possibles d'une synchronisation
*/
sealed class SyncResult {
object Success : SyncResult()
data class NetworkError(val message: String) : SyncResult()
data class Error(val message: String) : SyncResult()
}