Phive Queue is a time-based scheduling queue with multiple backend support.
The recommended way to install Phive Queue is through Composer:
$ composer require rybakit/phive-queue:*use Phive\Queue\Queue\InMemoryQueue;
$queue = new InMemoryQueue();
$queue->push('item1');
$queue->push('item2', new DateTime());
$queue->push('item3', time());
$queue->push('item4', '+5 seconds');
$queue->push('item5', 'next Monday');
// get the queue size
$count = $queue->count(); // 5
// pop items off the queue
// note that is not guaranteed that the items with the same scheduled time
// will be recieved in the same order in which they were added
$item123 = $queue->pop();
$item123 = $queue->pop();
$item123 = $queue->pop();
$item4 = $queue->pop(); // throws Phive\Queue\Exception\NoItemAvailableException
sleep(5);
$item4 = $queue->pop();
// clear the queue (will remove 'item5')
$queue->clear();Currently, there are the following queues available:
The MongoQueue requires the Mongo PECL extension (v1.3.0 or higher).
Note: Before making use of the queue, it's highly recommended to create an index on a eta field:
$ mongo my_db --eval 'db.my_coll.ensureIndex({ eta: 1 })'public MongoQueue::__construct(MongoClient $client, string $dbName, string $collName)Parameters:
client The MongoClient instance
dbName The database name
collName The collection name
use Phive\Queue\Queue\MongoQueue;
$client = new MongoClient();
$queue = new MongoQueue($client, 'my_db', 'my_coll');For the RedisQueue you have to install the Redis PECL extension (v2.2.3 or higher).
public RedisQueue::__construct(Redis $redis)Parameters:
redis The Redis instance
use Phive\Queue\Queue\RedisQueue;
$redis = new Redis();
$redis->connect('127.0.0.1');
$redis->setOption(Redis::OPT_PREFIX, 'my_prefix:');
// Since the Redis client v2.2.5 the RedisQueue has the ability to utilize serialization:
// $redis->setOption(Redis::OPT_SERIALIZER, Redis::SERIALIZER_PHP);
$queue = new RedisQueue($redis);The BeanstalkQueue requires the Pheanstalk library (Beanstalk client) to be installed:
$ composer require pda/pheanstalk:~2.1public BeanstalkQueue::__construct(Pheanstalk_Pheanstalk $client, string $tubeName)Parameters:
client The Pheanstalk_Pheanstalk instance
tubeName The tube name
use Pheanstalk_Pheanstalk as Pheanstalk;
use Phive\Queue\Queue\BeanstalkQueue;
$client = new Pheanstalk('127.0.0.1');
$queue = new BeanstalkQueue($client, 'my_tube');The GenericPdoQueue is intended for PDO drivers whose databases support stored procedures/functions (in fact all drivers except SQLite).
The GenericPdoQueue requires PDO and a PDO driver for a particular database be installed.
On top of that PDO error mode must be set to throw exceptions (PDO::ERRMODE_EXCEPTION).
public GenericPdoQueue::__construct(PDO $conn, string $tableName [, string $routineName = null ] )Parameters:
conn The PDO instance
tableName The table name
routineName Optional. The routine name. Default to tableName_pop
use Phive\Queue\Queue\Pdo\GenericPdoQueue;
$pdo = new PDO('pgsql:host=127.0.0.1;port=5432;dbname=foo', 'db_user', 'db_pass');
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$queue = new GenericPdoQueue($pdo, 'my_table', 'my_routine');The SqlitePdoQueue requires PDO and SQLite PDO driver.
On top of that PDO error mode must be set to throw exceptions (PDO::ERRMODE_EXCEPTION).
Note: For performance reasons it's highly recommended to activate WALL mode:
$pdo = new PDO('sqlite:/opt/databases/my_db.sq3');
$pdo->exec('PRAGMA journal_mode=WAL');public SqlitePdoQueue::__construct(PDO $conn, string $tableName)Parameters:
conn The PDO instance
tableName The table name
use Phive\Queue\Queue\Pdo\SqlitePdoQueue;
$pdo = new PDO('sqlite:/opt/databases/my_db.sq3');
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$queue = new SqlitePdoQueue($pdo, 'my_table');The SysVQueue requires PHP to be compiled with the option --enable-sysvmsg.
public SysVQueue::__construct(int $key [, bool $serialize = null [, int $perms = null ]] )Parameters:
key The message queue numeric ID
serialize Optional. Whether to serialize an item or not. Default to false
perms Optional. The queue permissions. Default to 0666
use Phive\Queue\Queue\SysVQueue;
$queue = new SysVQueue(123456);As you might guess, the InMemoryQueue exists only in RAM and therefore operates faster than other queues. It can be useful in cases where the persistence is not necessary.
public InMemoryQueue::__construct()use Phive\Queue\Queue\InMemoryQueue;
$queue = new InMemoryQueue();Every queue method declared in the Queue interface will throw an exception if a run-time error occurs at the time the method is called.
For example, in the code below, the push() call will fail with a MongoConnectionException exception in a case a remote server unreachable:
use Phive\Queue\Queue\MongoQueue;
$queue = new MongoQueue(...);
// mongodb server goes down here
$queue->push('item'); // throws MongoConnectionExceptionBut sometimes you may want to catch exceptions coming from a queue regardless of the underlying driver.
To do this just wrap your queue object with the ExceptionalQueue decorator:
use Phive\Queue\Queue\ExceptionalQueue;
use Phive\Queue\Queue\MongoQueue;
$queue = new MongoQueue(...);
$queue = new ExceptionalQueue($queue);
// mongodb server goes down here
$queue->push('item'); // throws Phive\Queue\Exception\RuntimeExceptionAnd then, to catch queue level exceptions use the QueueException marker interface:
use Phive\Queue\Exception\QueueException;
...
try {
do_something_with_a_queue();
} catch (QueueException $e) {
// handle queue exception
} catch (\Exception $e) {
// handle base exception
}Phive Queue uses PHPUnit for unit and acceptance testing. In order to run the tests, you'll first need to install the library dependencies using composer:
$ composer installYou can then run the tests:
$ phpunitYou may also wish to specify your own default values of some tests (db names, passwords, queue sizes, etc.).
Just create your own phpunit.xml file by copying the phpunit.xml.dist file and customize to your needs.
To check the performance of queues run:
$ phpunit --group performanceThis test inserts a number of items (1000 by default) into a queue, and then retrieves them back.
It measures the average time for push and pop operations and outputs the resulting stats, e.g.:
SysVQueue::push()
Total operations: 1000
Operations per second: 67149.691 [#/sec]
Time per operation: 14.892 [ms]
Time taken for test: 0.015 [sec]
SysVQueue::pop()
Total operations: 1000
Operations per second: 96908.667 [#/sec]
Time per operation: 10.319 [ms]
Time taken for test: 0.010 [sec]You may also change the number of items involved in the test by changing the PHIVE_PERF_QUEUE_SIZE value in your phpunit.xml file.
In order to check the concurrency you'll have to install the Gearman server and the German PECL extension. Once the server has been installed and started, create a number of processes (workers) by running:
$ php tests/worker.phpThen run the tests:
$ phpunit --group concurrencyThis test inserts a number of items (100 by default) into a queue, and then each worker tries to retrieve them back in parallel.
You may also change the number of items involved in the test by changing the PHIVE_CONCUR_QUEUE_SIZE value in your phpunit.xml file.
Phive Queue is released under the MIT License. See the bundled LICENSE file for details.
