AMQP wrapper for Laravel and Lumen to publish and consume messages
bschmitt/laravel-amqp is a Laravel package for amqp wrapper for laravel and lumen to publish and consume messages.
It currently has 278 GitHub stars and 2.470.514 downloads on Packagist (latest version v3.4.1).
Install it with composer require bschmitt/laravel-amqp.
Discover more Laravel packages by bschmitt
or browse all Laravel packages to compare alternatives.
Last updated
A detailed AMQP wrapper for Laravel and Lumen to publish and consume messages, especially from RabbitMQ. This package provides full support for RabbitMQ features including RPC patterns, management operations, message properties, and more.
rpc() and reply() methodsRetryPolicy + DeadLetterTopology + RetryHandler with fixed/exponential backoff and auto-routing to DLQ when retries exhaust (see Retry & DLQ Abstractions)publishLater() / publishTypedLater() with TTL+DLX or rabbitmq-delayed-message-exchange plugin strategies, plus PublishBackoff for publisher-side transient retries (see Delayed Messaging)MessageContractInterface, TypedMessage base class, publishTyped() / consumeTyped() with pluggable MessageSerializerInterface (JSON by default) (see Typed Messaging)SchemaValidator (Draft 7 subset) validates payloads against contract schema() definitions (see JSON Schema Validation)amqp as a config/queue.php driver with queue:workamqp:work (with --retry/--contract/--validate-schema), amqp:consume, amqp:listen, amqp:publish (with --delay-ms), and amqp:purgeExchangeTopology declarative exchange + queue bindings with declareExchangeTopology() (see Production Infrastructure)QueueProfile::quorum(), priority(), and quorumWithPriority() presets for queue_properties (see Production Infrastructure)ResilientConnectionManager auto-reconnect with heartbeat staleness checks; ConnectionPool for persistent worker channels (see Production Infrastructure)traceparent propagation via TracePropagatorInterface (OTel bridge via CallbackTracePropagator); enable with propagate_trace on publish/consume (see Production Infrastructure)CorrelationContext with propagate_correlation on publish/consume (see Production Infrastructure)ConsumerLifecycle graceful shutdown, signal handlers, and consumeWithLifecycle() (see Production Infrastructure)Saga step/compensation orchestrator with SagaResult reporting (see SAGA, Events, Middleware & Testing)MessagePublishing/MessagePublished/MessageReceived/MessageHandled/MessageFailed events plus ConsumePipeline / ConsumeMiddlewareInterface and consumeWithMiddleware() (see SAGA, Events, Middleware & Testing)Amqp::fake() / FakeAmqp with assertPublished(), assertPublishedCount(), assertNothingPublished() (see SAGA, Events, Middleware & Testing)AsyncPublisher with batched confirms via Amqp::asyncPublisher() (see SAGA, Events, Middleware & Testing)RpcClient / RpcServer with RpcCallResult, JSON mode, and Amqp::rpcClient() / rpcServer() (see Scale & Interop)InteropEnvelope standard headers (x-message-type, x-schema-version, x-source-service) via publishInterop() / consumeInterop() (see Scale & Interop)MetricsCollector, QueueMetrics, Amqp::metrics(), queueMetrics() / getQueueStats() (see Scale & Interop)WorkerOptions, HighPerformanceWorker, consumeOptimized(), and amqp:work --optimized (see Scale & Interop)Rpc::call(UserService::class, GetUserRequest::make([...])) with typed request/response DTOs, service registries, and Rpc::serve() on the server (see gRPC-lite RPC)Rpc::service('payments')->call(...) resolves a short name to a registered RpcService class; opt-in via static alias() on the service (see Messaging Platform)Saga::make()->step(...)->compensate(...) fluent syntax with reverse-order compensation on failure (see Messaging Platform)OrderCreated::dispatch(['orderId' => 'o-1']) auto-serializes and publishes typed messages via the Laravel container (see Messaging Platform)Amqp::deadLetters()->for('orders.dlq')->count()/peek()/summarize()/messages()/replayTo()/purge() + php artisan amqp:dlq (see Messaging Platform)#[Retry(attempts: 5, strategy: RetryStrategy::EXPONENTIAL)] on handlers, hydrated via RetryPolicy::fromAttribute() (see Messaging Platform)Amqp::dashboard($queues)->snapshot() with lag, DLQ summaries, and RPC histograms; php artisan amqp:monitor / amqp:dlq (see Messaging Platform)RpcLatencyRecorder, per-call durationMs on RpcCallResult, RpcCallStarted / RpcCallCompleted / RpcCallFailed events (see gRPC-lite RPC)CorrelationContext now propagates both correlation_id and x-causation-id so consumers can chain "this happened because of that" (see Messaging Platform)MessageStoreInterface + InMemoryMessageStore; opt-in audit log of every publish/consume via Amqp::setMessageStore() (see Messaging Platform)ShouldPublishToAmqpInterface and enable amqp.broadcast_laravel_events to auto-publish event(new OrderCreated()) to RabbitMQ (see Messaging Platform)AmqpPulseRecorder auto-records publish/handle/fail/RPC/DLQ events to Pulse when laravel/pulse is installed; opt out with amqp.pulse_integration => false (see Messaging Platform)OpenTelemetryTracePropagator injects the active OTel span context into AMQP headers (with W3C fallback) when open-telemetry/api is installed (see Production Infrastructure)CorrelationChain::tree() / render() reconstruct causation graphs from the MessageStore; php artisan amqp:trace <correlation_id> prints an ASCII tree or JSON (see Messaging Platform)HealthState + HealthCheck + Http\Controllers\HealthController; opt-in HTTP routes (GET {prefix}/live|ready) plus php artisan amqp:health for exec probes (see Kubernetes & Cloud Native)AutoscalingAdvisor (depth + lag heuristics, KEDA trigger spec) and php artisan amqp:scale CLI (see Kubernetes & Cloud Native)LaravelCloud detector + AMQP_URL / CLOUDAMQP_URL / RABBITMQ_URL DSN auto-hydration on register (see Kubernetes & Cloud Native)MultiRegionConnection resolver with locality preference, cool-down blacklist, and withFailover() retry loop across region-scoped connection keys (see Kubernetes & Cloud Native)Status legend: [x] shipped · [~] partial / building blocks shipped (full feature still planned) · [ ] not started.
Many "partial" items already ship as a programmatic API or CLI; the outstanding work is usually a UI, native integration, or codegen layer. See the Features section above for the full list of already-shipped capabilities.
AmqpPulseRecorder auto-subscribes to publish/handle/fail/RPC/DLQ events when laravel/pulse is installed; disable via amqp.pulse_integration => falseOpenTelemetryTracePropagator bridges to open-telemetry/api when installed (active span context auto-injected); falls back to W3C generation otherwise. CallbackTracePropagator remains for custom APMsCorrelationChain::tree() / render() reconstruct causation graphs from the MessageStore; php artisan amqp:trace <correlation_id> prints an ASCII tree or JSONMetricsCollector, QueueMetrics, Amqp::metrics() / queueMetrics(), MonitoringDashboardQueueMetrics::lag(), lagSeconds(), isLagging(); --lag-threshold / --lag-seconds / --lag-age on amqp:monitorDeadLetterManager::peek() / summarize(), dead_letters block in dashboard, php artisan amqp:dlqRpcLatencyRecorder, RpcCallResult::durationMs(), RpcCallCompleted / RpcCallFailed events, --rpc on amqp:monitorTraceContext, W3cTracePropagator, propagate_trace on publish/consumephp artisan amqp:explore (with --id, filters, JSON mode)php artisan amqp:replay + MessageStoreInterface source + target/exchange overridesphp artisan amqp:explore + amqp:dlq (messages, peek, summarize) for CLI inspectionphp artisan amqp:inspect watch loop over queueMetrics() / Management APIphp artisan amqp:diff {left} {right} + structural JSON/text diffingphp artisan amqp:schema:debug (interactive/file/store payload sources)php artisan amqp:rpc:console with JSON/raw payload modesConsumerLifecycle::withHealth() stamps HealthState on every start/stop/message/errorConsumerLifecycle signal handlers (SIGTERM / SIGINT via pcntl) + cooperative requestStop()GET {prefix}/ready HTTP route + php artisan amqp:health --probe=ready for exec probesGET {prefix}/live HTTP route + php artisan amqp:health --probe=liveResilientConnectionManager (reconnect + heartbeat staleness) and ConnectionPoolAutoscalingAdvisor + php artisan amqp:scale (depth/lag heuristics, KEDA-ready trigger output)LaravelCloud detector + AMQP_URL / CLOUDAMQP_URL / RABBITMQ_URL auto-hydrationMultiRegionConnection resolver with locality preference, cool-down blacklist, and withFailover() retry loopDeadLetterManager API + amqp:dlq / amqp:monitor CLI; web UI still plannedpublishLater() / dispatchLater()DelayedPublisher, Amqp::publishLater() / publishTypedLater(), TypedMessage::dispatchLater(), amqp:publish --delay-msQueueProfile::priority() / quorumWithPriority(), x-max-priority, publish priorityAmqp::batchBasicPublish() / batchPublish() + BatchManagerbasic_qos, --prefetch-count, WorkerOptions::throughput() / lowLatency()); no app-level msgs/sec throttlingRetryPolicy, #[Retry], RetryHandler, consumeWithRetry(); metrics via amqp:monitor (no dedicated retry UI)schema() and validate in-process; no export CLI yetServiceRegistry, Rpc::services()->register() / autodiscover(), Rpc::service('alias')->call(...)InteropEnvelope standard headers (x-message-type, x-schema-version, x-source-service); no codegen for foreign languagesMonitoringDashboard snapshot + php artisan amqp:monitor [--json]; full web UI still plannedamqp:monitor (no dedicated health view)ExchangeTopology / DeadLetterTopology are code builders, not a visualizer)amqp:monitor, amqp:publish, amqp:consume, amqp:listen, amqp:purge, amqp:work; no dedicated amqp:diagnoseResilientConnectionManager handles reconnection; no standalone diagnostic commandMessageStoreInterface + InMemoryMessageStore (opt-in append log of every publish/consume)composer.json: ^7.3|^8.0)rabbitmq:3-management Docker image)| Laravel | Minimum PHP | Notes | |---------|-------------|--------| | 8.x | 7.3 | Last Laravel version for PHP 7.3 / 7.4 | | 9.x | 8.0.2 | Use PHP 8.0.2+ (not 8.0.0/8.0.1) | | 10.x | 8.1 | | | 11.x / 12.x | 8.2 | | | 13.x | 8.3 | |
Config supports both use + properties (current) and legacy default + connections layouts.
composer require bschmitt/laravel-amqp
For Laravel 5.5+:
"bschmitt/laravel-amqp": "^3.1"
For Laravel < 5.5:
"bschmitt/laravel-amqp": "^2.0"
use Bschmitt\Amqp\Facades\Amqp;
// Basic publish
Amqp::publish('routing-key', 'message');
// Publish with queue creation
Amqp::publish('routing-key', 'message', ['queue' => 'queue-name']);
// Publish with message properties
Amqp::publish('routing-key', 'message', [
'priority' => 10,
'correlation_id' => 'unique-id',
'reply_to' => 'reply-queue',
'application_headers' => [
'X-Custom-Header' => 'value'
]
]);
use Bschmitt\Amqp\Facades\Amqp;
// Consume and acknowledge (using dynamic call)
$amqp = app('Amqp');
$amqp->consume('queue-name', function ($message, $resolver) {
echo $message->body;
$resolver->acknowledge($message);
$resolver->stopWhenProcessed();
});
// Consume forever
$amqp = app('Amqp');
$amqp->consume('queue-name', function ($message, $resolver) {
processMessage($message->body);
$resolver->acknowledge($message);
}, ['persistent' => true]);
// Alternative: Using resolve() helper
$amqp = resolve('Amqp');
$amqp->consume('queue-name', function ($message, $resolver) {
processMessage($message->body);
$resolver->acknowledge($message);
});
// Client side - Make RPC call (using dynamic call)
$amqp = app('Amqp');
$response = $amqp->rpc('rpc-queue', 'request-data', [], 30);
// Server side - Process and reply (using dynamic call)
$amqp = app('Amqp');
$amqp->consume('rpc-queue', function ($message, $resolver) {
$result = processRequest($message->body);
$resolver->reply($message, $result);
$resolver->acknowledge($message);
});
$amqp = app('Amqp');
$amqp->listen(['key1', 'key2', 'key3'], function ($message, $resolver) {
processMessage($message->body);
$resolver->acknowledge($message);
});
The package registers five console commands. Handler classes must implement Bschmitt\Amqp\Contracts\MessageHandlerInterface or expose an __invoke($message, $resolver) method. The $resolver is the active consumer and provides acknowledge(), reject(), reply(), and stopWhenProcessed().
amqp:work — long-running workerphp artisan amqp:work my-queue --handler="App\\Messaging\\ProcessOrderHandler"
| Option | Description |
|--------|-------------|
| --handler= | Required. FQCN of your message handler |
| --connection= | Connection name from config/amqp.php |
| --exchange= / --exchange-type= | Override exchange settings |
| --routing-key=* | Routing key(s) to bind (repeatable) |
| --prefetch-count= | Enable QoS with this prefetch count |
| --max-messages=0 | Stop after N messages (0 = unlimited) |
| --max-time=0 | Stop after N seconds |
| --memory=128 | Exit if memory exceeds MB |
| --stop-when-empty | Exit when the queue is drained instead of waiting |
| --requeue-on-error | Requeue messages when the handler throws |
amqp:consume — process a fixed number of messagesphp artisan amqp:consume my-queue --handler="App\\Messaging\\ProcessOrderHandler" --max-messages=10
php artisan amqp:consume my-queue --handler="App\\Messaging\\ProcessOrderHandler" --all
Defaults to one message per invocation. Use --all to drain the queue.
amqp:listen — listen on routing keysphp artisan amqp:listen order.created order.updated --handler="App\\Messaging\\OrderHandler"
Creates an auto-deleted queue (unless --queue= or --no-auto-delete is set) and binds it to every supplied routing key.
amqp:publish — publish from the CLIphp artisan amqp:publish order.created --body='{"id":42}' --exchange=orders --priority=5
php artisan amqp:publish order.created --file=./payload.json --headers='{"X-Source":"cli"}'
php artisan amqp:publish order.created --body='{"id":42}' --delay-ms=5000 --exchange=orders
Use --delay-ms to schedule delivery (TTL+DLX by default, or --delay-strategy=plugin when the delayed-message exchange plugin is installed).
amqp:purge — empty a queuephp artisan amqp:purge my-queue --force
amqp:work| Option | Description |
|--------|-------------|
| --retry=N | Wraps the handler in a RetryHandler and configures up to N retries (0 disables retries) |
| --retry-delay=ms | Base delay between retries in milliseconds (default 1000) |
| --retry-backoff=fixed\|exponential | Backoff strategy (default fixed) |
| --retry-multiplier=2.0 | Growth factor for exponential backoff |
| --retry-max-delay=ms | Cap for the computed retry delay (0 = uncapped) |
| --retry-jitter=ms | Random jitter added to each retry delay |
| --dlq=name | Override the dead-letter queue name (default {queue}.dlq) |
| --declare-topology | Pre-declare the work + DLQ + retry queues before consuming |
| --contract= | FQCN of a MessageContractInterface to deserialize bodies into (passed as 3rd handler arg) |
| --validate-schema | Validate inbound JSON against the contract's schema() before invoking the handler |
See Retry & Dead-Letter Abstractions for the full picture.
namespace App\Messaging;
use Bschmitt\Amqp\Contracts\ConsumerInterface;
use Bschmitt\Amqp\Contracts\MessageHandlerInterface;
use PhpAmqpLib\Message\AMQPMessage;
class ProcessOrderHandler implements MessageHandlerInterface
{
public function handle(AMQPMessage $message, ConsumerInterface $resolver, $typed = null): void
{
$order = $typed !== null ? $typed->toPayload() : json_decode($message->body, true);
// ... process $order ...
$resolver->acknowledge($message);
}
}
Use this package as a native Laravel queue backend so jobs can be dispatched with dispatch(), Queue::push(), and processed with php artisan queue:work.
php artisan vendor:publish --provider="Bschmitt\Amqp\Providers\AmqpServiceProvider"
Merge the example from config/queue-amqp.php into config/queue.php:
'connections' => [
// ...
'amqp' => [
'driver' => 'amqp',
'connection' => env('AMQP_ENV', 'production'), // key in config/amqp.php properties
'queue' => env('AMQP_QUEUE', 'default'),
'retry_after' => 90,
],
],
QUEUE_CONNECTION=amqp
php artisan queue:work amqp --queue=default
Jobs are published to your configured exchange with the queue name as the routing key. Delayed jobs use a TTL dead-letter queue per delay interval.
ProcessOrder::dispatch($order)->delay(now()->addMinutes(5));
AmqpQueue::later() publishes to a per-TTL delay queue ({queue}.delay.{ttl_ms}) with
x-dead-letter-exchange / x-message-ttl so RabbitMQ delivers the job back to the
main queue when the delay expires. $job->release($seconds) uses the same mechanism.
vendor/bin/phpunit --testdox \
--filter 'AmqpQueue|AmqpJob|AmqpConnector|AmqpServiceProviderQueue|QueueConfigResolver|LaravelQueue'
Full setup, architecture and troubleshooting: docs/content/queue-driver.md or the interactive docs site (docs/index.html).
Publish the configuration file:
php artisan vendor:publish --provider="Bschmitt\Amqp\Providers\AmqpServiceProvider"
Or manually copy vendor/bschmitt/laravel-amqp/config/amqp.php to config/amqp.php.
Create a config folder in your Lumen root and copy the configuration file:
mkdir config
cp vendor/bschmitt/laravel-amqp/config/amqp.php config/amqp.php
Register the service provider in bootstrap/app.php:
$app->configure('amqp');
$app->register(Bschmitt\Amqp\Providers\LumenServiceProvider::class);
// For Lumen 5.2+, enable facades
$app->withFacades(true, [
'Bschmitt\Amqp\Facades\Amqp' => 'Amqp',
]);
return [
'use' => env('AMQP_ENV', 'production'),
'properties' => [
'production' => [
'host' => env('AMQP_HOST', 'localhost'),
'port' => env('AMQP_PORT', 5672),
'username' => env('AMQP_USER', 'guest'),
'password' => env('AMQP_PASSWORD', 'guest'),
'vhost' => env('AMQP_VHOST', '/'),
'exchange' => env('AMQP_EXCHANGE', 'amq.topic'),
'exchange_type' => env('AMQP_EXCHANGE_TYPE', 'topic'),
'consumer_tag' => 'consumer',
'ssl_options' => [],
'connect_options' => [],
'queue_properties' => ['x-ha-policy' => ['S', 'all']],
'exchange_properties' => [],
'timeout' => 0,
// Management API (optional)
'management_api_url' => env('AMQP_MANAGEMENT_URL', 'http://localhost:15672'),
'management_api_user' => env('AMQP_MANAGEMENT_USER', 'guest'),
'management_api_password' => env('AMQP_MANAGEMENT_PASSWORD', 'guest'),
],
],
];
See docs/modules/ for detailed module documentation:
// Publishing
Amqp::publish('', 'message', [
'exchange_type' => 'fanout',
'exchange' => 'amq.fanout',
]);
// Consuming (using dynamic call)
$amqp = app('Amqp');
$amqp->consume('', function ($message, $resolver) {
echo $message->body;
$resolver->acknowledge($message);
}, [
'routing' => '',
'exchange' => 'amq.fanout',
'exchange_type' => 'fanout',
'queue_force_declare' => true,
'queue_exclusive' => true,
'persistent' => true
]);
// Get Amqp instance
$amqp = app('Amqp');
// Purge queue
$amqp->queuePurge('my-queue', ['queue' => 'my-queue']);
// Delete queue
$amqp->queueDelete('my-queue', ['queue' => 'my-queue']);
// Get queue statistics
$stats = $amqp->getQueueStats('my-queue', '/');
// Get Amqp instance
$amqp = app('Amqp');
// Get queue statistics
$stats = $amqp->getQueueStats('my-queue', '/');
// List connections
$connections = $amqp->getConnections();
// Create policy
$amqp->createPolicy('my-policy', [
'pattern' => '^my-queue$',
'definition' => ['max-length' => 1000]
], '/');
Three small primitives let you build production-grade retry pipelines without hand-rolling DLX wiring:
Bschmitt\Amqp\Support\RetryPolicy — declarative max attempts +
backoff strategy (fixed, exponential, immediate, none) with optional cap
and jitter.Bschmitt\Amqp\Support\DeadLetterTopology — describes the work queue,
the DLQ, and the per-delay retry queues. Produces ready-to-use property
arrays for publish() / consume().Bschmitt\Amqp\Support\RetryHandler — decorator that wraps your
handler. On exception it republishes the message to a TTL'd retry queue
(which dead-letters back to the work queue when the TTL expires) and
acknowledges the original delivery. When the retry budget is spent it
rejects without requeue so RabbitMQ routes the message to the DLQ via the
x-dead-letter-exchange configured on the work queue.use Bschmitt\Amqp\Support\DeadLetterTopology;
use Bschmitt\Amqp\Support\RetryPolicy;
$amqp = app('Amqp');
// RetryPolicy::exponential($maxAttempts, $baseDelayMs, $multiplier, $maxDelayMs)
$policy = RetryPolicy::exponential(5, 1000, 2.0, 60000);
$topology = DeadLetterTopology::for('orders.process', $policy)
->on('app.events', 'topic')
->withRoutingKey('orders.process');
// Idempotently creates: orders.process, orders.process.dlq,
// and orders.process.retry.{1000,2000,4000,8000,16000} (capped at 60000).
$amqp->declareRetryTopology($topology);
$amqp->consumeWithRetry($topology, function ($message, $resolver) {
processOrder(json_decode($message->body, true));
$resolver->acknowledge($message);
});
When the handler throws:
RetryHandler reads (and bumps) the x-retry-attempt application header.orders.process.retry.{delayMs} with the computed TTL. RabbitMQ's DLX on
that queue routes the message back to orders.process once the TTL
expires.orders.process.dlq via the
work queue's x-dead-letter-exchange.x-first-failed-at and x-last-error headers carry diagnostics
forward across retries so DLQ inspection is meaningful.use Bschmitt\Amqp\Support\RetryPolicy;
RetryPolicy::fixed(3, 1000); // 3 retries, 1s apart
RetryPolicy::exponential(5, 500, 2.0, 30000); // 500ms doubling, capped at 30s
RetryPolicy::immediate(2); // 2 retries with zero delay
RetryPolicy::none(); // failures go straight to the DLQ
use Bschmitt\Amqp\Support\RetryHandler;
$wrapped = $amqp->retryHandler($yourHandler, $topology, function ($level, $message, $context) {
Log::log($level, $message, $context);
});
$amqp->consume('orders.process', $wrapped, $topology->toWorkProperties());
php artisan amqp:work orders.process \
--handler="App\\Messaging\\ProcessOrderHandler" \
--retry=5 \
--retry-backoff=exponential \
--retry-delay=1000 \
--retry-multiplier=2.0 \
--retry-max-delay=60000 \
--dlq=orders.process.failed \
--declare-topology
See docs/content/advanced.md and the unit tests under test/Unit/Retry* /
test/Unit/DeadLetterTopologyTest.php for more examples.
Schedule messages for later delivery or absorb transient broker errors on publish.
publishLater() — schedule delivery$amqp = app('Amqp');
// TTL + dead-letter exchange (works on stock RabbitMQ)
$amqp->publishLater('orders.reminder', json_encode(['orderId' => 42]), 60000, [
'exchange' => 'shop.events',
]);
// rabbitmq-delayed-message-exchange plugin (exchange must be x-delayed-message)
$amqp->publishLater('orders.reminder', $body, 60000, [
'exchange' => 'shop.delayed',
'delay_strategy' => 'plugin',
]);
DelayedPublisher creates a per-delay queue ({routing}.delayed.{ms}) with x-message-ttl and DLX routing back to the target exchange when using the default TTL strategy.
PublishBackoff — retry failed publishesuse Bschmitt\Amqp\Support\RetryPolicy;
$amqp->withPublishBackoff(RetryPolicy::exponential(3, 100, 2.0))->run(function () use ($amqp) {
return $amqp->publish('orders.created', $payload);
});
This is separate from consumer-side RetryHandler — it retries the publish call itself when the broker throws.
Define message shapes as plain PHP classes and let the package handle JSON encoding/decoding.
use Bschmitt\Amqp\Support\TypedMessage;
class OrderCreated extends TypedMessage
{
public $orderId;
public $total;
public $currency;
public function __construct($orderId = null, $total = null, $currency = null)
{
$this->orderId = $orderId;
$this->total = $total;
$this->currency = $currency;
}
public static function routingKey()
{
return 'orders.created';
}
public static function exchange()
{
return 'shop.events';
}
}
$amqp = app('Amqp');
// Publish — picks up routing key + exchange from the contract
$amqp->publishTyped(new OrderCreated('order-1', 19.99, 'USD'));
// Consume — callback receives ($typed, $message, $resolver)
$amqp->consumeTyped('orders.queue', OrderCreated::class, function ($order, $message, $resolver) {
processOrder($order->orderId);
$resolver->acknowledge($message);
});
// Delayed typed publish
$amqp->publishTypedLater(new OrderCreated('order-2', 9.99, 'USD'), 30000);
Swap the serializer via $amqp->setSerializer($mySerializer) when you need MessagePack, Avro, etc.
Contracts may expose a static schema() method returning a JSON Schema-style array. The package validates payloads on publish and consume using the bundled SchemaValidator (no external dependencies).
class OrderCreated extends TypedMessage
{
// ...properties...
public static function schema()
{
return [
'type' => 'object',
'required' => ['orderId', 'total', 'currency'],
'additionalProperties' => false,
'properties' => [
'orderId' => ['type' => 'string', 'minLength' => 1],
'total' => ['type' => 'number', 'minimum' => 0],
'currency' => ['type' => 'string', 'enum' => ['USD', 'EUR', 'GBP']],
],
];
}
}
Invalid payloads raise Bschmitt\Amqp\Exception\SchemaValidationException with a list of pointer-style error messages. On the CLI, combine --contract with --validate-schema on amqp:work.
Supported keywords include type, required, properties, additionalProperties, enum, const, minimum/maximum, minLength/maxLength, pattern, format (email, uri, uuid, date, date-time), items, oneOf/anyOf/allOf/not, and more — see docs/content/advanced.md.
Declare an exchange and multiple bound queues in one fluent builder:
use Bschmitt\Amqp\Facades\Amqp;
use Bschmitt\Amqp\Support\ExchangeTopology;
use Bschmitt\Amqp\Support\QueueProfile;
$topology = ExchangeTopology::exchange('events', 'topic')
->bindQueue('orders.created', 'order.created')
->bindQueue('orders.shipped', 'order.shipped', QueueProfile::quorum());
Amqp::declareExchangeTopology($topology);
// Publish using properties for a specific queue in the topology
Amqp::publish('order.created', $payload, $topology->propertiesForQueue('orders.created'));
Shortcut: Amqp::exchangeTopology('events', 'topic')->bindQueue(...).
use Bschmitt\Amqp\Support\QueueProfile;
Amqp::publish('jobs', $payload, QueueProfile::quorumWithPriority(10)->mergeInto([
'queue' => 'jobs',
'routing' => 'jobs',
]));
use Bschmitt\Amqp\Facades\Amqp;
use Bschmitt\Amqp\Managers\ConnectionPool;
// Per-request resilient manager (reconnect + heartbeat staleness)
$resilient = Amqp::resilientConnection(['host' => 'rabbitmq'], [
'max_reconnect_attempts' => 5,
'heartbeat' => 30,
]);
$channel = $resilient->getChannel();
// Long-lived worker pool (persistent keys survive disconnectAll(false))
$pool = Amqp::connectionPool();
$manager = $pool->connection('worker', ['use' => 'production', 'resilient' => true], true);
use Bschmitt\Amqp\Support\CorrelationContext;
CorrelationContext::set('request-abc-123');
Amqp::publish('orders.created', $payload, [
'propagate_correlation' => true,
'propagate_trace' => true,
]);
Amqp::consumeWithLifecycle('orders.created', function ($message, $resolver) {
// CorrelationContext::get() is populated when propagate_* flags are used
}, null, [
'propagate_correlation' => true,
'propagate_trace' => true,
]);
Bridge OpenTelemetry (or any APM) without a hard dependency:
use Bschmitt\Amqp\Support\CallbackTracePropagator;
Amqp::setTracePropagator(new CallbackTracePropagator(
function (array $carrier, $context) {
// inject active span into $carrier
return $carrier;
},
function (array $carrier) {
// extract TraceContext from $carrier or return null
return null;
}
));
use Bschmitt\Amqp\Support\ConsumerLifecycle;
$lifecycle = (new ConsumerLifecycle())
->registerSignalHandlers()
->onStopping(function ($lifecycle) {
// flush buffers, close DB connections, etc.
});
Amqp::consumeWithLifecycle('jobs', $handler, $lifecycle);
See docs/content/production-features.md for the full reference.
use Bschmitt\Amqp\Facades\Amqp;
$saga = Amqp::saga('checkout')
->step('reserveStock', $reserveStock, $releaseStock)
->step('chargeCard', $chargeCard, $refundCard)
->step('shipOrder', $shipOrder);
$result = $saga->execute(['orderId' => 42]);
if ($result->failed()) {
Log::error('Saga failed', [
'step' => $result->getFailedStep(),
'compensated' => $result->getCompensatedSteps(),
'error' => $result->getException()->getMessage(),
]);
}
Compensations only run for steps that completed before the failure, in reverse order.
The package dispatches the following events through \Illuminate\Support\Facades\Event (and a local listener registry as a fallback for non-Laravel contexts):
| Event | When |
|------|------|
| Bschmitt\Amqp\Events\MessagePublishing | Right before a publish is sent |
| Bschmitt\Amqp\Events\MessagePublished | After a successful publish |
| Bschmitt\Amqp\Events\MessageReceived | When a message is received by the consume pipeline |
| Bschmitt\Amqp\Events\MessageHandled | After the handler completes |
| Bschmitt\Amqp\Events\MessageFailed | When the handler throws |
Listen in Laravel as usual:
Event::listen(\Bschmitt\Amqp\Events\MessageFailed::class, function ($event) {
Log::warning('AMQP handler failed', ['error' => $event->exception->getMessage()]);
});
Wrap the consume handler with a pipeline:
use Bschmitt\Amqp\Facades\Amqp;
Amqp::consumeWithMiddleware('orders', function ($message, $resolver) {
// handle...
}, [
function ($message, $next) {
$start = microtime(true);
$next($message);
Log::info('handled', ['duration_ms' => (microtime(true) - $start) * 1000]);
},
// ...or a ConsumeMiddlewareInterface instance
]);
Each middleware receives (AMQPMessage $message, callable $next) and can short-circuit by not calling $next.
In tests, replace the bound singleton with a recording fake:
use Bschmitt\Amqp\Core\Amqp;
public function test_publishes_order_created()
{
$fake = Amqp::fake();
(new CreateOrder)->handle();
$fake->assertPublished('orders.created');
$fake->assertPublishedCount(1, 'orders.created');
$fake->assertNotPublished('orders.shipped');
}
The fake records both publish() and publishLater() calls; never touches the broker.
$async = Amqp::asyncPublisher(['exchange' => 'events'])
->onAck(function ($tag) { /* metric: published */ })
->onNack(function ($tag) { /* metric: failed */ });
foreach ($messages as $m) {
$async->publish('events.created', json_encode($m));
}
if (!$async->flush(30)) {
Log::warning('Some publisher confirms timed out');
}
$async->close();
AsyncPublisher keeps a single channel open with confirm_select and only waits for confirmations on flush(), so high-throughput publishers don't block on the per-message round-trip.
// Client
$result = Amqp::rpcClient(['exchange' => 'rpc'])->asJson()->timeout(10)
->call('users.lookup', ['id' => 42]);
if ($result->succeeded()) {
$user = $result->body();
}
// Server
Amqp::rpcServer()->asJson()->serve('rpc.users', function ($request, $consumer) {
return ['id' => $request['id'], 'name' => 'Ada'];
});
RpcCallResult exposes succeeded(), timedOut(), and body().
Amqp::publishInterop(
'orders.created',
['orderId' => 99, 'total' => 12.50],
'orders.created',
'billing-service',
['exchange' => 'events'],
'2.0'
);
Amqp::consumeInterop('events.orders', function ($interop, $raw, $resolver) {
$payload = \Bschmitt\Amqp\Support\InteropEnvelope::decodePayload($interop);
// $interop->messageType, $interop->sourceService, $interop->schemaVersion
});
Standard headers (x-message-type, x-schema-version, x-source-service) let Node, Go, or Java consumers route messages without PHP DTOs.
// In-process counters (per worker / request)
$stats = Amqp::metrics()->snapshot();
// Broker-side queue depth + rates (Management API)
$metrics = Amqp::queueMetrics('orders', '/');
Log::info('queue depth', $metrics->toArray());
Publish/consume paths increment MetricsCollector automatically when using publish(), consumeWithMiddleware(), or HighPerformanceWorker.
Amqp::consumeOptimized('jobs', $handler, ['exchange' => 'work']);
// Or explicitly:
Amqp::highPerformanceWorker(
\Bschmitt\Amqp\Support\WorkerOptions::throughput(100)
)->run('jobs', $handler);
CLI: php artisan amqp:work jobs --handler=App\\Handlers\\JobHandler --optimized
See docs/content/scale-and-interop.md for the full reference.
A typed, service-oriented RPC layer that feels like gRPC but rides on RabbitMQ. Define a service once, then call it from any process with typed DTOs.
use Bschmitt\Amqp\Rpc\RpcService;
use Bschmitt\Amqp\Rpc\RpcRequest;
use Bschmitt\Amqp\Rpc\RpcResponse;
class UserService extends RpcService
{
public static function queue(): string
{
return 'rpc.user-service';
}
public static function methods(): array
{
return [
GetUserRequest::class => 'getUser',
CreateUserRequest::class => 'createUser',
];
}
}
class GetUserRequest extends RpcRequest
{
public $id;
public function __construct($id = null) { $this->id = $id; }
public static function responseClass()
{
return GetUserResponse::class;
}
}
class GetUserResponse extends RpcResponse
{
public $id;
public $name;
public function __construct($id = null, $name = null)
{
$this->id = $id;
$this->name = $name;
}
}
use Rpc; // facade alias auto-registered
$response = Rpc::call(
UserService::class,
GetUserRequest::make(['id' => 5])
);
echo $response->name; // GetUserResponse instance, hydrated for you
Rpc::call() automatically:
responseClass() (or returns the raw decoded array).Throws RpcTimeoutException if no reply arrives, or RpcException if the server returned an error envelope.
use Rpc;
class UserServiceHandler
{
public function getUser(GetUserRequest $request): GetUserResponse
{
$user = User::findOrFail($request->id);
return GetUserResponse::make([
'id' => $user->id,
'name' => $user->name,
]);
}
public function createUser(CreateUserRequest $request): GetUserResponse
{
$user = User::create(['name' => $request->name]);
return GetUserResponse::make(['id' => $user->id, 'name' => $user->name]);
}
}
Rpc::register(UserService::class, UserServiceHandler::class)
->serve(UserService::class);
The handler may be an instance or a container-resolvable FQCN (Laravel only). Handler exceptions are wrapped into an _rpc_error envelope so the client raises a typed RpcException with the original message and class name.
// Global default timeout
Rpc::defaultTimeout(10);
// Per-call timeout + extra publish properties
Rpc::call(UserService::class, GetUserRequest::make(['id' => 1]), 5, [
'exchange' => 'rpc.svc',
]);
Every Rpc::call() records timing in Amqp::rpcMetrics() and dispatches Laravel events you can wire to Pulse, logs, or APM:
use Bschmitt\Amqp\Events\RpcCallCompleted;
use Bschmitt\Amqp\Events\RpcCallFailed;
Event::listen(RpcCallCompleted::class, fn ($e) => Log::info('rpc.ok', [
'service' => $e->service,
'request' => $e->request,
'ms' => $e->durationMs,
]));
$stats = Amqp::rpcMetrics()->snapshot();
// ['UserService::GetUserRequest' => ['count' => 42, 'p95_ms' => 12.5, 'error_rate' => 0.02, ...]]
Lower-level RpcClient::call() also returns RpcCallResult::durationMs().
See docs/content/grpc-lite-rpc.md for the full reference.
A set of higher-level building blocks that turn the package from "an AMQP client" into a full microservice toolkit: service discovery, sagas, message contracts, dead-letter management, declarative retry, monitoring, automatic context propagation, an audit log, and an event bridge.
Rpc::service(...))Skip exchange/routing-key/queue gymnastics — register a short name and call by that name.
use Bschmitt\Amqp\Facades\Rpc;
// Either: explicit registration
Rpc::services()->register('payments', PaymentsService::class);
// Or: opt-in auto-discovery (service exposes `public static function alias()`)
class PaymentsService extends RpcService {
public static function queue(): string { return 'rpc.payments'; }
public static function methods(): array { return [GetPayment::class => 'find']; }
public static function alias(): ?string { return 'payments'; }
}
Rpc::services()->autodiscover([PaymentsService::class]);
$response = Rpc::service('payments')
->timeout(5)
->call(GetPayment::make(['id' => 123]));
Rpc::service() accepts an alias or a service FQCN.
Saga::make()->step()->compensate() with reverse-order compensation when a step throws.
use Bschmitt\Amqp\Facades\Saga;
$result = Saga::make('checkout')
->step('reserve', fn($ctx) => $stock->reserve($ctx['orderId']))
->compensate(fn($ctx) => $stock->release($ctx['orderId']))
->step('charge', fn($ctx) => $payments->charge($ctx['amount']))
->compensate(fn($ctx, $tx) => $payments->refund($tx))
->execute(['orderId' => 1, 'amount' => 49.99]);
if (!$result->succeeded()) {
Log::error('Saga failed at ' . $result->getFailedStep(), [
'compensated' => $result->getCompensatedSteps(),
]);
}
OrderCreated::dispatch(...))TypedMessage now exposes make() and dispatch() (and dispatchLater() for the delayed-queue variant).
use Bschmitt\Amqp\Support\TypedMessage;
class OrderCreated extends TypedMessage
{
public $orderId;
public $total;
public static function name(): string { return 'orders.created'; }
}
OrderCreated::dispatch(['orderId' => 'o-1', 'total' => 9.99]);
OrderCreated::dispatchLater(['orderId' => 'o-1'], 2_000); // 2s delay
use Bschmitt\Amqp\Facades\Amqp;
Amqp::deadLetters()->for('orders.dlq')->count(); // 17
Amqp::deadLetters()->for('orders.dlq')->peek(20); // non-destructive sample
Amqp::deadLetters()->for('orders.dlq')->summarize(100); // group by reason / error
Amqp::deadLetters()->for('orders.dlq')->messages(10); // drain & inspect (destructive)
Amqp::deadLetters()->for('orders.dlq')->replayTo('orders', 50);
Amqp::deadLetters()->for('orders.dlq')->purge();
CLI:
php artisan amqp:dlq inspect orders.dlq
php artisan amqp:dlq summary orders.dlq --limit=200 --json
php artisan amqp:dlq replay orders.dlq --target=orders --limit=50
php artisan amqp:dlq purge orders.dlq --force
Lifecycle events: DeadLetterDetected, DeadLetterReplayed, DeadLetterPurged.
#[Retry])use Bschmitt\Amqp\Attributes\Retry;
use Bschmitt\Amqp\Support\RetryStrategy;
use Bschmitt\Amqp\Support\RetryPolicy;
class CreateOrderHandler
{
#[Retry(attempts: 5, strategy: RetryStrategy::EXPONENTIAL, delayMs: 500)]
public function handle($message): void { /* ... */ }
}
$policy = RetryPolicy::fromAttribute(CreateOrderHandler::class, 'handle');
$amqp->consumeWithRetry('orders', $handler, $policy);
On PHP 7.x the attribute parses as a comment (the package still loads); call sites that want the attribute need PHP 8+.
$snapshot = Amqp::dashboard(['orders', 'orders.dlq'])
->deadLetters(['orders.dlq'])
->lagThresholds(1000, 60.0, 300)
->snapshot();
// process, queues (with lag / lag_seconds / lagging), dead_letters, rpc, lagging[], generated
CLI:
php artisan amqp:monitor --queue=orders --queue=orders.dlq --json
php artisan amqp:monitor --queue=orders --dlq=orders.dlq --rpc
php artisan amqp:monitor --queue=orders --lag-threshold=1000 --lag-seconds=60
# exits 1 when any queue breaches a lag threshold (cron-friendly)
Wire the snapshot into any HTTP route (Laravel, Symfony, Slim) to expose a JSON dashboard.
CorrelationContext::inheritFromMessage() now picks up the inbound message_id as the causation_id for everything published afterwards, so downstream services can trace "this happened because of that" through a chain.
CorrelationContext::inheritFromMessage($incoming);
Amqp::publish('orders.created', $body, [
'propagate_correlation' => true,
'message_id' => uniqid('msg_', true),
]);
// outbound has `correlation_id`, `x-correlation-id`, and `x-causation-id` set
CorrelationChain walks the MessageStore, groups entries by correlation_id, and rebuilds the causation tree using the x-causation-id header — no UI server required.
use Bschmitt\Amqp\Support\CorrelationChain;
$chain = new CorrelationChain($amqp->messageStore());
$summary = $chain->summarize('corr_abc123');
// total, published, consumed, routings, first_at, last_at, duration_ms
$tree = $chain->tree('corr_abc123'); // nested ['entry' => ..., 'children' => [...]]
echo $chain->render($tree); // ASCII tree, perfect for logs
CLI:
php artisan amqp:trace corr_abc123
php artisan amqp:trace corr_abc123 --summary
php artisan amqp:trace corr_abc123 --json --limit=50
Sample output:
correlation_id: corr_abc123
messages: 4 (published=3, consumed=1)
span: 18.42 ms
routings: orders.created(1), orders.shipped(2), orders.invoiced(1)
[published] >> orders.created (msg=msg_root)
├── [published] >> orders.shipped (msg=msg_a)
│ └── [published] >> orders.invoiced (msg=msg_grand)
└── [consumed] << orders.shipped (msg=msg_b)
When laravel/pulse is installed the package auto-registers AmqpPulseRecorder and records the following metric types so they show up under Pulse::values($type) and in custom cards:
| Type | Key | Value |
|------------------|----------------------------------|-------------------|
| amqp_publish | routing key | 1 (count) |
| amqp_handle | queue | duration (ms) |
| amqp_fail | queue | 1 (count) |
| amqp_rpc | Service::Request (short name) | duration (ms) |
| amqp_rpc_fail | Service::Request | 1 (count) |
| amqp_dlq | dead-letter queue | sampled msg count |
Disable the auto-subscription in config/amqp.php:
return [
// ...
'pulse_integration' => false,
];
The recorder is a silent no-op when Pulse is not installed — no exceptions, no log spam.
OpenTelemetryTracePropagator plugs the open-telemetry/api SDK into the package's TracePropagatorInterface so that the active OTel span context is auto-injected into every AMQP traceparent / tracestate header.
use Bschmitt\Amqp\Contracts\TracePropagatorInterface;
use Bschmitt\Amqp\Support\OpenTelemetryTracePropagator;
// In a service provider:
$this->app->singleton(TracePropagatorInterface::class, function () {
return new OpenTelemetryTracePropagator();
// Or pass an explicit \OpenTelemetry\Context\Propagation\TextMapPropagatorInterface
});
When the SDK is absent the propagator falls back to W3C generation (W3cTracePropagator), so the same wiring works on stripped-down environments and in CI.
use Bschmitt\Amqp\Support\InMemoryMessageStore;
$amqp->setMessageStore(new InMemoryMessageStore());
Amqp::publish('orders.created', '{}');
$entries = $amqp->messageStore()->all(['direction' => 'published']);
Implement MessageStoreInterface to back it with Eloquent / Redis / files for durable replay.
use Bschmitt\Amqp\Contracts\ShouldPublishToAmqpInterface;
class OrderCreated implements ShouldPublishToAmqpInterface
{
public function __construct(public string $orderId) {}
}
// config/amqp.php
return [
// ...
'broadcast_laravel_events' => true,
];
event(new OrderCreated('o-1'));
// auto-published to RabbitMQ with routing key `order_created`
Override amqpRouting(), amqpExchange(), or amqpPayload() on the event to customise routing.
The package includes comprehensive test coverage:
# Run all tests
php vendor/bin/phpunit
# Run unit tests only
php vendor/bin/phpunit test/Unit/
# Run integration tests only
php vendor/bin/phpunit test/Integration/
Test Requirements:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-managementSee Testing Guide for more information.
RPC:
$amqp->rpc($routingKey, $request, $properties, $timeout) - Make RPC calls (use $amqp = app('Amqp'))Consumer::reply($message, $response, $properties) - Send RPC responses$amqp->listen($routingKeys, $callback, $properties) - Auto-create queues with multiple bindings (use $amqp = app('Amqp'))Management:
$amqp->queuePurge($queue, $properties) - Purge queue (use $amqp = app('Amqp'))$amqp->queueDelete($queue, $ifUnused, $ifEmpty, $properties) - Delete queue$amqp->queueUnbind(...) - Unbind queue$amqp->exchangeDelete(...) - Delete exchange$amqp->exchangeUnbind(...) - Unbind exchangeManagement API:
$amqp->getQueueStats($queue, $vhost, $properties) - Queue statistics (use $amqp = app('Amqp'))$amqp->getConnections($connectionName, $properties) - List connections$amqp->getChannels($channelName, $properties) - List channels$amqp->getNodes($nodeName, $properties) - Cluster nodes$amqp->getPolicies($properties) - List policies$amqp->createPolicy(...) - Create policy$amqp->updatePolicy(...) - Update policy$amqp->deletePolicy(...) - Delete policy$amqp->listFeatureFlags($properties) - List feature flags$amqp->getFeatureFlag($name, $properties) - Get feature flagHelpers:
$amqp->getConnectionConfig($connectionName) - Get connection config (use $amqp = app('Amqp'))Note: For consume(), listen(), rpc(), and all management methods, you must resolve the Amqp instance from the container using $amqp = app('Amqp') or $amqp = resolve('Amqp'). The static facade Amqp:: works for publish() but not for consume() and other instance methods.
Two complementary surfaces — HTTP routes for sidecars and a CLI for exec probes — both backed by the same HealthState + HealthCheck pair.
Enable in config/amqp.php (or via AMQP_PROBES_ENABLED=true):
'probes' => [
'enabled' => true,
'prefix' => 'amqp/health', // GET /amqp/health/live, /ready, /
'middleware' => [], // optional middleware (e.g. ['api'])
'state_file' => storage_path('framework/amqp-health.json'),
'heartbeat_age' => 60, // seconds before liveness flips to 503
'queues' => ['orders', 'orders.dlq'],
'max_backlog' => 5000,
],
The service provider registers:
| Method | Path | Response |
|--------|-----------------------|------------------------------------------|
| GET | /amqp/health/live | 200 alive / 503 dead |
| GET | /amqp/health/ready | 200 ready / 503 not ready |
| GET | /amqp/health/ | combined snapshot |
Wire it from your consumer:
use Bschmitt\Amqp\Support\ConsumerLifecycle;
use Bschmitt\Amqp\Support\HealthState;
$lifecycle = (new ConsumerLifecycle())
->withHealth(HealthState::instance(storage_path('framework/amqp-health.json')))
->registerSignalHandlers();
Amqp::consumeWithLifecycle('orders', $handler, $lifecycle);
livenessProbe.exec.command)# Readiness (default)
php artisan amqp:health
php artisan amqp:health --queue=orders --backlog=1000
# Liveness
php artisan amqp:health --probe=live --heartbeat-age=30
# Combined snapshot
php artisan amqp:health --all --state-file=/var/run/amqp-health.json
Exit codes: 0 = healthy, 1 = unhealthy — exactly what livenessProbe.exec / readinessProbe.exec expect.
AutoscalingAdvisor is a pure function that turns a QueueMetrics snapshot into a recommended replica count and a ready-to-paste KEDA trigger:
use Bschmitt\Amqp\Support\AutoscalingAdvisor;
$metrics = Amqp::queueMetrics('orders');
$advice = (new AutoscalingAdvisor())
->messagesPerConsumer(100)
->maxLagSeconds(15.0)
->minReplicas(1)
->maxReplicas(20)
->advise($metrics);
// $advice['desired_consumers'] => 4
// $advice['action'] => 'scale_up'
// $advice['reasons'] => ['depth 350 / 100 ...', 'lag 20s > 15s -> +1 ...']
// $advice['keda'] => KEDA RabbitMQ trigger spec
CLI form:
php artisan amqp:scale orders orders.priority \
--per-consumer=100 --max=20 --lag-seconds=15
php artisan amqp:scale orders --keda # emit only the KEDA trigger
php artisan amqp:scale orders --json --fail-on-scale-up # CI-friendly
The --keda output drops straight into a ScaledObject manifest under spec.triggers.
LaravelCloud detects managed environments (Laravel Cloud, Forge, Vapor, Render, Fly.io) and, when amqp.cloud.auto_hydrate is true (default), parses an AMQP_URL / CLOUDAMQP_URL / RABBITMQ_URL DSN into the active connection block on register() — without overwriting explicit config:
AMQP_URL=amqps://app:[email protected]/%2Fprod
Explicit AMQP_HOST / AMQP_USER / etc. still win. You can also call the detector directly:
use Bschmitt\Amqp\Support\LaravelCloud;
if (LaravelCloud::isHosted()) {
logger()->info('amqp hosted env', LaravelCloud::summary());
}
$props = LaravelCloud::parseDsn(env('AMQP_URL'));
Configure region-scoped connection keys, then resolve / fail over with locality preference:
// config/amqp.php
'regions' => [
'enabled' => true,
'connections' => ['production-us', 'production-eu', 'production-apac'],
'primary' => null, // null = match LARAVEL_CLOUD_REGION/AWS_REGION
'cooldown_seconds' => 30,
],
use Bschmitt\Amqp\Support\MultiRegionConnection;
$resolver = app(MultiRegionConnection::class);
// Single attempt with locality preference
$connectionKey = $resolver->pick(); // 'production-us'
// Run a publish across regions until one succeeds
$resolver->withFailover(function ($region) {
Amqp::publish('orders.created', $payload, ['use' => $region]);
});
// Fan-out to every region (e.g. announcements)
foreach ($resolver->each() as $region) {
Amqp::publish('events.maintenance', $payload, ['use' => $region]);
}
Failed regions cool down for the configured window before re-entering rotation.
Version 3.4.0 is fully backward compatible with previous versions. All existing code will continue to work without modifications.
Contributions are welcome! Please feel free to submit a Pull Request.
rabbitmq:3-management Docker imageThis package is open-sourced software licensed under the MIT license.
For issues, questions, or contributions:
docs/ directoryVersion: 3.4.0
Status: Ready