Monday, January 11, 2016

Processing a downloaded text file while it's downloading

I've had a job where I had to download some huge text files and I thought it interesting to process them while they are downloading, instead of waiting until the download was finished. (Time was very important.) Ultimately, the client changed his mind and wanted the whole thing downloaded first but I thought this was an interesting code fragment to save for later.

When it comes to "do real-time stuff" I use the Rx-Main NuGet package. The interface is simple:

  public interface Downloader
  {
    /// <summary>
    ///   Downloads a file from the given URL and returns it line by line.
    /// </summary>
    /// <param name="url">The URL of the file to be downloaded.</param>
    /// <returns>The lines from the downloaded file, as a stream.</returns>
    IObservable<string> Download(string url);
  }

The only noteworthy thing about the implementation is the automatic decompression; using that property sends the appropriate headers to the server (do not send them separately, they will be duplicated and some servers don't handle that well):

  public class WebDownloader : Downloader
  {
    public IObservable<string> Download(string url)
    {
      return Observable.Create<string>(o =>
      {
        var req = (HttpWebRequest) WebRequest.Create(url);
        req.Method = WebRequestMethods.Http.Get;
        req.AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate;

        using (var res = req.GetResponse())
        using (var stream = res.GetResponseStream())
        using (var reader = new StreamReader(stream))
        {
          string line;
          while ((line = reader.ReadLine()) != null)
            o.OnNext(line);

          o.OnCompleted();
        }

        return Disposable.Empty;
      });
    }
  }

That's pretty much all there is to it. Using this class is also simple:

  downloader
    .Download(url)
    .ObserveOn(Scheduler.Default)
    .Skip(1)
    .Select(lineProcessor.Process)
    .Where(it => it != null)
    ...

I am using ObserveOn to let processing be done on another thread, concurrently with the download, Skip(1) to skip the header line and the Where clause to skip any processing errors.

Saturday, November 14, 2015

A data flow helper class

One problem I encounter when processing lists is exception handling. I prefer to write code that "chains" calls transforming the data:

  var results = list
    .Select(DoThing1)
    .Select(DoThing2)
    // ...
    .Select(DoThingN)
    .ToList();

The problem with something like this is that, if any of the calls throws an exception, processing stops for the whole list. Handling that requires that I move the "chain" to a new method and handle exceptions there:

  var results = list.Select(InnerMethod).ToList();

  // ...
  private ResultN InnerMethod(Input input)
  {
    try
    {
      var r1 = DoThing1(input);
      var r2 = DoThing2(r1);
      // ...
      var rn = DoThingN(rn_1);
      
      return rn;
    }
    catch(Exception ex)
    {
      // do something with ex, like logging
      return ?? // can't throw, I want to continue processing the rest of the list
    }
  }

Now I have two problems :) One is that the code just looks uglier, so maybe most people can ignore that. (I have OCD with regards to this - code that "looks bad" drives me nuts.) The more important issue is that I need to decide on an "empty" ResultN value to return from the inner method and then I need to be able to filter those out of the overall results. That can get ugly really quickly.

