Processing a downloaded text file while it's downloading
I've had a job where I had to download some huge text files and I thought it interesting to process them while they are downloading, instead of waiting until the download was finished. (Time was very important.) Ultimately, the client changed his mind and wanted the whole thing downloaded first but I thought this was an interesting code fragment to save for later.
When it comes to "do real-time stuff" I use the Rx-Main NuGet package. The interface is simple:
public interface Downloader { /// <summary> /// Downloads a file from the given URL and returns it line by line. /// </summary> /// <param name="url">The URL of the file to be downloaded.</param> /// <returns>The lines from the downloaded file, as a stream.</returns> IObservable<string> Download(string url); }
The only noteworthy thing about the implementation is the automatic decompression; using that property sends the appropriate headers to the server (do not send them separately, they will be duplicated and some servers don't handle that well):
public class WebDownloader : Downloader { public IObservable<string> Download(string url) { return Observable.Create<string>(o => { var req = (HttpWebRequest) WebRequest.Create(url); req.Method = WebRequestMethods.Http.Get; req.AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate; using (var res = req.GetResponse()) using (var stream = res.GetResponseStream()) using (var reader = new StreamReader(stream)) { string line; while ((line = reader.ReadLine()) != null) o.OnNext(line); o.OnCompleted(); } return Disposable.Empty; }); } }
That's pretty much all there is to it. Using this class is also simple:
downloader .Download(url) .ObserveOn(Scheduler.Default) .Skip(1) .Select(lineProcessor.Process) .Where(it => it != null) ...
I am using ObserveOn
to let processing be done on another thread, concurrently with the download, Skip(1)
to skip the header line and the Where
clause to skip any processing errors.
Comments