-
Notifications
You must be signed in to change notification settings - Fork 359
/
PostgresNotify.scala
81 lines (67 loc) · 2.93 KB
/
PostgresNotify.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package doobie.example
import doobie.imports._
import org.postgresql._
import scalaz._, Scalaz._
import scalaz.concurrent.Task
import scalaz.stream._, Process.{ eval, eval_, repeatEval, emitAll}
/**
* Example exposing PostrgreSQL NOTIFY as a Process[ConnectionIO, PGNotification]. This will
* likely be provided as a standard service in doobie-contrib-postgresql in a future version.
* To play with this program, run it and then in another window do:
*
* > psql -d world -U postgres -c "notify foo, 'abc'"
*
* to send a notification. The program will exit after reading five notifications.
*/
object PostgresNotify {
/** Program that retrieves the underlying PGConnection */
val getPGConnection: ConnectionIO[PGConnection] =
FC.unwrap(classOf[PGConnection])
/** Program that gets all new notifications. */
val getNotifications: ConnectionIO[List[PGNotification]] =
getPGConnection.flatMap(c => HC.delay(c.getNotifications).map {
case null => Nil
case as => as.toList
})
/** Construct a program that execs a no-param statement and discards the return value */
def execVoid(sql: String): ConnectionIO[Unit] =
HC.prepareStatement(sql)(HPS.executeUpdate).void
/** Construct a program that starts listening on the given channel. */
def listen(channel: String): ConnectionIO[Unit] =
execVoid("LISTEN " + channel)
/** Construct a program that stops listening on the given channel. */
def unlisten(channel: String): ConnectionIO[Unit] =
execVoid("UNLISTEN " + channel)
/**
* Construct a program that pauses the current thread. This doesn't scale, but neither do
* long- running connection-bound operations like NOTIFY/LISTEN. So the approach here is to
* burn a thread reading the events and multplex somewhere downstream.
*/
def sleep(ms: Long): ConnectionIO[Unit] =
HC.delay(Thread.sleep(ms))
/**
* Construct a stream of PGNotifications on the specified channel, polling at the specified
* rate. Note that this process, when run, will commit the current transaction.
*/
def notificationStream(channel: String, ms: Long): Process[ConnectionIO, PGNotification] =
(for {
_ <- eval(listen(channel) *> HC.commit)
ns <- repeatEval(sleep(ms) *> getNotifications <* HC.commit)
n <- emitAll(ns)
} yield n).onComplete(eval_(unlisten(channel) *> HC.commit))
/** A transactor that knows how to connect to a PostgreSQL database. */
val xa: Transactor[Task] =
DriverManagerTransactor("org.postgresql.Driver", "jdbc:postgresql:world", "postgres", "")
/**
* Construct a stream of PGNotifications that prints to the console. Transform it to a
* runnable process using the transcactor above, and run it.
*/
def main(args: Array[String]): Unit =
notificationStream("foo", 1000)
.map(n => s"${n.getPID} ${n.getName} ${n.getParameter}")
.take(5)
.sink(s => HC.delay(Console.println(s)))
.transact(xa)
.void
.run
}