class: center, middle, transition, intro # FS2 Streaming Intro .horizontalCentered.caption[Daniel Beskin] --- .opinions1[Opinions are my own] -- .opinions2[... and my employer's] -- .services[ Reach out for: - Personalized workshops - Software development - Talks ] .linksIntro.linkStackIntro[[](] --- ## A Streaming Library -- ```scala readUtf8Lines(Path("fahrenheit.txt")) .map(fahrenheitToCelsius) .intersperse("\n") .through(text.utf8.encode) .through(writeAll(Path("celsius.txt"))) ``` ??? - Comment about the intuition from regular collections --- ## Non-strict ("Lazy") -- ```scala Stream(1, 2, 3) .repeat .take(50) .toList // List(1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, ...) ``` --- ## Effectful -- ```scala val nums = Stream.range(1, 5).evalMap: i => IO(println(s"This is $i")) ``` -- ```scala // This is 1 // This is 2 // This is 3 // This is 4 ``` --- ## Time-aware -- ```scala val delays = Stream.sleep[IO](1.second).repeat ``` -- ```scala // ... sleeping // This is 1 // ... sleeping // This is 2 // ... sleeping // This is 3 // ... sleeping // This is 4 ``` --- ## Concurrent -- ```scala val biggerNums = Stream.range(10, 15) .evalMap(i => IO(println(s"Slower $i"))) val longerDelays = Stream.fixedDelay[IO](2.seconds) {{content}} ``` -- val s1 = val s2 = s1.merge(s2) -- ```scala // This is 1 // Slower 10 // This is 2 // This is 3 // Slower 11 // This is 4 // Slower 12 // Slower 13 // Slower 14 ``` --- ## Error-aware -- ```scala val err = Stream("a", "b", "c") ++ Stream.raiseError[IO](new Exception("oh noes!")) {{content}} ``` -- err .handleErrorWith(e => Stream(s"Got: ${e.getMessage}")) .printlns -- ```scala // a // b // c // Got: oh noes! ``` --- ## Resource-safe -- ```scala val open = IO(println("opening")).as(3) val close = (_: Int) => IO(println("closing")) Stream.bracket(open)(close) .flatMap(res => Stream( "a" * res, "b" * res)) .printlns ``` ??? - Resource-safety is guaranteed even in the face of errors -- ```scala // opening // aaa // bbb // closing ``` --- ## Show Me the Types .streamType.centered[ ```scala Stream[+F[_], +O] ``` ] --- layout: true ## What Is It Good For? --- - Data sets that can't fit into memory -- - ... everything else -- - "Events" - Open connections - Database writes - Pagination - ... --- - A value-level representation of data flowing through a program -- - Declarative - Composable - High-level -- - Custom control flow on steroids -- - Functional programming at its best --- layout: false class: middle, transition > Some people, when confronted with a problem, think "I know, I'll use streams." > Now their problems are flowing away. --- layout: true ## Vacation Reports --- A pipeline: - For every vacation report - Enrich the data - Write to database --- ```scala case class VacationReport( name: String, days: Int, sickLeave: Boolean) case class Enriched( value: VacationReport, metadata: Metadata) case class Metadata(availableDays: Int) ``` --- ```scala trait EmployeeAPI: def enrich(vacation: VacationReport): IO[Enriched] {{content}} ``` -- trait DBWriter: def write(value: Enriched): IO[Unit] def writeAll(values: Seq[Enriched]): IO[Unit] --- layout: true ## In Memory --- ```scala val vacations: Stream[Pure, VacationReport] = Stream( VacationReport("Alice", 1, sickLeave = false), VacationReport("Bob", 2, sickLeave = true), VacationReport("Charlie", 1, sickLeave = false), VacationReport("David", 3, sickLeave = true)) ``` -- ```scala val sickly: Stream[Pure, String] = vacations .filter(_.sickLeave) .map( ``` --- ```scala val flow: Stream[IO, Unit] = vacations .evalMap(api.enrich) .evalMap(db.write) ``` -- ```scala Getting available vacation days for Alice Writing Enriched(VacationReport(Alice,1,false),4) Getting available vacation days for Bob Writing Enriched(VacationReport(Bob,2,true),1) Getting available vacation days for Charlie Writing Enriched(VacationReport(Charlie,1,false),6) Getting available vacation days for David Writing Enriched(VacationReport(David,3,true),2) ``` --- layout: true ## Everyday I'm Refactoring --- ```scala type Pipe[F[_], -I, +O] = Stream[F, I] => Stream[F, O] ``` -- ```scala def enrichPipeline( api: EmployeeAPI): Pipe[IO, VacationReport, Enriched] = in => in.evalMap(api.enrich) def dbPipeline( db: DBWriter): Pipe[IO, Enriched, Unit] = in => in.evalMap(db.write) ``` -- ```scala vacations .through(enrichPipeline(api)) .through(dbPipeline(db)) ``` --- layout: true ## From File --- ```scala val fromFile: Stream[IO, VacationReport] = {{content}} ``` -- Files[IO] .readAll(Path("vacations.csv")) .through(text.utf8.decode) .through(text.lines) {{content}} -- .map(VacationReport.parseCSV) // Stream[IO, Option[VacationReport]] {{content}} -- .unNone -- ```scala fromFile .through(enrichPipeline(api)) .through(dbPipeline(db)) ``` --- layout: true ## From Console --- ```scala val fromConsole: Stream[IO, VacationReport] = Stream .repeatEval(Console[IO].readLine) {{content}} ``` -- .map(VacationReport.parseCSV) .unNone -- ```scala fromConsole .through(enrichPipeline(api)) .through(dbPipeline(db)) ``` --- layout: true ## Better API Calls --- ```scala in .evalMap(api.enrich) ``` --- ```scala in .metered(50.millis) .evalMap(api.enrich) ``` --- ```scala in .metered(50.millis) .parEvalMap(10)(api.enrich) ``` -- ```scala fromFile .through(enrichPipeline(api)) .through(dbPipeline(db)) ``` --- layout: true ## Better DB Calls --- ```scala in .evalMap(dbWriter.write) ``` --- ```scala in .groupWithin(10, 1.second) .evalMap(dbWriter.writeAll) ``` --- ```scala in .groupWithin(10, 1.second) .parEvalMap(5)(dbWriter.writeAll) ``` --- ```scala in .groupWithin(10, 1.second) .parEvalMapUnordered(5)(dbWriter.writeAll) ``` -- ```scala fromConsole .through(enrichPipeline(api)) .through(dbPipeline(db)) ``` --- layout: true ## Resources --- ```scala val makeEmployeeAPI: Stream[IO, EmployeeAPI.Prod] = Stream.bracket(EmployeeAPI.init("secret"))(_.shutdown) ``` -- ```scala val makeDBWriter: Stream[IO, DBWriter] = Stream.resource(DBWriter.resource) ``` -- ```scala val resources: Stream[IO, (EmployeeAPI.Prod, DBWriter)] = makeEmployeeAPI.parZip(makeDBWriter) ``` --- layout: true ## Sockets Anyone? --- ```scala val fromNetwork: Stream[IO, VacationReport] = val clients: Stream[IO, Socket[IO]] = Network[IO].server( Some(host"localhost"), Some(port"5555")) {{content}} ``` -- val reports: Stream[IO, Stream[IO, VacationReport]] = client => client.reads .through(text.utf8.decode) .through(text.lines) .map(VacationReport.parseCSV) .unNone {{content}} -- .handleErrorWith(_ => Stream.empty) {{content}} -- reports.parJoin(100) --- layout: true ## Full App --- ```scala object Demo extends IOApp.Simple: {{content}} ``` -- val flow: Stream[IO, Unit] = resources.flatMap: (api, db) => fromNetwork .through(enrichPipeline(api)) .through(dbPipeline(db)) {{content}} -- val action: IO[Unit] = flow.compile.drain {{content}} -- def run: IO[Unit] = action --- layout: false class: transition .endQuote[ > For data that's too big to fit in memory. > For control flow that's too hard to fit in one's head. ] .endQuote.footnote[Fabio Labella, [Compose Your Program Flow with Stream](] .centered.linksFin.linkStackFin[[](]