-
Notifications
You must be signed in to change notification settings - Fork 4
Open
Description
It would be useful to receive query results as an async pull sequence.
My attempt:
#r "nuget: FSharp.Control.TaskSeq, 0.4.0"
#r "nuget: Fumble, 0.8.3"
open FSharp.Control
open Fumble
[<RequireQualifiedAccess>]
module Sqlite =
open System.Threading.Channels
open System.Threading.Tasks
let streamAsync (read : SqliteRowReader -> 'a) (db : Sqlite.SqlProps) : TaskSeq<'a> =
taskSeq {
let channel =
let opts = BoundedChannelOptions(1024)
opts.SingleWriter <- true
opts.SingleReader <- true
Channel.CreateBounded(opts)
let fillChannel =
let writer = channel.Writer
Task.Run<unit>(
fun () ->
task {
try
let! outcome =
db
|> Sqlite.iterAsync
(fun r ->
let a = read r
if not (writer.TryWrite(a)) then
writer.WriteAsync(a).AsTask()
|> Async.AwaitTask
|> Async.RunSynchronously) // Is there a better way?
match outcome with
| Ok () ->
writer.Complete()
| Error exn ->
writer.Complete(exn)
with exn ->
writer.Complete(exn)
return raise exn
})
yield! channel.Reader.ReadAllAsync()
do! fillChannel
}Is this a better way?
Perhaps this should be added?
Metadata
Metadata
Assignees
Labels
No labels