Outbox transactionnelle Laravel: publier des événements externes sans perte
Implémentez une Outbox transactionnelle dans Laravel pour publier des événements vers des services externes sans pertes ni doublons. Tutoriel complet: schéma, service atomique, worker concurrent, retries, tests et purge.
Sommaire
Outbox transactionnelle Laravel: publier des événements externes sans perte
Publier des événements vers des systèmes externes (webhooks, bus, partenaires) ne doit pas compromettre la cohérence de vos données. Ce tutoriel vous montre comment implémenter le pattern Outbox dans Laravel pour garantir des publications fiables, sans pertes ni doublons, avec un worker concurrent, des retries, de l’observabilité, et des tests.
Objectif
L’objectif est de construire une Outbox transactionnelle complète dans une application Laravel. Vous allez créer un schéma de table dédié, un service qui enregistre les messages de manière atomique avec vos écritures métier, puis un worker concurrent qui sélectionne, publie et replanifie les messages avec backoff exponentiel. Nous couvrirons également l’idempotence côté consommateur HTTP, les limites de tentatives, une Dead Letter Queue implicite, la planification/scheduling, l’observabilité (logs, métriques simples), ainsi que des tests automatisés. À la fin, vous disposerez d’une solution robuste permettant d’émettre des événements sortants sans perte ni duplication, même en cas d’incident réseau ou de redéploiement.
Objectif, périmètre et prérequis
Nous allons garantir la publication fiable d’événements sortants (par exemple des webhooks ou des messages à un bus) en appliquant le pattern Outbox. Le principe est d’écrire l’événement dans la même transaction que la modification métier. Ainsi, si la transaction est validée, le message est garanti d’exister dans l’Outbox. Un worker lira ensuite ces messages pour les dispatcher vers l’extérieur. En cas d’échec, les messages seront replanifiés avec retries — ce qui évite les pertes — et rendus idempotents pour éviter les doublons côté consommateur.
Vous aurez besoin de Laravel 10/11, PHP 8.2+, et MySQL 8 ou PostgreSQL 12+ afin de bénéficier des verrous SKIP LOCKED. Assurez-vous qu’une queue est configurée (Redis ou Database) et que le client HTTP Laravel est activé. Dans ce tutoriel, nous publierons vers un endpoint HTTP via Http::post pour démontrer l’idempotence et la gestion des retries. Vous pouvez facilement substituer l’implémentation pour Kafka, SNS ou autre plus tard.
Créer la table outbox_messages
Commencez par générer une migration pour la table qui stockera vos messages:
php artisan make:migration create_outbox_messages_table
La table contient un identifiant UUID comme clé primaire, un topic pour catégoriser l’événement, un payload en JSON, un statut, un compteur de tentatives, des timestamps de disponibilité et d’envoi, ainsi qu’une clé d’idempotence unique. Nous ajoutons des index pour accélérer la sélection des messages à traiter et pour les opérations de purge.
<?php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
return new class extends Migration {
public function up(): void
{
Schema::create('outbox_messages', function (Blueprint $table) {
$table->uuid('id')->primary();
$table->string('topic');
$table->json('payload');
$table->string('status')->default('pending')->index();
$table->unsignedSmallInteger('attempts')->default(0);
$table->timestamp('available_at')->useCurrent()->index();
$table->timestamp('sent_at')->nullable()->index();
$table->string('idempotency_key')->unique();
$table->timestamps();
$table->index(['status', 'available_at']);
});
}
public function down(): void
{
Schema::dropIfExists('outbox_messages');
}
};
Par défaut, nous positionnons status à pending et available_at à maintenant. Cela permet aux nouveaux messages d’être immédiatement éligibles à la publication, sauf si vous fournissez une date future pour un envoi différé.
Ensuite, exécutez la migration:
php artisan migrate
Modèle Eloquent et Factory
Créez le modèle Eloquent OutboxMessage pour manipuler les lignes de la table avec des casts adaptés. Nous définissons des constantes pour les statuts standards et autorisons l’affectation de masse des colonnes utiles.
<?php
namespace App\Models;
use Illuminate\Database\Eloquent\Model;
class OutboxMessage extends Model
{
public const STATUS_PENDING = 'pending';
public const STATUS_SENT = 'sent';
public const STATUS_FAILED = 'failed';
protected $table = 'outbox_messages';
protected $fillable = [
'id',
'topic',
'payload',
'status',
'attempts',
'available_at',
'sent_at',
'idempotency_key',
];
protected $casts = [
'payload' => 'array',
'available_at' => 'datetime',
'sent_at' => 'datetime',
];
public $incrementing = false;
protected $keyType = 'string';
}
Pour les tests, une factory permet de produire rapidement des messages plausibles. Elle génère un topic aléatoire, un payload JSON, et une clé d’idempotence unique.
<?php
namespace Database\Factories;
use App\Models\OutboxMessage;
use Illuminate\Database\Eloquent\Factories\Factory;
use Illuminate\Support\Str;
class OutboxMessageFactory extends Factory
{
protected $model = OutboxMessage::class;
public function definition(): array
{
return [
'id' => (string) Str::uuid(),
'topic' => $this->faker->randomElement(['order.created', 'user.registered', 'invoice.paid']),
'payload' => [
'example' => $this->faker->uuid(),
'timestamp' => now()->toIso8601String(),
],
'status' => OutboxMessage::STATUS_PENDING,
'attempts' => 0,
'available_at' => now(),
'sent_at' => null,
'idempotency_key' => (string) Str::uuid(),
];
}
}
Service d’Outbox: enregistrement atomique
Le service d’Outbox encapsule l’enqueue des messages. Il est appelé depuis la même transaction que l’écriture métier, ce qui garantit que si la transaction échoue, aucun message ne reste en Outbox. À l’inverse, si la transaction réussit, le message existe et sera publié plus tard par le worker.
<?php
namespace App\Services;
use App\Models\OutboxMessage;
use Carbon\Carbon;
use Illuminate\Support\Str;
class OutboxService
{
public function enqueue(string $topic, array $payload, ?string $key = null, ?Carbon $availableAt = null): OutboxMessage
{
return OutboxMessage::create([
'id' => (string) Str::uuid(),
'topic' => $topic,
'payload' => $payload,
'status' => OutboxMessage::STATUS_PENDING,
'attempts' => 0,
'available_at' => $availableAt ?? now(),
'sent_at' => null,
'idempotency_key' => $key ?? (string) Str::uuid(),
]);
}
}
Voici un exemple concret lors de la création d’une commande. Nous persistons l’entité puis mettons en file un événement order.created avec l’identifiant de la commande. Le tout est exécuté dans une seule transaction.
<?php
use App\Models\Order;
use App\Services\OutboxService;
use Illuminate\Support\Facades\DB;
function createOrderAndEmit(array $data): Order
{
return DB::transaction(function () use ($data) {
$order = Order::create([
'customer_id' => $data['customer_id'],
'total' => $data['total'],
'currency' => $data['currency'] ?? 'EUR',
]);
app(OutboxService::class)->enqueue('order.created', [
'id' => $order->id,
'total' => $order->total,
]);
return $order;
});
}
Si un rollback survient (ex: exception avant la fin), ni la commande, ni le message Outbox ne sont enregistrés, évitant toute divergence entre votre base et les événements publiés.
Worker de dispatch: lecture concurrente avec SKIP LOCKED
Nous allons créer un Job qui lit les messages éligibles en lot, en utilisant un verrouillage pessimiste avec SKIP LOCKED pour permettre plusieurs workers concurrents sans collision. Pour éviter de tenir un verrou pendant l’appel HTTP, nous adoptons un mécanisme de lease: on « réserve » les messages en mettant available_at dans un futur proche, puis on effectue la publication hors transaction.
Générez le job:
php artisan make:job DispatchOutboxBatch
Ajoutez un fichier de configuration pour paramétrer le batch, l’endpoint, le timeout, le nombre max de tentatives et la durée de lease.
// config/outbox.php
<?php
return [
'batch' => (int) env('OUTBOX_BATCH', 100),
'endpoint' => env('OUTBOX_ENDPOINT', 'https://example.test/webhooks'),
'http_timeout' => (int) env('OUTBOX_HTTP_TIMEOUT', 5),
'max_attempts' => (int) env('OUTBOX_MAX_ATTEMPTS', 10),
'lease_seconds' => (int) env('OUTBOX_LEASE_SECONDS', 30),
'signing_secret' => env('OUTBOX_SIGNING_SECRET'), // facultatif pour HMAC
];
Et configurez vos variables d’environnement:
# .env
OUTBOX_ENDPOINT=https://example.test/webhooks
OUTBOX_BATCH=100
OUTBOX_HTTP_TIMEOUT=5
OUTBOX_MAX_ATTEMPTS=10
OUTBOX_LEASE_SECONDS=30
OUTBOX_SIGNING_SECRET=your_shared_secret_here
Implémentez ensuite le job. Il réserve un lot de messages avec lockForUpdate()->skipLocked() dans une transaction, met leur available_at quelques secondes/minutes plus tard pour empêcher les autres workers de les prendre, puis tente de les publier. Selon la réponse, il marque sent, reschedule pour retry, ou failed.
<?php
namespace App\Jobs;
use App\Events\OutboxMessageFailed;
use App\Models\OutboxMessage;
use Carbon\CarbonImmutable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Database\Eloquent\Collection as EloquentCollection;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Collection;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Http;
use Illuminate\Support\Facades\Log;
class DispatchOutboxBatch implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $queue = 'outbox';
public function handle(): void
{
$messages = $this->leaseBatch();
if ($messages->isEmpty()) {
return;
}
foreach ($messages as $msg) {
try {
$this->publishHttp($msg);
} catch (\Throwable $e) {
Log::error('outbox.http_exception', [
'outbox_id' => $msg->id,
'topic' => $msg->topic,
'attempt' => $msg->attempts,
'error' => $e->getMessage(),
]);
$this->scheduleRetry($msg);
}
}
// Auto-récursif: si des messages restent à traiter, on redéclenche rapidement.
if (OutboxMessage::query()
->where('status', OutboxMessage::STATUS_PENDING)
->where('available_at', '<=', now())
->exists()
) {
dispatch(new self())->onQueue($this->queue);
}
}
protected function leaseBatch(): Collection
{
$leaseUntil = now()->addSeconds(config('outbox.lease_seconds'));
return DB::transaction(function () use ($leaseUntil) {
/** @var EloquentCollection<int, OutboxMessage> $batch */
$batch = OutboxMessage::query()
->where('status', OutboxMessage::STATUS_PENDING)
->where('available_at', '<=', now())
->orderBy('available_at')
->limit((int) config('outbox.batch'))
->lockForUpdate()
->skipLocked()
->get();
foreach ($batch as $msg) {
// Réserver pour éviter une double prise par un autre worker
$msg->available_at = $leaseUntil;
$msg->save();
}
return $batch->values();
}, 3);
}
protected function publishHttp(OutboxMessage $msg): void
{
$endpoint = rtrim((string) config('outbox.endpoint'), '/').'/'.$msg->topic;
$headers = [
'Idempotency-Key' => $msg->idempotency_key,
];
// Optionnel: signature HMAC pour sécuriser le webhook
$timestamp = (string) CarbonImmutable::now()->getTimestamp();
$payloadJson = json_encode($msg->payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
if ($secret = config('outbox.signing_secret')) {
$signature = hash_hmac('sha256', $timestamp.'.'.$payloadJson, $secret);
$headers['X-Timestamp'] = $timestamp;
$headers['X-Signature'] = $signature;
}
$response = Http::timeout((int) config('outbox.http_timeout'))
->withHeaders($headers)
->asJson()
->post($endpoint, $msg->payload);
Log::info('outbox.attempt', [
'outbox_id' => $msg->id,
'topic' => $msg->topic,
'attempt' => $msg->attempts,
'http_status' => $response->status(),
]);
if ($response->successful()) {
$this->markSent($msg);
return;
}
if ($response->status() === 409 || $response->status() === 429 || $response->serverError()) {
$this->scheduleRetry($msg);
return;
}
$this->markFailed($msg, 'non_retryable_http_status_'.$response->status());
}
protected function markSent(OutboxMessage $msg): void
{
$msg->status = OutboxMessage::STATUS_SENT;
$msg->sent_at = now();
$msg->save();
}
protected function scheduleRetry(OutboxMessage $msg): void
{
$max = (int) config('outbox.max_attempts', 10);
$attempts = $msg->attempts + 1;
if ($attempts >= $max) {
$this->markFailed($msg, 'max_attempts_reached');
return;
}
$delaySeconds = (2 ** min(6, $attempts)) + random_int(0, 3); // backoff + jitter
$next = now()->addSeconds($delaySeconds);
$msg->attempts = $attempts;
$msg->available_at = $next;
$msg->save();
}
protected function markFailed(OutboxMessage $msg, string $reason): void
{
$msg->status = OutboxMessage::STATUS_FAILED;
$msg->save();
Log::warning('outbox.failed', [
'outbox_id' => $msg->id,
'topic' => $msg->topic,
'attempt' => $msg->attempts,
'reason' => $reason,
]);
event(new OutboxMessageFailed($msg));
}
}
Vous pouvez démarrer plusieurs workers en parallèle (via queue:work ou Horizon). Grâce à lockForUpdate()->skipLocked() et à la réservation via available_at, ils ne se marcheront pas dessus.
Publication HTTP + idempotence côté consommateur
Lors de l’appel HTTP, nous envoyons un header Idempotency-Key pour que le consommateur puisse dédupliquer. Côté réception, il suffit d’enregistrer cette clé et de rejeter toute requête avec une clé déjà vue. Voici un exemple minimal côté consommateur en PHP (Laravel) qui vérifie aussi une signature HMAC simple.
<?php
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Route;
Route::post('/webhooks/{topic}', function (Request $request, string $topic) {
$idempotencyKey = $request->header('Idempotency-Key');
$timestamp = $request->header('X-Timestamp');
$signature = $request->header('X-Signature');
// Vérification HMAC simple (facultatif)
$secret = env('OUTBOX_SIGNING_SECRET');
if ($secret && ($timestamp === null || $signature === null)) {
return response()->json(['error' => 'missing_signature'], 400);
}
if ($secret) {
$expected = hash_hmac('sha256', $timestamp.'.'.json_encode($request->json()->all(), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES), $secret);
if (!hash_equals($expected, $signature)) {
return response()->json(['error' => 'invalid_signature'], 400);
}
if (abs(time() - (int) $timestamp) > 300) {
return response()->json(['error' => 'stale_timestamp'], 400);
}
}
// Idempotence côté consommateur
if (!$idempotencyKey) {
return response()->json(['error' => 'missing_idempotency_key'], 400);
}
$already = !Cache::add('idemp:'.$idempotencyKey, 1, now()->addDay());
if ($already) {
// Déjà traité: on renvoie 200 OK pour éviter un retry inutile
return response()->json(['status' => 'duplicate'], 200);
}
// Traiter l’événement en fonction du topic
Log::info('webhook.received', ['topic' => $topic, 'key' => $idempotencyKey]);
// Exemple: appliquer une action métier idempotente
// ...
return response()->json(['status' => 'ok'], 200);
});
Ce consumer renvoie un 2xx s’il a bien reçu (ou déjà traité) l’événement. Les 409/429/5xx relancent des tentatives côté Outbox, tandis que d’autres 4xx (ex: 400 pour signature invalide) marquent l’événement comme failed de manière définitive.
Retries, backoff exponentiel et Dead Letter
Les erreurs temporaires (timeouts, 5xx, rate limit) déclenchent une replanification. Nous incrémentons attempts et calculons un nouveau available_at avec un backoff exponentiel (2^attempts jusqu’à une limite) plus un jitter aléatoire. Cela désengorge le système lors des pannes et évite l’effet troupeau. Après un nombre maximum de tentatives (par exemple 10), nous marquons le message en failed et le conservons en base comme Dead Letter pour inspection, recherche d’incidents et relecture manuelle si besoin. L’émission d’un event OutboxMessageFailed permet d’alerter vos systèmes d’observabilité.
<?php
namespace App\Events;
use App\Models\OutboxMessage;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class OutboxMessageFailed
{
use Dispatchable, SerializesModels;
public function __construct(public OutboxMessage $message)
{
}
}
Vous pouvez brancher un listener pour créer une alerte, un ticket, ou pousser une métrique dans votre système de monitoring.
Planification et scalabilité
Pour exécuter régulièrement le job, nous créons une commande artisan qui déclenche le batch. Cela fonctionne en tandem avec des workers de queue déjà lancés.
php artisan make:command DispatchOutboxCommand
<?php
namespace App\Console\Commands;
use App\Jobs\DispatchOutboxBatch;
use Illuminate\Console\Command;
class DispatchOutboxCommand extends Command
{
protected $signature = 'outbox:dispatch';
protected $description = 'Dispatch outbox messages batch';
public function handle(): int
{
dispatch(new DispatchOutboxBatch())->onQueue('outbox');
$this->info('Outbox batch dispatched.');
return self::SUCCESS;
}
}
Planifiez ensuite la commande:
<?php
// app/Console/Kernel.php
protected function schedule(\Illuminate\Console\Scheduling\Schedule $schedule): void
{
$schedule->command('outbox:dispatch')->everyMinute()->withoutOverlapping();
}
Vous pouvez également vous appuyer sur la stratégie auto-récursive du job illustrée plus haut: lorsque des messages restent disponibles, il se re-déclenche. Avec Horizon, affectez une file dédiée et une concurrence adaptée pour maîtriser le débit.
// config/horizon.php (extrait)
'defaults' => [
'outbox' => [
'connection' => 'redis',
'queue' => ['outbox'],
'balance' => 'auto',
'maxProcesses' => 10,
'minProcesses' => 1,
'tries' => 1,
],
],
Tests automatisés (Pest)
Les tests garantissent la résilience et la sécurité des scénarios critiques. Nous commençons par vérifier la propriété transactionnelle: si une transaction métier est rollbackée, aucun message Outbox ne doit exister. Ensuite, nous simulons un succès, un retry avec backoff, et une quasi-concurrence.
<?php
// tests/Feature/OutboxTest.php
use App\Jobs\DispatchOutboxBatch;
use App\Models\OutboxMessage;
use App\Services\OutboxService;
use Illuminate\Foundation\Testing\RefreshDatabase;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Http;
use Illuminate\Support\Str;
use function Pest\Laravel\assertDatabaseCount;
use function Pest\Laravel\assertDatabaseHas;
uses(RefreshDatabase::class);
it('does not enqueue on rollback', function () {
try {
DB::transaction(function () {
app(OutboxService::class)->enqueue('order.created', ['id' => 123]);
throw new RuntimeException('boom');
});
} catch (\Throwable $e) {
// ignore
}
assertDatabaseCount('outbox_messages', 0);
});
it('dispatches successfully', function () {
$msg = OutboxMessage::create([
'id' => (string) Str::uuid(),
'topic' => 'order.created',
'payload' => ['id' => 1],
'status' => OutboxMessage::STATUS_PENDING,
'attempts' => 0,
'available_at' => now(),
'idempotency_key' => (string) Str::uuid(),
]);
Http::fake([
'https://example.test/webhooks/*' => Http::response(['ok' => true], 200),
]);
(new DispatchOutboxBatch())->handle();
assertDatabaseHas('outbox_messages', [
'id' => $msg->id,
'status' => OutboxMessage::STATUS_SENT,
]);
});
it('retries with backoff then succeeds', function () {
$msg = OutboxMessage::create([
'id' => (string) Str::uuid(),
'topic' => 'order.created',
'payload' => ['id' => 2],
'status' => OutboxMessage::STATUS_PENDING,
'attempts' => 0,
'available_at' => now(),
'idempotency_key' => (string) Str::uuid(),
]);
Http::fakeSequence()
->push([], 500)
->push([], 200);
// Premier handle -> 500 => retry planifié
(new DispatchOutboxBatch())->handle();
$msg->refresh();
expect($msg->status)->toBe(OutboxMessage::STATUS_PENDING);
expect($msg->attempts)->toBe(1);
expect($msg->available_at->isFuture())->toBeTrue();
// Avancer le temps jusqu’à available_at et rejouer
\Carbon\Carbon::setTestNow($msg->available_at->copy()->addSecond());
(new DispatchOutboxBatch())->handle();
$msg->refresh();
expect($msg->status)->toBe(OutboxMessage::STATUS_SENT);
});
it('leases to avoid double processing', function () {
// Seed de quelques messages
for ($i = 0; $i < 10; $i++) {
OutboxMessage::factory()->create([
'status' => OutboxMessage::STATUS_PENDING,
'available_at' => now(),
]);
}
Http::fake(['*' => Http::response([], 200)]);
// Deux jobs consécutifs simulent des workers concurrents
$job1 = new DispatchOutboxBatch();
$job2 = new DispatchOutboxBatch();
$job1->handle();
$job2->handle();
// Tous les messages doivent être SENT (pas de double-traitement)
expect(OutboxMessage::where('status', OutboxMessage::STATUS_SENT)->count())->toBe(10);
});
Ces tests utilisent RefreshDatabase et le client HTTP fake de Laravel pour simuler le monde extérieur. Nous vérifions que la réservation empêche le double traitement, même lorsque deux jobs s’exécutent rapidement l’un après l’autre.
Observabilité, sécurité et maintenance
Les logs structurés permettent un audit fin et un dépannage efficace. Nous incluons des informations utiles comme l’identifiant Outbox, le topic, la tentative et le statut HTTP. Pour des métriques simples, vous pouvez incrémenter des compteurs ou mesurer la latence de publication (sent_at - created_at) et les exposer à votre système préféré.
Log::info('outbox.sent', [
'outbox_id' => $msg->id,
'topic' => $msg->topic,
'latency_ms' => $msg->sent_at?->diffInMilliseconds($msg->created_at),
]);
La purge est nécessaire pour éviter que la table ne grossisse indéfiniment. Une commande artisan supprime, par exemple, les messages sent de plus de 7 jours. Utilisez un index sur sent_at pour accélérer l’opération.
php artisan make:command PruneOutboxCommand
<?php
namespace App\Console\Commands;
use App\Models\OutboxMessage;
use Carbon\Carbon;
use Illuminate\Console\Command;
class PruneOutboxCommand extends Command
{
protected $signature = 'outbox:prune {--days=7}';
protected $description = 'Prune sent outbox messages older than N days';
public function handle(): int
{
$days = (int) $this->option('days');
$before = Carbon::now()->subDays($days);
$deleted = 0;
OutboxMessage::query()
->where('status', OutboxMessage::STATUS_SENT)
->where('sent_at', '<', $before)
->orderBy('id')
->chunkById(1000, function ($chunk) use (&$deleted) {
$ids = $chunk->pluck('id');
$deleted += OutboxMessage::whereIn('id', $ids)->delete();
});
$this->info("Deleted $deleted messages sent before {$before->toDateTimeString()}");
return self::SUCCESS;
}
}
Sur la sécurité, nous conseillons de signer les webhooks (HMAC) et d’envoyer un horodatage, comme montré dans le job. Côté consommateur, vérifiez la signature, rejetez les timestamps trop anciens pour éviter le replay, et stockez l’Idempotency-Key pour neutraliser les doublons. Cela rend vos intégrations robustes même en cas de retries massifs.
Enfin, si vous migrez plus tard vers Kafka, SNS ou un autre transport, introduisez une abstraction PublisherInterface et fournissez une implémentation HTTP aujourd’hui, puis une implémentation Kafka demain, sans toucher au cœur Outbox/worker.
<?php
namespace App\Publishing;
use App\Models\OutboxMessage;
interface PublisherInterface
{
public function publish(OutboxMessage $message): PublishResult;
}
final class PublishResult
{
public function __construct(
public bool $success,
public bool $retryable,
public ?int $statusCode = null,
public ?string $error = null,
) {}
}
<?php
namespace App\Publishing;
use App\Models\OutboxMessage;
use Illuminate\Support\Facades\Http;
class HttpPublisher implements PublisherInterface
{
public function __construct(private readonly string $baseEndpoint) {}
public function publish(OutboxMessage $message): PublishResult
{
$endpoint = rtrim($this->baseEndpoint, '/').'/'.$message->topic;
$response = Http::timeout((int) config('outbox.http_timeout'))
->withHeaders(['Idempotency-Key' => $message->idempotency_key])
->asJson()
->post($endpoint, $message->payload);
if ($response->successful()) {
return new PublishResult(true, false, $response->status());
}
$retryable = $response->status() === 409 || $response->status() === 429 || $response->serverError();
return new PublishResult(false, $retryable, $response->status(), $response->body());
}
}
Vous pourrez alors injecter PublisherInterface dans votre job pour découpler le transport.
Extraits de code clés (référence rapide)
La migration Outbox définit toutes les colonnes nécessaires, avec des index critiques pour les sélections et la purge. Voici un condensé de l’essentiel:
Schema::create('outbox_messages', function (Blueprint $t) {
$t->uuid('id')->primary();
$t->string('topic');
$t->json('payload');
$t->string('status')->default('pending')->index();
$t->unsignedSmallInteger('attempts')->default(0);
$t->timestamp('available_at')->useCurrent()->index();
$t->timestamp('sent_at')->nullable()->index();
$t->string('idempotency_key')->unique();
$t->timestamps();
$t->index(['status', 'available_at']);
});
L’enqueue d’un message est une simple création de ligne avec UUID, clé d’idempotence, statut pending et available_at positionné.
OutboxMessage::create([
'id' => (string) Str::uuid(),
'topic' => $topic,
'payload' => $payload,
'status' => 'pending',
'attempts' => 0,
'available_at' => $availableAt ?? now(),
'idempotency_key' => $key ?? (string) Str::uuid(),
]);
La sélection concurrente d’un batch utilise une transaction et SKIP LOCKED pour éviter les collisions entre workers, couplée à un lease temporel.
DB::transaction(function () {
$batch = OutboxMessage::where('status', 'pending')
->where('available_at', '<=', now())
->orderBy('available_at')
->limit((int) env('OUTBOX_BATCH', 100))
->lockForUpdate()
->skipLocked()
->get();
$leaseUntil = now()->addSeconds((int) env('OUTBOX_LEASE_SECONDS', 30));
foreach ($batch as $msg) {
$msg->available_at = $leaseUntil;
$msg->save();
}
return $batch;
});
Le backoff exponentiel avec jitter ré-ouvre la fenêtre de tentative en cas d’échec.
$next = now()->addSeconds(2 ** min(6, $msg->attempts + 1) + random_int(0, 3));
$msg->update([
'attempts' => DB::raw('attempts + 1'),
'available_at' => $next,
]);
Checklist
- Relire
- Tester les commandes / snippets
- Publier
Conclusion
En appliquant le pattern Outbox dans Laravel, vous découplez les écritures métier de la publication d’événements externes tout en préservant l’atomicité et la fiabilité. La combinaison d’une table outbox, d’un service transactionnel, d’un worker concurrent avec SKIP LOCKED, de retries à backoff exponentiel et d’une idempotence stricte côté consommateur élimine à la fois les pertes et les doublons. Avec des tests, de l’observabilité et des commandes de maintenance, vous disposez d’une fondation prête pour la production. La substitution future du transport (HTTP, Kafka, SNS…) devient une formalité grâce à une interface Publisher dédiée.
Ressources
- Pattern Transactional Outbox (Martin Fowler): https://martinfowler.com/articles/patterns-of-distributed-systems/transactional-outbox.html
- Laravel HTTP Client: https://laravel.com/docs/http-client
- Laravel Queues & Jobs: https://laravel.com/docs/queues
- Laravel Horizon: https://laravel.com/docs/horizon
- Eloquent: Migrations & Schema Builder: https://laravel.com/docs/migrations
- Postgres SKIP LOCKED: https://www.postgresql.org/docs/current/explicit-locking.html#LOCKING-ROWS
- MySQL SKIP LOCKED: https://dev.mysql.com/doc/refman/8.0/en/innodb-locking-reads.html
- Idempotency keys (Stripe’s guide, générique): https://stripe.com/docs/idempotency
- Exponential backoff and jitter (AWS Architecture Blog): https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/