1 /**
2 Functions of various nature. Prepared statement, portal and response deserialization
3 functions live here.
4 
5 Copyright: Copyright Boris-Barboris 2017.
6 License: MIT
7 Authors: Boris-Barboris
8 */
9 
10 module dpeq.command;
11 
12 import std.algorithm: max, map;
13 import std.exception: enforce;
14 import std.format: format;
15 import std.conv: to;
16 import std.traits;
17 import std.range;
18 import std.variant;
19 import std.meta;
20 import std.typecons;
21 
22 import dpeq.exceptions;
23 import dpeq.connection;
24 import dpeq.constants;
25 import dpeq.serialize;
26 import dpeq.result;
27 
28 
29 
30 @safe:
31 
32 /*
33 /////////////////////////////////////
34 // Different forms of command input
35 /////////////////////////////////////
36 */
37 
38 /** Simple query is simple. Send string to server and get responses.
39 The most versatile, but unsafe way to issue commands to PSQL.
40 Simple query always returns data in FormatCode.Text format.
41 Simple queries SHOULD NOT be accompanied by SYNC message, they
42 trigger ReadyForQuery message anyways.
43 
44 Every postSimpleQuery or PSQLConnection.sync should be accompanied by getQueryResults
45 call. */
46 void postSimpleQuery(ConnT)(ConnT conn, string query) pure
47 {
48     conn.putQueryMessage(query);
49 }
50 
51 /// Pre-parsed sql query with variable parameters.
52 class PreparedStatement(ConnT)
53 {
54     protected
55     {
56         const(OID)[] paramTypes;
57         string query;
58         ConnT conn;
59         string parsedName;  // name, reserved for this statement in PSQL connection
60         short m_paramCount;
61         bool parseRequested;
62     }
63 
64     /// name of this prepared statement, as seen by backend.
65     final @property string preparedName() const pure { return parsedName; }
66 
67     final @property short paramCount() const pure { return m_paramCount; }
68 
69     /**
70     Creates prepared statement object, wich holds dpeq utility state.
71     Constructor does not write anything to connection write buffer.
72 
73     Quoting https://www.postgresql.org/docs/9.5/static/protocol-message-formats.html:
74 
75     The number of parameter data types specified (can be zero). Note that this is not an
76     indication of the number of parameters that might appear in the query string,
77     only the number that the frontend wants to prespecify types for.
78 
79     Then, for each parameter, there is the following:
80 
81     Int32
82         Specifies the object ID of the parameter data type. Placing a zero here is
83         equivalent to leaving the type unspecified.
84 
85     That means you can leave paramTypes null, unless you're doing some tricky
86     stuff.
87     */
88     this(ConnT conn, string query, short paramCount, bool named = false,
89         const(OID)[] paramTypes = null) pure
90     {
91         assert(conn);
92         assert(query);
93         assert(paramCount >= 0);
94         this.conn = conn;
95         this.query = query;
96         this.paramTypes = paramTypes;
97         this.m_paramCount = paramCount;
98         if (named)
99             parsedName = conn.getNewPreparedName();
100         else
101             parsedName = "";
102     }
103 
104     /// write Parse message into connection's write buffer.
105     final void postParseMessage() pure
106     {
107         conn.putParseMessage(parsedName, query, paramTypes[]);
108         parseRequested = true;
109     }
110 
111     /// ditto
112     alias parse = postParseMessage;
113 
114     /** Post message to destroy named prepared statement.
115 
116     An unnamed prepared statement lasts only until the next Parse
117     statement specifying the unnamed statement as destination is issued.
118     (Note that a simple Query message also destroys the unnamed statement.)
119     */
120     final void postCloseMessage() pure
121     {
122         assert(parseRequested, "prepared statement was never sent to backend");
123         assert(parsedName.length, "no need to close unnamed prepared statements");
124         conn.putCloseMessage(StmtOrPortal.statement, parsedName);
125         parseRequested = false;
126     }
127 
128     /// poll message queue and make sure parse was completed
129     final void ensureParseComplete()
130     {
131         bool parsed = false;
132         bool interceptor(Message msg)
133         {
134             with (BackendMessageType)
135             switch (msg.type)
136             {
137                 case ParseComplete:
138                     parsed = true;
139                     return true;
140                 default:
141                     break;
142             }
143             return false;
144         }
145         conn.pollMessages(&interceptor, true);
146         enforce!PsqlClientException(parsed, "Parse was not confirmed");
147     }
148 }
149 
150 
151 /// Parameter tuple, bound to prepared statement
152 class Portal(ConnT)
153 {
154     protected
155     {
156         PreparedStatement!ConnT prepStmt;
157         ConnT conn;
158         string portalName;  // name, reserved for this portal in PSQL connection
159         bool bindRequested = false;
160     }
161 
162     this(PreparedStatement!ConnT ps, bool persist = true) pure
163     {
164         assert(ps);
165         this.conn = ps.conn;
166         prepStmt = ps;
167         if (persist)
168             portalName = conn.getNewPortalName();
169         else
170             portalName = "";
171     }
172 
173     /// bind empty, parameterless portal. resCodes are requested format codes
174     /// of resulting columns, keep it null to request everything in text format.
175     final void bind(FormatCode[] resCodes = null) pure
176     {
177         assert(prepStmt.paramCount == 0);
178 
179         auto safePoint = conn.saveBuffer();
180         scope (failure) safePoint.restore();
181 
182         if (bindRequested && portalName.length)
183             postCloseMessage();
184 
185         conn.putBindMessage(portalName, prepStmt.parsedName, resCodes);
186         bindRequested = true;
187     }
188 
189     /**
190     For the 'specs' array of prepared statement parameters types, known at
191     compile-time, write Bind message to connection's write buffer from the
192     representation of 'args' parameters, serialized to 'specs' types according
193     to 'Serializer' template. Format codes of the response columns is set
194     via 'resCodes' array, known at compile time.
195     */
196     final void bind(
197             FieldSpec[] specs,
198             FormatCode[] resCodes = null,
199             alias Serializer = DefaultSerializer,
200             Args...)
201         (in Args args) pure
202     {
203         assert(prepStmt.paramCount == Args.length);
204         assert(prepStmt.paramCount == specs.length);
205 
206         auto safePoint = conn.saveBuffer();
207         scope (failure) safePoint.restore();
208 
209         if (bindRequested && portalName.length)
210             postCloseMessage();
211 
212         enum fcodesr = [staticMap!(FCodeOfFSpec!(Serializer).F, aliasSeqOf!specs)];
213 
214         alias DlgT = int delegate(ubyte[]) pure nothrow @trusted;
215         DlgT[specs.length] serializers;
216         foreach(i, paramSpec; aliasSeqOf!specs)
217         {
218             serializers[i] =
219                 (ubyte[] to) pure nothrow @trusted => Serializer!paramSpec.serialize(to, &args[i]);
220         }
221         conn.putBindMessage(portalName, prepStmt.parsedName, fcodesr,
222             serializers, resCodes);
223         bindRequested = true;
224     }
225 
226     /** This version of bind accept generic InputRanges of format codes and
227     field serializers and passes them directly to putBindMessage method of
228     connection object. No parameter count and type validation is performed.
229     If this portal is already bound and is a named one, Close message is
230     posted.
231     */
232     final void bind(FR, PR, RR)(scope FR paramCodeRange, scope PR paramRange,
233         scope RR returnCodeRange) pure
234     {
235         auto safePoint = conn.saveBuffer();
236         scope (failure) safePoint.restore();
237         if (bindRequested && portalName.length)
238             postCloseMessage();
239         conn.putBindMessage(portalName, prepStmt.parsedName, paramCodeRange,
240             paramRange, returnCodeRange);
241         bindRequested = true;
242     }
243 
244     /// Simple portal bind, wich binds all parameters as strings and requests
245     /// all result columns in text format.
246     final void bind(scope const(Nullable!string)[] args) pure
247     {
248         assert(prepStmt.paramCount == args.length);
249 
250         if (bindRequested && portalName.length)
251             postCloseMessage();
252 
253         static struct StrSerializer
254         {
255             const Nullable!string str;
256             this(const(Nullable!string) v) { str = v; }
257 
258             int opCall(ubyte[] buf) nothrow const
259             {
260                 return serializeNullableStringField(buf, &str);
261             }
262         }
263 
264         static struct MarshRange
265         {
266             const(Nullable!string)[] params;
267             int idx = 0;
268             @property bool empty() const { return idx >= params.length; }
269             void popFront() { idx++; }
270             @property StrSerializer front() const
271             {
272                 return StrSerializer(params[idx]);
273             }
274         }
275 
276         conn.putBindMessage!(FormatCode[], MarshRange, FormatCode[])(
277             portalName, prepStmt.parsedName, null, MarshRange(args), null);
278         bindRequested = true;
279     }
280 
281     /** Write Close message to connection write buffer in order to
282     explicitly destroy named portal.
283 
284     If successfully created, a named portal object lasts till the end of the
285     current transaction, unless explicitly destroyed. An unnamed portal is
286     destroyed at the end of the transaction, or as soon as the next Bind
287     statement specifying the unnamed portal as destination is issued.
288     (Note that a simple Query message also destroys the unnamed portal.)
289     Named portals must be explicitly closed before they can be redefined
290     by another Bind message, but this is not required for the unnamed portal.
291     */
292     final void postCloseMessage() pure
293     {
294         assert(bindRequested, "portal was never bound");
295         assert(portalName.length, "no need to close unnamed portals");
296         conn.putCloseMessage(StmtOrPortal.portal, portalName);
297         bindRequested = false;
298     }
299 
300     /// poll message queue and make sure bind was completed
301     final void ensureBindComplete()
302     {
303         bool is_bound = false;
304         bool interceptor(Message msg)
305         {
306             with (BackendMessageType)
307             switch (msg.type)
308             {
309                 case BindComplete:
310                     is_bound = true;
311                     return true;
312                 default:
313                     break;
314             }
315             return false;
316         }
317         conn.pollMessages(&interceptor, true);
318         enforce!PsqlClientException(is_bound, "Bind was not confirmed");
319     }
320 
321     /** Send Describe+Execute command.
322     If describe is false, no RowDescription message will be requested
323     from PSQL - useful for optimistic statically-typed querying.
324     'maxRows' parameter is responsible for portal suspending and is
325     conceptually inferior to simple TCP backpressure mechanisms or result set
326     size limiting. */
327     final void execute(bool describe = true, int maxRows = 0) pure
328     {
329         assert(bindRequested, "Portal was never bound");
330         if (describe)
331             conn.putDescribeMessage(StmtOrPortal.portal, portalName);
332         conn.putExecuteMessage(portalName, maxRows);
333     }
334 }
335 
336 
337 /*
338 ////////////////////////////////////////
339 // Functions to work with query results
340 ////////////////////////////////////////
341 */
342 
343 
344 /** Generic result materializer, suitable for both simple and prepared queries.
345 Polls messages from the connection and builds QueryResult structure from
346 them. Throws if something goes wrong. Polling stops when ReadyForQuery message
347 is received. */
348 QueryResult getQueryResults(ConnT)(ConnT conn)
349 {
350     QueryResult res;
351     RowBlock rb;
352 
353     bool interceptor(Message msg) nothrow
354     {
355         with (BackendMessageType)
356         switch (msg.type)
357         {
358             case EmptyQueryResponse:
359                 rb.state = RowBlockState.emptyQuery;
360                 res.blocks ~= rb;
361                 rb = RowBlock();
362                 break;
363             case CommandComplete:
364                 rb.state = RowBlockState.complete;
365                 rb.commandTag = deserializeString(msg.data[0..$-1]);
366                 res.blocks ~= rb;
367                 rb = RowBlock();
368                 break;
369             case PortalSuspended:
370                 rb.state = RowBlockState.suspended;
371                 res.blocks ~= rb;
372                 rb = RowBlock();
373                 break;
374             case RowDescription:
375                 rb.rowDesc = dpeq.result.RowDescription(msg.data);
376                 break;
377             case DataRow:
378                 rb.dataRows ~= msg.data;
379                 break;
380             default:
381                 break;
382         }
383         return false;
384     }
385 
386     conn.pollMessages(&interceptor, false);
387     return res;
388 }
389 
390 
391 /// Poll messages from the connection until CommandComplete or EmptyQueryResponse
392 /// is received, and return one row block (result of one and only one query).
393 RowBlock getOneRowBlock(ConnT)(ConnT conn, int rowCountLimit = 0)
394 {
395     RowBlock result;
396 
397     bool interceptor(Message msg) nothrow
398     {
399         with (BackendMessageType)
400         switch (msg.type)
401         {
402             case EmptyQueryResponse:
403                 result.state = RowBlockState.emptyQuery;
404                 return true;
405             case CommandComplete:
406                 result.state = RowBlockState.complete;
407                 result.commandTag = deserializeString(msg.data[0..$-1]);
408                 return true;
409             case PortalSuspended:
410                 result.state = RowBlockState.suspended;
411                 return true;
412             case RowDescription:
413                 result.rowDesc = dpeq.result.RowDescription(msg.data);
414                 requireRowDescription = false;
415                 break;
416             case DataRow:
417                 result.dataRows ~= msg.data;
418                 if (rowCountLimit != 0)
419                 {
420                     // client code requested early stop
421                     if (--rowCountLimit == 0)
422                     {
423                         result.state = RowBlockState.incomplete;
424                         return true;
425                     }
426                 }
427                 break;
428             default:
429                 break;
430         }
431         return false;
432     }
433 
434     conn.pollMessages(&interceptor, true);
435     return result;
436 }
437 
438 
439 
440 /*
441 /////////////////////////////////////////////////////////////////
442 // Functions used to transform query results into D types
443 /////////////////////////////////////////////////////////////////
444 */
445 
446 //import std.stdio;
447 
448 /** Returns RandomAccessRange of InputRanges of lazy-deserialized variants.
449 Specific flavor of Variant is derived from Converter.deserialize call return type.
450 Look into serialize.VariantConverter for deserialize implementation examples.
451 Will append parsed field descriptions to fieldDescs array if passed. */
452 auto blockToVariants(alias Converter = VariantConverter!DefaultSerializer)
453     (RowBlock block, FieldDescription[]* fieldDescs = null) pure
454 {
455     alias VariantT = ReturnType!(Converter.deserialize);
456 
457     enforce!PsqlSerializationException(block.rowDesc.isSet,
458         "Cannot deserialize RowBlock without row description. " ~
459         "Did you send Describe message?");
460     short totalColumns = block.rowDesc.fieldCount;
461     OID[] typeArr = new OID[totalColumns];
462     FormatCode[] fcArr = new FormatCode[totalColumns];
463 
464     int i = 0;
465     foreach (fdesc; block.rowDesc[]) // row description deserialization happens here
466     {
467         //writeln(fdesc.name);
468         //writeln(fdesc.formatCode);
469         if (fieldDescs)
470             (*fieldDescs)[i] = fdesc;
471         fcArr[i] = fdesc.formatCode;
472         typeArr[i++] = fdesc.type;
473     }
474 
475     static struct RowDeserializer
476     {
477     private:
478         short column = 0;
479         short totalCols;
480         immutable(ubyte)[] buf;
481         const(OID)[] types;
482         const(FormatCode)[] fcodes;
483         bool parsed = false;
484 
485         // cache result to prevent repeated deserialization on
486         // front() call.
487         VariantT res;
488     public:
489         @property bool empty() const { return column >= totalCols; }
490         void popFront()
491         {
492             parsed = false;
493             column++;
494         }
495         @property VariantT front()
496         {
497             if (parsed)
498                 return res;
499             if (column == 0)
500             {
501                 // we need to skip field count in the start of DataRow message
502                 buf = buf[2 .. $];
503             }
504             assert(buf.length > 0);
505             int len = deserializeNumber(buf[0 .. 4]);
506             immutable(ubyte)[] vbuf = buf[4 .. max(4, len + 4)];
507             //writeln(types[column], " ", buf);
508             res = Converter.deserialize(vbuf, types[column], fcodes[column], len);
509             buf = buf[max(4, len + 4) .. $];
510             parsed = true;
511             return res;
512         }
513     }
514 
515     static assert (isInputRange!RowDeserializer);
516 
517     static struct RowsRange
518     {
519     private:
520         immutable(ubyte)[][] dataRows;
521         OID[] columnTypes;
522         FormatCode[] fcodes;
523         short totalColumns;
524     public:
525         @property size_t length() const { return dataRows.length; }
526         @property bool empty() const { return dataRows.empty; }
527         @property RowDeserializer front()
528         {
529             return RowDeserializer(0, totalColumns, dataRows[0],
530                 columnTypes, fcodes);
531         }
532         @property RowDeserializer back()
533         {
534             return RowDeserializer(0, totalColumns, dataRows[$-1],
535                 columnTypes, fcodes);
536         }
537         RowDeserializer opIndex(size_t i)
538         {
539             return RowDeserializer(0, totalColumns, dataRows[i],
540                 columnTypes, fcodes);
541         }
542         void popFront() { dataRows = dataRows[1 .. $]; }
543         void popBack() { dataRows = dataRows[0 .. $-1]; }
544         RowsRange save()
545         {
546             return RowsRange(dataRows, columnTypes, fcodes, totalColumns);
547         }
548     }
549 
550     static assert (isRandomAccessRange!RowsRange);
551 
552     return RowsRange(block.dataRows, typeArr, fcArr, totalColumns);
553 }
554 
555 
556 
557 
558 /// for row spec `spec` build native tuple representation.
559 template TupleForSpec(FieldSpec[] spec, alias Deserializer = DefaultSerializer)
560 {
561     alias TupleForSpec =
562         Tuple!(
563             staticMap!(
564                 SpecMapper!(Deserializer).Func,
565                 aliasSeqOf!spec));
566 }
567 
568 /// Template function Func returns D type wich corresponds to FieldSpec.
569 template SpecMapper(alias Deserializer)
570 {
571     template Func(FieldSpec spec)
572     {
573         static if (is(Deserializer!spec.type))
574             alias Func = Deserializer!spec.type;
575         else
576             static assert(0, "Deserializer doesn't support type with oid " ~
577                 spec.typeId.to!string);
578     }
579 }
580 
581 
582 /** Returns RandomAccessRange of lazily-deserialized tuples.
583 Customazable with Deserializer template. Will append parsed field descriptions
584 to fieldDescs array if it is provided. */
585 auto blockToTuples
586     (FieldSpec[] spec, alias Deserializer = DefaultSerializer)
587     (RowBlock block, FieldDescription[]* fieldDescs = null) pure
588 {
589     alias ResTuple = TupleForSpec!(spec, Deserializer);
590     debug pragma(msg, "Resulting tuple from spec: ", ResTuple);
591     enforce!PsqlSerializationException(block.rowDesc.isSet,
592         "Cannot deserialize RowBlock without row description. " ~
593         "Did you send describe message?");
594     short totalColumns = block.rowDesc.fieldCount;
595     enforce!PsqlSerializationException(totalColumns == spec.length,
596         "Expected %d columns in a row, got %d".format(spec.length, totalColumns));
597     FormatCode[] fcArr = new FormatCode[totalColumns];
598 
599     int i = 0;
600     foreach (fdesc; block.rowDesc[]) // row description deserialization happens here
601     {
602         //writeln(fdesc.name);
603         //writeln(fdesc.formatCode);
604         if (fieldDescs)
605             (*fieldDescs)[i] = fdesc;
606         fcArr[i] = fdesc.formatCode;
607         OID colType = fdesc.type;
608         enforce!PsqlSerializationException(colType == spec[i].typeId,
609             "Colunm %d type mismatch: expected %d, got %d".format(
610                 i, spec[i].typeId, colType));
611         i++;
612     }
613 
614     //import std.stdio;
615 
616     static ResTuple deserializeRow(immutable(ubyte)[] from, const(FormatCode)[] fcodes)
617     {
618         ResTuple res;
619         int len = 0;
620         immutable(ubyte)[] vbuf;
621         from = from[2 .. $];    // skip 2 bytes
622         foreach (i, colSpec; aliasSeqOf!(spec))
623         {
624             len = deserializeNumber(from[0 .. 4]);
625             //writeln("col ", i, ", len = ", len, " from = ", from);
626             vbuf = from[4 .. max(4, len + 4)];
627             Deserializer!(colSpec).deserialize(vbuf, fcodes[i], len, &res[i]);
628             from = from[max(4, len + 4) .. $];
629         }
630         enforce!PsqlSerializationException(from.length == 0,
631             "%d bytes left in supposedly emptied row".format(from.length));
632         return res;
633     }
634 
635     static struct RowsRange
636     {
637     private:
638         immutable(ubyte)[][] dataRows;
639         FormatCode[] fcodes;
640     public:
641         @property size_t length() const { return dataRows.length; }
642         @property bool empty() const { return dataRows.empty; }
643         @property ResTuple front()
644         {
645             return deserializeRow(dataRows[0], fcodes);
646         }
647         @property ResTuple back()
648         {
649             return deserializeRow(dataRows[$-1], fcodes);
650         }
651         ResTuple opIndex(size_t i)
652         {
653             return deserializeRow(dataRows[i], fcodes);
654         }
655         void popFront() { dataRows = dataRows[1 .. $]; }
656         void popBack() { dataRows = dataRows[0 .. $-1]; }
657         RowsRange save()
658         {
659             return RowsRange(dataRows, fcodes);
660         }
661     }
662 
663     static assert (isRandomAccessRange!RowsRange);
664 
665     return RowsRange(block.dataRows, fcArr);
666 }
667 
668 
669 class FormatCodesOfSpec(FieldSpec[] spec, alias Deserializer)
670 {
671     static const(FormatCode)[spec.length] codes;
672 
673     static this()
674     {
675         foreach (i, fpec; aliasSeqOf!spec)
676             codes[i] = Deserializer!fpec.formatCode;
677     }
678 }
679 
680 
681 /** Returns RandomAccessRange of lazy-deserialized tuples. Customazable with
682 Deserializer template. This version does not require RowDescription, but cannot
683 validate row types reliably. */
684 auto blockToTuples
685     (FieldSpec[] spec, alias Deserializer = DefaultSerializer)
686     (immutable(ubyte)[][] data) pure
687 {
688     alias ResTuple = TupleForSpec!(spec, Deserializer);
689     debug pragma(msg, "Resulting tuple from spec: ", ResTuple);
690 
691     //import std.stdio;
692 
693     static ResTuple deserializeRow(immutable(ubyte)[] from)
694     {
695         ResTuple res;
696         int len = 0;
697         immutable(ubyte)[] vbuf;
698         from = from[2 .. $];    // skip 2 bytes
699         foreach (i, colSpec; aliasSeqOf!(spec))
700         {
701             len = deserializeNumber(from[0 .. 4]);
702             //writeln("col ", i, ", len = ", len, " from = ", from);
703             vbuf = from[4 .. max(4, len + 4)];
704             FormatCode fcode = FCodeOfFSpec!(Deserializer).F!(colSpec);
705             Deserializer!(colSpec).deserialize(vbuf, fcode, len, &res[i]);
706             from = from[max(4, len + 4) .. $];
707         }
708         enforce!PsqlSerializationException(from.length == 0,
709             "%d bytes left in supposedly emptied row".format(from.length));
710         return res;
711     }
712 
713     static struct RowsRange
714     {
715     private:
716         immutable(ubyte)[][] dataRows;
717     public:
718         @property size_t length() const { return dataRows.length; }
719         @property bool empty() const { return dataRows.empty; }
720         @property ResTuple front()
721         {
722             return deserializeRow(dataRows[0]);
723         }
724         @property ResTuple back()
725         {
726             return deserializeRow(dataRows[$-1]);
727         }
728         ResTuple opIndex(size_t i)
729         {
730             return deserializeRow(dataRows[i]);
731         }
732         void popFront() { dataRows = dataRows[1 .. $]; }
733         void popBack() { dataRows = dataRows[0 .. $-1]; }
734         RowsRange save()
735         {
736             return RowsRange(dataRows);
737         }
738     }
739 
740     static assert (isRandomAccessRange!RowsRange);
741 
742     return RowsRange(data);
743 }