F# Implementation of Scala ZIO
29 Mar 2019PLEASE NOTE SOURCE CODE IS NO LONGER AVAILABLE AND THE POST IS HERE JUST FOR INFORMATION
This is a prototype implementation of Scala ZIO in F#. It aims to be a skeleton of ZIO features such that additional functions can be easily fleshed out.
Background
I recently went to a talk on Scala ZIO by John De Goes. ZIO is a type-safe, composable library for asynchronous and concurrent programming in Scala.
It takes a different approach to other Scala effects libraries in that it does not require the use of Higher-Kinded Types. Instead it uses a reader monad to provide access to IO effects (called ZIO Environment).
I came away wanting something similar in F#. A useful library that could be used in the outer IO layer to simplify and test IO dependency code. I started to play with some reader code but didn't think it would ultimately work out. In fact, it works really well.
IO
\[IO = Reader + Async + Result\]
The F# equivalent of ZIO type aliases are UIO<'r,'a>
which represents effects without errors,
and IO<'r,'a,'e>
which represents effects with a possible error.
IO combines reader, async and result into one unified computation expression.
type UIO<'r,'a> = UIO of ('r * Cancel -> ('a option -> unit) -> unit)
type IO<'r,'a,'e> = IO of ('r * Cancel -> (Result<'a,'e> option -> unit) -> unit)
Reader
The reader part represents all the environment dependencies required in the computation expression. It is fully type-safe with types inferred including any library requirements such as Clock for the timeout. The computation expression can easily be tested by running with a test environment.
Async
At the IO layer thread pool threads need to be used in the most efficient way without any blocking. This usually means Async in F# or async/await in C# need to be used. They both join threads without a thread pool thread having to wait.
let race (UIO run1) (IO run2) : IO<'r,Choice<'a1,'a2>,'e1> =
IO (fun env cont ->
if Cancel.isSet env then cont None
else
let envChild = Cancel.add env
let mutable o = 0
ThreadPool.QueueUserWorkItem (fun _ ->
run1 envChild (fun a ->
if Interlocked.Exchange(&o,1) = 0 then
Cancel.set envChild
if Cancel.isSet env then cont None
else Option.map (Choice2Of2 >> Ok) a |> cont
)
) |> ignore
ThreadPool.QueueUserWorkItem (fun _ ->
run2 envChild (fun a ->
if Interlocked.Exchange(&o,1) = 0 then
Cancel.set envChild
if Cancel.isSet env then cont None
else Option.map (Result.map Choice1Of2) a |> cont
)
) |> ignore
)
With IO async is implemented directly using the thread pool. There are two main reasons for this. In IO exceptions are not part of control flow. Errors are first class and type-safe. Unrecoverable exceptions output the stack trace and exit the process. Cancellation is fully integrated into IO meaning in race, parallel and upon an error, computations are automatically cancelled, saving resources.
These with the final part dramatically simplify and optimise asynchronous IO code.
Result
The result part of IO represents possible errors in an integrated and type-safe way.
The error type is inferred, and different error types are auto lifted into Choice<'a,'b>
when combined.
IO computations can be timed out and retried based on result using simple functions.
Schedule is a powerful construct that can be combined several ways.
I've replicated the structure from ZIO but not fully explored its uses.
let programRetry noRetry =
io {
do! Logger.log "started"
do! Console.writeLine "Please enter your name:"
let! name = Console.readLine()
do! Logger.log ("got name = " + name)
let! thread =
Persistence.persist name
|> IO.timeout 1000
|> IO.retry (Schedule.recurs noRetry)
|> IO.fork
do! Console.writeLine ("Hi "+name)
do! thread
do! Logger.log "finished"
return 0
}
Conclusion
When type inference worked for the dependencies I was surprised. When it was also possible to make it work for the errors I was amazed.
Computation expressions do not compose well. At the IO layer a solution is needed for dependencies in a testable way. The IO layer also needs to efficiently use the thread pool. Making errors type-safe and integrated in the IO logic completes this compelling trinity.
References
IO.fs
IOTests.fs
ZIO Overview
ZIO Data Types
The Death Of Final Tagless
Thanks
@jdegoes for ZIO and a great talk that made me want to do this.
@NinoFloris for useful async discussions.
@keithtpinson for the error auto lift idea.
module Result
from Microsoft.FSharp.Core
--------------------
[<Struct>]
type Result<'T,'TError> =
| Ok of ResultValue: 'T
| Error of ErrorValue: 'TError
module Result
from Microsoft.FSharp.Core
--------------------
type Result<'a,'e> =
| Ok of 'a
| Error of 'e
union case Time.Time: Time
--------------------
type Time = | Time
type Choice<'T1,'T2> =
| Choice1Of2 of 'T1
| Choice2Of2 of 'T2
--------------------
type Choice<'T1,'T2,'T3> =
| Choice1Of3 of 'T1
| Choice2Of3 of 'T2
| Choice3Of3 of 'T3
--------------------
type Choice<'T1,'T2,'T3,'T4> =
| Choice1Of4 of 'T1
| Choice2Of4 of 'T2
| Choice3Of4 of 'T3
| Choice4Of4 of 'T4
--------------------
type Choice<'T1,'T2,'T3,'T4,'T5> =
| Choice1Of5 of 'T1
| Choice2Of5 of 'T2
| Choice3Of5 of 'T3
| Choice4Of5 of 'T4
| Choice5Of5 of 'T5
--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6> =
| Choice1Of6 of 'T1
| Choice2Of6 of 'T2
| Choice3Of6 of 'T3
| Choice4Of6 of 'T4
| Choice5Of6 of 'T5
| Choice6Of6 of 'T6
--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6,'T7> =
| Choice1Of7 of 'T1
| Choice2Of7 of 'T2
| Choice3Of7 of 'T3
| Choice4Of7 of 'T4
| Choice5Of7 of 'T5
| Choice6Of7 of 'T6
| Choice7Of7 of 'T7
union case Cancel.Cancel: bool ref * children: Cancel list ref -> Cancel
--------------------
type Cancel = private | Cancel of bool ref * children: Cancel list ref
val ref : value:'T -> 'T ref
--------------------
type 'T ref = Ref<'T>
module List
from Microsoft.FSharp.Collections
--------------------
type List<'T> =
| ( [] )
| ( :: ) of Head: 'T * Tail: 'T list
interface IReadOnlyList<'T>
interface IReadOnlyCollection<'T>
interface IEnumerable
interface IEnumerable<'T>
member GetSlice : startIndex:int option * endIndex:int option -> 'T list
member Head : 'T
member IsEmpty : bool
member Item : index:int -> 'T with get
member Length : int
member Tail : 'T list
...
union case UIO.UIO: ('r * Cancel -> ('a option -> unit) -> unit) -> UIO<'r,'a>
--------------------
type UIO<'r,'a> =
| UIO of ('r * Cancel -> ('a option -> unit) -> unit)
member Bind : f:('a -> UIO<'r,'b>) -> UIO<'r,'b>
member Bind : f:('a -> IO<'r,'b,'e>) -> IO<'r,'b,'e>
union case Cancel.Cancel: bool ref * children: Cancel list ref -> Cancel
--------------------
module Cancel
from 2019-03-29-io
--------------------
type Cancel = private | Cancel of bool ref * children: Cancel list ref
from Microsoft.FSharp.Core
from Microsoft.FSharp.Core.Operators
type Timer =
inherit MarshalByRefObject
new : callback:TimerCallback -> Timer + 4 overloads
member Change : dueTime:int * period:int -> bool + 3 overloads
member Dispose : unit -> unit + 1 overload
--------------------
Timer(callback: TimerCallback) : Timer
Timer(callback: TimerCallback, state: obj, dueTime: int, period: int) : Timer
Timer(callback: TimerCallback, state: obj, dueTime: System.TimeSpan, period: System.TimeSpan) : Timer
Timer(callback: TimerCallback, state: obj, dueTime: uint32, period: uint32) : Timer
Timer(callback: TimerCallback, state: obj, dueTime: int64, period: int64) : Timer
Timer.Dispose(notifyObject: WaitHandle) : bool
static val InfiniteTimeSpan : TimeSpan
static val Infinite : int
type Async =
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
...
--------------------
type Async<'T> =
static member BindHandle : osHandle:nativeint -> bool + 1 overload
static member GetAvailableThreads : workerThreads:int * completionPortThreads:int -> unit
static member GetMaxThreads : workerThreads:int * completionPortThreads:int -> unit
static member GetMinThreads : workerThreads:int * completionPortThreads:int -> unit
static member QueueUserWorkItem : callBack:WaitCallback -> bool + 1 overload
static member RegisterWaitForSingleObject : waitObject:WaitHandle * callBack:WaitOrTimerCallback * state:obj * millisecondsTimeOutInterval:uint32 * executeOnlyOnce:bool -> RegisteredWaitHandle + 3 overloads
static member SetMaxThreads : workerThreads:int * completionPortThreads:int -> bool
static member SetMinThreads : workerThreads:int * completionPortThreads:int -> bool
static member UnsafeQueueNativeOverlapped : overlapped:NativeOverlapped -> bool
static member UnsafeQueueUserWorkItem : callBack:WaitCallback * state:obj -> bool
...
ThreadPool.QueueUserWorkItem(callBack: WaitCallback, state: obj) : bool
static member Add : location1:int * value:int -> int + 1 overload
static member CompareExchange : location1:int * value:int * comparand:int -> int + 6 overloads
static member Decrement : location:int -> int + 1 overload
static member Exchange : location1:int * value:int -> int + 6 overloads
static member Increment : location:int -> int + 1 overload
static member MemoryBarrier : unit -> unit
static member Read : location:int64 -> int64
Interlocked.CompareExchange(location1: byref<nativeint>, value: nativeint, comparand: nativeint) : nativeint
Interlocked.CompareExchange(location1: byref<obj>, value: obj, comparand: obj) : obj
Interlocked.CompareExchange(location1: byref<float>, value: float, comparand: float) : float
Interlocked.CompareExchange(location1: byref<float32>, value: float32, comparand: float32) : float32
Interlocked.CompareExchange(location1: byref<int64>, value: int64, comparand: int64) : int64
Interlocked.CompareExchange(location1: byref<int>, value: int, comparand: int) : int
interface
abstract member Sleep : int -> UIO<'r,unit>
abstract member Time : unit -> UIO<'r,Time>
end
module Time
from 2019-03-29-io
--------------------
type Time = | Time
union case UIO.UIO: ('r * Cancel -> ('a option -> unit) -> unit) -> UIO<'r,'a>
--------------------
module UIO
from 2019-03-29-io
--------------------
type UIO<'r,'a> =
| UIO of ('r * Cancel -> ('a option -> unit) -> unit)
member Bind : f:('a -> UIO<'r,'b>) -> UIO<'r,'b>
member Bind : f:('a -> IO<'r,'b,'e>) -> IO<'r,'b,'e>
val int : value:'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
interface
abstract member Clock : ClockService
end
union case Time.Time: Time
--------------------
module Time
from 2019-03-29-io
--------------------
type Time = | Time
union case Decision.Decision: cont: bool * delay: int * state: 'a * (unit -> 'b) -> Decision<'a,'b>
--------------------
type Decision<'a,'b> = | Decision of cont: bool * delay: int * state: 'a * (unit -> 'b)
union case Schedule.Schedule: initial: UIO<'r,'s> * update: 'a * 's -> UIO<'r,Decision<'s,'b>> -> Schedule<'r,'s,'a,'b>
--------------------
type Schedule<'r,'s,'a,'b> = private | Schedule of initial: UIO<'r,'s> * update: 'a * 's -> UIO<'r,Decision<'s,'b>>
union case IO.IO: ('r * Cancel -> (Result<'a,'e> option -> unit) -> unit) -> IO<'r,'a,'e>
--------------------
type IO<'r,'a,'e> =
| IO of ('r * Cancel -> (Result<'a,'e> option -> unit) -> unit)
member Bind : f:('a -> UIO<'r,'b>) -> IO<'r,'b,'e>
member Bind : f:('a -> IO<'r,'b,'e>) -> IO<'r,'b,'e>
member Bind : f:('a -> IO<'r,'b,'e2>) -> IO<'r,'b,Choice<'e,'e2>>
module Result
from 2019-03-29-io
--------------------
module Result
from Microsoft.FSharp.Core
--------------------
type Result<'a,'e> =
| Ok of 'a
| Error of 'e
module Choice
from 2019-03-29-io
--------------------
type Choice<'T1,'T2> =
| Choice1Of2 of 'T1
| Choice2Of2 of 'T2
--------------------
type Choice<'T1,'T2,'T3> =
| Choice1Of3 of 'T1
| Choice2Of3 of 'T2
| Choice3Of3 of 'T3
--------------------
type Choice<'T1,'T2,'T3,'T4> =
| Choice1Of4 of 'T1
| Choice2Of4 of 'T2
| Choice3Of4 of 'T3
| Choice4Of4 of 'T4
--------------------
type Choice<'T1,'T2,'T3,'T4,'T5> =
| Choice1Of5 of 'T1
| Choice2Of5 of 'T2
| Choice3Of5 of 'T3
| Choice4Of5 of 'T4
| Choice5Of5 of 'T5
--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6> =
| Choice1Of6 of 'T1
| Choice2Of6 of 'T2
| Choice3Of6 of 'T3
| Choice4Of6 of 'T4
| Choice5Of6 of 'T5
| Choice6Of6 of 'T6
--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6,'T7> =
| Choice1Of7 of 'T1
| Choice2Of7 of 'T2
| Choice3Of7 of 'T3
| Choice4Of7 of 'T4
| Choice5Of7 of 'T5
| Choice6Of7 of 'T6
| Choice7Of7 of 'T7
val mapError : f:('e -> 'f) -> Result<'a,'e> -> Result<'a,'f>
--------------------
val mapError : mapping:('TError -> 'U) -> result:Result<'T,'TError> -> Result<'T,'U>
val map : f:('a -> 'b) -> Result<'a,'e> -> Result<'b,'e>
--------------------
val map : mapping:('T -> 'U) -> result:Result<'T,'TError> -> Result<'U,'TError>
union case Schedule.Schedule: initial: UIO<'r,'s> * update: 'a * 's -> UIO<'r,Decision<'s,'b>> -> Schedule<'r,'s,'a,'b>
--------------------
module Schedule
from 2019-03-29-io
--------------------
type Schedule<'r,'s,'a,'b> = private | Schedule of initial: UIO<'r,'s> * update: 'a * 's -> UIO<'r,Decision<'s,'b>>
member UIO.Bind : f:('a -> IO<'r,'b,'e>) -> IO<'r,'b,'e>
module Clock
from 2019-03-29-io
--------------------
type Clock =
interface
abstract member Clock : ClockService
end
from Microsoft.FSharp.Collections
Interlocked.Decrement(location: byref<int>) : int
Interlocked.Exchange(location1: byref<nativeint>, value: nativeint) : nativeint
Interlocked.Exchange(location1: byref<obj>, value: obj) : obj
Interlocked.Exchange(location1: byref<float>, value: float) : float
Interlocked.Exchange(location1: byref<float32>, value: float32) : float32
Interlocked.Exchange(location1: byref<int64>, value: int64) : int64
Interlocked.Exchange(location1: byref<int>, value: int) : int
type IOBuilder =
new : unit -> IOBuilder
member Bind : io:UIO<'r,'a> * f:('a -> UIO<'r,'b>) -> UIO<'r,'b>
member Bind : io:IO<'r,'a,'e> * f:('a -> UIO<'r,'b>) -> IO<'r,'b,'e>
member Bind : io:UIO<'r,'a> * f:('a -> IO<'r,'b,'e>) -> IO<'r,'b,'e>
member Bind : io:IO<'r,'a,'e> * f:('a -> IO<'r,'b,'e>) -> IO<'r,'b,'e>
member Bind : io:IO<'r,'a,'e1> * f:('a -> IO<'r,'b,'e2>) -> IO<'r,'b,Choice<'e1,'e2>>
member Return : value:'b -> UIO<'c,'b>
member ReturnFrom : value:'a -> 'a
--------------------
new : unit -> IOBuilder
union case IO.IO: ('r * Cancel -> (Result<'a,'e> option -> unit) -> unit) -> IO<'r,'a,'e>
--------------------
module IO
from 2019-03-29-io
--------------------
type IO<'r,'a,'e> =
| IO of ('r * Cancel -> (Result<'a,'e> option -> unit) -> unit)
member Bind : f:('a -> UIO<'r,'b>) -> IO<'r,'b,'e>
member Bind : f:('a -> IO<'r,'b,'e>) -> IO<'r,'b,'e>
member Bind : f:('a -> IO<'r,'b,'e2>) -> IO<'r,'b,Choice<'e,'e2>>
member IO.Bind : f:('a -> IO<'r,'b,'e>) -> IO<'r,'b,'e>
member IO.Bind : f:('a -> IO<'r,'b,'e2>) -> IO<'r,'b,Choice<'e,'e2>>
type AutoOpenAttribute =
inherit Attribute
new : unit -> AutoOpenAttribute
new : path:string -> AutoOpenAttribute
member Path : string
--------------------
new : unit -> AutoOpenAttribute
new : path:string -> AutoOpenAttribute
union case ConsoleError.ConsoleError: ConsoleError
--------------------
type ConsoleError = | ConsoleError
interface
abstract member ReadLine : unit -> Result<string,ConsoleError>
abstract member WriteLine : string -> unit
end
val string : value:'T -> string
--------------------
type string = System.String
interface
abstract member Console : ConsoleService
end
interface
abstract member Log : string -> unit
end
interface
abstract member Logging : LoggingService
end
union case PersistError.PersistError: PersistError
--------------------
type PersistError = | PersistError
interface
abstract member Persist : 'a -> Result<unit,PersistError>
end
interface
abstract member Persistence : PersistenceService
end
module Logger
from 2019-03-29-io
--------------------
type Logger =
interface
abstract member Logging : LoggingService
end
module Console
from 2019-03-29-io
--------------------
type Console =
interface
abstract member Console : ConsoleService
end
module Persistence
from 2019-03-29-io
--------------------
type Persistence =
interface
abstract member Persistence : PersistenceService
end