1 /**
2 Connection.
3 
4 Copyright: Copyright Boris-Barboris 2017.
5 License: MIT
6 Authors: Boris-Barboris
7 */
8 
9 module dpeq.connection;
10 
11 import core.time: seconds;
12 
13 import std.exception: enforce;
14 import std.conv: to;
15 import std.traits;
16 import std.range;
17 import std.socket;
18 
19 import dpeq.constants;
20 import dpeq.exceptions;
21 import dpeq.marshalling;
22 import dpeq.schema: Notification, Notice;
23 
24 
25 
26 /**
27 $(D std.socket.Socket) wrapper wich is compatible with PSQLConnection.
28 If you want to use custom socket type (vibe-d or any other), make it
29 duck-type and exception-compatible with this one.
30 */
31 final class StdSocket
32 {
33     private Socket m_socket;
34 
35     /// Underlying socket instance.
36     @property Socket socket() { return m_socket; }
37 
38     /// Establish connection to backend. Constructor is expected to throw Exception
39     /// if anything goes wrong.
40     this(string host, ushort port)
41     {
42         if (host[0] == '/')
43         {
44             // Unix socket
45             version(Posix)
46             {
47                 Address addr = new UnixAddress(host);
48                 m_socket = new Socket(AddressFamily.UNIX, SocketType.STREAM);
49                 m_socket.connect(addr);
50             }
51             else
52                 assert(0, "Cannot connect using UNIX sockets on non-Posix OS");
53         }
54         else
55         {
56             m_socket = new TcpSocket();
57             // 20 minutes for both tcp_keepalive_time and tcp_keepalive_intvl
58             m_socket.setKeepAlive(1200, 1200);
59             // example of receive timeout:
60             // m_socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, seconds(120));
61             m_socket.connect(new InternetAddress(host, port));
62         }
63     }
64 
65     void close() nothrow @nogc
66     {
67         m_socket.shutdown(SocketShutdown.BOTH);
68         m_socket.close();
69     }
70 
71     /// Send whole byte buffer or throw. Throws: $(D PsqlSocketException).
72     auto send(const(ubyte)[] buf)
73     {
74         auto r = m_socket.send(buf);
75         if (r == Socket.ERROR)
76             throw new PsqlSocketException("Socket.ERROR on send");
77         return r;
78     }
79 
80     /// Fill byte buffer from the socket completely or throw.
81     /// Throws: $(D PsqlSocketException).
82     auto receive(ubyte[] buf)
83     {
84         auto r = m_socket.receive(buf);
85         if (r == 0 && buf.length > 0)
86             throw new PsqlSocketException("Connection closed");
87         if (r == Socket.ERROR)
88             throw new PsqlSocketException("Socket.ERROR on receive: " ~ m_socket.getErrorText());
89         return r;
90     }
91 }
92 
93 
94 
95 /// Transport and authorization parameters.
96 struct BackendParams
97 {
98     /// Hostname or IP address for TCP socket. Filename for UNIX socket.
99     string host = "/var/run/postgresql/.s.PGSQL.5432";
100     ushort port = cast(ushort)5432;
101     string user = "postgres";
102     string password;
103     string database = "postgres";
104 }
105 
106 
107 /// Message, received from backend.
108 struct Message
109 {
110     BackendMessageType type;
111     /// Raw unprocessed message byte array excluding message type byte and
112     /// first 4 bytes that represent message body length.
113     ubyte[] data;
114 }
115 
116 enum StmtOrPortal: char
117 {
118     Statement = 'S',
119     Portal = 'P'
120 }
121 
122 // When the client code is uninterested in dpeq connection logging.
123 private pragma(inline, true) void nop_logger(T...)(T vals) {}
124 
125 /**
126 Connection object.
127 
128 Params:
129     SocketT  = socket class type to use.
130     logTrace = alias of a logging function with $(D std.stdio.writef) signature.
131         Very verbose byte-stream information will be printed through it.
132     logError = same as logTrace but for errors.
133 */
134 class PSQLConnection(
135     SocketT = StdSocket,
136     alias logTrace = nop_logger,
137     alias logError = nop_logger)
138 {
139     protected
140     {
141         SocketT m_socket;
142         ubyte[] writeBuffer;
143         int bufHead = 0;
144         bool open = false;
145 
146         // we allocate RAM for responses in batches to reduce GC pressure.
147         // 2048 is the last small-sized dling in gc:
148         // https://github.com/dlang/druntime/blob/v2.079.0/src/gc/impl/conservative/gc.d#L1251
149         immutable int readBatchSize = 2048;
150         ubyte[] readBatch;
151 
152         // number of expected readyForQuery responses
153         int readyForQueryExpected = 0;
154 
155         TransactionStatus tstatus = TransactionStatus.IDLE;
156 
157         /// counters to generate unique prepared statement and portal ids
158         int preparedCounter = 0;
159         int portalCounter = 0;
160 
161         int m_processId = -1;
162         int m_cancellationKey = -1;
163 
164         /// backend params this connection was created with
165         const BackendParams m_backendParams;
166     }
167 
168     /// Backend parameters this connection was constructed from
169     final @property ref const(BackendParams) backendParams() const
170     {
171         return m_backendParams;
172     }
173 
174     /// Backend process ID. Used in CancelRequest message.
175     final @property int processId() const { return m_processId; }
176 
177     /// Cancellation secret. Used in CancelRequest message.
178     final @property int cancellationKey() const { return m_cancellationKey; }
179 
180     /// Socket getter.
181     final @property SocketT socket() { return m_socket; }
182 
183     /// Number of ReadyForQuery messages that are yet to be received
184     /// from the backend. May be useful for checking wether getQueryResults
185     /// would block forever.
186     final @property int expectedRFQCount() const { return readyForQueryExpected; }
187 
188     /// Transaction status, reported by the last received ReadyForQuery message.
189     /// For a new connection TransactionStatus.IDLE is returned.
190     final @property TransactionStatus transactionStatus() const { return tstatus; }
191 
192     invariant
193     {
194         assert(readyForQueryExpected >= 0);
195         assert(bufHead >= 0);
196     }
197 
198     /// Connection is open when it is authorized and socket was alive last time
199     /// it was checked.
200     final @property bool isOpen() const { return open; }
201 
202     /// Generate next connection-unique prepared statement name.
203     final string getNewPreparedName()
204     {
205         return (preparedCounter++).to!string;
206     }
207 
208     /// Generate next connection-unique portal name.
209     final string getNewPortalName()
210     {
211         return (portalCounter++).to!string;
212     }
213 
214     /// Allocate reuired memory, build socket and authorize the connection.
215     /// Throws: $(D PsqlSocketException) if transport failed, or
216     /// $(D PsqlClientException) if authorization failed for some reason.
217     this(const BackendParams bp)
218     {
219         m_backendParams = bp;
220         writeBuffer.length = 2 * 4096; // liberal procurement of two pages
221         try
222         {
223             logTrace("Trying to open TCP connection to PSQL");
224             m_socket = new SocketT(bp.host, bp.port);
225             logTrace("Success");
226         }
227         catch (Exception e)
228         {
229             throw new PsqlSocketException(e.msg, e);
230         }
231         scope(failure) m_socket.close();
232         initialize(bp);
233         open = true;
234     }
235 
236     /// Notify backend and close socket.
237     void terminate()
238     {
239         open = false;
240         try
241         {
242             putTerminateMessage();
243             flush();
244             m_socket.close();
245         }
246         catch (Exception e)
247         {
248             logError("Exception caught while terminating PSQL connection: %s", e.msg);
249         }
250     }
251 
252     /**
253     Open new socket and connect it to the same backend as this connection is
254     bound to, send CancelRequest and close socket.
255     */
256     void cancelRequest()
257     {
258         SocketT sock = new SocketT(m_backendParams.host, m_backendParams.port);
259         ubyte[4 * 4] intBuf;
260         marshalFixedField(intBuf[0..4], int(16));
261         marshalFixedField(intBuf[4..8], int(80877102));
262         marshalFixedField(intBuf[8..12], m_processId);
263         marshalFixedField(intBuf[12..16], m_cancellationKey);
264         sock.send(intBuf[]);
265         sock.close();
266     }
267 
268     /// Flush writeBuffer into the socket. Blocks/yields (according to socket
269     /// implementation).
270     final void flush()
271     {
272         try
273         {
274             // does not block if zero length:
275             // https://github.com/vibe-d/vibe-core/blob/master/source/vibe/core/net.d#L607
276             auto w = m_socket.send(writeBuffer[0..bufHead]);
277             while (bufHead - w > 0)
278                 w += m_socket.send(writeBuffer[w..bufHead]);
279             logTrace("flushed %d bytes: %s", w, writeBuffer[0..bufHead].to!string);
280         }
281         catch (PsqlSocketException e)
282         {
283             open = false;
284             throw e;
285         }
286         finally
287         {
288             bufHead = 0;
289         }
290     }
291 
292     /// discard write buffer content
293     final void discard()
294     {
295         bufHead = 0;
296     }
297 
298     /// Save write buffer cursor in order to be able to restore it in case of errors.
299     /// Use it to prevent sending junk to backend when something goes wrong during
300     /// marshalling or message creation.
301     final auto saveBuffer()
302     {
303         static struct WriteCursor
304         {
305             int offset;
306             PSQLConnection conn;
307             void restore()
308             {
309                 conn.bufHead = offset;
310             }
311         }
312         return WriteCursor(bufHead, this);
313     }
314 
315     pragma(inline)
316     final void ensureOpen()
317     {
318         enforce!PsqlConnectionClosedException(open, "Connection is not open");
319     }
320 
321 
322     /*
323     ////////////////////////////////////////////////////////////////////////////
324     // All sorts of messages
325     // https://www.postgresql.org/docs/9.5/static/protocol-message-formats.html
326     ////////////////////////////////////////////////////////////////////////////
327     */
328 
329     /** Put Bind message into write buffer.
330     *
331     * 'formatCodes' - input range of FormatCodes.
332     * quotes:
333     * The number of parameter format codes that follow (denoted C below).
334     * This can be zero to indicate that there are no parameters or that the
335     * parameters all use the default format (text); or one, in which case the
336     * specified format code is applied to all parameters; or it can equal
337     * the actual number of parameters.
338     * The parameter format codes. Each must presently be zero (text) or one (binary).
339     * `parameters` is input range of marshalling delegates.
340     *
341     * 'parameters' - input range of marshaller closures. Actual data should
342     * be self-contained in this parameter. Marshaller is a callable that
343     * is covariant with "int delegate(ubyte[] buf)" and returns -2 if buf
344     * is too small, -1 if parameter is null and an actual number of bytes written
345     * otherwise.
346     *
347     * 'resultFormatCodes' - input range of query result FormatCodes.
348     * quotes:
349     *
350     * The number of result-column format codes that follow (denoted R below).
351     * This can be zero to indicate that there are no result columns or that
352     * the result columns should all use the default format (text); or one,
353     * in which case the specified format code is applied to all result
354     * columns (if any); or it can equal the actual number of result columns
355     * of the query.
356     * The result-column format codes. Each must presently be zero (text) or
357     * one (binary).
358     */
359     final void putBindMessage(FR, PR, RR)
360         (string portal, string prepared, scope FR formatCodes, scope PR parameters,
361         scope RR resultFormatCodes)
362     //if (isInputRange!FR && is(Unqual!(ElementType!FR) == FormatCode) &&
363     //    isInputRange!RR && is(Unqual!(ElementType!RR) == FormatCode) &&
364     //    isInputRange!PR && __traits(compiles, -1 == parameters.front()(new ubyte[2]))
365     {
366         ensureOpen();
367         write(cast(ubyte)FrontMessageType.Bind);
368         auto lenTotal = reserveLen();
369         cwrite(portal);
370         cwrite(prepared);
371 
372         // parameter format code(s)
373         short fcodes = 0;
374         auto fcodePrefix = reserveLen!short();
375         foreach (FormatCode fcode; formatCodes)
376         {
377             logTrace("Bind: writing %d fcode", fcode);
378             write(cast(short)fcode);
379             fcodes++;
380         }
381         fcodePrefix.write(fcodes);
382 
383         // parameters
384         short pcount = 0;
385         auto pcountPrefix = reserveLen!short();
386         foreach (param; parameters)
387         {
388             auto paramPrefix = reserveLen!int();
389             int r = wrappedMarsh(param);
390             logTrace("Bind: wrote 4bytes + %d bytes for value", r);
391             paramPrefix.write(r);    // careful! -1 means Null
392             pcount++;
393         }
394         pcountPrefix.write(pcount);
395 
396         // result format codes
397         short rcount = 0;
398         auto rcolPrefix = reserveLen!short();
399         foreach (FormatCode fcode; resultFormatCodes)
400         {
401             write(cast(short)fcode);
402             logTrace("Bind: writing %d rfcode", fcode);
403             rcount++;
404         }
405         rcolPrefix.write(rcount);
406 
407         lenTotal.fill();
408         logTrace("Bind message buffered");
409     }
410 
411     /// putBindMessage overload for parameterless portals
412     final void putBindMessage(RR)
413         (string portal, string prepared, scope RR resultFormatCodes)
414     {
415         ensureOpen();
416         write(cast(ubyte)FrontMessageType.Bind);
417         auto lenTotal = reserveLen();
418         cwrite(portal);
419         cwrite(prepared);
420 
421         // parameter format code(s)
422         short fcodes = 0;
423         auto fcodePrefix = reserveLen!short();
424         fcodePrefix.write(fcodes);
425 
426         // parameters
427         short pcount = 0;
428         auto pcountPrefix = reserveLen!short();
429         pcountPrefix.write(pcount);
430 
431         // result format codes
432         short rcount = 0;
433         auto rcolPrefix = reserveLen!short();
434         foreach (FormatCode fcode; resultFormatCodes)
435         {
436             write(cast(short)fcode);
437             logTrace("Bind: writing %d rfcode", fcode);
438             rcount++;
439         }
440         rcolPrefix.write(rcount);
441 
442         lenTotal.fill();
443         logTrace("Bind message buffered");
444     }
445 
446     /// put Close message into write buffer.
447     /// `closeWhat` is 'S' for prepared statement and
448     /// 'P' for portal.
449     final void putCloseMessage(StmtOrPortal closeWhat, string name)
450     {
451         ensureOpen();
452         assert(closeWhat == 'S' || closeWhat == 'P');
453         write(cast(ubyte)FrontMessageType.Close);
454         auto lenTotal = reserveLen();
455         write(cast(ubyte)closeWhat);
456         cwrite(name);
457         lenTotal.fill();
458         logTrace("Close message buffered");
459     }
460 
461     /// put Close message into write buffer.
462     /// `closeWhat` is 'S' for prepared statement and
463     /// 'P' for portal.
464     final void putDescribeMessage(StmtOrPortal descWhat, string name)
465     {
466         ensureOpen();
467         assert(descWhat == 'S' || descWhat == 'P');
468         write(cast(ubyte)FrontMessageType.Describe);
469         auto lenTotal = reserveLen();
470         write(cast(ubyte)descWhat);
471         cwrite(name);
472         lenTotal.fill();
473         logTrace("Describe message buffered");
474     }
475 
476     /**
477     non-zero maxRows will generate PortalSuspended messages, wich are
478     currently not handled by dpeq commands */
479     final void putExecuteMessage(string portal = "", int maxRows = 0)
480     {
481         ensureOpen();
482         write(cast(ubyte)FrontMessageType.Execute);
483         auto lenTotal = reserveLen();
484         cwrite(portal);
485         write(maxRows);
486         lenTotal.fill();
487         logTrace("Execute message buffered");
488     }
489 
490     /**
491     Quote:
492     "The Flush message does not cause any specific output to be generated,
493     but forces the backend to deliver any data pending in its output buffers.
494     A Flush must be sent after any extended-query command except Sync, if the
495     frontend wishes to examine the results of that command before issuing more
496     commands. Without Flush, messages returned by the backend will be combined
497     into the minimum possible number of packets to minimize network overhead."
498     */
499     final void putFlushMessage()
500     {
501         ensureOpen();
502         write(cast(ubyte)FrontMessageType.Flush);
503         write(4);
504         logTrace("Flush message buffered");
505     }
506 
507     final void putParseMessage(PR)(string prepared, string query, scope PR ptypes)
508         if (isInputRange!PR && is(Unqual!(ElementType!PR) == ObjectID))
509     {
510         logTrace("Message to parse query: %s", query);
511 
512         ensureOpen();
513         write(cast(ubyte)FrontMessageType.Parse);
514         auto lenTotal = reserveLen();
515         cwrite(prepared);
516         cwrite(query);
517 
518         // parameter types
519         short pcount = 0;
520         auto pcountPrefix = reserveLen!short();
521         foreach (ObjectID ptype; ptypes)
522         {
523             write(cast(int)ptype);
524             pcount++;
525         }
526         pcountPrefix.write(pcount);
527 
528         lenTotal.fill();
529         logTrace("Parse message buffered");
530     }
531 
532     /// put Query message (simple query protocol) into the write buffer
533     final void putQueryMessage(string query)
534     {
535         ensureOpen();
536         write(cast(ubyte)FrontMessageType.Query);
537         auto lenTotal = reserveLen();
538         cwrite(query);
539         lenTotal.fill();
540         readyForQueryExpected++;
541         logTrace("Query message buffered");
542     }
543 
544     alias putSimpleQuery = putQueryMessage;
545 
546     /**
547     Put Sync message into write buffer. Usually you should call this after
548     every portal execute message.
549     Every postSimpleQuery or PSQLConnection.sync MUST be accompanied by getQueryResults call. */
550     final void putSyncMessage()
551     {
552         ensureOpen();
553         write(cast(ubyte)FrontMessageType.Sync);
554         write(4);
555         readyForQueryExpected++;
556         logTrace("Sync message buffered");
557     }
558 
559     alias sync = putSyncMessage;
560 
561     /// NotificationResponse messages will be parsed and passed to this
562     /// callback during 'pollMessages' call.
563     /// https://www.postgresql.org/docs/9.5/static/sql-notify.html
564     bool delegate(typeof(this) con, Notification n) nothrow notificationCallback = null;
565 
566     /// NoticeResponse messages will be parsed and
567     /// passed to this callback during 'pollMessages' call.
568     void delegate(typeof(this) con, Notice n) nothrow noticeCallback = null;
569 
570     /** When this callback returns true, pollMessages will exit it's loop.
571     Interceptor should set err to true if it has encountered some kind of error
572     and wants it to be rethrown as PsqlClientException at the end of
573     pollMessages call. errMsg should be appended with error description. */
574     alias InterceptorT = bool delegate(Message msg, ref bool err,
575         ref string errMsg) nothrow;
576 
577     /** Read messages from the socket in loop until:
578       1). if finishOnError is set and ErrorResponse is received, function
579           throws immediately.
580       2). ReadyForQuery message is received.
581       3). interceptor delegate returns `true`.
582       4). NotificationResponse received and notificationCallback returned 'true'.
583     Interceptor delegate is used to customize the logic. If the message is
584     not ReadyForQuery, ErrorResponse or Notice\Notification, it is passed to
585     interceptor. It may set bool 'err' flag to true and append to 'errMsg'
586     string, if delayed throw is required. */
587     final void pollMessages(scope InterceptorT interceptor, bool finishOnError = false)
588     {
589         bool finish = false;
590         bool error = false;
591         Notice errorNotice;
592         bool intError = false;
593         string intErrMsg;
594 
595         while (!finish)
596         {
597             Message msg = readOneMessage();
598 
599             with (BackendMessageType)
600             switch (msg.type)
601             {
602                 case ErrorResponse:
603                     enforce!PsqlClientException(!error, "Second ErrorResponse " ~
604                         "received during one pollMessages call");
605                     error = true;
606                     parseNoticeMessage(msg.data, errorNotice);
607                     if (finishOnError)
608                         finish = true;
609                     break;
610                 case ReadyForQuery:
611                     enforce!PsqlClientException(readyForQueryExpected > 0,
612                         "Unexpected ReadyForQuery message");
613                     readyForQueryExpected--;
614                     tstatus = cast(TransactionStatus) msg.data[0];
615                     finish = true;
616                     break;
617                 case NoticeResponse:
618                     if (noticeCallback !is null)
619                     {
620                         Notice n;
621                         parseNoticeMessage(msg.data, n);
622                         noticeCallback(this, n);
623                     }
624                     break;
625                 case NotificationResponse:
626                     if (notificationCallback !is null)
627                     {
628                         Notification n;
629                         n.procId = demarshalNumber!int(msg.data[0..4]);
630                         size_t l;
631                         n.channel = demarshalProtocolString(msg.data[4..$], l);
632                         n.payload = demarshalString(msg.data[4+l..$-1]);
633                         finish |= notificationCallback(this, n);
634                     }
635                     break;
636                 default:
637                     if (interceptor !is null)
638                         finish |= interceptor(msg, intError, intErrMsg);
639             }
640         }
641 
642         if (error)
643             throw new PsqlErrorResponseException(errorNotice);
644         if (intError)
645             throw new PsqlClientException(intErrMsg);
646     }
647 
648     /// reads and discards messages from socket until all expected
649     /// ReadyForQuery messages are received
650     void windupResponseStack()
651     {
652         while (readyForQueryExpected > 0)
653         {
654             Message msg = readOneMessage();
655             if (msg.type == BackendMessageType.ReadyForQuery)
656                 readyForQueryExpected--;
657         }
658     }
659 
660     // Protected section for functions that will probably never be used by
661     // client code directly. If you need them, inherit them.
662 protected:
663 
664     final void putStartupMessage(in BackendParams params)
665     {
666         auto lenPrefix = reserveLen();
667         write(0x0003_0000);  // protocol v3
668         cwrite("user");
669         cwrite(params.user);
670         cwrite("database");
671         cwrite(params.database);
672         write(cast(ubyte)0);
673         lenPrefix.fill();
674         logTrace("Startup message buffered");
675     }
676 
677     final void putTerminateMessage()
678     {
679         write(cast(ubyte)FrontMessageType.Terminate);
680         write(4);
681         logTrace("Terminate message buffered");
682     }
683 
684     final void initialize(in BackendParams params)
685     {
686         putStartupMessage(params);
687         flush();
688 
689         int authType = -1;
690         Message auth_msg;
691 
692         pollMessages((Message msg, ref bool e, ref string eMsg) {
693                 if (msg.type == BackendMessageType.Authentication)
694                 {
695                     auth_msg = msg;
696                     if (authType != -1)
697                     {
698                         e = true;
699                         eMsg ~= "Unexpected second Authentication " ~
700                             "message received from backend";
701                     }
702                     authType = demarshalNumber(msg.data[0..4]);
703                     if (authType == 0)  // instantly authorized, so we'll get readyForQuery
704                         readyForQueryExpected++;
705                     else
706                         return true;
707                 }
708                 else if (msg.type == BackendMessageType.BackendKeyData)
709                 {
710                     m_processId = demarshalNumber(msg.data[0..4]);
711                     m_cancellationKey = demarshalNumber(msg.data[4..8]);
712                 }
713                 return false;
714             }, true);
715 
716         enforce!PsqlClientException(authType != -1,
717             "Expected Authentication message was not received");
718         switch (authType)
719         {
720             case 0:
721                 // instant AuthenticationOk, trusted connection usually does this
722                 logTrace("Succesfully authorized");
723                 return;
724             case 3:
725                 // cleartext password
726                 putPasswordMessage(params.password);
727                 break;
728             case 5:
729                 // MD5 salted password
730                 ubyte[4] salt = auth_msg.data[4 .. 8];
731                 putMd5PasswordMessage(params.password, params.user, salt);
732                 break;
733             default:
734                 throw new PsqlClientException("Unknown auth type " ~
735                     authType.to!string);
736         }
737         flush();
738 
739         int authRes = -1;
740         pollMessages((Message msg, ref bool e, ref string eMsg) {
741                 if (msg.type == BackendMessageType.Authentication)
742                     authRes = demarshalNumber(msg.data[0..4]);
743                 else if (msg.type == BackendMessageType.BackendKeyData)
744                 {
745                     m_processId = demarshalNumber(msg.data[0..4]);
746                     m_cancellationKey = demarshalNumber(msg.data[4..8]);
747                 }
748                 return false;
749             });
750         enforce!PsqlClientException(authRes == 0,
751             "Expected AuthenticationOk message was not received");
752     }
753 
754     final void putPasswordMessage(string pw)
755     {
756         write(cast(ubyte)FrontMessageType.PasswordMessage);
757         auto lenPrefix = reserveLen();
758         cwrite(pw);
759         lenPrefix.fill();
760         readyForQueryExpected++;
761         logTrace("Password message buffered");
762     }
763 
764     final void putMd5PasswordMessage(string pw, string user, ubyte[4] salt)
765     {
766         // thank you ddb authors
767         char[32] MD5toHex(T...)(in T data)
768         {
769             import std.ascii : LetterCase;
770             import std.digest.md : md5Of, toHexString;
771             return md5Of(data).toHexString!(LetterCase.lower);
772         }
773 
774         write(cast(ubyte)FrontMessageType.PasswordMessage);
775         auto lenPrefix = reserveLen();
776         char[3 + 32] mdpw;
777         mdpw[0 .. 3] = "md5";
778         mdpw[3 .. $] = MD5toHex(MD5toHex(pw, user), salt);
779         cwrite(cast(string) mdpw[]);
780         lenPrefix.fill();
781         readyForQueryExpected++;
782         logTrace("MD5 Password message buffered");
783     }
784 
785     final void parseNoticeMessage(ubyte[] data, ref Notice n)
786     {
787         void copyTillZero(ref string dest)
788         {
789             size_t length;
790             dest = demarshalProtocolString(data, length);
791             data = data[length..$];
792         }
793 
794         void discardTillZero()
795         {
796             size_t idx = 0;
797             while (idx < data.length && data[idx])
798                 idx++;
799             data = data[idx+1..$];
800         }
801 
802         while (data.length > 1)
803         {
804             char fieldType = cast(char) data[0];
805             data = data[1..$];
806             // https://www.postgresql.org/docs/current/static/protocol-error-fields.html
807             switch (fieldType)
808             {
809                 case 'S':
810                     copyTillZero(n.severity);
811                     break;
812                 case 'V':
813                     copyTillZero(n.severityV);
814                     break;
815                 case 'C':
816                     enforce!PsqlClientException(data.length >= 6,
817                         "Expected 5 bytes of SQLSTATE code.");
818                     n.code[] = cast(char[]) data[0..5];
819                     data = data[6..$];
820                     break;
821                 case 'M':
822                     copyTillZero(n.message);
823                     break;
824                 case 'D':
825                     copyTillZero(n.detail);
826                     break;
827                 case 'H':
828                     copyTillZero(n.hint);
829                     break;
830                 case 'P':
831                     copyTillZero(n.position);
832                     break;
833                 case 'p':
834                     copyTillZero(n.internalPos);
835                     break;
836                 case 'q':
837                     copyTillZero(n.internalQuery);
838                     break;
839                 case 'W':
840                     copyTillZero(n.where);
841                     break;
842                 case 's':
843                     copyTillZero(n.schema);
844                     break;
845                 case 't':
846                     copyTillZero(n.table);
847                     break;
848                 case 'c':
849                     copyTillZero(n.column);
850                     break;
851                 case 'd':
852                     copyTillZero(n.dataType);
853                     break;
854                 case 'n':
855                     copyTillZero(n.constraint);
856                     break;
857                 case 'F':
858                     copyTillZero(n.file);
859                     break;
860                 case 'L':
861                     copyTillZero(n.line);
862                     break;
863                 case 'R':
864                     copyTillZero(n.routine);
865                     break;
866                 default:
867                     discardTillZero();
868             }
869         }
870     }
871 
872     /// Read from socket to buffer buf exactly buf.length bytes.
873     /// Blocks and throws.
874     final void read(ubyte[] buf)
875     {
876         try
877         {
878             logTrace("reading %d bytes", buf.length);
879             // TODO: make sure this code is generic enough for all sockets
880             auto r = m_socket.receive(buf);
881             assert(r == buf.length);
882             //logTrace("received %d bytes", r);
883         }
884         catch (PsqlSocketException e)
885         {
886             open = false;
887             throw e;
888         }
889     }
890 
891     /// extends writeBuffer if marshalling functor m is lacking space (returns -2)
892     final int wrappedMarsh(MarshT)(scope MarshT m)
893     {
894         int bcount = m(writeBuffer[bufHead .. $]);
895         while (bcount == -2)
896         {
897             writeBuffer.length = writeBuffer.length + 4 * 4096;
898             bcount = m(writeBuffer[bufHead .. $]);
899         }
900         if (bcount > 0)
901             bufHead += bcount;
902         return bcount;
903     }
904 
905     /// write numeric type T to write buffer
906     final int write(T)(T val)
907         if (isNumeric!T)
908     {
909         return wrappedMarsh((ubyte[] buf) => marshalFixedField(buf, val));
910     }
911 
912     /// Reserve space in write buffer for length prefix and return
913     /// struct that automatically fills it from current buffer offset position.
914     final auto reserveLen(T = int)()
915         if (isNumeric!T && !isUnsigned!T)
916     {
917         static struct Len
918         {
919             PSQLConnection con;
920             int idx;    // offset of length prefix word in writeBuffer
921 
922             /// calculate and write length prefix
923             void fill(bool includeSelf = true)
924             {
925                 T len = (con.bufHead - idx).to!T;
926                 if (!includeSelf)
927                 {
928                     len -= T.sizeof.to!T;
929                     assert(len >= 0);
930                 }
931                 else
932                     assert(len >= T.sizeof);
933                 logTrace("writing length of %d bytes to index %d", len, idx);
934                 auto res = marshalFixedField(con.writeBuffer[idx .. idx+T.sizeof], len);
935                 assert(res == T.sizeof);
936             }
937 
938             /// write some specific number
939             void write(T v)
940             {
941                 auto res = marshalFixedField(con.writeBuffer[idx .. idx+T.sizeof], v);
942                 assert(res == T.sizeof);
943             }
944         }
945 
946         Len l = Len(this, bufHead);
947         bufHead += T.sizeof;
948         return l;
949     }
950 
951     final int write(string s)
952     {
953         return wrappedMarsh((ubyte[] buf) => marshalStringField(buf, s));
954     }
955 
956     final int cwrite(string s)
957     {
958         return wrappedMarsh((ubyte[] buf) => marshalCstring(buf, s));
959     }
960 
961     /// read exactly one message from the socket
962     final Message readOneMessage()
963     {
964         Message res;
965         ubyte[1] type;
966         read(type);
967         res.type = cast(BackendMessageType) type[0];
968         logTrace("Got message of type %s", res.type.to!string);
969         ubyte[4] length_arr;
970         read(length_arr);
971         int length = demarshalNumber(length_arr) - 4;
972         enforce!PsqlClientException(length >= 0, "Negative message length");
973         ubyte[] data;
974         if (length > 0)
975         {
976             if (length <= readBatchSize / 2)
977             {
978                 // we should batch the allocation
979                 if (readBatch.length < length)
980                     readBatch = new ubyte[readBatchSize];
981                 data = readBatch[0..length];
982                 readBatch = readBatch[length..$];
983             }
984             else
985                 data = new ubyte[length];
986             read(data);
987         }
988         res.data = data;
989         return res;
990     }
991 }