Centum\Queue
The Queue component enables you to offload work for asynchronous or deferred processing. This is especially useful for tasks that are time-consuming, resource-intensive, or could negatively impact application performance if run synchronously.
Interface Overview
Centum\Interfaces\Queue\QueueInterface
exposes two main methods:
publish(Centum\Interfaces\Queue\TaskInterface $task): void
Add a task to the queue for later execution.consume(): Centum\Interfaces\Queue\TaskInterface
Retrieve and execute the next available task.
When consume()
is called, the next task is executed. If a task throws an exception during execution, the queue implementation may bury or mark it as failed for later inspection or retry.
Task Representation
All queue operations use Centum\Interfaces\Queue\TaskInterface
, which represents a unit of work for a background worker. Tasks may be serialized/unserialized internally, so complex dependencies should be resolved via the Container in the execute()
method.
Example Task
namespace App\Tasks;
use Centum\Interfaces\Container\ContainerInterface;
use Centum\Interfaces\Queue\TaskInterface;
class LogTask implements TaskInterface
{
public function __construct(
protected readonly string $message
) {
}
public function execute(ContainerInterface $container): void
{
$line = sprintf(
"[%s] %s" . PHP_EOL,
date("Y-m-d H:i:s"),
$this->message
);
file_put_contents("log.txt", $line, FILE_APPEND);
}
}
Typical Usage
Tasks can be published from anywhere in your application and consumed elsewhere, such as in a background worker or CLI command.
Publishing a Task (e.g., from a Controller)
namespace App\Web\Controllers;
use App\Tasks\LogTask;
use Centum\Http\Response;
use Centum\Interfaces\Http\ResponseInterface;
use Centum\Interfaces\Queue\QueueInterface;
use Centum\Interfaces\Router\ControllerInterface;
class IndexController implements ControllerInterface
{
public function index(QueueInterface $queue): ResponseInterface
{
$logTask = new LogTask("Queue works OK.");
$queue->publish($logTask);
return new Response("hello");
}
}
Consuming Tasks (e.g., from a Console Command)
namespace App\Console\Commands;
use Centum\Console\CommandMetadata;
use Centum\Interfaces\Console\CommandInterface;
use Centum\Interfaces\Console\TerminalInterface;
use Centum\Interfaces\Queue\QueueInterface;
#[CommandMetadata("queue:consume")]
class QueueConsumeCommand implements CommandInterface
{
public function __construct(
protected readonly QueueInterface $queue
) {
}
public function execute(TerminalInterface $terminal): int
{
$this->queue->consume();
return self::SUCCESS;
}
}
Supported Queue Implementations
- ArrayQueue: In-memory, non-persistent queue for testing and development.
- BeanstalkdQueue: Persistent queue backed by Beanstalkd.
- ImmediateQueue: Executes tasks synchronously as soon as they are published.