diff --git a/Jenkinsfile b/Jenkinsfile index a80361f..a44f250 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,5 +1,31 @@ #!/usr/bin/env groovy + +/** + * Jenkinsfile for proxyhook server/client + * + * Environment variables: + * + * PROXYHOOK_DOCKER_REGISTRY - Docker registry host to use with OpenShift platform + * PROXYHOOK_OPENSHIFT - OpenShift platform URL + * PROXYHOOK_SERVER_PROJECT - OpenShift project name for DEV/QA/STAGE/PROD deployments + * + * For pull requests: + * - build and test + * - optional: wait for input + * - deploy to DEV + * - while (input = redeploy) deploy to DEV + * - wait for input + * - deploy to QA + * - while (input = redeploy) deploy to QA + * + * For master: + * - build and test + * - deploy to STAGE + * - wait for input + * - deploy to PROD + */ + @Field public static final String PROJ_BASE = 'github.com/zanata/proxyhook' @@ -96,31 +122,7 @@ timestamps { stage('Build') { notify.startBuilding() tag = makeTag() - - // TODO run detekt - sh """./gradlew clean build shadowJar jacocoTestReport - """ - - // archive build artifacts - archive "**/build/libs/*.jar" - - // gather surefire results; mark build as unstable in case of failures - junit(testResults: '**/build/test-results/**/*.xml') - notify.testResults("UNIT", currentBuild.result) - - if (isBuildResultSuccess()) { - // parse Jacoco test coverage - step([$class: 'JacocoPublisher']) - - if (env.BRANCH_NAME == 'master') { - step([$class: 'MasterCoverageAction']) - } else if (env.BRANCH_NAME.startsWith('PR-')) { - step([$class: 'CompareCoverageAction']) - } - - // send test coverage data to codecov.io - codecov(env, steps, mainScmGit) - } + buildAndTest() } stage('Deploy') { if (tag && isBuildResultSuccess()) { @@ -174,6 +176,33 @@ timestamps { } } +private void buildAndTest() { +// TODO run detekt + sh """./gradlew clean build shadowJar jacocoTestReport + """ + + // archive build artifacts + archive "**/build/libs/*.jar" + + // gather surefire results; mark build as unstable in case of failures + junit(testResults: '**/build/test-results/**/*.xml') + notify.testResults("UNIT", currentBuild.result) + + if (isBuildResultSuccess()) { + // parse Jacoco test coverage + step([$class: 'JacocoPublisher']) + + if (env.BRANCH_NAME == 'master') { + step([$class: 'MasterCoverageAction']) + } else if (env.BRANCH_NAME.startsWith('PR-')) { + step([$class: 'CompareCoverageAction']) + } + + // send test coverage data to codecov.io + codecov(env, steps, mainScmGit) + } +} + private boolean isBuildResultSuccess() { currentBuild.result in ['SUCCESS', null] } diff --git a/build.gradle b/build.gradle index 9c27b31..963942b 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,6 @@ buildscript { - ext.kotlin_version = '1.1.51' + ext.kotlin_version = '1.2.30' + ext.vertx_version = '3.5.1' repositories { mavenCentral() } @@ -22,12 +23,18 @@ subprojects { sourceCompatibility = '1.8' + kotlin { + experimental { + coroutines "enable" + } + } + dependencies { compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version" - compile 'io.vertx:vertx-core:3.5.0' - compile 'io.vertx:vertx-web:3.5.0' -// testCompile 'io.vertx:vertx-unit:3.5.0' - testCompile 'io.vertx:vertx-lang-kotlin-coroutines:3.5.0' + compile "io.vertx:vertx-core:$vertx_version" + compile "io.vertx:vertx-web:$vertx_version" +// testCompile "io.vertx:vertx-unit:$vertx_version" + compile "io.vertx:vertx-lang-kotlin-coroutines:$vertx_version" testCompile 'junit:junit:4.12' testCompile 'net.wuerl.kotlin:assertj-core-kotlin:0.2.1' } @@ -54,6 +61,11 @@ subprojects { } } + configurations { + all*.exclude group: 'xerces', module: 'xerces' + all*.exclude group: 'xerces', module: 'xercesImpl' + } + } task('setVersionFromBuild') << { diff --git a/client/Dockerfile b/client/Dockerfile index 775e958..544f8f0 100644 --- a/client/Dockerfile +++ b/client/Dockerfile @@ -1,13 +1,13 @@ -FROM openjdk:8-jre-alpine +LABEL maintainer="sflaniga@redhat.com" +# https://access.redhat.com/containers/?tab=overview&platform=docker#/registry.access.redhat.com/redhat-openjdk-18/openjdk18-openshift +FROM registry.access.redhat.com/redhat-openjdk-18/openjdk18-openshift:1.2-6 -ADD build/libs/client*-fat.jar /app/client.jar -RUN chgrp -R 0 /app/ && chmod -R 775 /app/ +COPY build/libs/client*-fat.jar /deployments/ -WORKDIR /app/ - -USER 64738 +# NB run-java.sh will scale heap size to 50% of container memory size # NB must use the exec form of ENTRYPOINT if you want to add arguments with CMD # https://docs.docker.com/engine/reference/builder/#exec-form-entrypoint-example -ENTRYPOINT ["java", "-Xmx32M", "-jar", "client.jar"] +# see https://github.com/fabric8io-images/run-java-sh +ENTRYPOINT ["/opt/run-java/run-java.sh"] diff --git a/client/build.gradle b/client/build.gradle index edf74e7..4e03710 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -5,43 +5,29 @@ plugins { dependencies { compile project(':common') + compile 'com.xenomachina:kotlin-argparser:2.0.4' + compile "io.vertx:vertx-shell:$vertx_version" testCompile project(':server') testCompile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.18' testCompile 'org.asynchttpclient:async-http-client:2.1.0-alpha20' testRuntime 'org.slf4j:slf4j-simple:1.7.25' testCompile 'org.mock-server:mockserver-netty:3.11' + +// compile "io.vertx:vertx-hazelcast:$vertx_version" +// compile "io.vertx:vertx-infinispan:$vertx_version" +// // http://vertx.io/docs/vertx-infinispan/java/#_configuring_for_kubernetes_or_openshift_3 +// compile 'org.infinispan:infinispan-cloud:9.1.2.Final' +// compile 'org.jgroups.kubernetes:jgroups-kubernetes:1.0.3.Final' } -mainClassName = 'io.vertx.core.Launcher' -def mainVerticleName = 'org.zanata.proxyhook.client.ProxyHookClient' +mainClassName = 'org.zanata.proxyhook.client.ProxyHookClient' // enable debugger on a random port applicationDefaultJvmArgs = ['-Xdebug', '-Xrunjdwp:transport=dt_socket,address=0,server=y,suspend=n', '-Dsun.net.inetaddr.ttl=0', '-Dsun.net.inetaddr.negative.ttl=0'] - -// Vert.x watches for file changes in all subdirectories -// of src/ but only for files with .kt extension -// NB this won't pick up changes in :common -def watchForChange = 'src/**/*.kt' - -// Vert.x will call this task on changes -def doOnChange = 'gradlew classes' - -//noinspection GroovyAssignabilityCheck -run { - def urls = System.getProperty('urls') ?: '' - args = ['run', mainVerticleName] + urls.tokenize() - // redeploy doesn't stop the old code, for some reason -// args = ['run', mainVerticleName, "--redeploy=$watchForChange", "--launcher-class=$mainClassName", "--on-redeploy=$doOnChange"] + urls.tokenize() -} - shadowJar { classifier = 'fat' - manifest { - attributes "Main-Verticle": mainVerticleName - } - mergeServiceFiles { include 'META-INF/services/io.vertx.core.spi.VerticleFactory' } diff --git a/client/src/main/java/org/zanata/proxyhook/client/ProxyHookClient.kt b/client/src/main/java/org/zanata/proxyhook/client/ProxyHookClient.kt index 0c0e46a..96919af 100644 --- a/client/src/main/java/org/zanata/proxyhook/client/ProxyHookClient.kt +++ b/client/src/main/java/org/zanata/proxyhook/client/ProxyHookClient.kt @@ -20,8 +20,12 @@ */ package org.zanata.proxyhook.client +import com.xenomachina.argparser.ArgParser +import com.xenomachina.argparser.DefaultHelpFormatter +import com.xenomachina.argparser.mainBody import io.netty.buffer.Unpooled import io.vertx.core.AbstractVerticle +import io.vertx.core.CompositeFuture import io.vertx.core.Future import io.vertx.core.MultiMap import io.vertx.core.Vertx @@ -32,12 +36,12 @@ import io.vertx.core.http.HttpClientResponse import io.vertx.core.http.WebSocket import io.vertx.core.http.impl.FrameType import io.vertx.core.http.impl.ws.WebSocketFrameImpl +import io.vertx.core.json.DecodeException import io.vertx.core.json.JsonObject import io.vertx.core.logging.LoggerFactory import io.vertx.core.net.ProxyOptions import org.zanata.proxyhook.common.Constants.EVENT_ID_HEADERS import org.zanata.proxyhook.common.Constants.MAX_FRAME_SIZE -import org.zanata.proxyhook.common.Constants.PATH_WEBSOCKET import org.zanata.proxyhook.common.Constants.PROXYHOOK_PASSWORD import org.zanata.proxyhook.common.Keys.BUFFER import org.zanata.proxyhook.common.Keys.BUFFER_TEXT @@ -65,11 +69,22 @@ import java.net.UnknownHostException * @param ready optional Future which will complete when deployment is complete. * @author Sean Flanigan [sflaniga@redhat.com](mailto:sflaniga@redhat.com) */ -class ProxyHookClient(var ready: Future? = null, var args: List? = null, val internalHttpProxy: Int? = null) : AbstractVerticle() { - constructor(ready: Future?, vararg args: String, internalHttpProxy: Int? = null) : this(ready, args.asList(), internalHttpProxy) +// TODO add http proxy +class ProxyHookClient( + val ready: Future? = null, + val webSocketUrls: List, + val webhookUrls: List, + val internalHttpProxyHost: String? = null, + val internalHttpProxyPort: Int? = null) : AbstractVerticle() { + + init { + if (webSocketUrls.isEmpty() || webhookUrls.isEmpty()) { + throw StartupException("Must provide at least one websocket and at least one webhook") + } + } companion object { - private val APP_NAME = ProxyHookClient::class.java.name + private val APP_NAME = ProxyHookClient::class.java.simpleName private val log = LoggerFactory.getLogger(ProxyHookClient::class.java) private val sslInsecureServer: Boolean by lazy { @@ -131,9 +146,27 @@ class ProxyHookClient(var ready: Future? = null, var args: List? = // deliberately not included: Connection, Host, Origin, If-*, Cache-Control, Proxy-Authorization, Range, Upgrade .map { it.toLowerCase() } - @JvmStatic fun main(args: Array) { - // TODO add http proxy - Vertx.vertx().deployVerticle(ProxyHookClient(ready = null, args = args.toList()), { result -> + /** + * Main method, used to launch proxyhook client with CLI arguments + */ + @JvmStatic fun main(args: Array) = mainBody(APP_NAME) { + class MyArgs (parser: ArgParser) { + val webSocketUrls: List by parser.adding("-s", "--websocket", help = "connect to websocket (proxyhook server), eg wss://proxyhook.example.com/"); + val webhookUrls: List by parser.adding("-k", "--webhook", help = "deliver webhooks to web server, eg http://target1.example.com/webhook") + } + + val helpFormatter = DefaultHelpFormatter( + prologue = "ProxyHookClient connects to a ProxyHook server, receives proxied webhooks over a websocket, then forwards them to a specified web server", + epilogue = "Note that at least one WEBSOCKET and at least one WEBHOOK must be provided.") + val argParser = ArgParser( + args = if (args.isEmpty()) arrayOf("--help") else args, + helpFormatter = helpFormatter) + val opts = argParser.parseInto(::MyArgs) + + if (opts.webSocketUrls.isEmpty()) throw StartupException("Must specify at least one websocket") + if (opts.webhookUrls.isEmpty()) throw StartupException("Must specify at least one webhook") + + Vertx.vertx().deployVerticle(ProxyHookClient(ready = null, webSocketUrls = opts.webSocketUrls, webhookUrls = opts.webhookUrls), { result -> result.otherwise { e -> exit(e) } @@ -141,79 +174,65 @@ class ProxyHookClient(var ready: Future? = null, var args: List? = } } - // TODO use http://vertx.io/docs/vertx-core/java/#_vert_x_command_line_interface_api - // not this mess. - // Command line is of the pattern "vertx run [options] main-verticle [verticle_args...]" - // so strip off everything up to the Verticle class name. - private fun findArgs(): List { - args?.let { return it } - val processArgs = vertx.orCreateContext.processArgs() ?: listOf() - log.debug("processArgs: " + processArgs) - val n = processArgs.indexOf(javaClass.name) - val argsAfterClass = processArgs.subList(n + 1, processArgs.size) - val result = argsAfterClass.filter { arg -> !arg.startsWith("-") } - log.debug("args: " + result) - args = result - return result - } - override fun start(startFuture: Future) { - val args = findArgs() - if (args.size < 2) { - throw StartupException("Usage: wss://proxyhook.example.com/$PATH_WEBSOCKET http://target1.example.com/webhook [http://target2.example.com/webhook ...]") - } - startClient(args[0], args.subList(1, args.size), startFuture) + startClient(startFuture) } - private fun startClient(webSocketUrl: String, webhookUrls: List, startFuture: Future) { - log.info("starting client for websocket: $webSocketUrl posting to webhook URLs: $webhookUrls") + private fun startClient(startFuture: Future) { + log.info("starting client for websockets: $webSocketUrls posting to webhook URLs: $webhookUrls") + log.info("Using internal http proxy: $internalHttpProxyHost:$internalHttpProxyPort") webhookUrls.forEach { this.checkURI(it) } - - val wsUri = parseUri(webSocketUrl) - val webSocketRelativeUri = getRelativeUri(wsUri) - val useSSL = getSSL(wsUri) - val wsOptions = HttpClientOptions().apply { - // 60s timeout based on pings from every 50s (both directions) - idleTimeout = 60 - connectTimeout = 10_000 - defaultHost = wsUri.host - defaultPort = getWebsocketPort(wsUri) - maxWebsocketFrameSize = MAX_FRAME_SIZE - isSsl = useSSL - isVerifyHost = !sslInsecureServer - isTrustAll = sslInsecureServer - // this doesn't appear to affect websocket connections + val wsUris = webSocketUrls.map { parseUri(it) } + + CompositeFuture.all(wsUris.map { wsUri -> + val future = Future.future() + val webSocketRelativeUri = getRelativeUri(wsUri) + val useSSL = getSSL(wsUri) + val wsOptions = HttpClientOptions().apply { + // 60s timeout based on pings from every 50s (both directions) + idleTimeout = 60 + connectTimeout = 10_000 + defaultHost = wsUri.host + defaultPort = getWebsocketPort(wsUri) + maxWebsocketFrameSize = MAX_FRAME_SIZE + isSsl = useSSL + isVerifyHost = !sslInsecureServer + isTrustAll = sslInsecureServer + // this doesn't appear to affect websocket connections // externalHttpProxy?.let { portNum -> // proxyOptions = ProxyOptions().apply { // host = "localhost" // port = portNum // } // } - } - val wsClient = vertx.createHttpClient(wsOptions) - val httpOptions = HttpClientOptions().apply { - isVerifyHost = !sslInsecureDelivery - isTrustAll = sslInsecureDelivery - internalHttpProxy?.let { portNum -> - proxyOptions = ProxyOptions().apply { - host = "localhost" - port = portNum + } + val wsClient = vertx.createHttpClient(wsOptions) + val httpOptions = HttpClientOptions().apply { + isVerifyHost = !sslInsecureDelivery + isTrustAll = sslInsecureDelivery + internalHttpProxyPort?.let { portNum -> + proxyOptions = ProxyOptions().apply { + host = "localhost" + port = portNum + } } } - } - val httpClient = vertx.createHttpClient(httpOptions) + val httpClient = vertx.createHttpClient(httpOptions) - connect(webhookUrls, webSocketRelativeUri, wsClient, httpClient, startFuture) + connect(webSocketRelativeUri, wsClient, httpClient, future) + future + }).setHandler { res -> if (res.succeeded()) startFuture.complete() else startFuture.fail(res.cause()) } + // TODO this would be better, but so far I can't get the types right without casting +// }).setHandler(startFuture.completer() as Handler>) } - private fun connect(webhookUrls: List, - webSocketRelativeUri: String, wsClient: HttpClient, - httpClient: HttpClient, startFuture: Future? = null) { + private fun connect(webSocketRelativeUri: String, wsClient: HttpClient, + httpClient: HttpClient, wsFuture: Future<*>? = null) { wsClient.websocket(webSocketRelativeUri, { webSocket -> var password: String? = getenv(PROXYHOOK_PASSWORD) if (password == null) password = "" - log.info("trying to log in") + log.info("trying to log in to ${webSocket.remoteAddress()}") val login = JsonObject() login.put(TYPE, LOGIN) login.put(PASSWORD, password) @@ -229,55 +248,58 @@ class ProxyHookClient(var ready: Future? = null, var args: List? = sendPingFrame(webSocket) } webSocket.handler { buf: Buffer -> - handleWebSocket(webhookUrls, buf, webSocket, wsClient, httpClient, startFuture) + handleWebSocket(buf, webSocket, wsClient, httpClient, wsFuture) } webSocket.closeHandler { log.info("websocket closed") vertx.cancelTimer(periodicTimer) vertx.setTimer(300) { - connect(webhookUrls, - webSocketRelativeUri, wsClient, httpClient) + connect(webSocketRelativeUri, wsClient, httpClient) } } webSocket.exceptionHandler { e -> log.error("websocket stream exception", e) vertx.cancelTimer(periodicTimer) vertx.setTimer(2000) { - connect(webhookUrls, - webSocketRelativeUri, wsClient, httpClient) + connect(webSocketRelativeUri, wsClient, httpClient) } } }) { e -> log.error("websocket connection exception", e) vertx.setTimer(2000) { - connect(webhookUrls, - webSocketRelativeUri, wsClient, httpClient) + connect(webSocketRelativeUri, wsClient, httpClient) } } } // TODO too many params - private fun handleWebSocket(webhookUrls: List, buf: Buffer, webSocket: WebSocket, wsClient: HttpClient, httpClient: HttpClient, startFuture: Future?) { - val msg = buf.toJsonObject() + private fun handleWebSocket(buf: Buffer, webSocket: WebSocket, wsClient: HttpClient, httpClient: HttpClient, startFuture: Future<*>?) { + val msg: JsonObject + try { + msg = buf.toJsonObject() + } catch (e: DecodeException) { + log.warn("Invalid JSON from ${webSocket.remoteAddress()}: \n${buf.bytes.joinToString()}") + return + } log.debug("payload: {0}", msg) val type = msg.getString(TYPE) val messageType = MessageType.valueOf(type) when (messageType) { MessageType.SUCCESS -> { - log.info("logged in") + log.info("logged in to ${webSocket.remoteAddress()}") ready?.complete() startFuture?.complete() } MessageType.FAILED -> { webSocket.close() wsClient.close() - startFuture?.fail("login failed") + startFuture?.fail("login failed for ${webSocket.remoteAddress()}") } MessageType.WEBHOOK -> handleWebhook(webhookUrls, httpClient, msg) MessageType.PING -> { val pingId = msg.getString(PING_ID) - log.debug("received PING with id {}", pingId) + log.debug("received PING with id {} from ${webSocket.remoteAddress()}", pingId) val pong = JsonObject() pong.put(TYPE, PONG) pong.put(PING_ID, pingId) @@ -286,14 +308,14 @@ class ProxyHookClient(var ready: Future? = null, var args: List? = PONG -> { val pongId = msg.getString(PING_ID) // TODO check ping ID - log.debug("received PONG with id {}", pongId) + log.debug("received PONG with id {} from ${webSocket.remoteAddress()}", pongId) } else -> { // TODO this might happen if the server is newer than the client // should we log a warning and keep going, to be more robust? webSocket.close() wsClient.close() - startFuture?.fail("unexpected message type: " + type) + startFuture?.fail("unexpected message type: $type from ${webSocket.remoteAddress()}") } } } @@ -397,7 +419,7 @@ class ProxyHookClient(var ready: Future? = null, var args: List? = try { return URI(uri) } catch (e: URISyntaxException) { - throw StartupException("Invalid URI: " + uri) + throw StartupException("Invalid URI: $uri") } } diff --git a/client/src/test/java/org/zanata/proxyhook/client/IntegrationTest.kt b/client/src/test/java/org/zanata/proxyhook/client/IntegrationTest.kt index 34f45df..1e9041c 100644 --- a/client/src/test/java/org/zanata/proxyhook/client/IntegrationTest.kt +++ b/client/src/test/java/org/zanata/proxyhook/client/IntegrationTest.kt @@ -2,6 +2,7 @@ package org.zanata.proxyhook.client import io.vertx.core.Future import io.vertx.core.Vertx +import io.vertx.core.VertxOptions import io.vertx.core.http.HttpServer import io.vertx.core.logging.LoggerFactory import io.vertx.kotlin.coroutines.await @@ -43,53 +44,65 @@ class IntegrationTest { private lateinit var proxyClient: ProxyClient @Before - fun before() { - server = Vertx.vertx() + fun before() = runBlocking { webhook = Vertx.vertx() client = Vertx.vertx() } @After - fun after() { + fun after() = runBlocking { // to minimise shutdown errors: // 1. we want the client to stop delivering before the webhook receiver stops // 2. we want the client to disconnect from server before server stops - client.close() - webhook.close() - server.close() + awaitResult { client.close(it) } + awaitResult { webhook.close(it) } + awaitResult { server.close(it) } } @Test fun rootDeployment() { + server = Vertx.vertx() + deliverProxiedWebhook(prefix = "") + proxyClient.verifyZeroInteractions() + } + + @Test + fun rootDeploymentInInfinispanCluster() { + server = runBlocking { + awaitResult { + val serverOpts = VertxOptions().apply { + isClustered = true + clusterManager = io.vertx.ext.cluster.infinispan.InfinispanClusterManager() + clusterHost = "localhost" + clusterPort = 0 + } + Vertx.clusteredVertx(serverOpts, it) + } + } deliverProxiedWebhook(prefix = "") proxyClient.verifyZeroInteractions() } @Test fun rootDeploymentWithProxy() { - deliverProxiedWebhook(prefix = "", internalHttpProxy = proxyRule.httpPort) + server = Vertx.vertx() + deliverProxiedWebhook(prefix = "", httpProxyHost = "localhost", httpProxyPort = proxyRule.httpPort) // proxyClient.dumpToLogAsJSON(request()) proxyClient.verify(request(), once()) } @Test fun subPathDeployment() { + server = Vertx.vertx() deliverProxiedWebhook(prefix = "/proxyhook") proxyClient.verifyZeroInteractions() } - @Test - fun subPathDeploymentWithProxy() { - deliverProxiedWebhook(prefix = "/proxyhook", internalHttpProxy = proxyRule.httpPort) -// proxyClient.dumpToLogAsJSON(request()) - proxyClient.verify(request(), once()) - } - private fun ProxyClient.verifyZeroInteractions() { verify(request(), exactly(0)) } - private fun deliverProxiedWebhook(prefix: String, internalHttpProxy: Int? = null): Unit = runBlocking { + private fun deliverProxiedWebhook(prefix: String, httpProxyHost: String? = null, httpProxyPort: Int? = null): Unit = runBlocking { // this future will succeed if the test passes, // or fail if something goes wrong. val testFinished = CompletableFuture() @@ -101,7 +114,7 @@ class IntegrationTest { val websocketUrl = "ws://localhost:${serverPort.await()}$prefix/listen" val postUrl = "http://localhost:${serverPort.await()}$prefix/webhook" // wait for proxyhook server and webhook receiver before starting client - val client = startClient(testFinished, websocketUrl, receiveUrl, internalHttpProxy) + val client = startClient(testFinished, websocketUrl, receiveUrl, httpProxyHost, httpProxyPort) // wait for client login before sending webhook to proxyhook server client.await() @@ -142,10 +155,10 @@ class IntegrationTest { actualPort } - private fun startClient(testFinished: CompletableFuture, websocketUrl: String, receiveUrl: String, internalHttpProxy: Int?): Future { + private fun startClient(testFinished: CompletableFuture, websocketUrl: String, receiveUrl: String, internalHttpProxyHost: String?, internalHttpProxyPort: Int?): Future { log.info("deploying client") val clientReady = Future.future() - client.deployVerticle(ProxyHookClient(clientReady, websocketUrl, receiveUrl, internalHttpProxy = internalHttpProxy)) { + client.deployVerticle(ProxyHookClient(clientReady, listOf(websocketUrl), listOf(receiveUrl), internalHttpProxyHost = internalHttpProxyHost, internalHttpProxyPort = internalHttpProxyPort)) { if (it.failed()) { testFinished.completeExceptionally(it.cause()) } diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 3baa851..ed88a04 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index efc12cb..b6517bb 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,5 @@ -#Thu Sep 29 00:03:59 AEST 2016 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.2.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.4-all.zip diff --git a/gradlew b/gradlew index 27309d9..cccdd3d 100755 --- a/gradlew +++ b/gradlew @@ -1,4 +1,4 @@ -#!/usr/bin/env bash +#!/usr/bin/env sh ############################################################################## ## @@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS="" # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" -warn ( ) { +warn () { echo "$*" } -die ( ) { +die () { echo echo "$*" echo @@ -154,11 +154,19 @@ if $cygwin ; then esac fi -# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules -function splitJvmOpts() { - JVM_OPTS=("$@") +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " } -eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS -JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" +APP_ARGS=$(save "$@") -exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat index 832fdb6..e95643d 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,90 +1,84 @@ -@if "%DEBUG%" == "" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS= - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto init - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto init - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:init -@rem Get command-line arguments, handling Windows variants - -if not "%OS%" == "Windows_NT" goto win9xME_args -if "%@eval[2+2]" == "4" goto 4NT_args - -:win9xME_args -@rem Slurp the command line arguments. -set CMD_LINE_ARGS= -set _SKIP=2 - -:win9xME_args_slurp -if "x%~1" == "x" goto execute - -set CMD_LINE_ARGS=%* -goto execute - -:4NT_args -@rem Get arguments from the 4NT Shell from JP Software -set CMD_LINE_ARGS=%$ - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% - -:end -@rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/server/.dockerignore b/server/.dockerignore index c2d50e6..2c50eeb 100644 --- a/server/.dockerignore +++ b/server/.dockerignore @@ -1,3 +1,5 @@ -# don't send any files to Docker build daemon, other than the fat jar -* +# don't send any build artifacts to Docker build daemon, other than the fat jar +build +out +src !build/libs/*-fat.jar diff --git a/server/Dockerfile b/server/Dockerfile index 04d812f..92711a9 100644 --- a/server/Dockerfile +++ b/server/Dockerfile @@ -1,13 +1,22 @@ -FROM openjdk:8-jre-alpine +LABEL maintainer="sflaniga@redhat.com" +# https://access.redhat.com/containers/?tab=overview&platform=docker#/registry.access.redhat.com/redhat-openjdk-18/openjdk18-openshift +FROM registry.access.redhat.com/redhat-openjdk-18/openjdk18-openshift:1.1-13 -ADD build/libs/server*-fat.jar /app/server.jar -RUN chgrp -R 0 /app/ && chmod -R 775 /app/ +COPY build/libs/server*-fat.jar /deployments/ +EXPOSE 8080 -WORKDIR /app/ +# http://vertx.io/docs/vertx-infinispan/java/#_configuring_for_kubernetes_or_openshift_3 +# NB run-java.sh will scale heap size to 50% of container memory size +ENV JAVA_OPTIONS "-Dhttp.address=0.0.0.0 \ + -Djava.net.preferIPv4Stack=true \ + -Dvertx.jgroups.config=default-configs/default-jgroups-kubernetes.xml" -EXPOSE 8080 +# You should run this docker container with the env var KUBERNETES_NAMESPACE set to your project name +# eg docker run --memory 128m -e KUBERNETES_NAMESPACE=proxyhook-server --rm -it # NB must use the exec form of ENTRYPOINT if you want to add arguments with CMD # https://docs.docker.com/engine/reference/builder/#exec-form-entrypoint-example -ENTRYPOINT ["java", "-Dhttp.address=0.0.0.0", "-Xmx64M", "-jar", "server.jar"] +# see https://github.com/fabric8io-images/run-java-sh +ENTRYPOINT ["/opt/run-java/run-java.sh"] +# NB run with arg "-cluster" if you want OpenShift/Kubernetes clustering diff --git a/server/build.gradle b/server/build.gradle index 05ab697..5d05cf3 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -6,6 +6,9 @@ plugins { dependencies { compile project(':common') compile 'de.svenkubiak:jBCrypt:0.4.1' + runtime "io.vertx:vertx-infinispan:$vertx_version" +// // http://vertx.io/docs/vertx-infinispan/java/#_configuring_for_kubernetes_or_openshift_3 + runtime 'org.infinispan:infinispan-cloud:9.1.2.Final' } mainClassName = 'io.vertx.core.Launcher' @@ -37,11 +40,13 @@ shadowJar { classifier = 'fat' manifest { + attributes "Main-Class": "io.vertx.core.Launcher" attributes "Main-Verticle": mainVerticleName } mergeServiceFiles { include 'META-INF/services/io.vertx.core.spi.VerticleFactory' + include 'META-INF/services/io.vertx.core.spi.cluster.ClusterManager' } } diff --git a/server/src/main/java/org/zanata/proxyhook/server/ProxyHookServer.kt b/server/src/main/java/org/zanata/proxyhook/server/ProxyHookServer.kt index 11e23b6..202e6ae 100644 --- a/server/src/main/java/org/zanata/proxyhook/server/ProxyHookServer.kt +++ b/server/src/main/java/org/zanata/proxyhook/server/ProxyHookServer.kt @@ -20,9 +20,7 @@ */ package org.zanata.proxyhook.server -import io.vertx.core.AbstractVerticle import io.vertx.core.Future -import io.vertx.core.Vertx import org.mindrot.jbcrypt.BCrypt import io.vertx.core.buffer.Buffer import io.vertx.core.eventbus.EventBus @@ -31,14 +29,19 @@ import io.vertx.core.http.HttpHeaders import io.vertx.core.http.HttpServerOptions import io.vertx.core.http.HttpServerRequest import io.vertx.core.http.ServerWebSocket +import io.vertx.core.http.WebSocketBase import io.vertx.core.json.JsonObject import io.vertx.core.logging.LoggerFactory -import io.vertx.core.shareddata.LocalMap +import io.vertx.core.shareddata.AsyncMap +import io.vertx.ext.web.Route import io.vertx.ext.web.Router import io.vertx.ext.web.RoutingContext import io.vertx.ext.web.handler.BodyHandler import io.vertx.ext.web.handler.ErrorHandler -import org.zanata.proxyhook.common.* +import io.vertx.kotlin.coroutines.CoroutineVerticle +import io.vertx.kotlin.coroutines.awaitResult +import io.vertx.kotlin.coroutines.dispatcher +import kotlinx.coroutines.experimental.launch import org.zanata.proxyhook.common.Constants.EVENT_ID_HEADERS import org.zanata.proxyhook.common.Constants.MAX_BODY_SIZE import org.zanata.proxyhook.common.Constants.MAX_FRAME_SIZE @@ -62,7 +65,6 @@ import org.zanata.proxyhook.common.MessageType.PONG import org.zanata.proxyhook.common.MessageType.SUCCESS import org.zanata.proxyhook.common.MessageType.WEBHOOK import org.zanata.proxyhook.common.StartupException -import org.zanata.proxyhook.common.exit import org.zanata.proxyhook.common.multiMapToJson import java.lang.System.getenv import java.net.InetAddress @@ -80,74 +82,75 @@ import java.net.UnknownHostException * when deployment is complete. * @author Sean Flanigan [sflaniga@redhat.com](mailto:sflaniga@redhat.com) */ -class ProxyHookServer(val port: Int? = null, val prefix: String = getenv("PROXYHOOK_PREFIX") ?: "", var actualPort: Future? = null) : AbstractVerticle() { +class ProxyHookServer( + private val port: Int? = null, + private val prefix: String = getenv("PROXYHOOK_PREFIX") ?: "", + var actualPort: Future? = null) : CoroutineVerticle() { - // TODO clustering: should use getClusterWideMap and getCounter - val connections: LocalMap by lazy { - vertx.sharedData().getLocalMap("connections") - } - val eventBus: EventBus get() = vertx.eventBus() + private val sharedData by lazy { vertx.sharedData() } + private val eventBus: EventBus get() = vertx.eventBus() + private val passhash: String? = getenv(PROXYHOOK_PASSHASH) + // map of websockets which are connected directly to this verticle (not via clustering) + private val localConnections = HashSet() + // map of websocket IDs to TRUE (used like a Set) + private lateinit var connections: AsyncMap - val passhash: String? = getenv(PROXYHOOK_PASSHASH) + override suspend fun start() { + // a map of websocket connections across the vert.x cluster + connections = awaitResult { + sharedData.getAsyncMap("connections", it) + } - override fun start() { if (passhash != null) { log.info("password is set") } else { log.warn("{0} is not set; authentication is disabled", PROXYHOOK_PASSHASH) } - val host = System.getProperty("http.address", "127.0.0.1") + val listenHost = System.getProperty("http.address", "127.0.0.1") val listenPort: Int = port ?: Integer.getInteger("http.port", 8080) - log.info("Starting webhook/websocket server on $host:$listenPort") - - val options = HttpServerOptions() - // 60s timeout based on pings every 50s - .setIdleTimeout(60) - .setMaxWebsocketFrameSize(MAX_FRAME_SIZE) - .setPort(listenPort) - .setHost(host) + log.info("Starting webhook/websocket server on $listenHost:$listenPort") + + val options = HttpServerOptions().apply { + // 60s timeout based on pings every 50s + idleTimeout = 60 + maxWebsocketFrameSize = MAX_FRAME_SIZE + host = listenHost + port = listenPort + } val server = vertx.createHttpServer(options) // a set of textHandlerIds for connected websockets vertx.setPeriodic(50_000) { - // TODO clustering: should iterate through websockets of this verticle only (eg a local HashMap?) - connections.keys.forEach { connection -> - - // this is probably the correct way (ping frame triggers pong, closes websocket if no data received before idleTimeout in TCPSSLOptions): - // WebSocketFrameImpl frame = new WebSocketFrameImpl(FrameType.PING, io.netty.buffer.Unpooled.copyLong(System.currentTimeMillis())); - // webSocket.writeFrame(frame); - - val obj = JsonObject() - obj.put(TYPE, PING) - obj.put(PING_ID, System.currentTimeMillis().toString()) - eventBus.send(connection, obj.encode()) - } + // in a cluster, we should only ping websockets connected to this verticle directly + localConnections.forEach(this::pingConnection) } val router = Router.router(vertx) router.exceptionHandler { t -> log.error("Unhandled exception", t) } router.route() .handler(BodyHandler.create().setBodyLimit(MAX_BODY_SIZE.toLong())) - // .handler(LoggerHandler.create()) +// .handler(LoggerHandler.create()) .failureHandler(ErrorHandler.create()) // we need to respond to GET / so that health checks will work: - router.get("$prefix/").handler { routingContext -> routingContext.response().setStatusCode(HTTP_OK).end(APP_NAME + " (" + describe(connections.size) + ")") } + router.get("$prefix/").coroutineHandler { rootHandler(it) } // see https://github.com/vert-x3/vertx-health-check if we need more features - router.get("$prefix/ready").handler(this::readyHandler) - router.post("$prefix/$PATH_WEBHOOK").handler(this::webhookHandler) - server.requestHandler({ router.accept(it) }) - server.websocketHandler { webSocket: ServerWebSocket -> - if (webSocket.path() != "$prefix/$PATH_WEBSOCKET") { - log.warn("wrong path for websocket connection: {0}", webSocket.path()) - webSocket.reject() + router.get("$prefix/ready").coroutineHandler { readyHandler(it) } + router.post("$prefix/$PATH_WEBHOOK").coroutineHandler { webhookHandler(it) } + server.requestHandler { router.accept(it) } + + server.websocketHandler { ws: ServerWebSocket -> + if (ws.path() != "$prefix/$PATH_WEBSOCKET") { + log.warn("wrong path for websocket connection: {0}", ws.path()) + ws.reject() return@websocketHandler } - handleListen(webSocket) + handleListen(ws) } - server.listen { startupResult -> - if (startupResult.failed()) { - actualPort?.fail(startupResult.cause()) - throw StartupException(startupResult.cause()) + + server.listen { res -> + if (res.failed()) { + actualPort?.fail(res.cause()) + throw StartupException(res.cause()) } else { log.info("Started server on port ${server.actualPort()}") logEndPoints(server.actualPort()) @@ -156,23 +159,49 @@ class ProxyHookServer(val port: Int? = null, val prefix: String = getenv("PROXYH } } - private fun readyHandler(context: RoutingContext) { + private fun logEndPoints(actualPort: Int) { + val hostname = System.getenv("OPENSHIFT_APP_DNS") + if (hostname != null) { + log.info("Running on OpenShift") + // TODO handle proxyhookContext (or remove OpenShift support) + log.info("Webhooks should be POSTed to https://{0}/webhook (secure) or http://{0}/webhook (insecure)", hostname) + log.info("ProxyHook client should connect to wss://{0}:8433/listen (secure) or ws://{0}:8000/listen (insecure)", hostname) + } else { + val port = actualPort.toString() // we don't want commas for thousands + log.info("Webhooks should be POSTed to http://{0}:{1}{2}/webhook (insecure)", localHostName, port, prefix) + log.info("ProxyHook client should connect to ws://{0}:{1}{2}/listen (insecure)", localHostName, port, prefix) + } + } + + // The prefix "fetch" is because this is a bit expensive + private suspend fun fetchConnectionCount(): Int { + return awaitResult { connections.size(it) } + } + + private suspend fun rootHandler(context: RoutingContext) { + val count = fetchConnectionCount() + context.response().setStatusCode(HTTP_OK).end(APP_NAME + " (" + describe(count) + ")") + } + + private suspend fun readyHandler(context: RoutingContext) { + val count = fetchConnectionCount() context.response() - // if there are no connections, webhooks won't be delivered, thus HTTP_SERVICE_UNAVAILABLE - .setStatusCode(if (connections.isEmpty()) HTTP_SERVICE_UNAVAILABLE else HTTP_OK) - .end(APP_NAME + " (" + describe(connections.size) + ")") + // if there are no connections, webhooks won't be delivered, thus + // HTTP_SERVICE_UNAVAILABLE (allows web pingers to check if it's all working) + .setStatusCode(if (count == 0) HTTP_SERVICE_UNAVAILABLE else HTTP_OK) + .end(APP_NAME + " (" + describe(count) + ")") } - private fun webhookHandler(context: RoutingContext) { + private suspend fun webhookHandler(context: RoutingContext) { log.info("handling POST request") val req = context.request() val headers = req.headers() EVENT_ID_HEADERS .filter { headers.contains(it) } .forEach { log.info("{0}: {1}", it, headers.getAll(it)) } - val statusCode: Int - val listeners = connections.keys + val listeners = awaitResult> { connections.keys(it) } log.info("handling POST for {0} listeners", listeners.size) + val statusCode: Int if (!listeners.isEmpty()) { val body = context.body val msgString = encodeWebhook(req, body) @@ -194,7 +223,7 @@ class ProxyHookServer(val port: Int? = null, val prefix: String = getenv("PROXYH } private fun handleListen(webSocket: ServerWebSocket) { - webSocket.handler { buffer: Buffer -> + webSocket.coroutineHandler { buffer: Buffer -> val msg = buffer.toJsonObject() val messageType = MessageType.valueOf(msg.getString(TYPE)) when (messageType) { @@ -206,20 +235,20 @@ class ProxyHookServer(val port: Int? = null, val prefix: String = getenv("PROXYH } } - private fun handleLogin(msg: JsonObject, webSocket: ServerWebSocket) { + private suspend fun handleLogin(msg: JsonObject, webSocket: ServerWebSocket) { val password = msg.getString(PASSWORD) if (passhash == null) { log.info("unverified websocket connection") val obj = JsonObject() obj.put(TYPE, SUCCESS) webSocket.writeTextMessage(obj.encode()) - registerWebsocket(connections, webSocket) + registerWebsocket(webSocket) } else if (BCrypt.checkpw(password, passhash)) { log.info("password accepted") val obj = JsonObject() obj.put(TYPE, SUCCESS) webSocket.writeTextMessage(obj.encode()) - registerWebsocket(connections, webSocket) + registerWebsocket(webSocket) } else { log.warn("password rejected") val obj = JsonObject() @@ -244,6 +273,7 @@ class ProxyHookServer(val port: Int? = null, val prefix: String = getenv("PROXYH log.debug("received PONG with id {}", pongId) } + private fun handleUnknownMessage(msg: JsonObject, webSocket: ServerWebSocket) { log.warn("unexpected message: {0}", msg) val obj = JsonObject() @@ -252,126 +282,167 @@ class ProxyHookServer(val port: Int? = null, val prefix: String = getenv("PROXYH webSocket.close() } - private fun logEndPoints(actualPort: Int) { - val hostname = System.getenv("OPENSHIFT_APP_DNS") - if (hostname != null) { - log.info("Running on OpenShift") - // TODO handle proxyhookContext (or remove OpenShift support) - log.info("Webhooks should be POSTed to https://{0}/webhook (secure) or http://{0}/webhook (insecure)", hostname) - log.info("ProxyHook client should connect to wss://{0}:8433/listen (secure) or ws://{0}:8000/listen (insecure)", hostname) - } else { - val port = actualPort.toString() // we don't want commas for thousands - log.info("Webhooks should be POSTed to http://{0}:{1}{2}/webhook (insecure)", localHostName, port, prefix) - log.info("ProxyHook client should connect to ws://{0}:{1}{2}/listen (insecure)", localHostName, port, prefix) - } - } - - private val localHostName: String by lazy { - try { - InetAddress.getLocalHost().hostName - } catch (e: UnknownHostException) { - log.warn("Unable to find hostname", e) - "localhost" - } - } - - private fun registerWebsocket(connections: LocalMap, - webSocket: ServerWebSocket) { + private suspend fun registerWebsocket(webSocket: ServerWebSocket) { // TODO enhancement: register specific webhook path using webSocket.path() or webSocket.query() val id = webSocket.textHandlerID() val clientIP = getClientIP(webSocket) log.info("Adding connection. ID: $id IP: $clientIP") - connections.put(id, true) - log.info("Total connections: {0}", connections.size) - webSocket.closeHandler { + localConnections.add(id) + log.info("Total local connections: {0}", localConnections.size) + val connections = connections + awaitResult { connections.put(id, true, it) } + log.info("Connection added: now {0} connections to cluster", fetchConnectionCount()) + + webSocket.closeCoroutineHandler { log.info("Connection closed. ID: {0} IP: {1}", id, clientIP) - connections.remove(id) - log.info("Total connections: {0}", connections.size) + localConnections.remove(id) + log.info("Total local connections: {0}", localConnections.size) + awaitResult { connections.remove(id, it) } + log.info("Connection removed: now {0} connections to cluster", fetchConnectionCount()) } - webSocket.exceptionHandler { e -> + webSocket.exceptionCoroutineHandler { e -> log.warn("Connection error. ID: {0} IP: {1}", e, id, clientIP) - connections.remove(id) - log.info("Total connections: {0}", connections.size) + localConnections.remove(id) + log.info("Total local connections: {0}", localConnections.size) + awaitResult { connections.remove(id, it) } + log.info("Broken connection removed: now {0} connections to cluster", fetchConnectionCount()) } } - private fun getClientIP(webSocket: ServerWebSocket): String { - var clientIP: String? = webSocket.headers().get("X-Client-Ip") - if (clientIP == null) clientIP = webSocket.headers().get("X-Forwarded-For") - if (clientIP == null) clientIP = webSocket.remoteAddress().host()!! - return clientIP + private fun pingConnection(connection: String?) { + // this is probably the correct way (ping frame triggers pong, closes websocket if no data received before idleTimeout in TCPSSLOptions): + // WebSocketFrameImpl frame = new WebSocketFrameImpl(FrameType.PING, io.netty.buffer.Unpooled.copyLong(System.currentTimeMillis())); + // webSocket.writeFrame(frame); + + val obj = JsonObject() + obj.put(TYPE, PING) + obj.put(PING_ID, System.currentTimeMillis().toString()) + eventBus.send(connection, obj.encode()) } - private fun encodeWebhook(req: HttpServerRequest, buffer: Buffer): String { - val msg = JsonObject() - msg.put(TYPE, WEBHOOK) - msg.put(PATH, req.path()) - msg.put(QUERY, req.query()) - val headers = CaseInsensitiveHeaders().addAll(req.headers()) - msg.put(HOST, headers.get("Host")) - headers.remove("Host") - // headers.remove("Content-Length"); - // serialise MultiMap - msg.put(HEADERS, multiMapToJson(headers)) - - if (treatAsUTF8(headers.get(HttpHeaders.CONTENT_TYPE))) { - // toString will blow up if not valid UTF-8 - msg.put(BUFFER_TEXT, buffer.toString()) - } else { - msg.put(BUFFER, buffer.bytes) + /* + * Extension methods to simplify coroutine usage for various WebSocket handlers + */ + private fun WebSocketBase.coroutineHandler(fn : suspend (Buffer) -> Unit) { + handler { buffer -> + launch(vertx.dispatcher()) { + fn(buffer) + } } - return msg.encode() } - internal fun treatAsUTF8(contentType: String?): Boolean { - if (contentType == null) return false // equiv. to application/octet-stream - val contentTypeSplit = contentType.toLowerCase().split("; *".toRegex()).dropLastWhile { it.isEmpty() }.toTypedArray() - // use the explicit charset if available: - contentTypeSplit - .filter { it.matches("charset=(utf-?8|ascii)".toRegex()) } - .forEach { return true } - // otherwise we infer charset based on the content type: - when (contentType) { - // JSON only allows Unicode. - "application/json", - // XML defaults to Unicode. - // An XML doc could specify another (non-Unicode) charset internally, but we don't support this. - "application/xml", - // Defaults to ASCII: - "text/xml" -> return true - // If in doubt, treat as non-Unicode (or binary) - else -> return false + private fun WebSocketBase.closeCoroutineHandler(fn : suspend () -> Unit) { + closeHandler { + launch(vertx.dispatcher()) { + fn() + } } } - companion object { - private val APP_NAME = ProxyHookServer::class.java.name - private val log = LoggerFactory.getLogger(ProxyHookServer::class.java) - - // HTTP status codes - private val HTTP_OK = 200 - // private static final int HTTP_NO_CONTENT = 204; - // private static final int HTTP_INTERNAL_SERVER_ERROR = 500; - // private static final int HTTP_NOT_IMPLEMENTED = 501; - // private static final int HTTP_BAD_GATEWAY = 502; - private val HTTP_SERVICE_UNAVAILABLE = 503 - // private static final int HTTP_GATEWAY_TIMEOUT = 504; - - internal fun describe(size: Int): String { - if (size == 1) { - return "1 listener" - } else { - return "" + size + " listeners" + private fun WebSocketBase.exceptionCoroutineHandler(fn : suspend (Throwable) -> Unit) { + exceptionHandler { throwable -> + launch(vertx.dispatcher()) { + fn(throwable) } } + } - @JvmStatic fun main(args: Array) { - Vertx.vertx().deployVerticle(ProxyHookServer(port = null), { result -> - result.otherwise { e -> - exit(e) - } - }) - } +} + +private val APP_NAME = ProxyHookServer::class.java.name + +// HTTP status codes +private val HTTP_OK = 200 +//private val HTTP_NO_CONTENT = 204 +//private val HTTP_INTERNAL_SERVER_ERROR = 500 +//private val HTTP_NOT_IMPLEMENTED = 501 +//private val HTTP_BAD_GATEWAY = 502 +private val HTTP_SERVICE_UNAVAILABLE = 503 +//private val HTTP_GATEWAY_TIMEOUT = 504 + +private val log = LoggerFactory.getLogger(ProxyHookServer::class.java) + +private val localHostName: String by lazy { + try { + InetAddress.getLocalHost().hostName + } catch (e: UnknownHostException) { + log.warn("Unable to find hostname", e) + "localhost" } +} + +// visible for testing +internal fun describe(size: Int): String = describe(size.toLong()) + +// visible for testing +internal fun describe(size: Long): String = if (size == 1L) { + "1 listener" +} else { + "$size listeners" +} +private fun getClientIP(webSocket: ServerWebSocket): String { + var clientIP: String? = webSocket.headers().get("X-Client-Ip") + if (clientIP == null) clientIP = webSocket.headers().get("X-Forwarded-For") + if (clientIP == null) clientIP = webSocket.remoteAddress().host()!! + return clientIP +} + +private fun encodeWebhook(req: HttpServerRequest, buffer: Buffer): String { + val msg = JsonObject() + msg.put(TYPE, WEBHOOK) + msg.put(PATH, req.path()) + msg.put(QUERY, req.query()) + val headers = CaseInsensitiveHeaders().addAll(req.headers()) + msg.put(HOST, headers.get("Host")) + headers.remove("Host") + // headers.remove("Content-Length"); + // serialise MultiMap + msg.put(HEADERS, multiMapToJson(headers)) + + if (treatAsUTF8(headers.get(HttpHeaders.CONTENT_TYPE))) { + // toString will blow up if not valid UTF-8 + msg.put(BUFFER_TEXT, buffer.toString()) + } else { + msg.put(BUFFER, buffer.bytes) + } + return msg.encode() +} + +// visible for testing +internal fun treatAsUTF8(contentType: String?): Boolean { + if (contentType == null) return false // equiv. to application/octet-stream + val contentTypeSplit = contentType.toLowerCase().split("; *".toRegex()).dropLastWhile { it.isEmpty() }.toTypedArray() + // use the explicit charset if available: + contentTypeSplit + .filter { it.matches("charset=(utf-?8|ascii)".toRegex()) } + .forEach { return true } + // otherwise we infer charset based on the content type: + return when (contentType) { + // JSON only allows Unicode. + "application/json", + // XML defaults to Unicode. + // An XML doc could specify another (non-Unicode) charset internally, but we don't support this. + "application/xml", + // Defaults to ASCII: + "text/xml" -> true + // If in doubt, treat as non-Unicode (or binary) + else -> false + } +} + +/* + * An extension method for simplifying coroutines usage with Vert.x Web routers + */ +@Suppress("Detekt.TooGenericExceptionCaught") +private fun Route.coroutineHandler(fn : suspend (RoutingContext) -> Unit) { + handler { ctx -> + launch(ctx.vertx().dispatcher()) { + try { + fn(ctx) + } catch(e: Exception) { + ctx.fail(e) + } + } + } } diff --git a/server/src/test/java/org/zanata/proxyhook/server/ProxyHookServerTest.kt b/server/src/test/java/org/zanata/proxyhook/server/ProxyHookServerTest.kt index 7b2b2b1..21b473f 100644 --- a/server/src/test/java/org/zanata/proxyhook/server/ProxyHookServerTest.kt +++ b/server/src/test/java/org/zanata/proxyhook/server/ProxyHookServerTest.kt @@ -17,29 +17,29 @@ class ProxyHookServerTest { @Test fun describe0() { - val desc = ProxyHookServer.describe(0) + val desc = describe(0) assertThat(desc).isEqualTo("0 listeners") } @Test fun describe1() { - val desc = ProxyHookServer.describe(1) + val desc = describe(1) assertThat(desc).isEqualTo("1 listener") } @Test fun describe2() { - val desc = ProxyHookServer.describe(2) + val desc = describe(2) assertThat(desc).isEqualTo("2 listeners") } @Test fun testTreatAsUtf8() { - assertThat(proxyHookServer.treatAsUTF8("application/json")).isEqualTo(true) - assertThat(proxyHookServer.treatAsUTF8("application/xml; charset=utf8")).isEqualTo(true) - assertThat(proxyHookServer.treatAsUTF8("application/xml; charset=utf-8")).isEqualTo(true) - assertThat(proxyHookServer.treatAsUTF8("application/xml; charset=ASCII")).isEqualTo(true) - assertThat(proxyHookServer.treatAsUTF8("application/xml; charset=iso8859-1")).isEqualTo(false) + assertThat(treatAsUTF8("application/json")).isEqualTo(true) + assertThat(treatAsUTF8("application/xml; charset=utf8")).isEqualTo(true) + assertThat(treatAsUTF8("application/xml; charset=utf-8")).isEqualTo(true) + assertThat(treatAsUTF8("application/xml; charset=ASCII")).isEqualTo(true) + assertThat(treatAsUTF8("application/xml; charset=iso8859-1")).isEqualTo(false) } }