1 /** 2 Functions of various nature. Prepared statement, portal and response deserialization 3 functions live here. 4 5 Copyright: Copyright Boris-Barboris 2017. 6 License: MIT 7 Authors: Boris-Barboris 8 */ 9 10 module dpeq.command; 11 12 import std.algorithm: max, map; 13 import std.exception: enforce; 14 import std.format: format; 15 import std.conv: to; 16 import std.traits; 17 import std.range; 18 import std.variant; 19 import std.meta; 20 import std.typecons; 21 22 import dpeq.exceptions; 23 import dpeq.connection; 24 import dpeq.constants; 25 import dpeq.serialize; 26 import dpeq.result; 27 28 29 30 @safe: 31 32 /* 33 ///////////////////////////////////// 34 // Different forms of command input 35 ///////////////////////////////////// 36 */ 37 38 /** Simple query is simple. Send string to server and get responses. 39 The most versatile, but unsafe way to issue commands to PSQL. 40 Simple query always returns data in FormatCode.Text format. 41 Simple queries SHOULD NOT be accompanied by SYNC message, they 42 trigger ReadyForQuery message anyways. 43 44 Every postSimpleQuery or PSQLConnection.sync should be accompanied by getQueryResults 45 call. */ 46 void postSimpleQuery(ConnT)(ConnT conn, string query) pure 47 { 48 conn.putQueryMessage(query); 49 } 50 51 /// Pre-parsed sql query with variable parameters. 52 class PreparedStatement(ConnT) 53 { 54 protected 55 { 56 const(OID)[] paramTypes; 57 string query; 58 ConnT conn; 59 string parsedName; // name, reserved for this statement in PSQL connection 60 short m_paramCount; 61 bool parseRequested; 62 } 63 64 /// name of this prepared statement, as seen by backend. 65 final @property string preparedName() const pure { return parsedName; } 66 67 final @property short paramCount() const pure { return m_paramCount; } 68 69 /** 70 Creates prepared statement object, wich holds dpeq utility state. 71 Constructor does not write anything to connection write buffer. 72 73 Quoting https://www.postgresql.org/docs/9.5/static/protocol-message-formats.html: 74 75 The number of parameter data types specified (can be zero). Note that this is not an 76 indication of the number of parameters that might appear in the query string, 77 only the number that the frontend wants to prespecify types for. 78 79 Then, for each parameter, there is the following: 80 81 Int32 82 Specifies the object ID of the parameter data type. Placing a zero here is 83 equivalent to leaving the type unspecified. 84 85 That means you can leave paramTypes null, unless you're doing some tricky 86 stuff. 87 */ 88 this(ConnT conn, string query, short paramCount, bool named = false, 89 const(OID)[] paramTypes = null) pure 90 { 91 assert(conn); 92 assert(query); 93 assert(paramCount >= 0); 94 this.conn = conn; 95 this.query = query; 96 this.paramTypes = paramTypes; 97 this.m_paramCount = paramCount; 98 if (named) 99 parsedName = conn.getNewPreparedName(); 100 else 101 parsedName = ""; 102 } 103 104 /// write Parse message into connection's write buffer. 105 final void postParseMessage() pure 106 { 107 conn.putParseMessage(parsedName, query, paramTypes[]); 108 parseRequested = true; 109 } 110 111 /// ditto 112 alias parse = postParseMessage; 113 114 /** Post message to destroy named prepared statement. 115 116 An unnamed prepared statement lasts only until the next Parse 117 statement specifying the unnamed statement as destination is issued. 118 (Note that a simple Query message also destroys the unnamed statement.) 119 */ 120 final void postCloseMessage() pure 121 { 122 assert(parseRequested, "prepared statement was never sent to backend"); 123 assert(parsedName.length, "no need to close unnamed prepared statements"); 124 conn.putCloseMessage(StmtOrPortal.statement, parsedName); 125 parseRequested = false; 126 } 127 128 /// poll message queue and make sure parse was completed 129 final void ensureParseComplete() 130 { 131 bool parsed = false; 132 bool interceptor(Message msg) 133 { 134 with (BackendMessageType) 135 switch (msg.type) 136 { 137 case ParseComplete: 138 parsed = true; 139 return true; 140 default: 141 break; 142 } 143 return false; 144 } 145 conn.pollMessages(&interceptor, true); 146 enforce!PsqlClientException(parsed, "Parse was not confirmed"); 147 } 148 } 149 150 151 /// Parameter tuple, bound to prepared statement 152 class Portal(ConnT) 153 { 154 protected 155 { 156 PreparedStatement!ConnT prepStmt; 157 ConnT conn; 158 string portalName; // name, reserved for this portal in PSQL connection 159 bool bindRequested = false; 160 } 161 162 this(PreparedStatement!ConnT ps, bool persist = true) pure 163 { 164 assert(ps); 165 this.conn = ps.conn; 166 prepStmt = ps; 167 if (persist) 168 portalName = conn.getNewPortalName(); 169 else 170 portalName = ""; 171 } 172 173 /// bind empty, parameterless portal. resCodes are requested format codes 174 /// of resulting columns, keep it null to request everything in text format. 175 final void bind(FormatCode[] resCodes = null) pure 176 { 177 assert(prepStmt.paramCount == 0); 178 179 auto safePoint = conn.saveBuffer(); 180 scope (failure) safePoint.restore(); 181 182 if (bindRequested && portalName.length) 183 postCloseMessage(); 184 185 conn.putBindMessage(portalName, prepStmt.parsedName, resCodes); 186 bindRequested = true; 187 } 188 189 /** 190 For the 'specs' array of prepared statement parameters types, known at 191 compile-time, write Bind message to connection's write buffer from the 192 representation of 'args' parameters, serialized to 'specs' types according 193 to 'Serializer' template. Format codes of the response columns is set 194 via 'resCodes' array, known at compile time. 195 */ 196 final void bind( 197 FieldSpec[] specs, 198 FormatCode[] resCodes = null, 199 alias Serializer = DefaultSerializer, 200 Args...) 201 (in Args args) pure 202 { 203 assert(prepStmt.paramCount == Args.length); 204 assert(prepStmt.paramCount == specs.length); 205 206 auto safePoint = conn.saveBuffer(); 207 scope (failure) safePoint.restore(); 208 209 if (bindRequested && portalName.length) 210 postCloseMessage(); 211 212 enum fcodesr = [staticMap!(FCodeOfFSpec!(Serializer).F, aliasSeqOf!specs)]; 213 214 alias DlgT = int delegate(ubyte[]) pure nothrow @trusted; 215 DlgT[specs.length] serializers; 216 foreach(i, paramSpec; aliasSeqOf!specs) 217 { 218 serializers[i] = 219 (ubyte[] to) pure nothrow @trusted => Serializer!paramSpec.serialize(to, &args[i]); 220 } 221 conn.putBindMessage(portalName, prepStmt.parsedName, fcodesr, 222 serializers, resCodes); 223 bindRequested = true; 224 } 225 226 /** This version of bind accept generic InputRanges of format codes and 227 field serializers and passes them directly to putBindMessage method of 228 connection object. No parameter count and type validation is performed. 229 If this portal is already bound and is a named one, Close message is 230 posted. 231 */ 232 final void bind(FR, PR, RR)(scope FR paramCodeRange, scope PR paramRange, 233 scope RR returnCodeRange) pure 234 { 235 auto safePoint = conn.saveBuffer(); 236 scope (failure) safePoint.restore(); 237 if (bindRequested && portalName.length) 238 postCloseMessage(); 239 conn.putBindMessage(portalName, prepStmt.parsedName, paramCodeRange, 240 paramRange, returnCodeRange); 241 bindRequested = true; 242 } 243 244 /// Simple portal bind, wich binds all parameters as strings and requests 245 /// all result columns in text format. 246 final void bind(scope const(Nullable!string)[] args) pure 247 { 248 assert(prepStmt.paramCount == args.length); 249 250 if (bindRequested && portalName.length) 251 postCloseMessage(); 252 253 static struct StrSerializer 254 { 255 const Nullable!string str; 256 this(const(Nullable!string) v) { str = v; } 257 258 int opCall(ubyte[] buf) nothrow const 259 { 260 return serializeNullableStringField(buf, &str); 261 } 262 } 263 264 static struct MarshRange 265 { 266 const(Nullable!string)[] params; 267 int idx = 0; 268 @property bool empty() const { return idx >= params.length; } 269 void popFront() { idx++; } 270 @property StrSerializer front() const 271 { 272 return StrSerializer(params[idx]); 273 } 274 } 275 276 conn.putBindMessage!(FormatCode[], MarshRange, FormatCode[])( 277 portalName, prepStmt.parsedName, null, MarshRange(args), null); 278 bindRequested = true; 279 } 280 281 /** Write Close message to connection write buffer in order to 282 explicitly destroy named portal. 283 284 If successfully created, a named portal object lasts till the end of the 285 current transaction, unless explicitly destroyed. An unnamed portal is 286 destroyed at the end of the transaction, or as soon as the next Bind 287 statement specifying the unnamed portal as destination is issued. 288 (Note that a simple Query message also destroys the unnamed portal.) 289 Named portals must be explicitly closed before they can be redefined 290 by another Bind message, but this is not required for the unnamed portal. 291 */ 292 final void postCloseMessage() pure 293 { 294 assert(bindRequested, "portal was never bound"); 295 assert(portalName.length, "no need to close unnamed portals"); 296 conn.putCloseMessage(StmtOrPortal.portal, portalName); 297 bindRequested = false; 298 } 299 300 /// poll message queue and make sure bind was completed 301 final void ensureBindComplete() 302 { 303 bool is_bound = false; 304 bool interceptor(Message msg) 305 { 306 with (BackendMessageType) 307 switch (msg.type) 308 { 309 case BindComplete: 310 is_bound = true; 311 return true; 312 default: 313 break; 314 } 315 return false; 316 } 317 conn.pollMessages(&interceptor, true); 318 enforce!PsqlClientException(is_bound, "Bind was not confirmed"); 319 } 320 321 /** Send Describe+Execute command. 322 If describe is false, no RowDescription message will be requested 323 from PSQL - useful for optimistic statically-typed querying. 324 'maxRows' parameter is responsible for portal suspending and is 325 conceptually inferior to simple TCP backpressure mechanisms or result set 326 size limiting. */ 327 final void execute(bool describe = true, int maxRows = 0) pure 328 { 329 assert(bindRequested, "Portal was never bound"); 330 if (describe) 331 conn.putDescribeMessage(StmtOrPortal.portal, portalName); 332 conn.putExecuteMessage(portalName, maxRows); 333 } 334 } 335 336 337 /* 338 //////////////////////////////////////// 339 // Functions to work with query results 340 //////////////////////////////////////// 341 */ 342 343 344 /** Generic result materializer, suitable for both simple and prepared queries. 345 Polls messages from the connection and builds QueryResult structure from 346 them. Throws if something goes wrong. Polling stops when ReadyForQuery message 347 is received. */ 348 QueryResult getQueryResults(ConnT)(ConnT conn) 349 { 350 QueryResult res; 351 RowBlock rb; 352 353 bool interceptor(Message msg) nothrow 354 { 355 with (BackendMessageType) 356 switch (msg.type) 357 { 358 case EmptyQueryResponse: 359 rb.state = RowBlockState.emptyQuery; 360 res.blocks ~= rb; 361 rb = RowBlock(); 362 break; 363 case CommandComplete: 364 rb.state = RowBlockState.complete; 365 rb.commandTag = deserializeString(msg.data[0..$-1]); 366 res.blocks ~= rb; 367 rb = RowBlock(); 368 break; 369 case PortalSuspended: 370 rb.state = RowBlockState.suspended; 371 res.blocks ~= rb; 372 rb = RowBlock(); 373 break; 374 case RowDescription: 375 rb.rowDesc = dpeq.result.RowDescription(msg.data); 376 break; 377 case DataRow: 378 rb.dataRows ~= msg.data; 379 break; 380 default: 381 break; 382 } 383 return false; 384 } 385 386 conn.pollMessages(&interceptor, false); 387 return res; 388 } 389 390 391 /// Poll messages from the connection until CommandComplete or EmptyQueryResponse 392 /// is received, and return one row block (result of one and only one query). 393 RowBlock getOneRowBlock(ConnT)(ConnT conn, int rowCountLimit = 0) 394 { 395 RowBlock result; 396 397 bool interceptor(Message msg) nothrow 398 { 399 with (BackendMessageType) 400 switch (msg.type) 401 { 402 case EmptyQueryResponse: 403 result.state = RowBlockState.emptyQuery; 404 return true; 405 case CommandComplete: 406 result.state = RowBlockState.complete; 407 result.commandTag = deserializeString(msg.data[0..$-1]); 408 return true; 409 case PortalSuspended: 410 result.state = RowBlockState.suspended; 411 return true; 412 case RowDescription: 413 result.rowDesc = dpeq.result.RowDescription(msg.data); 414 requireRowDescription = false; 415 break; 416 case DataRow: 417 result.dataRows ~= msg.data; 418 if (rowCountLimit != 0) 419 { 420 // client code requested early stop 421 if (--rowCountLimit == 0) 422 { 423 result.state = RowBlockState.incomplete; 424 return true; 425 } 426 } 427 break; 428 default: 429 break; 430 } 431 return false; 432 } 433 434 conn.pollMessages(&interceptor, true); 435 return result; 436 } 437 438 439 440 /* 441 ///////////////////////////////////////////////////////////////// 442 // Functions used to transform query results into D types 443 ///////////////////////////////////////////////////////////////// 444 */ 445 446 //import std.stdio; 447 448 /** Returns RandomAccessRange of InputRanges of lazy-deserialized variants. 449 Specific flavor of Variant is derived from Converter.deserialize call return type. 450 Look into serialize.VariantConverter for deserialize implementation examples. 451 Will append parsed field descriptions to fieldDescs array if passed. */ 452 auto blockToVariants(alias Converter = VariantConverter!DefaultSerializer) 453 (RowBlock block, FieldDescription[]* fieldDescs = null) pure 454 { 455 alias VariantT = ReturnType!(Converter.deserialize); 456 457 enforce!PsqlSerializationException(block.rowDesc.isSet, 458 "Cannot deserialize RowBlock without row description. " ~ 459 "Did you send Describe message?"); 460 short totalColumns = block.rowDesc.fieldCount; 461 OID[] typeArr = new OID[totalColumns]; 462 FormatCode[] fcArr = new FormatCode[totalColumns]; 463 464 int i = 0; 465 foreach (fdesc; block.rowDesc[]) // row description deserialization happens here 466 { 467 //writeln(fdesc.name); 468 //writeln(fdesc.formatCode); 469 if (fieldDescs) 470 (*fieldDescs)[i] = fdesc; 471 fcArr[i] = fdesc.formatCode; 472 typeArr[i++] = fdesc.type; 473 } 474 475 static struct RowDeserializer 476 { 477 private: 478 short column = 0; 479 short totalCols; 480 immutable(ubyte)[] buf; 481 const(OID)[] types; 482 const(FormatCode)[] fcodes; 483 bool parsed = false; 484 485 // cache result to prevent repeated deserialization on 486 // front() call. 487 VariantT res; 488 public: 489 @property bool empty() const { return column >= totalCols; } 490 void popFront() 491 { 492 parsed = false; 493 column++; 494 } 495 @property VariantT front() 496 { 497 if (parsed) 498 return res; 499 if (column == 0) 500 { 501 // we need to skip field count in the start of DataRow message 502 buf = buf[2 .. $]; 503 } 504 assert(buf.length > 0); 505 int len = deserializeNumber(buf[0 .. 4]); 506 immutable(ubyte)[] vbuf = buf[4 .. max(4, len + 4)]; 507 //writeln(types[column], " ", buf); 508 res = Converter.deserialize(vbuf, types[column], fcodes[column], len); 509 buf = buf[max(4, len + 4) .. $]; 510 parsed = true; 511 return res; 512 } 513 } 514 515 static assert (isInputRange!RowDeserializer); 516 517 static struct RowsRange 518 { 519 private: 520 immutable(ubyte)[][] dataRows; 521 OID[] columnTypes; 522 FormatCode[] fcodes; 523 short totalColumns; 524 public: 525 @property size_t length() const { return dataRows.length; } 526 @property bool empty() const { return dataRows.empty; } 527 @property RowDeserializer front() 528 { 529 return RowDeserializer(0, totalColumns, dataRows[0], 530 columnTypes, fcodes); 531 } 532 @property RowDeserializer back() 533 { 534 return RowDeserializer(0, totalColumns, dataRows[$-1], 535 columnTypes, fcodes); 536 } 537 RowDeserializer opIndex(size_t i) 538 { 539 return RowDeserializer(0, totalColumns, dataRows[i], 540 columnTypes, fcodes); 541 } 542 void popFront() { dataRows = dataRows[1 .. $]; } 543 void popBack() { dataRows = dataRows[0 .. $-1]; } 544 RowsRange save() 545 { 546 return RowsRange(dataRows, columnTypes, fcodes, totalColumns); 547 } 548 } 549 550 static assert (isRandomAccessRange!RowsRange); 551 552 return RowsRange(block.dataRows, typeArr, fcArr, totalColumns); 553 } 554 555 556 557 558 /// for row spec `spec` build native tuple representation. 559 template TupleForSpec(FieldSpec[] spec, alias Deserializer = DefaultSerializer) 560 { 561 alias TupleForSpec = 562 Tuple!( 563 staticMap!( 564 SpecMapper!(Deserializer).Func, 565 aliasSeqOf!spec)); 566 } 567 568 /// Template function Func returns D type wich corresponds to FieldSpec. 569 template SpecMapper(alias Deserializer) 570 { 571 template Func(FieldSpec spec) 572 { 573 static if (is(Deserializer!spec.type)) 574 alias Func = Deserializer!spec.type; 575 else 576 static assert(0, "Deserializer doesn't support type with oid " ~ 577 spec.typeId.to!string); 578 } 579 } 580 581 582 /** Returns RandomAccessRange of lazily-deserialized tuples. 583 Customazable with Deserializer template. Will append parsed field descriptions 584 to fieldDescs array if it is provided. */ 585 auto blockToTuples 586 (FieldSpec[] spec, alias Deserializer = DefaultSerializer) 587 (RowBlock block, FieldDescription[]* fieldDescs = null) pure 588 { 589 alias ResTuple = TupleForSpec!(spec, Deserializer); 590 debug pragma(msg, "Resulting tuple from spec: ", ResTuple); 591 enforce!PsqlSerializationException(block.rowDesc.isSet, 592 "Cannot deserialize RowBlock without row description. " ~ 593 "Did you send describe message?"); 594 short totalColumns = block.rowDesc.fieldCount; 595 enforce!PsqlSerializationException(totalColumns == spec.length, 596 "Expected %d columns in a row, got %d".format(spec.length, totalColumns)); 597 FormatCode[] fcArr = new FormatCode[totalColumns]; 598 599 int i = 0; 600 foreach (fdesc; block.rowDesc[]) // row description deserialization happens here 601 { 602 //writeln(fdesc.name); 603 //writeln(fdesc.formatCode); 604 if (fieldDescs) 605 (*fieldDescs)[i] = fdesc; 606 fcArr[i] = fdesc.formatCode; 607 OID colType = fdesc.type; 608 enforce!PsqlSerializationException(colType == spec[i].typeId, 609 "Colunm %d type mismatch: expected %d, got %d".format( 610 i, spec[i].typeId, colType)); 611 i++; 612 } 613 614 //import std.stdio; 615 616 static ResTuple deserializeRow(immutable(ubyte)[] from, const(FormatCode)[] fcodes) 617 { 618 ResTuple res; 619 int len = 0; 620 immutable(ubyte)[] vbuf; 621 from = from[2 .. $]; // skip 2 bytes 622 foreach (i, colSpec; aliasSeqOf!(spec)) 623 { 624 len = deserializeNumber(from[0 .. 4]); 625 //writeln("col ", i, ", len = ", len, " from = ", from); 626 vbuf = from[4 .. max(4, len + 4)]; 627 Deserializer!(colSpec).deserialize(vbuf, fcodes[i], len, &res[i]); 628 from = from[max(4, len + 4) .. $]; 629 } 630 enforce!PsqlSerializationException(from.length == 0, 631 "%d bytes left in supposedly emptied row".format(from.length)); 632 return res; 633 } 634 635 static struct RowsRange 636 { 637 private: 638 immutable(ubyte)[][] dataRows; 639 FormatCode[] fcodes; 640 public: 641 @property size_t length() const { return dataRows.length; } 642 @property bool empty() const { return dataRows.empty; } 643 @property ResTuple front() 644 { 645 return deserializeRow(dataRows[0], fcodes); 646 } 647 @property ResTuple back() 648 { 649 return deserializeRow(dataRows[$-1], fcodes); 650 } 651 ResTuple opIndex(size_t i) 652 { 653 return deserializeRow(dataRows[i], fcodes); 654 } 655 void popFront() { dataRows = dataRows[1 .. $]; } 656 void popBack() { dataRows = dataRows[0 .. $-1]; } 657 RowsRange save() 658 { 659 return RowsRange(dataRows, fcodes); 660 } 661 } 662 663 static assert (isRandomAccessRange!RowsRange); 664 665 return RowsRange(block.dataRows, fcArr); 666 } 667 668 669 class FormatCodesOfSpec(FieldSpec[] spec, alias Deserializer) 670 { 671 static const(FormatCode)[spec.length] codes; 672 673 static this() 674 { 675 foreach (i, fpec; aliasSeqOf!spec) 676 codes[i] = Deserializer!fpec.formatCode; 677 } 678 } 679 680 681 /** Returns RandomAccessRange of lazy-deserialized tuples. Customazable with 682 Deserializer template. This version does not require RowDescription, but cannot 683 validate row types reliably. */ 684 auto blockToTuples 685 (FieldSpec[] spec, alias Deserializer = DefaultSerializer) 686 (immutable(ubyte)[][] data) pure 687 { 688 alias ResTuple = TupleForSpec!(spec, Deserializer); 689 debug pragma(msg, "Resulting tuple from spec: ", ResTuple); 690 691 //import std.stdio; 692 693 static ResTuple deserializeRow(immutable(ubyte)[] from) 694 { 695 ResTuple res; 696 int len = 0; 697 immutable(ubyte)[] vbuf; 698 from = from[2 .. $]; // skip 2 bytes 699 foreach (i, colSpec; aliasSeqOf!(spec)) 700 { 701 len = deserializeNumber(from[0 .. 4]); 702 //writeln("col ", i, ", len = ", len, " from = ", from); 703 vbuf = from[4 .. max(4, len + 4)]; 704 FormatCode fcode = FCodeOfFSpec!(Deserializer).F!(colSpec); 705 Deserializer!(colSpec).deserialize(vbuf, fcode, len, &res[i]); 706 from = from[max(4, len + 4) .. $]; 707 } 708 enforce!PsqlSerializationException(from.length == 0, 709 "%d bytes left in supposedly emptied row".format(from.length)); 710 return res; 711 } 712 713 static struct RowsRange 714 { 715 private: 716 immutable(ubyte)[][] dataRows; 717 public: 718 @property size_t length() const { return dataRows.length; } 719 @property bool empty() const { return dataRows.empty; } 720 @property ResTuple front() 721 { 722 return deserializeRow(dataRows[0]); 723 } 724 @property ResTuple back() 725 { 726 return deserializeRow(dataRows[$-1]); 727 } 728 ResTuple opIndex(size_t i) 729 { 730 return deserializeRow(dataRows[i]); 731 } 732 void popFront() { dataRows = dataRows[1 .. $]; } 733 void popBack() { dataRows = dataRows[0 .. $-1]; } 734 RowsRange save() 735 { 736 return RowsRange(dataRows); 737 } 738 } 739 740 static assert (isRandomAccessRange!RowsRange); 741 742 return RowsRange(data); 743 }