Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
2e142c9
Forward compatibility with voryx/event-loop 3.0 and while supporting 2.0
WyriHaximus Apr 18, 2018
086d26e
Merge pull request #32 from WyriHaximus-labs/event-loop-3.0-and-2.0.2
davidwdan Apr 18, 2018
4d5e23f
Cleanup README
davidwdan Apr 18, 2018
cf166f1
Remove deprecated testcase implementation
samnela Oct 19, 2018
b743590
Fix broken DNS test. Add 7.2 to travis
mbonneau Nov 2, 2018
3e2441c
Add RecordNotFoundException back to test
mbonneau Nov 2, 2018
e0fdf3d
Merge pull request #34 from samnela/remove-deprecated-testcase
mbonneau Nov 2, 2018
3cb99de
Add PHP 7.3 to tests
mbonneau Nov 21, 2018
44722e8
Add LISTEN/NOTIFY support
mbonneau Nov 21, 2018
b8e9451
Typo in README
mbonneau Nov 21, 2018
0d143f9
Refactored `listen()`
davidwdan Nov 22, 2018
790d310
Update ListenNotify.php
davidwdan Nov 22, 2018
cabf704
Added LISTEN tests
mbonneau Nov 23, 2018
4afc0df
Merge pull request #36 from voryx/listen_notify
mbonneau Nov 23, 2018
87468ea
Added default application name
mbonneau Jul 14, 2019
91afbcb
Specify DNS dev dep version.
mbonneau Jul 14, 2019
6d65779
Update dns dep version
mbonneau Jul 14, 2019
9faf9c9
Forgot to include file in last commit
mbonneau Jul 14, 2019
eefc2ab
Cancellation improvements
mbonneau Jul 14, 2019
b840dc4
Always ensure we have a connection at index 0
WyriHaximus Aug 19, 2019
5422773
Use services to install postgres on travis instead of addons
mbonneau Aug 19, 2019
88d0cdb
Merge branch 'master' into always_ensure_we_have_a_connection_at_index_0
mbonneau Aug 19, 2019
61658fe
Merge pull request #41 from WyriHaximus-labs/always_ensure_we_have_a_…
mbonneau Aug 19, 2019
6e550f7
Keep current exception on CommandTrait error method
Aug 16, 2019
67c2060
Merge pull request #40 from Alban-io/fix/error-command-trait
mbonneau Dec 9, 2019
c508ea7
Testing with docker
mbonneau Dec 14, 2019
b3b7a8a
Don't use autogenerated container names for docker-compose
mbonneau Dec 14, 2019
7be3263
Upgrade to PHPUnit 8
mbonneau Dec 14, 2019
73e30ce
Remove unsupported PHP versions from travis (PHPUnit conflict)
mbonneau Dec 14, 2019
3cfe5ef
Add support back in for 7.0 and 7.1 because it was easier than I thought
mbonneau Dec 14, 2019
c701407
Merge pull request #42 from mbonneau/master
mbonneau Dec 14, 2019
aea082c
Update README.md
mbonneau Apr 8, 2020
2ec06d9
Fix testing. Stop testing 7.0 and 7.1
mbonneau Jun 19, 2020
17accd9
Remove optional argument on Sync before required argument. Fixes #45 …
mbonneau Oct 28, 2021
5eacea6
Thanks for the fish TravisCI, all welcome GitHub Actions
WyriHaximus Sep 2, 2022
78394c0
Add docker-compose.yml back in for local testing and correct test_db.…
mbonneau Nov 11, 2022
b4e7c1c
Add migrate-ci-to-github-actions branch to ci
mbonneau Nov 11, 2022
2ba63e4
Skip null test that is causing failures with new CI
mbonneau Nov 11, 2022
15c53c4
Update minimum phpunit version to get past dep error in CI
mbonneau Nov 11, 2022
9744d09
Bump phpunit 8 to >=8.5.23 to fix CI error
mbonneau Nov 11, 2022
696b766
Remove migration branch from ci,yml
mbonneau Nov 11, 2022
f8af9d0
PHP 8.2 compatibility
WyriHaximus Sep 1, 2022
7889cef
Support for SCRAM_SHA_256
zwirek Aug 1, 2023
11dcb5b
Merge pull request #49 from WyriHaximus-labs/php-8.2-compatibility
mbonneau Aug 15, 2023
574faa9
update postgres config for CI tests to handle scram-sha-256 auth alon…
zwirek Aug 21, 2023
ef19cc3
remove not supported in php7.0 void return type from ScramSha256
zwirek Aug 21, 2023
2ca0b98
Fix ClientTest mock creation to be compatible with phpunit 10
zwirek Aug 21, 2023
8c0ce4c
Merge pull request #54 from zwirek/feature/scram-sha-256-authorization
mbonneau Aug 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
name: Continuous Integration
on:
push:
branches:
- 'master'
- 'refs/heads/[0-9]+.[0-9]+.[0-9]+'
pull_request:
jobs:
supported-versions-matrix:
name: Supported Versions Matrix
runs-on: ubuntu-latest
outputs:
version: ${{ steps.supported-versions-matrix.outputs.version }}
steps:
- uses: actions/checkout@v1
- id: supported-versions-matrix
uses: WyriHaximus/github-action-composer-php-versions-in-range@v1
tests:
services:
postgres:
image: postgres:${{ matrix.postgres }}
env:
POSTGRES_PASSWORD: postgres
POSTGRES_INITDB_ARGS: --auth-host=md5
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
name: Testing on PHP ${{ matrix.php }} with ${{ matrix.composer }} dependency preference against Postgres ${{ matrix.postgres }}
strategy:
fail-fast: false
matrix:
php: ${{ fromJson(needs.supported-versions-matrix.outputs.version) }}
postgres: [12, 13, 14, 15]
composer: [lowest, locked, highest]
needs:
- supported-versions-matrix
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: |
PGPASSWORD=postgres psql -h localhost -U postgres -c "CREATE USER pgasync"
PGPASSWORD=postgres psql -h localhost -U postgres -c "ALTER ROLE pgasync PASSWORD 'pgasync'"
PGPASSWORD=postgres psql -h localhost -U postgres -c "CREATE USER pgasyncpw"
PGPASSWORD=postgres psql -h localhost -U postgres -c "ALTER ROLE pgasyncpw PASSWORD 'example_password'"
PGPASSWORD=postgres psql -h localhost -U postgres -c "CREATE USER scram_user"
PGPASSWORD=postgres psql -h localhost -U postgres -c "SET password_encryption='scram-sha-256';ALTER ROLE scram_user PASSWORD 'scram_password'"
PGPASSWORD=postgres psql -h localhost -U postgres -c "CREATE DATABASE pgasync_test OWNER pgasync"
PGPASSWORD=pgasync psql -h localhost -U pgasync -f tests/test_db.sql pgasync_test
# PGPASSWORD=postgres cat tests/test_db.sql | xargs -I % psql -h localhost -U postgres -c "%"
- uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php }}
coverage: xdebug
- uses: ramsey/composer-install@v2
with:
dependency-versions: ${{ matrix.composer }}
# - run: vendor/bin/phpunit --testdox
- run: vendor/bin/phpunit
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/composer.lock
/vendor
/docker/database
15 changes: 10 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ language: php
sudo: required

