diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..f6a99f3 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,63 @@ +name: Continuous Integration +on: + push: + branches: + - 'master' + - 'refs/heads/[0-9]+.[0-9]+.[0-9]+' + pull_request: +jobs: + supported-versions-matrix: + name: Supported Versions Matrix + runs-on: ubuntu-latest + outputs: + version: ${{ steps.supported-versions-matrix.outputs.version }} + steps: + - uses: actions/checkout@v1 + - id: supported-versions-matrix + uses: WyriHaximus/github-action-composer-php-versions-in-range@v1 + tests: + services: + postgres: + image: postgres:${{ matrix.postgres }} + env: + POSTGRES_PASSWORD: postgres + POSTGRES_INITDB_ARGS: --auth-host=md5 + # Set health checks to wait until postgres has started + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + name: Testing on PHP ${{ matrix.php }} with ${{ matrix.composer }} dependency preference against Postgres ${{ matrix.postgres }} + strategy: + fail-fast: false + matrix: + php: ${{ fromJson(needs.supported-versions-matrix.outputs.version) }} + postgres: [12, 13, 14, 15] + composer: [lowest, locked, highest] + needs: + - supported-versions-matrix + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - run: | + PGPASSWORD=postgres psql -h localhost -U postgres -c "CREATE USER pgasync" + PGPASSWORD=postgres psql -h localhost -U postgres -c "ALTER ROLE pgasync PASSWORD 'pgasync'" + PGPASSWORD=postgres psql -h localhost -U postgres -c "CREATE USER pgasyncpw" + PGPASSWORD=postgres psql -h localhost -U postgres -c "ALTER ROLE pgasyncpw PASSWORD 'example_password'" + PGPASSWORD=postgres psql -h localhost -U postgres -c "CREATE USER scram_user" + PGPASSWORD=postgres psql -h localhost -U postgres -c "SET password_encryption='scram-sha-256';ALTER ROLE scram_user PASSWORD 'scram_password'" + PGPASSWORD=postgres psql -h localhost -U postgres -c "CREATE DATABASE pgasync_test OWNER pgasync" + PGPASSWORD=pgasync psql -h localhost -U pgasync -f tests/test_db.sql pgasync_test +# PGPASSWORD=postgres cat tests/test_db.sql | xargs -I % psql -h localhost -U postgres -c "%" + - uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + coverage: xdebug + - uses: ramsey/composer-install@v2 + with: + dependency-versions: ${{ matrix.composer }} +# - run: vendor/bin/phpunit --testdox + - run: vendor/bin/phpunit diff --git a/.gitignore b/.gitignore index ff72e2d..deca736 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /composer.lock /vendor +/docker/database diff --git a/.travis.yml b/.travis.yml index bccff6a..5925aa9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,14 +2,19 @@ language: php sudo: required php: - - 7.0 - - 7.1 + - 7.2 + - 7.3 + - 7.4 -addons: - postgresql: "9.3" +services: + - docker install: - composer install +before_script: + - docker-compose -f docker/docker-compose.yml up -d + - sh docker/waitForPostgres.sh + script: - - vendor/bin/phpunit + - vendor/bin/phpunit --testdox diff --git a/README.md b/README.md index 34bef36..5875d33 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ $client = new PgAsync\Client([ "database" => "matt" ]); -$client->query('SELECT * FROM channel')->subscribe(new \Rx\Observer\CallbackObserver( +$client->query('SELECT * FROM channel')->subscribe( function ($row) { var_dump($row); }, @@ -30,7 +30,7 @@ $client->query('SELECT * FROM channel')->subscribe(new \Rx\Observer\CallbackObse function () { echo "Complete.\n"; } -)); +); ``` @@ -48,7 +48,7 @@ $client = new PgAsync\Client([ ]); $client->executeStatement('SELECT * FROM channel WHERE id = $1', ['5']) - ->subscribe(new \Rx\Observer\CallbackObserver( + ->subscribe( function ($row) { var_dump($row); }, @@ -58,15 +58,32 @@ $client->executeStatement('SELECT * FROM channel WHERE id = $1', ['5']) function () { echo "Complete.\n"; } - )); + ); ``` +## Example - LISTEN/NOTIFY +```php +$client = new PgAsync\Client([ + "host" => "127.0.0.1", + "port" => "5432", + "user" => "matt", + "database" => "matt" +]); + +$client->listen('some_channel') + ->subscribe(function (\PgAsync\Message\NotificationResponse $message) { + echo $message->getChannelName() . ': ' . $message->getPayload() . "\n"; + }); + +$client->query("NOTIFY some_channel, 'Hello World'")->subscribe(); +``` + ## Install With [composer](https://getcomposer.org/) install into you project with: Install pgasync: -```composer require voryx/pgasync:dev-master``` +```composer require voryx/pgasync``` ## What it can do - Run queries (CREATE, UPDATE, INSERT, SELECT, DELETE) @@ -76,7 +93,7 @@ Install pgasync: - Connection pooling (basic pooling) ## What it can't quite do yet -- Transactions +- Transactions (Actually though, just grab a connection and you can run your transaction on that single connection) ## What's next - Add more testing @@ -112,3 +129,17 @@ $insert ->concat($select) ->subscribe(...); ``` + +## Testing + +We use docker to run a postgresql instance for testing. To run locally, +just install docker and run the following command from the project root: +```bash +docker-compose -f docker/docker-compose.yml up -d +``` +If you need to reset the database, just stop the docker instance and delete +the `docker/database` directory. Restart the docker with the above command and it will +initialize the database again. + +The tests do not change the ending structure of the database, so you should not +normally need to do this. diff --git a/composer.json b/composer.json index f3f740b..7c0172a 100644 --- a/composer.json +++ b/composer.json @@ -33,12 +33,18 @@ } }, "require": { - "voryx/event-loop": "^2.0.2", + "php": ">=7.0.0", + "voryx/event-loop": "^3.0 || ^2.0.2", "reactivex/rxphp": "^2.0", "react/socket": "^1.0 || ^0.8 || ^0.7", "evenement/evenement": "^2.0 | ^3.0" }, "require-dev": { - "phpunit/phpunit": "^5.7" + "phpunit/phpunit": ">=8.5.23 || ^6.5.5", + "react/dns": "^1.0" + }, + "scripts": { + "docker-up": "cd docker && docker-compose up -d", + "docker-down": "cd docker && docker-compose down" } } diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..3031c55 --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,23 @@ +version: '3.7' + +services: + pgasync-postgres: + container_name: pgasync-postgres + image: postgres:11 + environment: + - PGDATA=/database + - POSTGRES_PASSWORD=some_password + - POSTGRES_INITDB_ARGS=--auth-host=md5 + - TZ=America/New_York + volumes: + - .:/app + - ./database:/database + - ./../tests/test_db.sql:/app/test_db.sql + - ./docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d + ports: + - "5432:5432" + +configs: + pg_hba: + file: pg_hba_new.conf + diff --git a/docker/docker-entrypoint-initdb.d/init.sh b/docker/docker-entrypoint-initdb.d/init.sh new file mode 100644 index 0000000..af68572 --- /dev/null +++ b/docker/docker-entrypoint-initdb.d/init.sh @@ -0,0 +1,19 @@ +#!/bin/bash +set -x + +echo "Running as $USER in $PWD" + +createuser -U postgres --createdb pgasync +createuser -U postgres --createdb pgasyncpw +createuser -U postgres --createdb scram_user +psql -U postgres -c "ALTER ROLE pgasyncpw PASSWORD 'example_password'" +psql -U postgres -c "SET password_encryption='scram-sha-256'; ALTER ROLE scram_user PASSWORD 'scram_password'" + +cd /app +cp pg_hba_new.conf database/pg_hba.conf + +createdb -U pgasync pgasync_test + +psql -U pgasync -f test_db.sql pgasync_test + + diff --git a/docker/pg_hba_new.conf b/docker/pg_hba_new.conf new file mode 100644 index 0000000..2ee090f --- /dev/null +++ b/docker/pg_hba_new.conf @@ -0,0 +1,16 @@ +# TYPE DATABASE USER ADDRESS METHOD + +# "local" is for Unix domain socket connections only +local all all trust +# IPv4 local connections: +host all all 127.0.0.1/32 trust +# IPv6 local connections: +host all all ::1/128 trust +# Allow replication connections from localhost, by a user with the +# replication privilege. +local replication all trust +host replication all 127.0.0.1/32 trust +host replication all ::1/128 trust + +host all pgasync all trust +host all all all md5 diff --git a/example/ListenNotify.php b/example/ListenNotify.php new file mode 100644 index 0000000..ec6c3a2 --- /dev/null +++ b/example/ListenNotify.php @@ -0,0 +1,20 @@ + '127.0.0.1', + 'port' => '5432', + 'user' => 'matt', + 'database' => 'matt', +]); + +$client->listen('some_channel') + ->subscribe(function (\PgAsync\Message\NotificationResponse $message) { + echo $message->getChannelName() . ': ' . $message->getPayload() . "\n"; + }); + +\Rx\Observable::timer(1000) + ->flatMapTo($client->query("NOTIFY some_channel, 'Hello World'")) + ->subscribe(); + diff --git a/phpunit.xml b/phpunit.xml index 9059b40..b1ca19e 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -1,5 +1,6 @@ - diff --git a/src/PgAsync/Client.php b/src/PgAsync/Client.php index 639f511..698811b 100644 --- a/src/PgAsync/Client.php +++ b/src/PgAsync/Client.php @@ -2,9 +2,11 @@ namespace PgAsync; +use PgAsync\Message\NotificationResponse; use React\EventLoop\LoopInterface; use React\Socket\ConnectorInterface; use Rx\Observable; +use Rx\Subject\Subject; class Client { @@ -30,10 +32,16 @@ class Client /** @var int */ private $maxConnections = 5; + /** @var Subject[] */ + private $listeners = []; + + /** @var Connection */ + private $listenConnection; + public function __construct(array $parameters, LoopInterface $loop = null, ConnectorInterface $connector = null) { - $this->loop = $loop ?: \EventLoop\getLoop(); - $this->connector = $connector; + $this->loop = $loop ?: \EventLoop\getLoop(); + $this->connector = $connector; if (isset($parameters['auto_disconnect'])) { $this->autoDisconnect = $parameters['auto_disconnect']; @@ -71,7 +79,7 @@ public function executeStatement(string $queryString, array $parameters = []) }); } - private function getLeastBusyConnection() : Connection + private function getLeastBusyConnection(): Connection { if (count($this->connections) === 0) { // try to spin up another connection to return @@ -133,9 +141,9 @@ private function createNewConnection() $this->connections[] = $connection; $connection->on('close', function () use ($connection) { - $this->connections = array_filter($this->connections, function ($c) use ($connection) { + $this->connections = array_values(array_filter($this->connections, function ($c) use ($connection) { return $connection !== $c; - }); + })); }); return $connection; @@ -158,4 +166,42 @@ public function closeNow() $connection->disconnect(); } } + + public function listen(string $channel): Observable + { + if (isset($this->listeners[$channel])) { + return $this->listeners[$channel]; + } + + $unlisten = function () use ($channel) { + $this->listenConnection->query('UNLISTEN ' . $channel)->subscribe(); + + unset($this->listeners[$channel]); + + if (empty($this->listeners)) { + $this->listenConnection->disconnect(); + $this->listenConnection = null; + } + }; + + $this->listeners[$channel] = Observable::defer(function () use ($channel) { + if ($this->listenConnection === null) { + $this->listenConnection = $this->createNewConnection(); + } + + if ($this->listenConnection === null) { + throw new \Exception('Could not get new connection to listen on.'); + } + + return $this->listenConnection->query('LISTEN ' . $channel) + ->merge($this->listenConnection->notifications()) + ->filter(function (NotificationResponse $message) use ($channel) { + return $message->getChannelName() === $channel; + }); + }) + ->finally($unlisten) + ->share(); + + return $this->listeners[$channel]; + } } diff --git a/src/PgAsync/Command/CommandTrait.php b/src/PgAsync/Command/CommandTrait.php index 8a0d8a3..fd13120 100644 --- a/src/PgAsync/Command/CommandTrait.php +++ b/src/PgAsync/Command/CommandTrait.php @@ -20,11 +20,11 @@ public function complete() $this->observer->onCompleted(); } - public function error(\Throwable $exception = null) + public function error(\Throwable $exception) { $this->active = false; if (!$this->observer instanceof ObserverInterface) { - throw new \Exception('Observer not set on command.'); + throw $exception; } $this->observer->onError($exception); } diff --git a/src/PgAsync/Command/Describe.php b/src/PgAsync/Command/Describe.php index ed04ab3..d294e56 100644 --- a/src/PgAsync/Command/Describe.php +++ b/src/PgAsync/Command/Describe.php @@ -16,7 +16,6 @@ public function __construct(string $name = "") { $this->name = $name; $this->portalOrStatement = 'P'; - $this->subject = new Subject(); } public function encodedMessage(): string diff --git a/src/PgAsync/Command/Parse.php b/src/PgAsync/Command/Parse.php index 5d7de76..a1b2934 100644 --- a/src/PgAsync/Command/Parse.php +++ b/src/PgAsync/Command/Parse.php @@ -19,7 +19,6 @@ public function __construct(string $name, string $queryString) { $this->name = $name; $this->queryString = $queryString; - $this->subject = new Subject(); } // there is mechanisms to pre-describe types - we aren't getting into that diff --git a/src/PgAsync/Command/SaslInitialResponse.php b/src/PgAsync/Command/SaslInitialResponse.php new file mode 100644 index 0000000..83f538a --- /dev/null +++ b/src/PgAsync/Command/SaslInitialResponse.php @@ -0,0 +1,41 @@ +scramSha265 = $scramSha265; + } + + public function encodedMessage(): string + { + $mechanism = self::SCRAM_SHA_256 . "\0"; + $clientFirstMessage = $this->scramSha265->getClientFirstMessage(); + + $message = "p"; + $messageLength = strlen($mechanism) + strlen($clientFirstMessage) + 8; + $message .= pack("N", $messageLength) . $mechanism; + $message .= pack("N", strlen($clientFirstMessage)) . $clientFirstMessage; + + return $message; + } + + public function shouldWaitForComplete(): bool + { + return false; + } +} \ No newline at end of file diff --git a/src/PgAsync/Command/SaslResponse.php b/src/PgAsync/Command/SaslResponse.php new file mode 100644 index 0000000..b7532e6 --- /dev/null +++ b/src/PgAsync/Command/SaslResponse.php @@ -0,0 +1,39 @@ +scramSha265 = $scramSha265; + } + + public function encodedMessage(): string + { + $clientFinalMessage = $this->createClientFinalMessage(); + $messageLength = strlen($clientFinalMessage) + 4; + + return 'p' . pack('N', $messageLength) . $clientFinalMessage; + } + + public function shouldWaitForComplete(): bool + { + return false; + } + + private function createClientFinalMessage(): string + { + return $this->scramSha265->getClientFirstMessageWithoutProof() . ',p=' . base64_encode($this->scramSha265->getClientProof()); + } +} \ No newline at end of file diff --git a/src/PgAsync/Command/Sync.php b/src/PgAsync/Command/Sync.php index 8b596b8..ae2da4f 100644 --- a/src/PgAsync/Command/Sync.php +++ b/src/PgAsync/Command/Sync.php @@ -10,7 +10,7 @@ class Sync implements CommandInterface private $description; - public function __construct(string $description = "", ObserverInterface $observer) + public function __construct(string $description, ObserverInterface $observer) { $this->description = $description; $this->observer = $observer; diff --git a/src/PgAsync/Connection.php b/src/PgAsync/Connection.php index 5a25f3a..2a06107 100644 --- a/src/PgAsync/Connection.php +++ b/src/PgAsync/Connection.php @@ -10,6 +10,8 @@ use PgAsync\Command\Execute; use PgAsync\Command\Parse; use PgAsync\Command\PasswordMessage; +use PgAsync\Command\SaslInitialResponse; +use PgAsync\Command\SaslResponse; use PgAsync\Command\Sync; use PgAsync\Command\Terminate; use PgAsync\Message\Authentication; @@ -23,6 +25,7 @@ use PgAsync\Message\ErrorResponse; use PgAsync\Message\Message; use PgAsync\Message\NoticeResponse; +use PgAsync\Message\NotificationResponse; use PgAsync\Message\ParameterStatus; use PgAsync\Message\ParseComplete; use PgAsync\Command\Query; @@ -39,6 +42,7 @@ use Rx\Observable\AnonymousObservable; use Rx\ObserverInterface; use Rx\SchedulerInterface; +use Rx\Subject\Subject; class Connection extends EventEmitter { @@ -108,6 +112,18 @@ class Connection extends EventEmitter /** @var string */ private $uri; + /** @var Subject */ + private $notificationSubject; + + /** @var bool */ + private $cancelPending; + + /** @var bool */ + private $cancelRequested; + + /** @var ScramSha256 */ + private $scramSha256; + /** * Can be 'I' for Idle, 'T' if in transactions block * or 'E' if in failed transaction block (queries will fail until end of trans) @@ -147,14 +163,23 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt unset($parameters['auto_disconnect']); } - $this->parameters = $parameters; - $this->loop = $loop; - $this->commandQueue = []; - $this->queryState = static::STATE_BUSY; - $this->queryType = static::QUERY_SIMPLE; - $this->connStatus = static::CONNECTION_NEEDED; - $this->socket = $connector ?: new Connector($loop); - $this->uri = 'tcp://' . $this->parameters['host'] . ':' . $this->parameters['port']; + if (!isset($parameters['application_name'])) { + $parameters['application_name'] = 'pgasync'; + } + + $this->loop = $loop; + $this->commandQueue = []; + $this->queryState = static::STATE_BUSY; + $this->queryType = static::QUERY_SIMPLE; + $this->connStatus = static::CONNECTION_NEEDED; + $this->socket = $connector ?: new Connector($loop); + $this->uri = 'tcp://' . $parameters['host'] . ':' . $parameters['port']; + $this->notificationSubject = new Subject(); + $this->cancelPending = false; + $this->cancelRequested = false; + + $this->parameters = $parameters; + $this->scramSha256 = new ScramSha256($parameters['user'], $this->password ?: ''); } private function start() @@ -216,6 +241,12 @@ public function onData($data) while (strlen($data) > 0) { $data = $this->processData($data); } + + // We should only cancel if we have drained the input buffer (as much as we can see) + // and there is still a pending query that needs to be canceled + if ($this->cancelRequested) { + $this->cancelRequest(); + } } private function processData($data) @@ -243,7 +274,9 @@ private function processData($data) $type = $data[0]; - $message = Message::createMessageFromIdentifier($type); + $message = Message::createMessageFromIdentifier($type, [ + 'SCRAM_SHA_256' => $this->scramSha256 + ]); if ($message !== false) { $this->currentMessage = $message; return $data; @@ -297,9 +330,16 @@ public function handleMessage($message) $this->handleReadyForQuery($message); } elseif ($message instanceof RowDescription) { $this->handleRowDescription($message); + } elseif ($message instanceof NotificationResponse) { + $this->handleNotificationResponse($message); } } + private function handleNotificationResponse(NotificationResponse $message) + { + $this->notificationSubject->onNext($message); + } + private function handleDataRow(DataRow $dataRow) { if ($this->queryState === $this::STATE_BUSY && $this->currentCommand instanceof CommandInterface) { @@ -356,6 +396,28 @@ private function handleAuthentication(Authentication $message) return; } + if ($message->getAuthCode() === $message::AUTH_SCRAM) { + $saslInitialResponse = new SaslInitialResponse($this->scramSha256); + $this->stream->write($saslInitialResponse->encodedMessage()); + + return; + } + + if ($message->getAuthCode() === $message::AUTH_SCRAM_CONTINUE) { + $saslResponse = new SaslResponse($this->scramSha256); + $this->stream->write($saslResponse->encodedMessage()); + + return; + } + + if ($message->getAuthCode() === $message::AUTH_SCRAM_FIN) { + if ($this->scramSha256->verify()) { + return; + } + + $this->lastError = 'Invalid server signature sent by server on SCRAM FIN stage'; + } + $this->connStatus = $this::CONNECTION_BAD; $this->failAllCommandsWith(new \Exception($this->lastError)); $this->emit('error', [new \Exception($this->lastError)]); @@ -373,6 +435,11 @@ private function handleCommandComplete(CommandComplete $message) $command = $this->currentCommand; $this->currentCommand = null; $command->complete(); + + // if we have requested a cancel for this query + // but we have received the command complete before we + // had a chance to start canceling - then never mind + $this->cancelRequested = false; } $this->debug('Command complete.'); } @@ -448,6 +515,8 @@ private function failAllCommandsWith(\Throwable $e = null) { $e = $e ?: new \Exception('unknown error'); + $this->notificationSubject->onError($e); + while (count($this->commandQueue) > 0) { $c = array_shift($this->commandQueue); if ($c instanceof CommandInterface) { @@ -458,6 +527,11 @@ private function failAllCommandsWith(\Throwable $e = null) public function processQueue() { + if ($this->cancelPending) { + $this->debug("Not processing queue because there is a cancellation pending."); + return; + } + if (count($this->commandQueue) === 0 && $this->queryState === static::STATE_READY && $this->auto_disconnect) { $this->commandQueue[] = new Terminate(); } @@ -523,7 +597,7 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use return new CallbackDisposable(function () use ($q) { if ($this->currentCommand === $q && $q->isActive()) { - $this->cancelRequest(); + $this->cancelRequested = true; } $q->cancel(); }); @@ -568,31 +642,41 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use $name = 'somestatement'; + /** @var CommandInterface[] $commandGroup */ + $commandGroup = []; $close = new Close($name); - $this->commandQueue[] = $close; + $commandGroup[] = $close; $prepare = new Parse($name, $queryString); - $this->commandQueue[] = $prepare; + $commandGroup[] = $prepare; $bind = new Bind($parameters, $name); - $this->commandQueue[] = $bind; + $commandGroup[] = $bind; $describe = new Describe(); - $this->commandQueue[] = $describe; + $commandGroup[] = $describe; $execute = new Execute(); - $this->commandQueue[] = $execute; + $commandGroup[] = $execute; $sync = new Sync($queryString, $observer); - $this->commandQueue[] = $sync; + $commandGroup[] = $sync; + + $this->commandQueue = array_merge($this->commandQueue, $commandGroup); $this->processQueue(); - return new CallbackDisposable(function () use ($sync) { + return new CallbackDisposable(function () use ($sync, $commandGroup) { if ($this->currentCommand === $sync && $sync->isActive()) { - $this->cancelRequest(); + $this->cancelRequested = true; + $sync->cancel(); + + // no point in canceling the other commands because they are out the door + return; + } + foreach ($commandGroup as $command) { + $command->cancel(); } - $sync->cancel(); }); } ); @@ -627,13 +711,29 @@ public function disconnect() private function cancelRequest() { + $this->cancelRequested = false; + if ($this->queryState !== self::STATE_BUSY) { + $this->debug("Not canceling because there is nothing to cancel."); + return; + } if ($this->currentCommand !== null) { + $this->cancelPending = true; $this->socket->connect($this->uri)->then(function (DuplexStreamInterface $conn) { $cancelRequest = new CancelRequest($this->backendKeyData->getPid(), $this->backendKeyData->getKey()); + $conn->on('close', function () { + $this->cancelPending = false; + $this->processQueue(); + }); $conn->end($cancelRequest->encodedMessage()); }, function (\Throwable $e) { + $this->cancelPending = false; + $this->processQueue(); $this->debug("Error connecting for cancellation... " . $e->getMessage() . "\n"); }); } } + + public function notifications() { + return $this->notificationSubject->asObservable(); + } } diff --git a/src/PgAsync/Message/Authentication.php b/src/PgAsync/Message/Authentication.php index f3f06aa..9dbb932 100644 --- a/src/PgAsync/Message/Authentication.php +++ b/src/PgAsync/Message/Authentication.php @@ -2,6 +2,8 @@ namespace PgAsync\Message; +use PgAsync\ScramSha256; + class Authentication extends Message { const AUTH_OK = 0; // AuthenticationOk @@ -12,11 +14,26 @@ class Authentication extends Message const AUTH_GSS = 7; // AuthenticationGSS const AUTH_GSS_CONTINUE = 8; // AuthenticationGSSContinue const AUTH_SSPI = 9; // AuthenticationSSPI + const AUTH_SCRAM = 10; // AuthenticationSASL + const AUTH_SCRAM_CONTINUE = 11; // AuthenticationSASLContinue + const AUTH_SCRAM_FIN = 12; // AuthenticationSASLFinal private $authCode; private $salt; + /** @var ScramSha256 */ + private $scramSha256; + + private $iteration; + + private $nonce; + + public function __construct(ScramSha256 $scramSha265) + { + $this->scramSha256 = $scramSha265; + } + /** * @inheritDoc * @throws \InvalidArgumentException @@ -47,6 +64,23 @@ public function parseMessage(string $rawMessage) break; // AuthenticationGSSContinue case $this::AUTH_SSPI: break; // AuthenticationSSPI + case $this::AUTH_SCRAM: + $this->scramSha256->beginFirstClientMessageStage(); + break; + case $this::AUTH_SCRAM_CONTINUE: + $content = $this->getContent($rawMessage); + $parts = explode(',', $content); + $this->scramSha256->beginFinalClientMessageStage( + substr($parts[0], 2), + substr($parts[1], 2), + (int) substr($parts[2], 2) + ); + + break; + case $this::AUTH_SCRAM_FIN: + $content = $this->getContent($rawMessage); + $this->scramSha256->beginVerificationStage(substr($content, 2)); + break; } $this->authCode = $authCode; @@ -70,4 +104,10 @@ public function getSalt(): string return $this->salt; } + + private function getContent(string $rawMessage): string + { + $messageLength = unpack('N', substr($rawMessage, 1, 4))[1]; + return substr($rawMessage, 9, $messageLength - 8); + } } diff --git a/src/PgAsync/Message/Message.php b/src/PgAsync/Message/Message.php index 5550da8..e5eaf5b 100644 --- a/src/PgAsync/Message/Message.php +++ b/src/PgAsync/Message/Message.php @@ -23,11 +23,11 @@ public static function prependLengthInt32(string $s): string return Message::int32($len + 4) . $s; } - public static function createMessageFromIdentifier(string $identifier): ParserInterface + public static function createMessageFromIdentifier(string $identifier, array $dependencies = []): ParserInterface { switch ($identifier) { case 'R': - return new Authentication(); + return new Authentication($dependencies['SCRAM_SHA_256']); case 'K': return new BackendKeyData(); case 'C': @@ -52,6 +52,8 @@ public static function createMessageFromIdentifier(string $identifier): ParserIn return new ReadyForQuery(); case 'T': return new RowDescription(); + case NotificationResponse::getMessageIdentifier(): + return new NotificationResponse(); } return new Discard(); diff --git a/src/PgAsync/Message/NotificationResponse.php b/src/PgAsync/Message/NotificationResponse.php new file mode 100644 index 0000000..4a27e0a --- /dev/null +++ b/src/PgAsync/Message/NotificationResponse.php @@ -0,0 +1,81 @@ +notifyingProcessId = unpack('N', substr($rawMessage, $currentPos, 4))[1]; + $currentPos += 4; + + $rawPayload = substr($rawMessage, $currentPos); + $parts = explode("\0", $rawPayload); + + if (count($parts) !== 3) { + throw new \UnderflowException('Wrong number of notification parts in payload'); + } + + $this->channelName = $parts[0]; + $this->payload = $parts[1]; + } + + /** + * @return string + */ + public function getPayload(): string + { + return $this->payload; + } + + /** + * @return int + */ + public function getNotifyingProcessId(): int + { + return $this->notifyingProcessId; + } + + /** + * @return string + */ + public function getChannelName(): string + { + return $this->channelName; + } + + /** + * @inheritDoc + */ + public static function getMessageIdentifier(): string + { + return 'A'; + } + + public function getNoticeMessages(): array + { + return $this->noticeMessages; + } +} diff --git a/src/PgAsync/ScramSha256.php b/src/PgAsync/ScramSha256.php new file mode 100644 index 0000000..1895dfb --- /dev/null +++ b/src/PgAsync/ScramSha256.php @@ -0,0 +1,208 @@ +user = $user; + $this->password = $password; + } + + public function beginFirstClientMessageStage() + { + $length = strlen(self::CHARACTERS); + + for ($i = 0; $i < self::CLIENT_NONCE_LENGTH; $i++) { + $this->clientNonce .= substr(self::CHARACTERS, random_int(0, $length), 1); + } + + $this->currentStage = self::STAGE_FIRST_MESSAGE; + } + + public function beginFinalClientMessageStage(string $nonce, string $salt, int $iteration) + { + $this->nonce = $nonce; + $this->salt = $salt; + $this->iteration = $iteration; + + $this->currentStage = self::STAGE_FINAL_MESSAGE; + } + + public function beginVerificationStage(string $verification) + { + $this->verification = $verification; + + $this->currentStage = self::STAGE_VERIFICATION; + } + + public function verify(): bool + { + $this->checkStage(self::STAGE_VERIFICATION); + + $serverKey = hash_hmac("sha256", "Server Key", $this->getSaltedPassword(), true); + $serverSignature = hash_hmac('sha256', $this->getAuthMessage(), $serverKey, true); + + return $serverSignature === base64_decode($this->verification); + } + + public function getClientFirstMessageWithoutProof(): string + { + if (null === $this->clientFirstMessageWithoutProof) { + $this->clientFirstMessageWithoutProof = sprintf( + 'c=%s,r=%s', + base64_encode('n,,'), + $this->nonce + ); + } + + return $this->clientFirstMessageWithoutProof; + } + + public function getSaltedPassword(): string + { + $this->checkStage(self::STAGE_FINAL_MESSAGE); + + if (null === $this->saltedPassword) { + $this->saltedPassword = hash_pbkdf2( + "sha256", + $this->password, + base64_decode($this->salt), + $this->iteration, + 32, + true + ); + } + + return $this->saltedPassword; + } + + public function getClientKey(): string + { + $this->checkStage(self::STAGE_FINAL_MESSAGE); + + if (null === $this->clientKey) { + $this->clientKey = hash_hmac("sha256", "Client Key", $this->getSaltedPassword(), true); + } + + return $this->clientKey; + } + + public function getStoredKey(): string + { + $this->checkStage(self::STAGE_FINAL_MESSAGE); + + if (null === $this->storedKey) { + $this->storedKey = hash("sha256", $this->getClientKey(), true); + } + + return $this->storedKey; + } + + public function getClientFirstMessageBare(): string + { + $this->checkStage(self::STAGE_FIRST_MESSAGE); + + return sprintf( + 'n=%s,r=%s', + $this->user, + $this->clientNonce + ); + } + + public function getClientFirstMessage(): string + { + $this->checkStage(self::STAGE_FIRST_MESSAGE); + + return sprintf('n,,%s', $this->getClientFirstMessageBare()); + } + + public function getAuthMessage(): string + { + $this->checkStage(self::STAGE_FINAL_MESSAGE); + + if (null === $this->authMessage) { + $clientFirstMessageBare = $this->getClientFirstMessageBare(); + $serverFirstMessage = sprintf( + 'r=%s,s=%s,i=%s', + $this->nonce, + $this->salt, + $this->iteration + ); + + $this->authMessage = implode(',', [ + $clientFirstMessageBare, + $serverFirstMessage, + $this->getClientFirstMessageWithoutProof() + ]); + } + + return $this->authMessage; + } + + public function getClientProof(): string + { + $this->checkStage(self::STAGE_FINAL_MESSAGE); + + $clientKey = $this->getClientKey(); + $storedKey = $this->getStoredKey(); + $authMessage = $this->getAuthMessage(); + $clientSignature = hash_hmac("sha256", $authMessage, $storedKey, true); + + return $clientKey ^ $clientSignature; + } + + private function checkStage(int $stage) + { + if ($this->currentStage < $stage) { + throw new \LogicException('Invalid Stage of SCRAM authorization'); + } + } +} \ No newline at end of file diff --git a/tests/Integration/BoolTest.php b/tests/Integration/BoolTest.php index a855012..c04302f 100644 --- a/tests/Integration/BoolTest.php +++ b/tests/Integration/BoolTest.php @@ -10,7 +10,7 @@ class BoolTest extends TestCase public function testBools() { - $client = new Client(["user" => $this::getDbUser(), "database" => $this::getDbName()]); + $client = new Client(["user" => $this::getDbUser(), "password" => $this::getDbUser(), "database" => $this::getDbName()]); $count = $client->query("SELECT * FROM thing"); @@ -57,7 +57,7 @@ function () use (&$completes, $client) { */ public function testBoolParam() { - $client = new Client(["user" => $this::getDbUser(), "database" => $this::getDbName()]); + $client = new Client(["user" => $this::getDbUser(), "password" => $this::getDbUser(), "database" => $this::getDbName()]); $args = [false, 1]; diff --git a/tests/Integration/ClientTest.php b/tests/Integration/ClientTest.php index 4f99b17..5a192c4 100644 --- a/tests/Integration/ClientTest.php +++ b/tests/Integration/ClientTest.php @@ -3,7 +3,7 @@ namespace PgAsync\Tests\Integration; use PgAsync\Client; -use React\EventLoop\Timer\Timer; +use PgAsync\Message\NotificationResponse; use Rx\Observable; use Rx\Observer\CallbackObserver; @@ -11,7 +11,7 @@ class ClientTest extends TestCase { public function testClientReusesIdleConnection() { - $client = new Client(["user" => $this->getDbUser(), "database" => $this::getDbName()], $this->getLoop()); + $client = new Client(["user" => $this->getDbUser(), "password" => $this::getDbUser(), "database" => $this::getDbName()], $this->getLoop()); $hello = null; @@ -81,6 +81,7 @@ public function testAutoDisconnect() { $client = new Client([ "user" => $this->getDbUser(), + "password" => $this::getDbUser(), "database" => $this::getDbName(), "auto_disconnect" => true ], $this->getLoop()); @@ -115,6 +116,7 @@ public function testSendingTwoQueriesRepeatedlyOnlyCreatesTwoConnections() { $client = new Client([ "user" => $this->getDbUser(), + "password" => $this::getDbUser(), "database" => $this::getDbName(), ], $this->getLoop()); @@ -156,6 +158,7 @@ public function testMaxConnections() { $client = new Client([ "user" => $this->getDbUser(), + "password" => $this::getDbUser(), "database" => $this::getDbName(), "max_connections" => 3 ], $this->getLoop()); @@ -191,4 +194,47 @@ function () { $client->closeNow(); $this->getLoop()->run(); } + + public function testListen() + { + $client = new Client([ + "user" => $this->getDbUser(), + "password" => $this::getDbUser(), + "database" => $this::getDbName(), + ], $this->getLoop()); + + $testQuery = $client->listen('some_channel') + ->merge($client->listen('some_channel')->take(1)) + ->take(3) + ->concat($client->listen('some_channel')->take(1)); + + $values = []; + + $testQuery->subscribe( + function (NotificationResponse $results) use (&$values) { + $values[] = $results->getPayload(); + }, + function (\Throwable $e) use (&$error) { + $this->fail('Error while testing: ' . $e->getMessage()); + $this->stopLoop(); + }, + function () { + $this->stopLoop(); + } + ); + + Observable::interval(300) + ->take(3) + ->flatMap(function ($x) use ($client) { + return $client->executeStatement("NOTIFY some_channel, 'Hello" . $x . "'"); + }) + ->subscribe(); + + $this->runLoopWithTimeout(4); + + $this->assertEquals(['Hello0', 'Hello0', 'Hello1', 'Hello2'], $values); + + $client->closeNow(); + $this->getLoop()->run(); + } } \ No newline at end of file diff --git a/tests/Integration/ConnectionTest.php b/tests/Integration/ConnectionTest.php index 27a827d..20ebf51 100644 --- a/tests/Integration/ConnectionTest.php +++ b/tests/Integration/ConnectionTest.php @@ -14,6 +14,7 @@ public function testConnectionDisconnectAfterSuccessfulQuery() { $conn = new Connection([ "user" => $this->getDbUser(), + "password" => $this::getDbUser(), "database" => $this::getDbName(), "auto_disconnect" => true ], $this->getLoop()); @@ -44,6 +45,7 @@ public function testConnectionDisconnectAfterSuccessfulStatement() { $conn = new Connection([ "user" => $this->getDbUser(), + "password" => $this::getDbUser(), "database" => $this::getDbName(), "auto_disconnect" => true ], $this->getLoop()); @@ -74,6 +76,7 @@ public function testConnectionDisconnectAfterFailedQuery() { $conn = new Connection([ "user" => $this->getDbUser(), + "password" => $this::getDbUser(), "database" => $this::getDbName(), "auto_disconnect" => true ], $this->getLoop()); @@ -106,6 +109,7 @@ public function testInvalidHostName() $conn = new Connection([ "host" => 'host.invalid', "user" => $this->getDbUser(), + "password" => $this::getDbUser(), "database" => $this::getDbName(), "auto_disconnect" => true ], $this->getLoop()); @@ -129,13 +133,19 @@ function () { $this->runLoopWithTimeout(2); - $this->assertInstanceOf(RecordNotFoundException::class, $error); + // At some point, DNS was returning RecordNotFoundException + // as long as we are getting an Exception here, we should be good + $this->assertInstanceOf(\Exception::class, $error); + + // looks like this behavior changed with newer versions of react libs + // $this->assertInstanceOf(RecordNotFoundException::class, $error->getPrevious()); } public function testSendingTwoQueriesWithoutWaitingNoAutoDisconnect() { $conn = new Connection([ "user" => $this->getDbUser(), + "password" => $this::getDbUser(), "database" => $this::getDbName() ], $this->getLoop()); @@ -170,6 +180,7 @@ public function testSendingTwoQueriesWithoutWaitingAutoDisconnect() { $conn = new Connection([ "user" => $this->getDbUser(), + "password" => $this::getDbUser(), "database" => $this::getDbName(), "auto_disconnect" => true ], $this->getLoop()); @@ -202,37 +213,52 @@ function () { public function testCancellationUsingDispose() { + $this->markTestSkipped('We have disabled cancellation for the time being.'); $conn = new Connection([ "user" => $this->getDbUser(), - "database" => $this::getDbName() + "password" => $this::getDbUser(), + "database" => $this::getDbName(), + "auto_disconnect" => true ], $this->getLoop()); - $testQuery = $conn->query("SELECT pg_sleep(10)")->mapTo(1); + $disposed = false; - $testQuery->takeUntil(Observable::timer(500))->subscribe( + $testQuery = $conn->query("SELECT pg_sleep(10)") + ->mapTo(1) + ->finally(function () use (&$disposed) { + $disposed = true; + }); + + //$disposable = $testQuery->takeUntil(Observable::timer(500))->subscribe( + $disposable = $testQuery->subscribe( function ($results) { - $this->fail('Expected no value'); $this->stopLoop(); + $this->fail('Expected no value'); }, function (\Throwable $e) { - $this->fail('Expected no error'); $this->stopLoop(); + $this->fail('Expected no error'); }, function () { $this->stopLoop(); + $this->fail('Expected no completion'); } ); + $this->getLoop()->addTimer(500, function () use ($disposable) { + $disposable->dispose(); + }); + $this->runLoopWithTimeout(2); - $conn->disconnect(); - $this->getLoop()->run(); + $this->assertTrue($disposed); } public function testCancellationUsingInternalFunctions() { $conn = new Connection([ "user" => $this->getDbUser(), + "password" => $this::getDbUser(), "database" => $this::getDbName() ], $this->getLoop()); @@ -276,6 +302,7 @@ public function testCancellationOfNonActiveQuery() { $conn = new Connection([ "user" => $this->getDbUser(), + "password" => $this::getDbUser(), "database" => $this::getDbName() ], $this->getLoop()); @@ -310,4 +337,73 @@ function () { $conn->disconnect(); $this->getLoop()->run(); } + + public function testCancellationWithImmediateQueryQueuedUp() { + $conn = new Connection([ + "user" => $this->getDbUser(), + "password" => $this::getDbUser(), + "database" => $this::getDbName() + ], $this->getLoop()); + + $q1 = $conn->query("SELECT * FROM generate_series(1,4)"); + $q2 = $conn->query("SELECT pg_sleep(10)"); + + $testQuery = $q1->merge($q2)->take(1); + + $value = null; + + $testQuery->subscribe( + function ($results) use (&$value) { + $value = $results; + $this->stopLoop(); + }, + function (\Throwable $e) { + $this->fail('Expected no error' . $e->getMessage()); + $this->stopLoop(); + }, + function () { + $this->stopLoop(); + } + ); + + $this->runLoopWithTimeout(15); + + $this->assertEquals(['generate_series' => '1'], $value); + + $conn->disconnect(); + $this->getLoop()->run(); + } + + public function testArrayInParameters() { + $conn = new Connection([ + "user" => $this->getDbUser(), + "password" => $this::getDbUser(), + "database" => $this::getDbName() + ], $this->getLoop()); + + $testQuery = $conn->executeStatement("SELECT * FROM generate_series(1,4) WHERE generate_series = ANY($1)", ['{2, 3}']); + + $value = []; + + $testQuery->subscribe( + function ($results) use (&$value) { + $value[] = $results; + $this->stopLoop(); + }, + function (\Throwable $e) { + $this->fail('Expected no error' . $e->getMessage()); + $this->stopLoop(); + }, + function () { + $this->stopLoop(); + } + ); + + $this->runLoopWithTimeout(15); + + $this->assertEquals([['generate_series' => 2], ['generate_series' => 3]], $value); + + $conn->disconnect(); + $this->getLoop()->run(); + } } \ No newline at end of file diff --git a/tests/Integration/Md5PasswordTest.php b/tests/Integration/Md5PasswordTest.php new file mode 100644 index 0000000..5ca908f --- /dev/null +++ b/tests/Integration/Md5PasswordTest.php @@ -0,0 +1,41 @@ + "pgasyncpw", + "database" => $this->getDbName(), + "auto_disconnect" => true, + "password" => "example_password" + ], $this->getLoop()); + + $hello = null; + + $client->query("SELECT 'Hello' AS hello") + ->subscribe(new CallbackObserver( + function ($x) use (&$hello) { + $this->assertNull($hello); + $hello = $x['hello']; + }, + function ($e) { + $this->fail('Unexpected error'); + }, + function () { + $this->getLoop()->addTimer(0.1, function () { + $this->stopLoop(); + }); + } + )); + + $this->runLoopWithTimeout(2); + + $this->assertEquals('Hello', $hello); + } +} \ No newline at end of file diff --git a/tests/Integration/NullPasswordTest.php b/tests/Integration/NullPasswordTest.php index 0a19b50..7c4af1e 100644 --- a/tests/Integration/NullPasswordTest.php +++ b/tests/Integration/NullPasswordTest.php @@ -9,6 +9,8 @@ class NullPasswordTest extends TestCase { public function testNullPassword() { + $this->markTestSkipped('Not using null password anymore. Maybe should setup tests to twst this again.'); + $client = new Client([ "user" => $this::getDbUser(), "database" => $this::getDbName(), diff --git a/tests/Integration/ScramSha256PasswordTest.php b/tests/Integration/ScramSha256PasswordTest.php new file mode 100644 index 0000000..14f11f1 --- /dev/null +++ b/tests/Integration/ScramSha256PasswordTest.php @@ -0,0 +1,42 @@ + 'scram_user', + "database" => $this->getDbName(), + "auto_disconnect" => true, + "password" => "scram_password" + ], $this->getLoop()); + + $hello = null; + + $client->query("SELECT 'Hello' AS hello") + ->subscribe(new CallbackObserver( + function ($x) use (&$hello) { + $this->assertNull($hello); + $hello = $x['hello']; + }, + function ($e) { + $this->fail('Unexpected error ' . $e); + }, + function () { + $this->getLoop()->addTimer(0.1, function () { + $this->stopLoop(); + }); + } + )); + + $this->runLoopWithTimeout(2); + + $this->assertEquals('Hello', $hello); + } +} \ No newline at end of file diff --git a/tests/Integration/SimpleQueryTest.php b/tests/Integration/SimpleQueryTest.php index ae788b8..3d54c90 100644 --- a/tests/Integration/SimpleQueryTest.php +++ b/tests/Integration/SimpleQueryTest.php @@ -9,7 +9,7 @@ class SimpleQueryTest extends TestCase { public function testSimpleQuery() { - $client = new Client(["user" => $this::getDbUser(), "database" => $this::getDbName()]); + $client = new Client(["user" => $this::getDbUser(), "password" => $this::getDbUser(), "database" => $this::getDbName()]); $count = $client->query("SELECT count(*) AS the_count FROM thing"); @@ -38,7 +38,7 @@ function () use ($client) { public function testSimpleQueryNoResult() { - $client = new Client(["user" => $this->getDbUser(), "database" => $this->getDbName()], $this->getLoop()); + $client = new Client(["user" => $this->getDbUser(), "password" => $this::getDbUser(), "database" => $this->getDbName()], $this->getLoop()); $count = $client->query("SELECT count(*) AS the_count FROM thing WHERE thing_type = 'non-thing'"); @@ -67,7 +67,7 @@ function () use ($client) { public function testSimpleQueryError() { - $client = new Client(["user" => $this->getDbUser(), "database" => $this::getDbName()], $this->getLoop()); + $client = new Client(["user" => $this->getDbUser(), "password" => $this::getDbUser(), "database" => $this::getDbName()], $this->getLoop()); $count = $client->query("SELECT count(*) abcdef AS the_count FROM thing WHERE thing_type = 'non-thing'"); diff --git a/tests/TestCase.php b/tests/TestCase.php index 7dc3f16..6ab29c0 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -3,11 +3,11 @@ namespace PgAsync\Tests; use EventLoop\EventLoop; -use React\EventLoop\Factory; use React\EventLoop\LoopInterface; use React\EventLoop\Timer\Timer; +use PHPUnit\Framework\TestCase as BaseTestCase; -class TestCase extends \PHPUnit_Framework_TestCase +class TestCase extends BaseTestCase { const DBNAME = 'pgasync_test'; @@ -17,7 +17,7 @@ class TestCase extends \PHPUnit_Framework_TestCase /** @var Timer */ public static $timeoutTimer; - public static $dbUser = ""; + public static $dbUser = 'pgasync'; public static function getLoop() { @@ -38,7 +38,7 @@ public static function stopLoop() public static function cancelCurrentTimeoutTimer() { if (static::$timeoutTimer !== null) { - static::$timeoutTimer->cancel(); + static::getLoop()->cancelTimer(static::$timeoutTimer); static::$timeoutTimer = null; } } diff --git a/tests/Unit/ClientTest.php b/tests/Unit/ClientTest.php index 1938d72..606b695 100644 --- a/tests/Unit/ClientTest.php +++ b/tests/Unit/ClientTest.php @@ -12,18 +12,15 @@ class ClientTest extends TestCase { public function testFailedDNSLookup() { - $executor = $this->getMockBuilder(ExecutorInterface::class) - ->setMethods(['query']) - ->getMock(); + $executor = $this->createMock(ExecutorInterface::class); $deferred = new Deferred(); $executor - ->expects($this->once()) ->method('query') ->willReturn($deferred->promise()); - $resolver = new Resolver('1.2.3.4', $executor); + $resolver = new Resolver($executor); $conn = new Client([ "database" => $this->getDbName(), @@ -53,16 +50,13 @@ function (Exception $e) use (&$exception) { public function testFailedDNSLookupEarlyRejection() { - $executor = $this->getMockBuilder(ExecutorInterface::class) - ->setMethods(['query']) - ->getMock(); + $executor = $this->createMock(ExecutorInterface::class); $executor - ->expects($this->once()) ->method('query') ->willReturn(new RejectedPromise(new React\Dns\RecordNotFoundException())); - $resolver = new Resolver('1.2.3.4', $executor); + $resolver = new Resolver($executor); $conn = new Client([ "database" => $this->getDbName(), diff --git a/tests/Unit/ConnectionTest.php b/tests/Unit/ConnectionTest.php index 5014129..2bbee8d 100644 --- a/tests/Unit/ConnectionTest.php +++ b/tests/Unit/ConnectionTest.php @@ -5,27 +5,21 @@ class ConnectionTest extends TestCase { - /** - * @expectedException \InvalidArgumentException - */ public function testInvalidParametersThrows() { + $this->expectException(\InvalidArgumentException::class); $conn = new Connection(['something' => ''], $this->getLoop()); } - /** - * @expectedException \InvalidArgumentException - */ public function testNoUserThrows() { + $this->expectException(\InvalidArgumentException::class); $conn = new Connection(["database" => "some_database"], $this->getLoop()); } - /** - * @expectedException \InvalidArgumentException - */ public function testNoDatabaseThrows() { + $this->expectException(\InvalidArgumentException::class); $conn = new Connection(["user" => "some_user"], $this->getLoop()); } } diff --git a/tests/Unit/Message/MessageTest.php b/tests/Unit/Message/MessageTest.php index b46d2ec..9eebf53 100644 --- a/tests/Unit/Message/MessageTest.php +++ b/tests/Unit/Message/MessageTest.php @@ -1,10 +1,28 @@ assertEquals("\x04\xd2\x16\x2f", \PgAsync\Message\Message::int32(80877103)); $this->assertEquals("\x00\x00\x00\x00", \PgAsync\Message\Message::int32(0)); } + + public function testNotificationResponse() + { + $rawNotificationMessage = hex2bin('41000000190000040c686572650048656c6c6f20746865726500'); + + + $notificationResponse = \PgAsync\Message\Message::createMessageFromIdentifier($rawNotificationMessage[0], []); + $this->assertInstanceOf(\PgAsync\Message\NotificationResponse::class, $notificationResponse); + /** @var \PgAsync\Message\NotificationResponse */ + $notificationResponse->parseData($rawNotificationMessage); + + $this->assertEquals('Hello there', $notificationResponse->getPayload()); + $this->assertEquals('here', $notificationResponse->getChannelName()); + $this->assertEquals(1036, $notificationResponse->getNotifyingProcessId()); + + } } diff --git a/tests/Unit/Message/ParseTest.php b/tests/Unit/Message/ParseTest.php index d2f3560..dae6a0c 100644 --- a/tests/Unit/Message/ParseTest.php +++ b/tests/Unit/Message/ParseTest.php @@ -1,6 +1,8 @@