From fde9b912cbc6e35f5882d1949e0a2247d448221b Mon Sep 17 00:00:00 2001 From: manuc66 Date: Sat, 14 Nov 2015 14:20:29 +0100 Subject: [PATCH] It did not automatically reconnect when server closed the connection. ConnectedState was stuck on read task's result. --- EventSource4Net.Test/EventSourceTest.cs | 47 +++++++++++++++++++ .../WebRequesterFactoryTest.cs | 45 +++++++++++++++--- EventSource4Net/ConnectedState.cs | 2 +- 3 files changed, 86 insertions(+), 8 deletions(-) diff --git a/EventSource4Net.Test/EventSourceTest.cs b/EventSource4Net.Test/EventSourceTest.cs index 43cd90e..5c89ebf 100644 --- a/EventSource4Net.Test/EventSourceTest.cs +++ b/EventSource4Net.Test/EventSourceTest.cs @@ -81,5 +81,52 @@ public void TestSuccesfulConnection() } + [TestMethod] + public void TestReConnectionAfterConnectionLost() + { + // setup + Uri url = new Uri("http://test.com"); + CancellationTokenSource cts = new CancellationTokenSource(); + List states = new List(); + ServiceResponseMock serviceResponseMock = new ServiceResponseMock(url, System.Net.HttpStatusCode.OK); + WebRequesterFactoryMock factory = new WebRequesterFactoryMock(serviceResponseMock); + ManualResetEvent stateIsOpen = new ManualResetEvent(false); + ManualResetEvent stateIsClosed = new ManualResetEvent(false); + + TestableEventSource es = new TestableEventSource(url, factory); + es.StateChanged += (o, e) => + { + states.Add(e.State); + if (e.State == EventSourceState.OPEN) + { + stateIsClosed.Reset(); + stateIsOpen.Set(); + } + else if (e.State == EventSourceState.CLOSED) + { + stateIsOpen.Reset(); + stateIsClosed.Set(); + } + }; + + + // act + stateIsOpen.Reset(); + + es.Start(cts.Token); + + stateIsOpen.WaitOne(); + states.Clear(); + + serviceResponseMock.DistantConnectionClose(); + + stateIsClosed.WaitOrThrow(); + stateIsOpen.WaitOrThrow(); + + // assert + Assert.AreEqual(states[0], EventSourceState.CLOSED); + Assert.AreEqual(states[1], EventSourceState.CONNECTING); + Assert.AreEqual(states[2], EventSourceState.OPEN); + } } } diff --git a/EventSource4Net.Test/WebRequesterFactoryTest.cs b/EventSource4Net.Test/WebRequesterFactoryTest.cs index ab61e7e..cbb3332 100644 --- a/EventSource4Net.Test/WebRequesterFactoryTest.cs +++ b/EventSource4Net.Test/WebRequesterFactoryTest.cs @@ -3,6 +3,7 @@ using System.IO; using System.Linq; using System.Net; +using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -18,7 +19,7 @@ public WebRequesterMock WebRequesterMock } public WebRequesterFactoryMock(ServiceResponseMock response) { - this.WebRequesterMock = new WebRequesterMock(response); + this.WebRequesterMock = new WebRequesterMock(response); } public IWebRequester Create() { @@ -48,7 +49,7 @@ public System.Threading.Tasks.Task Get(Uri url) class ServiceResponseMock : IServerResponse { - private Stream mStream; + private TestableStream mStream; private StreamWriter mStreamWriter; private Uri mUrl; private HttpStatusCode mStatusCode; @@ -87,6 +88,11 @@ public void WriteTestTextToStream(string text) mStreamWriter.Write(text); mStreamWriter.Flush(); } + + public void DistantConnectionClose() + { + mStream.Throws(new SocketException(10054)); + } } class GetIsCalledEventArgs : EventArgs @@ -103,6 +109,13 @@ class TestableStream : Stream { long _pos = 0; System.Collections.Concurrent.BlockingCollection _texts = new System.Collections.Concurrent.BlockingCollection(); + private CancellationTokenSource _cancellationTokenSource; + private Exception _throw; + + public TestableStream() + { + _cancellationTokenSource = new CancellationTokenSource(); + } public override bool CanRead { @@ -143,11 +156,23 @@ public override long Position public override int Read(byte[] buffer, int offset, int count) { - string s = _texts.Take(); - - byte[] encodedText = Encoding.UTF8.GetBytes(s); - encodedText.CopyTo(buffer, offset); - return encodedText.Length; + try + { + var s = _texts.Take(_cancellationTokenSource.Token); + byte[] encodedText = Encoding.UTF8.GetBytes(s); + encodedText.CopyTo(buffer, offset); + return encodedText.Length; + } + catch (OperationCanceledException) + { + if (_throw != null) + { + var ex = _throw; + _throw = null; + throw ex; + } + return 0; + } } public override long Seek(long offset, SeekOrigin origin) @@ -166,6 +191,12 @@ public override void Write(byte[] buffer, int offset, int count) _texts.Add(s); //_texts.CompleteAdding(); } + + public void Throws(Exception exception) + { + _cancellationTokenSource.Cancel(); + _throw = exception; + } } diff --git a/EventSource4Net/ConnectedState.cs b/EventSource4Net/ConnectedState.cs index 7669993..6c69b26 100644 --- a/EventSource4Net/ConnectedState.cs +++ b/EventSource4Net/ConnectedState.cs @@ -47,7 +47,7 @@ public Task Run(Action msgReceived, Cancellat { _logger.Trace(ex, "ConnectedState.Run"); } - if (!cancelToken.IsCancellationRequested) + if (!cancelToken.IsCancellationRequested && !taskRead.IsFaulted) { int bytesRead = taskRead.Result; if (bytesRead > 0) // stream has not reached the end yet