php:
- 7.0
- 7.1
- 7.2
- 7.3
- 7.4

addons:
postgresql: "9.3"
services:
- docker

install:
- composer install

before_script:
- docker-compose -f docker/docker-compose.yml up -d
- sh docker/waitForPostgres.sh

script:
- vendor/bin/phpunit
- vendor/bin/phpunit --testdox
43 changes: 37 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ $client = new PgAsync\Client([
"database" => "matt"
]);

$client->query('SELECT * FROM channel')->subscribe(new \Rx\Observer\CallbackObserver(
$client->query('SELECT * FROM channel')->subscribe(
function ($row) {
var_dump($row);
},
Expand All @@ -30,7 +30,7 @@ $client->query('SELECT * FROM channel')->subscribe(new \Rx\Observer\CallbackObse
function () {
echo "Complete.\n";
}
));
);


```
Expand All @@ -48,7 +48,7 @@ $client = new PgAsync\Client([
]);

$client->executeStatement('SELECT * FROM channel WHERE id = $1', ['5'])
->subscribe(new \Rx\Observer\CallbackObserver(
->subscribe(
function ($row) {
var_dump($row);
},
Expand All @@ -58,15 +58,32 @@ $client->executeStatement('SELECT * FROM channel WHERE id = $1', ['5'])
function () {
echo "Complete.\n";
}
));
);

```

## Example - LISTEN/NOTIFY
```php
$client = new PgAsync\Client([
"host" => "127.0.0.1",
"port" => "5432",
"user" => "matt",
"database" => "matt"
]);

$client->listen('some_channel')
->subscribe(function (\PgAsync\Message\NotificationResponse $message) {
echo $message->getChannelName() . ': ' . $message->getPayload() . "\n";
});

$client->query("NOTIFY some_channel, 'Hello World'")->subscribe();
```

## Install
With [composer](https://getcomposer.org/) install into you project with:

Install pgasync:
```composer require voryx/pgasync:dev-master```
```composer require voryx/pgasync```

## What it can do
- Run queries (CREATE, UPDATE, INSERT, SELECT, DELETE)
Expand All @@ -76,7 +93,7 @@ Install pgasync:
- Connection pooling (basic pooling)

## What it can't quite do yet
- Transactions
- Transactions (Actually though, just grab a connection and you can run your transaction on that single connection)

## What's next
- Add more testing
Expand Down Expand Up @@ -112,3 +129,17 @@ $insert
->concat($select)
->subscribe(...);
```

## Testing

We use docker to run a postgresql instance for testing. To run locally,
just install docker and run the following command from the project root:
```bash
docker-compose -f docker/docker-compose.yml up -d
```
If you need to reset the database, just stop the docker instance and delete
the `docker/database` directory. Restart the docker with the above command and it will
initialize the database again.

The tests do not change the ending structure of the database, so you should not
normally need to do this.
10 changes: 8 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,18 @@
}
},
"require": {
"voryx/event-loop": "^2.0.2",
"php": ">=7.0.0",
"voryx/event-loop": "^3.0 || ^2.0.2",
"reactivex/rxphp": "^2.0",
"react/socket": "^1.0 || ^0.8 || ^0.7",
"evenement/evenement": "^2.0 | ^3.0"
},
"require-dev": {
"phpunit/phpunit": "^5.7"
"phpunit/phpunit": ">=8.5.23 || ^6.5.5",
"react/dns": "^1.0"
},
"scripts": {
"docker-up": "cd docker && docker-compose up -d",
"docker-down": "cd docker && docker-compose down"
}
}
23 changes: 23 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
version: '3.7'

services:
pgasync-postgres:
container_name: pgasync-postgres
image: postgres:11
environment:
- PGDATA=/database
- POSTGRES_PASSWORD=some_password
- POSTGRES_INITDB_ARGS=--auth-host=md5
- TZ=America/New_York
volumes:
- .:/app
- ./database:/database
- ./../tests/test_db.sql:/app/test_db.sql
- ./docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
ports:
- "5432:5432"

configs:
pg_hba:
file: pg_hba_new.conf

19 changes: 19 additions & 0 deletions docker/docker-entrypoint-initdb.d/init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash
set -x

echo "Running as $USER in $PWD"

createuser -U postgres --createdb pgasync
createuser -U postgres --createdb pgasyncpw
createuser -U postgres --createdb scram_user
psql -U postgres -c "ALTER ROLE pgasyncpw PASSWORD 'example_password'"
psql -U postgres -c "SET password_encryption='scram-sha-256'; ALTER ROLE scram_user PASSWORD 'scram_password'"

cd /app
cp pg_hba_new.conf database/pg_hba.conf

createdb -U pgasync pgasync_test

psql -U pgasync -f test_db.sql pgasync_test


16 changes: 16 additions & 0 deletions docker/pg_hba_new.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# TYPE DATABASE USER ADDRESS METHOD

# "local" is for Unix domain socket connections only
local all all trust
# IPv4 local connections:
host all all 127.0.0.1/32 trust
# IPv6 local connections:
host all all ::1/128 trust
# Allow replication connections from localhost, by a user with the
# replication privilege.
local replication all trust
host replication all 127.0.0.1/32 trust
host replication all ::1/128 trust

host all pgasync all trust
host all all all md5
20 changes: 20 additions & 0 deletions example/ListenNotify.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

require_once __DIR__ . '/bootstrap.php';

$client = new PgAsync\Client([
'host' => '127.0.0.1',
'port' => '5432',
'user' => 'matt',
'database' => 'matt',
]);

$client->listen('some_channel')
->subscribe(function (\PgAsync\Message\NotificationResponse $message) {
echo $message->getChannelName() . ': ' . $message->getPayload() . "\n";
});

\Rx\Observable::timer(1000)
->flatMapTo($client->query("NOTIFY some_channel, 'Hello World'"))
->subscribe();

4 changes: 2 additions & 2 deletions phpunit.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8" ?>
<phpunit
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="./vendor/phpunit/phpunit/phpunit.xsd"
backupGlobals="false"
backupStaticAttributes="false"
colors="true"
Expand All @@ -8,7 +9,6 @@
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false"
syntaxCheck="false"
bootstrap="tests/bootstrap.php">
<testsuites>
<testsuite name="PgAsync Tests">
Expand Down
56 changes: 51 additions & 5 deletions src/PgAsync/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

namespace PgAsync;

use PgAsync\Message\NotificationResponse;
use React\EventLoop\LoopInterface;
use React\Socket\ConnectorInterface;
use Rx\Observable;
use Rx\Subject\Subject;

class Client
{
Expand All @@ -30,10 +32,16 @@ class Client
/** @var int */
private $maxConnections = 5;

/** @var Subject[] */
private $listeners = [];

/** @var Connection */
private $listenConnection;

public function __construct(array $parameters, LoopInterface $loop = null, ConnectorInterface $connector = null)
{
$this->loop = $loop ?: \EventLoop\getLoop();
$this->connector = $connector;
$this->loop = $loop ?: \EventLoop\getLoop();
$this->connector = $connector;

if (isset($parameters['auto_disconnect'])) {
$this->autoDisconnect = $parameters['auto_disconnect'];
Expand Down Expand Up @@ -71,7 +79,7 @@ public function executeStatement(string $queryString, array $parameters = [])
});
}

private function getLeastBusyConnection() : Connection
private function getLeastBusyConnection(): Connection
{
if (count($this->connections) === 0) {
// try to spin up another connection to return
Expand Down Expand Up @@ -133,9 +141,9 @@ private function createNewConnection()
$this->connections[] = $connection;

$connection->on('close', function () use ($connection) {
$this->connections = array_filter($this->connections, function ($c) use ($connection) {
$this->connections = array_values(array_filter($this->connections, function ($c) use ($connection) {
return $connection !== $c;
});
}));
});

return $connection;
Expand All @@ -158,4 +166,42 @@ public function closeNow()
$connection->disconnect();
}
}

public function listen(string $channel): Observable
{
if (isset($this->listeners[$channel])) {
return $this->listeners[$channel];
}

$unlisten = function () use ($channel) {
$this->listenConnection->query('UNLISTEN ' . $channel)->subscribe();

unset($this->listeners[$channel]);

if (empty($this->listeners)) {
$this->listenConnection->disconnect();
$this->listenConnection = null;
}
};

$this->listeners[$channel] = Observable::defer(function () use ($channel) {
if ($this->listenConnection === null) {
$this->listenConnection = $this->createNewConnection();
}

if ($this->listenConnection === null) {
throw new \Exception('Could not get new connection to listen on.');
}

return $this->listenConnection->query('LISTEN ' . $channel)
->merge($this->listenConnection->notifications())
->filter(function (NotificationResponse $message) use ($channel) {
return $message->getChannelName() === $channel;
});
})
->finally($unlisten)
->share();

return $this->listeners[$channel];
}
}
Loading