By analogy with what I read about other languages (I think GO uses this approach - I've never studied the language but I believe I first encountered the idea in some articles about it), I decided to write an "either a good value or an exception" helper class. On further reflection I changed that to a struct because I don't want to check that the value is not null. Once I thought of that I also decided that null is not a "good value", so any attempt to pass it as such will result in an ArgumentNullException instead in the "or an exception" part. I hope things will become clearer from the code:

  public struct Result<T>
  {
    public bool HasValue { get; }

    public T Value
    {
      get
      {
        if (!HasValue)
          throw Exception;

        return value;
      }
    }

    public Exception Exception => HasValue ? null : exception ?? NULL_EXCEPTION;

    public Result(T value)
    {
      // do not accept null
      if (value == null)
      {
        HasValue = false;
        this.value = default(T);
        exception = new ArgumentNullException();
      }
      else
      {
        HasValue = true;
        this.value = value;
        exception = null;
      }
    }

    public Result(Exception ex)
    {
      HasValue = false;
      value = default(T);
      exception = ex;
    }

    //

    // ReSharper disable once StaticMemberInGenericType
    private static readonly Exception NULL_EXCEPTION = new ArgumentNullException();

    private readonly T value;
    private readonly Exception exception;
  }

I also added an Apply extension method - the reason it's an extension method instead of a method in the original struct is because of the additional generic type TR; it just looked wrong there. (I did mention my OCD, right?)

  public static class ResultExtensions
  {
    public static Result<TR> Apply<T, TR>(this Result<T> it, Func<T, TR> selector)
    {
      return it.HasValue
        ? Try(() => selector(it.Value))
        : new Result<TR>(it.Exception);
    }

    public static IEnumerable<Result<TR>> Select<T, TR>(this IEnumerable<Result<T>> list, Func<T, TR> selector)
    {
      return list.Select(it => it.Apply(selector));
    }

    //

    private static Result<TR> Try<TR>(Func<TR> func)
    {
      try
      {
        return new Result<TR>(func());
      }
      catch (Exception ex)
      {
        return new Result<TR>(ex);
      }
    }
  }

While I didn't write this in a TDD fashion, I have added some asserts to a console application to make sure I got back the expected results:

  static class Program
  {
    static void Main()
    {
      var r1 = Divide(5, 2);
      Debug.Assert(Print(r1) == "HasValue = True Value = 2 Exception = ");
      var r2 = Divide(5, 0);
      Debug.Assert(Print(r2) == "HasValue = False Value = (invalid) Exception = Attempted to divide by zero.");
      var r3 = new Result<int?>(3);
      Debug.Assert(Print(r3) == "HasValue = True Value = 3 Exception = ");
      var r4 = new Result<int?>((int?) null);
      Debug.Assert(Print(r4) == "HasValue = False Value = (invalid) Exception = Value cannot be null.");
      var r5 = new Result<object>(null);
      Debug.Assert(Print(r4) == "HasValue = False Value = (invalid) Exception = Value cannot be null.");

      // using the default constructor
      var r6 = new Result<int>();
      Debug.Assert(Print(r6) == "HasValue = False Value = (invalid) Exception = Value cannot be null.");

      // trying to access the Value property without checking first will result in an exception
      try
      {
        Console.WriteLine(r5.Value);
      }
      catch
      {
        Console.WriteLine("Oops.");
      }

      // we can now chain selectors

      // case 1: all good
      var i1 = new Result<int?>(5);
      var ri1 = i1
        .Apply(it => 100 / it)
        .Apply(it => 200 / it)
        .Apply(it => 10 / it);
      Debug.Assert(Print(ri1) == "HasValue = True Value = 1 Exception = ");

      // case 2: something bad happens
      var i2 = new Result<int>(200);
      var ri2 = i2
        .Apply(it => 100 / it)
        .Apply(it => 200 / it)
        .Apply(it => 10 / it);
      Debug.Assert(Print(ri2) == "HasValue = False Value = (invalid) Exception = Attempted to divide by zero.");

      // finally, the target use case: processing a list without aborting due to exceptions
      var list1 = new List<int> { 10, 20, 0, 30, 40 };
      var list2 = list1
        .Select(it => new Result<int>(it))
        .Select(it => it / 2)
        .Select(it => 10 / it)
        .Select(it => 100 / it)
        .ToList();
      var good = list2.Where(it => it.HasValue).ToList();
      var bad = list2.Where(it => !it.HasValue).ToList();

      Debug.Assert(good.Count == 2);
      Debug.Assert(bad.Count == 3);
    }

    private static Result<int> Divide(int a, int b)
    {
      try
      {
        return new Result<int>(a / b);
      }
      catch (Exception ex)
      {
        return new Result<int>(ex);
      }
    }

    private static string Print<T>(Result<T> r)
    {
      return $"HasValue = {r.HasValue} Value = {(r.HasValue ? r.Value + "" : "(invalid)")} Exception = {r.Exception?.Message}";
    }
  }

Note that the addition of the Select extension method, I didn't have to write the last example as

      var list2 = list1
        .Select(it => new Result<int>(it))
        .Select(it => it.Apply(x => x / 2))
        .Select(it => it.Apply(x => 10 / x))
        .Select(it => it.Apply(x => 100 / x))
        .ToList();

Avoiding boilerplate code is good; so is the fact that the inner lambda doesn't have to know anything about the Result<T> type and yet, any crash in it doesn't abort processing the entire list.

Tuesday, October 20, 2015

Retry algorithm

Retry with exponential back-off

I think this is an useful class so I'm just going to leave it here. (I'm annoyed by the duplication between Retry and RetryAsync but I haven't been able to remove it.)

  public interface RetryPolicy
  {
    T Retry<T>(Func<T> func);
    void Retry(Action action);

    Task<T> RetryAsync<T>(Func<Task<T>> func);
    Task RetryAsync(Func<Task> action);
  }

  public class RetryPolicyWithExponentialDelay : RetryPolicy
  {
    // ReSharper disable once InconsistentNaming
    public Func<double> GetRandom = () => RND.NextDouble();

    // ReSharper disable once InconsistentNaming
    public Action<int> Sleep = timeout => Thread.Sleep(timeout);

    public RetryPolicyWithExponentialDelay(int maxCount, TimeSpan initialDelay, TimeSpan maxDelay)
    {
      this.maxCount = maxCount;
      this.initialDelay = initialDelay;
      this.maxDelay = maxDelay;
    }

    public T Retry<T>(Func<T> func)
    {
      var count = 0;
      var delay = initialDelay;

      while (true)
      {
        try
        {
          return func();
        }
        catch
        {
          count++;
          if (count >= maxCount)
            throw;

          SleepUpTo(delay);
          delay = IncreaseDelay(delay);
        }
      }
    }

    public void Retry(Action action)
    {
      Retry(() =>
      {
        action();
        return 0;
      });
    }

    public async Task<T> RetryAsync<T>(Func<Task<T>> func)
    {
      var count = 0;
      var delay = initialDelay;

      while (true)
      {
        try
        {
          return await func();
        }
        catch
        {
          count++;
          if (count >= maxCount)
            throw;

          SleepUpTo(delay);
          delay = IncreaseDelay(delay);
        }
      }
    }

    public async Task RetryAsync(Func<Task> action)
    {
      await RetryAsync(async () =>
      {
        await action();
        return 0;
      });
    }

    //

    private static readonly Random RND = new Random();

    private readonly int maxCount;
    private readonly TimeSpan initialDelay;
    private readonly TimeSpan maxDelay;

    private void SleepUpTo(TimeSpan delay)
    {
      var actualDelay = (int) Math.Truncate(GetRandom() * delay.TotalMilliseconds);
      Sleep(actualDelay);
    }

    private TimeSpan IncreaseDelay(TimeSpan delay)
    {
      delay = delay.Add(delay);
      if (delay > maxDelay)
        delay = maxDelay;

      return delay;
    }
  }

I'm also adding the tests here:

  [TestClass]
  public class RetryPolicyWithExponentialDelayTests
  {
    private const int RESULT = 100;

    private RetryPolicyWithExponentialDelay sut;

    private int called;
    private int sleepTime;

    [TestInitialize]
    public void SetUp()
    {
      sut = new RetryPolicyWithExponentialDelay(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5))
      {
        GetRandom = () => 1.0,
        Sleep = timeout => sleepTime += timeout,
      };

      called = 0;
      sleepTime = 0;
    }

    [TestClass]
    public class Sync : RetryPolicyWithExponentialDelayTests
    {
      private Action action;
      private Func<int> func;

      [TestInitialize]
      public void InnerSetup()
      {
        action = null;
        func = () =>
        {
          action();
          return RESULT;
        };
      }

      [TestClass]
      public class SyncFunc : Sync
      {
        [TestMethod]
        public void NoErrors()
        {
          action = GetActionWithErrors(0);

          var result = sut.Retry(func);

          Assert.AreEqual(1, called, "Function was not called");
          Assert.AreEqual(RESULT, result, "Invalid result");
          Assert.AreEqual(0, sleepTime, "Should not have slept");
        }

        [TestMethod]
        public void OneError()
        {
          action = GetActionWithErrors(1);

          var result = sut.Retry(func);

          Assert.AreEqual(2, called, "The call was not retried");
          Assert.AreEqual(RESULT, result, "Invalid result");
          Assert.AreEqual(TimeSpan.FromSeconds(1).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount");
        }

        [TestMethod]
        public void TwoErrors()
        {
          action = GetActionWithErrors(2);

          var result = sut.Retry(func);

          Assert.AreEqual(3, called, "The call was not retried twice");
          Assert.AreEqual(RESULT, result, "Invalid result");
          Assert.AreEqual(TimeSpan.FromSeconds(1 + 2).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount");
        }

        [TestMethod]
        public void FourErrors()
        {
          action = GetActionWithErrors(4);

          var result = sut.Retry(func);

          Assert.AreEqual(5, called, "The call was not retried four times");
          Assert.AreEqual(RESULT, result, "Invalid result");
          Assert.AreEqual(TimeSpan.FromSeconds(1 + 2 + 4 + 5).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount (limited by max delay)");
        }

        [TestMethod]
        public void TooManyErrors()
        {
          action = GetActionWithErrors(10);

          try
          {
            sut.Retry(func);
            Assert.Fail("The call did not throw");
          }
          catch
          {
            Assert.AreEqual(5, called, "The call was not tried five times");
            Assert.AreEqual(TimeSpan.FromSeconds(1 + 2 + 4 + 5).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount (limited by max delay)");
          }
        }
      }

      [TestClass]
      public class SyncAction : Sync
      {
        [TestMethod]
        public void NoErrors()
        {
          action = GetActionWithErrors(0);

          sut.Retry(action);

          Assert.AreEqual(1, called, "Function was not called");
          Assert.AreEqual(0, sleepTime, "Should not have slept");
        }

        [TestMethod]
        public void OneError()
        {
          action = GetActionWithErrors(1);

          sut.Retry(action);

          Assert.AreEqual(2, called, "The call was not retried");
          Assert.AreEqual(TimeSpan.FromSeconds(1).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount");
        }

        [TestMethod]
        public void TwoErrors()
        {
          action = GetActionWithErrors(2);

          sut.Retry(action);

          Assert.AreEqual(3, called, "The call was not retried twice");
          Assert.AreEqual(TimeSpan.FromSeconds(1 + 2).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount");
        }

        [TestMethod]
        public void FourErrors()
        {
          action = GetActionWithErrors(4);

          sut.Retry(action);

          Assert.AreEqual(5, called, "The call was not retried four times");
          Assert.AreEqual(TimeSpan.FromSeconds(1 + 2 + 4 + 5).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount (limited by max delay)");
        }

        [TestMethod]
        public void TooManyErrors()
        {
          action = GetActionWithErrors(10);

          try
          {
            sut.Retry(action);
            Assert.Fail("The call did not throw");
          }
          catch
          {
            Assert.AreEqual(5, called, "The call was not tried five times");
            Assert.AreEqual(TimeSpan.FromSeconds(1 + 2 + 4 + 5).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount (limited by max delay)");
          }
        }
      }
    }

    [TestClass]
    public class Async : RetryPolicyWithExponentialDelayTests
    {
      private Action action;
      private Func<Task<int>> func;

      [TestInitialize]
      public void InnerSetup()
      {
        action = null;
        func = async () =>
        {
          await Task.Run(action);
          return RESULT;
        };
      }

      [TestClass]
      public class AsyncFunc : Async
      {
        [TestMethod]
        public async Task NoErrorsAsync()
        {
          action = GetActionWithErrors(0);

          var result = await sut.RetryAsync(func);

          Assert.AreEqual(1, called, "Function was not called");
          Assert.AreEqual(RESULT, result, "Invalid result");
          Assert.AreEqual(0, sleepTime, "Should not have slept");
        }

        [TestMethod]
        public async Task OneErrorAsync()
        {
          action = GetActionWithErrors(1);

          var result = await sut.RetryAsync(func);

          Assert.AreEqual(2, called, "The call was not retried");
          Assert.AreEqual(RESULT, result, "Invalid result");
          Assert.AreEqual(TimeSpan.FromSeconds(1).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount");
        }

        [TestMethod]
        public async Task TwoErrorsAsync()
        {
          action = GetActionWithErrors(2);

          var result = await sut.RetryAsync(func);

          Assert.AreEqual(3, called, "The call was not retried twice");
          Assert.AreEqual(RESULT, result, "Invalid result");
          Assert.AreEqual(TimeSpan.FromSeconds(1 + 2).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount");
        }

        [TestMethod]
        public async Task FourErrorsAsync()
        {
          action = GetActionWithErrors(4);

          var result = await sut.RetryAsync(func);

          Assert.AreEqual(5, called, "The call was not retried four times");
          Assert.AreEqual(RESULT, result, "Invalid result");
          Assert.AreEqual(TimeSpan.FromSeconds(1 + 2 + 4 + 5).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount (limited by max delay)");
        }

        [TestMethod]
        public async Task TooManyErrorsAsync()
        {
          action = GetActionWithErrors(10);

          try
          {
            await sut.RetryAsync(func);
            Assert.Fail("The call did not throw");
          }
          catch
          {
            Assert.AreEqual(5, called, "The call was not tried five times");
            Assert.AreEqual(TimeSpan.FromSeconds(1 + 2 + 4 + 5).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount (limited by max delay)");
          }
        }
      }

      [TestClass]
      public class AsyncAction : Async
      {
        [TestMethod]
        public async Task NoErrorsAsync()
        {
          action = GetActionWithErrors(0);

          await sut.RetryAsync(() => Task.Run(action));

          Assert.AreEqual(1, called, "Function was not called");
          Assert.AreEqual(0, sleepTime, "Should not have slept");
        }

        [TestMethod]
        public async Task OneErrorAsync()
        {
          action = GetActionWithErrors(1);

          await sut.RetryAsync(() => Task.Run(action));

          Assert.AreEqual(2, called, "The call was not retried");
          Assert.AreEqual(TimeSpan.FromSeconds(1).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount");
        }

        [TestMethod]
        public async Task TwoErrorsAsync()
        {
          action = GetActionWithErrors(2);

          await sut.RetryAsync(() => Task.Run(action));

          Assert.AreEqual(3, called, "The call was not retried twice");
          Assert.AreEqual(TimeSpan.FromSeconds(1 + 2).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount");
        }

        [TestMethod]
        public async Task FourErrorsAsync()
        {
          action = GetActionWithErrors(4);

          await sut.RetryAsync(() => Task.Run(action));

          Assert.AreEqual(5, called, "The call was not retried four times");
          Assert.AreEqual(TimeSpan.FromSeconds(1 + 2 + 4 + 5).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount (limited by max delay)");
        }

        [TestMethod]
        public async Task TooManyErrorsAsync()
        {
          action = GetActionWithErrors(10);

          try
          {
            await sut.RetryAsync(() => Task.Run(action));
            Assert.Fail("The call did not throw");
          }
          catch
          {
            Assert.AreEqual(5, called, "The call was not tried five times");
            Assert.AreEqual(TimeSpan.FromSeconds(1 + 2 + 4 + 5).TotalMilliseconds, sleepTime, 1, "Did not sleep the correct amount (limited by max delay)");
          }
        }
      }
    }

    //

    private Action GetActionWithErrors(int errorCount)
    {
      return () =>
      {
        called++;
        if (called <= errorCount)
          throw new Exception();
      };
    }
  }