1 /**
2 Connection.
4 Copyright: Copyright Boris-Barboris 2017.
5 License: MIT
6 Authors: Boris-Barboris
7 */
9 module dpeq.connection;
11 import core.time: seconds, Duration;
13 import std.algorithm: max;
14 import std.exception: enforce;
15 import std.conv: to;
16 import std.traits;
17 import std.range;
19 import dpeq.constants;
20 import dpeq.exceptions;
21 import dpeq.serialize;
22 import dpeq.result;
23 import dpeq.socket;
27 /// Transport and authorization parameters.
28 struct BackendParams
29 {
30     /// Hostname or IP address for TCP socket. Filename for UNIX socket.
31     string host = "/var/run/postgresql/.s.PGSQL.5432";
32     ushort port = cast(ushort)5432;
33     string user = "postgres";
34     string password;
35     string database = "postgres";
36 }
38 enum StmtOrPortal: char
39 {
40     statement = 'S',
41     portal = 'P'
42 }
44 // When the client code is uninterested in dpeq connection logging.
45 pragma(inline)
46 void nop_logger(T...)(lazy string fmt, lazy T vals) nothrow @safe pure {}
48 /**
49 Connection object.
51 Params:
52     SocketT  = socket class type to use.
53     logTrace = alias of a logging function with $(D std.stdio.writef) signature.
54         Very verbose byte-stream information will be printed through it.
55     logError = same as logTrace but for errors.
56 */
57 class PSQLConnection(
58     SocketT = StdSocket,
59     alias logTrace = nop_logger,
60     alias logError = nop_logger)
61 {
62     protected
63     {
64         SocketT m_socket;
65         ubyte[] writeBuffer;
66         int bufHead = 0;
67         bool open = false;
69         // we allocate RAM for responses in batches to reduce GC pressure.
70         // 2048 is the last small-sized dling in gc:
71         // https://github.com/dlang/druntime/blob/v2.079.0/src/gc/impl/conservative/gc.d#L1251
72         immutable int readBatchSize = 2048;
73         ubyte[] readBatch;
75         // number of expected readyForQuery responses
76         int readyForQueryExpected = 0;
77         int unflushedRfq = 0;
79         TransactionStatus tstatus;
81         /// counters to generate unique prepared statement and portal ids
82         int preparedCounter = 0;
83         int portalCounter = 0;
85         int m_processId = -1;
86         int m_cancellationKey = -1;
88         /// backend params this connection was created with
89         const BackendParams m_backendParams;
90     }
92     final @property pure @safe nothrow
93     {
95         /// Backend parameters this connection was constructed from
96         ref const(BackendParams) backendParams() const
97         {
98             return m_backendParams;
99         }
101         /// Backend process ID. Used in CancelRequest message.
102         int processId() const { return m_processId; }
104         /// Cancellation secret. Used in CancelRequest message.
105         int cancellationKey() const { return m_cancellationKey; }
107         /// Socket getter.
108         SocketT socket() { return m_socket; }
110         /// Number of ReadyForQuery messages that are yet to be received
111         /// from the backend. May be useful for checking wether getQueryResults
112         /// would block forever.
113         int expectedRFQCount() const { return readyForQueryExpected; }
115         /// Transaction status, reported by the last received ReadyForQuery message.
116         /// For a new connection TransactionStatus.IDLE is returned.
117         TransactionStatus transactionStatus() const { return tstatus; }
119         /// Connection is open when it is authorized and socket was alive last time
120         /// it was checked.
121         bool isOpen() const { return open; }
122     }
124     invariant
125     {
126         assert(readyForQueryExpected >= 0);
127         assert(unflushedRfq >= 0);
128         assert(bufHead >= 0);
129     }
131     /// Generate next connection-unique prepared statement name.
132     final string getNewPreparedName() pure @safe nothrow
133     {
134         return (preparedCounter++).to!string;
135     }
137     /// Generate next connection-unique portal name.
138     final string getNewPortalName() pure @safe nothrow
139     {
140         return (portalCounter++).to!string;
141     }
143     /// Allocate reuired memory, build socket and authorize the connection.
144     /// Throws: $(D PsqlSocketException) if transport failed, or
145     /// $(D PsqlClientException) if authorization failed for some reason.
146     this(const BackendParams bp, Duration connectTimeout = seconds(10), size_t writeBufSize = 2 * 4096) @safe
147     {
148         m_backendParams = bp;
149         writeBuffer.length = writeBufSize;
150         try
151         {
152             logTrace("Trying to open TCP connection to PSQL");
153             m_socket = new SocketT(bp.host, bp.port, connectTimeout);
154             logTrace("Success");
155         }
156         catch (Exception e)
157         {
158             throw new PsqlSocketException(e.msg, e);
159         }
160         scope(failure) m_socket.close();
161         initialize(bp);
162         open = true;
163     }
165     /// Notify backend and close socket.
166     void terminate() nothrow @safe
167     {
168         open = false;
169         try
170         {
171             putTerminateMessage();
172             flush();
173             m_socket.close();
174         }
175         catch (Exception e)
176         {
177             logError("Exception caught while terminating PSQL connection: %s", e.msg);
178         }
179     }
181     /**
182     Open new socket and connect it to the same backend as this connection is
183     bound to, send CancelRequest and close the temporary socket.
184     */
185     void cancelRequest() @safe
186     {
187         SocketT sock = new SocketT(m_backendParams.host, m_backendParams.port, seconds(5));
188         ubyte[4 * 4] intBuf;
189         static immutable int pn1 = 16;
190         serializeFixedField(intBuf[0..4], &pn1);
191         static immutable int pn2 = int(80877102);
192         serializeFixedField(intBuf[4..8], &pn2);
193         serializeFixedField(intBuf[8..12], &m_processId);
194         serializeFixedField(intBuf[12..16], &m_cancellationKey);
195         sock.send(intBuf[]);
196         sock.close();
197     }
199     /// Flush writeBuffer into the socket. Blocks/yields (according to socket
200     /// implementation).
201     final void flush() @safe
202     {
203         try
204         {
205             // does not block if zero length:
206             // https://github.com/vibe-d/vibe-core/blob/master/source/vibe/core/net.d#L607
207             auto w = m_socket.send(writeBuffer[0..bufHead]);
208             while (bufHead - w > 0)
209                 w += m_socket.send(writeBuffer[w..bufHead]);
210             logTrace("flushed %d bytes: %s", w, writeBuffer[0..bufHead].to!string);
211         }
212         catch (PsqlSocketException e)
213         {
214             open = false;
215             throw e;
216         }
217         finally
218         {
219             bufHead = 0;
220             readyForQueryExpected += unflushedRfq;
221             unflushedRfq = 0;
222         }
223     }
225     /// discard write buffer content
226     final void discard() pure nothrow @safe
227     {
228         bufHead = 0;
229         unflushedRfq = 0;
230     }
232     /// Save write buffer cursor in order to be able to restore it in case of errors.
233     /// Use it to prevent sending junk to backend when something goes wrong during
234     /// serialization or message creation.
235     final auto saveBuffer() pure nothrow @safe
236     {
237         static struct WriteCursor
238         {
239             private int savedHead;
240             private int savedUnflushedRfq;
241             PSQLConnection conn;
242             void restore() pure nothrow @safe
243             {
244                 assert(conn.bufHead >= savedHead);
245                 conn.bufHead = savedHead;
246                 assert(conn.unflushedRfq >= savedUnflushedRfq);
247                 conn.unflushedRfq = savedUnflushedRfq;
248             }
249         }
250         return WriteCursor(bufHead, unflushedRfq, this);
251     }
254     /*
255     ////////////////////////////////////////////////////////////////////////////
256     // All sorts of messages
257     // https://www.postgresql.org/docs/9.5/static/protocol-message-formats.html
258     ////////////////////////////////////////////////////////////////////////////
259     */
261     /** Put Bind message into write buffer.
262     *
263     * 'formatCodes' - input range of FormatCodes.
264     * quotes:
265     * The number of parameter format codes that follow (denoted C below).
266     * This can be zero to indicate that there are no parameters or that the
267     * parameters all use the default format (text); or one, in which case the
268     * specified format code is applied to all parameters; or it can equal
269     * the actual number of parameters.
270     * The parameter format codes. Each must presently be zero (text) or one (binary).
271     * `parameters` is input range of deserialization delegates.
272     *
273     * 'parameters' - input range of serializeler closures. Actual data should
274     * be self-contained in this parameter. Marshaller is a callable that
275     * is covariant with "int delegate(ubyte[] buf)" and returns -2 if buf
276     * is too small, -1 if parameter is null and an actual number of bytes written
277     * otherwise.
278     *
279     * 'resultFormatCodes' - input range of query result FormatCodes.
280     * quotes:
281     *
282     * The number of result-column format codes that follow (denoted R below).
283     * This can be zero to indicate that there are no result columns or that
284     * the result columns should all use the default format (text); or one,
285     * in which case the specified format code is applied to all result
286     * columns (if any); or it can equal the actual number of result columns
287     * of the query.
288     * The result-column format codes. Each must presently be zero (text) or
289     * one (binary).
290     */
291     final void putBindMessage(FR, PR, RR)
292         (string portal, string prepared, scope FR formatCodes, scope PR parameters,
293         scope RR resultFormatCodes) pure @safe
294     //if (isInputRange!FR && is(Unqual!(ElementType!FR) == FormatCode) &&
295     //    isInputRange!RR && is(Unqual!(ElementType!RR) == FormatCode) &&
296     //    isInputRange!PR && __traits(compiles, -1 == parameters.front()(new ubyte[2]))
297     {
298         assert(open, "Connection is not open");
299         int savepoint = bufHead;
300         scope(failure) bufHead = savepoint;
302         write(cast(ubyte)FrontMessageType.Bind);
303         auto lenTotal = reserveLen();
304         cwrite(portal);
305         cwrite(prepared);
307         // parameter format code(s)
308         short fcodes = 0;
309         auto fcodePrefix = reserveLen!short();
310         foreach (FormatCode fcode; formatCodes)
311         {
312             logTrace("Bind: writing %d fcode", fcode);
313             write(cast(short)fcode);
314             fcodes++;
315         }
316         fcodePrefix.write(fcodes);
318         // parameters
319         short pcount = 0;
320         auto pcountPrefix = reserveLen!short();
321         foreach (param; parameters)
322         {
323             auto paramPrefix = reserveLen!int();
324             int r = wrappedSerialize(param);
325             logTrace("Bind: wrote 4bytes + %d bytes for value", r);
326             paramPrefix.write(r);    // careful! -1 means Null
327             pcount++;
328         }
329         pcountPrefix.write(pcount);
331         // result format codes
332         short rcount = 0;
333         auto rcolPrefix = reserveLen!short();
334         foreach (FormatCode fcode; resultFormatCodes)
335         {
336             write(cast(short)fcode);
337             logTrace("Bind: writing %d rfcode", fcode);
338             rcount++;
339         }
340         rcolPrefix.write(rcount);
342         lenTotal.fill();
343         logTrace("Bind message buffered");
344     }
346     /// putBindMessage overload for parameterless portals
347     final void putBindMessage(RR)(string portal, string prepared,
348         scope RR resultFormatCodes) pure @safe
349     {
350         assert(open, "Connection is not open");
351         int savepoint = bufHead;
352         scope(failure) bufHead = savepoint;
354         write(cast(ubyte)FrontMessageType.Bind);
355         auto lenTotal = reserveLen();
356         cwrite(portal);
357         cwrite(prepared);
358         write(short(0));
359         write(short(0));
360         // result format codes
361         short rcount = 0;
362         auto rcolPrefix = reserveLen!short();
363         foreach (FormatCode fcode; resultFormatCodes)
364         {
365             write(cast(short)fcode);
366             logTrace("Bind: writing %d rfcode", fcode);
367             rcount++;
368         }
369         rcolPrefix.write(rcount);
371         lenTotal.fill();
372         logTrace("Bind message buffered");
373     }
375     /// putBindMessage overload for already serialized parameters
376     final void putBindMessage(string portal, string prepared,
377         scope const(const(ubyte)[])[] rawChunks) pure @safe
378     {
379         assert(open, "Connection is not open");
380         int savepoint = bufHead;
381         scope(failure) bufHead = savepoint;
383         write(cast(ubyte)FrontMessageType.Bind);
384         auto lenTotal = reserveLen();
385         cwrite(portal);
386         cwrite(prepared);
387         foreach (chunk; rawChunks)
388             bwrite(chunk);
389         lenTotal.fill();
390         logTrace("Bind message buffered");
391     }
393     /// put Close message into write buffer.
394     /// `closeWhat` is 'S' for prepared statement and
395     /// 'P' for portal.
396     final void putCloseMessage(StmtOrPortal closeWhat, string name) pure @safe
397     {
398         assert(open, "Connection is not open");
399         assert(closeWhat == 'S' || closeWhat == 'P');
400         int savepoint = bufHead;
401         scope(failure) bufHead = savepoint;
403         write(cast(ubyte)FrontMessageType.Close);
404         auto lenTotal = reserveLen();
405         write(cast(ubyte)closeWhat);
406         cwrite(name);
407         lenTotal.fill();
408         logTrace("Close message buffered");
409     }
411     /// put Close message into write buffer.
412     /// `closeWhat` is 'S' for prepared statement and
413     /// 'P' for portal.
414     final void putDescribeMessage(StmtOrPortal descWhat, string name) pure @safe
415     {
416         assert(open, "Connection is not open");
417         assert(descWhat == 'S' || descWhat == 'P');
418         int savepoint = bufHead;
419         scope(failure) bufHead = savepoint;
421         write(cast(ubyte)FrontMessageType.Describe);
422         auto lenTotal = reserveLen();
423         write(cast(ubyte)descWhat);
424         cwrite(name);
425         lenTotal.fill();
426         logTrace("Describe message buffered");
427     }
429     /**
430     non-zero maxRows will generate PortalSuspended messages, wich are
431     currently not handled by dpeq commands */
432     final void putExecuteMessage(string portal = "", int maxRows = 0) pure @safe
433     {
434         assert(open, "Connection is not open");
435         int savepoint = bufHead;
436         scope(failure) bufHead = savepoint;
438         write(cast(ubyte)FrontMessageType.Execute);
439         auto lenTotal = reserveLen();
440         cwrite(portal);
441         write(maxRows);
442         lenTotal.fill();
443         logTrace("Execute message buffered");
444     }
446     /**
447     Quote:
448     "The Flush message does not cause any specific output to be generated,
449     but forces the backend to deliver any data pending in its output buffers.
450     A Flush must be sent after any extended-query command except Sync, if the
451     frontend wishes to examine the results of that command before issuing more
452     commands. Without Flush, messages returned by the backend will be combined
453     into the minimum possible number of packets to minimize network overhead."
454     */
455     final void putFlushMessage() pure nothrow @safe
456     {
457         assert(open, "Connection is not open");
458         write(cast(ubyte)FrontMessageType.Flush);
459         write(4);
460         logTrace("Flush message buffered");
461     }
463     final void putParseMessage(PR)(string prepared, string query, scope PR ptypes)
464         pure @safe if (isInputRange!PR && is(Unqual!(ElementType!PR) == OID))
465     {
466         assert(open, "Connection is not open");
467         int savepoint = bufHead;
468         scope(failure) bufHead = savepoint;
470         write(cast(ubyte)FrontMessageType.Parse);
471         auto lenTotal = reserveLen();
472         cwrite(prepared);
473         cwrite(query);
475         // parameter types
476         short pcount = 0;
477         auto pcountPrefix = reserveLen!short();
478         foreach (OID ptype; ptypes)
479         {
480             write(ptype);
481             pcount++;
482         }
483         pcountPrefix.write(pcount);
485         lenTotal.fill();
486         logTrace("Parse message buffered");
487     }
489     /// put Query message (simple query protocol) into the write buffer
490     final void putQueryMessage(in string query) pure @safe
491     {
492         assert(open, "Connection is not open");
493         int savepoint = bufHead;
494         scope(failure) bufHead = savepoint;
496         write(cast(ubyte)FrontMessageType.Query);
497         auto lenTotal = reserveLen();
498         cwrite(query);
499         lenTotal.fill();
500         unflushedRfq++;
501         logTrace("Query message buffered");
502     }
504     /// ditto
505     final void putQueryMessage(in string[] queryChunks) pure @safe
506     {
507         assert(open, "Connection is not open");
508         int savepoint = bufHead;
509         scope(failure) bufHead = savepoint;
511         write(cast(ubyte)FrontMessageType.Query);
512         auto lenTotal = reserveLen();
513         for (int i = 0; i < queryChunks.length; i++)
514             if (i == queryChunks.length - 1)
515                 cwrite(queryChunks[i]);
516             else
517                 write(queryChunks[i]);
518         lenTotal.fill();
519         unflushedRfq++;
520         logTrace("Query message buffered");
521     }
523     alias putSimpleQuery = putQueryMessage;
525     /**
526     Put Sync message into write buffer. Usually you should call this after
527     every portal execute message.
528     Every postSimpleQuery or PSQLConnection.sync MUST be accompanied by getQueryResults call. */
529     final void putSyncMessage() pure nothrow @safe
530     {
531         assert(open, "Connection is not open");
532         write(cast(ubyte)FrontMessageType.Sync);
533         write(4);
534         unflushedRfq++;
535         logTrace("Sync message buffered");
536     }
538     alias sync = putSyncMessage;
540     /// NotificationResponse messages will be parsed and passed to this
541     /// callback during 'pollMessages' call.
542     /// https://www.postgresql.org/docs/9.5/static/sql-notify.html
543     bool delegate(typeof(this) con, Notification n) nothrow @safe notificationCallback = null;
545     /// NoticeResponse messages will be parsed and
546     /// passed to this callback during 'pollMessages' call.
547     void delegate(typeof(this) con, Notice n) nothrow @safe noticeCallback = null;
549     /** When this callback returns true, pollMessages will exit it's loop.
550     Interceptor should set err to true if it has encountered some kind of error
551     and wants it to be rethrown as PsqlClientException at the end of
552     pollMessages call. errMsg should be appended with error description. */
553     alias InterceptorT = bool delegate(Message msg) @safe nothrow;
555     /** Read messages from the socket in loop until:
556       1). if finishOnError is set and ErrorResponse is received, function
557           throws PsqlErrorResponseException immediately.
558       2). ReadyForQuery message is received.
559       3). interceptor delegate returns `true`.
560       4). NotificationResponse received and notificationCallback returned 'true'.
561     Interceptor delegate is used to customize the logic. If the message is
562     not ReadyForQuery, ErrorResponse or Notice\Notification, it is passed to
563     interceptor. */
564     final void pollMessages(scope InterceptorT interceptor, bool finishOnError = false) @safe
565     {
566         bool error;
567         Notice errorNotice;
568         pollMessages(interceptor, error, errorNotice, finishOnError);
569         if (error)
570             throw new PsqlErrorResponseException(errorNotice);
571     }
573     /// Same as above, but throws only on serious protocol or socket-level errors.
574     final void pollMessages(scope InterceptorT interceptor,
575         out bool error, out Notice errorNotice, bool finishOnError = false) @safe
576     {
577         bool finish = false;
579         while (!finish)
580         {
581             Message msg = readOneMessage();
583             with (BackendMessageType)
584             switch (msg.type)
585             {
586                 case ErrorResponse:
587                     enforce!PsqlClientException(!error, "Second ErrorResponse " ~
588                         "received during one pollMessages call");
589                     error = true;
590                     parseNoticeMessage(msg.data, errorNotice);
591                     if (finishOnError)
592                         finish = true;
593                     break;
594                 case ReadyForQuery:
595                     enforce!PsqlClientException(readyForQueryExpected > 0,
596                         "Unexpected ReadyForQuery message");
597                     readyForQueryExpected--;
598                     tstatus = cast(TransactionStatus) msg.data[0];
599                     finish = true;
600                     break;
601                 case NoticeResponse:
602                     if (noticeCallback !is null)
603                     {
604                         Notice n;
605                         parseNoticeMessage(msg.data, n);
606                         noticeCallback(this, n);
607                     }
608                     break;
609                 case NotificationResponse:
610                     if (notificationCallback !is null)
611                     {
612                         Notification n;
613                         n.procId = deserializeNumber!int(msg.data[0..4]);
614                         size_t l;
615                         n.channel = deserializeProtocolString(msg.data[4..$], l);
616                         n.payload = deserializeString(msg.data[4+l..$-1]);
617                         finish |= notificationCallback(this, n);
618                     }
619                     break;
620                 default:
621                     if (interceptor !is null)
622                         finish |= interceptor(msg);
623             }
624         }
625     }
627     /// reads and discards messages from socket until all expected
628     /// ReadyForQuery messages are received
629     void windupResponseStack() @safe
630     {
631         while (readyForQueryExpected > 0)
632         {
633             Message msg = readOneMessage();
634             if (msg.type == BackendMessageType.ReadyForQuery)
635                 readyForQueryExpected--;
636         }
637     }
639     // Protected section for functions that will probably never be used by
640     // client code directly. If you need them, inherit them.
641 protected:
643     final void putStartupMessage(in BackendParams params) pure @safe
644     {
645         int savepoint = bufHead;
646         scope(failure) bufHead = savepoint;
648         auto lenPrefix = reserveLen();
649         write(0x0003_0000);  // protocol v3
650         cwrite("user");
651         cwrite(params.user);
652         cwrite("database");
653         cwrite(params.database);
654         write(cast(ubyte)0);
655         lenPrefix.fill();
656         logTrace("Startup message buffered");
657     }
659     final void putTerminateMessage() pure nothrow @safe
660     {
661         write(cast(ubyte)FrontMessageType.Terminate);
662         write(4);
663         logTrace("Terminate message buffered");
664     }
666     void initialize(in BackendParams params) @safe
667     {
668         putStartupMessage(params);
669         flush();
671         int authType = -1;
672         Message auth_msg;
674         pollMessages((Message msg)
675         {
676             if (msg.type == BackendMessageType.Authentication)
677             {
678                 auth_msg = msg;
679                 authType = deserializeNumber(msg.data[0..4]);
680                 if (authType == 0)  // instantly authorized, so we'll get readyForQuery
681                     readyForQueryExpected++;
682                 else
683                     return true;
684             }
685             else if (msg.type == BackendMessageType.BackendKeyData)
686             {
687                 m_processId = deserializeNumber(msg.data[0..4]);
688                 m_cancellationKey = deserializeNumber(msg.data[4..8]);
689             }
690             return false;
691         }, true);
693         enforce!PsqlClientException(authType != -1,
694             "Expected Authentication message was not received");
695         switch (authType)
696         {
697             case 0:
698                 // instant AuthenticationOk, trusted connection usually does this
699                 logTrace("Succesfully authorized");
700                 return;
701             case 3:
702                 // cleartext password
703                 putPasswordMessage(params.password);
704                 break;
705             case 5:
706                 // MD5 salted password
707                 ubyte[4] salt = auth_msg.data[4 .. 8];
708                 putMd5PasswordMessage(params.password, params.user, salt);
709                 break;
710             default:
711                 throw new PsqlClientException("Unknown auth type " ~
712                     authType.to!string);
713         }
714         flush();
716         int authRes = -1;
717         pollMessages((Message msg) {
718                 if (msg.type == BackendMessageType.Authentication)
719                     authRes = deserializeNumber(msg.data[0..4]);
720                 else if (msg.type == BackendMessageType.BackendKeyData)
721                 {
722                     m_processId = deserializeNumber(msg.data[0..4]);
723                     m_cancellationKey = deserializeNumber(msg.data[4..8]);
724                 }
725                 return false;
726             });
727         enforce!PsqlClientException(authRes == 0,
728             "Expected AuthenticationOk message was not received");
729     }
731     final void putPasswordMessage(string pw) pure @safe
732     {
733         int savepoint = bufHead;
734         scope(failure) bufHead = savepoint;
736         write(cast(ubyte)FrontMessageType.PasswordMessage);
737         auto lenPrefix = reserveLen();
738         cwrite(pw);
739         lenPrefix.fill();
740         unflushedRfq++;
741         logTrace("Password message buffered");
742     }
744     final void putMd5PasswordMessage(string pw, string user, ubyte[4] salt) pure @trusted
745     {
746         int savepoint = bufHead;
747         scope(failure) bufHead = savepoint;
749         // thank you ddb authors
750         char[32] MD5toHex(T...)(in T data)
751         {
752             import std.ascii : LetterCase;
753             import std.digest.md : md5Of, toHexString;
754             return md5Of(data).toHexString!(LetterCase.lower);
755         }
757         write(cast(ubyte)FrontMessageType.PasswordMessage);
758         auto lenPrefix = reserveLen();
759         char[3 + 32] mdpw;
760         mdpw[0 .. 3] = "md5";
761         mdpw[3 .. $] = MD5toHex(MD5toHex(pw, user), salt);
762         cwrite(cast(string) mdpw[]);
763         lenPrefix.fill();
764         unflushedRfq++;
765         logTrace("MD5 Password message buffered");
766     }
768     /// Read from socket to buffer buf exactly buf.length bytes.
769     /// Blocks and throws.
770     final void read(ubyte[] buf) @safe
771     {
772         try
773         {
774             logTrace("reading %d bytes", buf.length);
775             auto r = m_socket.receive(buf);
776             assert(r == buf.length, "received less bytes than requested");
777         }
778         catch (PsqlSocketException e)
779         {
780             open = false;
781             throw e;
782         }
783     }
785     /// extends writeBuffer if serializer 'm' is lacking space (returns -2)
786     final int wrappedSerialize(SerialT)(scope SerialT m) pure nothrow @trusted
787         if (isCallable!SerialT)
788     {
789         int bcount = m(writeBuffer[bufHead .. $]);
790         while (bcount <= -2)
791         {
792             // reallocate with additional 4 pages
793             int deficit = -bcount - (cast(int) writeBuffer.length - bufHead);
794             assert(deficit > 0, "negative buffer deficit");
795             writeBuffer.length = writeBuffer.length + max(4 * 4096, deficit);
796             bcount = m(writeBuffer[bufHead .. $]);
797         }
798         if (bcount > 0)
799             bufHead += bcount;
800         return bcount;
801     }
803     /// write numeric type T to write buffer
804     final int write(T)(T val) pure nothrow @safe
805         if (isNumeric!T)
806     {
807         return wrappedSerialize((ubyte[] buf) => serializeFixedField(buf, &val));
808     }
810     /// Reserve space in write buffer for length prefix and return
811     /// struct that automatically fills it from current buffer offset position.
812     final auto reserveLen(T = int)() @safe nothrow
813         if (isNumeric!T && !isUnsigned!T)
814     {
815         static struct Len
816         {
817             PSQLConnection con;
818             int idx;    // offset of length prefix word in writeBuffer
820             /// calculate and write length prefix
821             void fill(bool includeSelf = true) pure @trusted
822             {
823                 T len = (con.bufHead - idx).to!T;
824                 if (!includeSelf)
825                 {
826                     len -= T.sizeof.to!T;
827                     assert(len >= 0);
828                 }
829                 else
830                     assert(len >= T.sizeof);
831                 logTrace("writing length of %d bytes to index %d", len, idx);
832                 auto res = serializeFixedField(con.writeBuffer[idx .. idx+T.sizeof], &len);
833                 assert(res == T.sizeof);
834             }
836             /// write some specific number
837             void write(T v) nothrow pure @trusted
838             {
839                 auto res = serializeFixedField(con.writeBuffer[idx .. idx+T.sizeof], &v);
840                 assert(res == T.sizeof);
841             }
842         }
844         Len l = Len(this, bufHead);
845         bufHead += T.sizeof;
846         return l;
847     }
849     final int bwrite(scope const ubyte[] s) pure nothrow @safe
850     {
851         return wrappedSerialize((ubyte[] buf) => serializeBytesField(buf, &s));
852     }
854     final int write(in string s) pure nothrow @safe
855     {
856         return wrappedSerialize((ubyte[] buf) => serializeStringField(buf, &s));
857     }
859     final int cwrite(in string s) pure nothrow @safe
860     {
861         return wrappedSerialize((ubyte[] buf) => serializeCstring(buf, s));
862     }
864     /// read exactly one message from the socket
865     Message readOneMessage() @trusted
866     {
867         Message res;
868         ubyte[5] type_and_length;
869         read(type_and_length);
870         res.type = cast(BackendMessageType) type_and_length[0];
871         logTrace("Got message of type %s", res.type.to!string);
872         int length = deserializeNumber(cast(immutable(ubyte)[]) type_and_length[1..$]) - 4;
873         enforce!PsqlClientException(length >= 0, "Negative message length");
874         ubyte[] data;
875         if (length > 0)
876         {
877             if (length <= readBatchSize / 2)
878             {
879                 // we should batch the allocation
880                 if (readBatch.length < length)
881                     readBatch = new ubyte[readBatchSize];
882                 data = readBatch[0..length];
883                 readBatch = readBatch[length..$];
884             }
885             else
886                 data = new ubyte[length];   // fat messages get their own buffer
887             read(data);
888         }
889         res.data = cast(immutable(ubyte)[]) data;
890         return res;
891     }
892 }