1 /** 2 Connection. 3 4 Copyright: Copyright Boris-Barboris 2017. 5 License: MIT 6 Authors: Boris-Barboris 7 */ 8 9 module dpeq.connection; 10 11 import core.time: seconds, Duration; 12 13 import std.algorithm: max; 14 import std.exception: enforce; 15 import std.conv: to; 16 import std.traits; 17 import std.range; 18 19 import dpeq.constants; 20 import dpeq.exceptions; 21 import dpeq.serialize; 22 import dpeq.result; 23 import dpeq.socket; 24 25 26 27 /// Transport and authorization parameters. 28 struct BackendParams 29 { 30 /// Hostname or IP address for TCP socket. Filename for UNIX socket. 31 string host = "/var/run/postgresql/.s.PGSQL.5432"; 32 ushort port = cast(ushort)5432; 33 string user = "postgres"; 34 string password; 35 string database = "postgres"; 36 } 37 38 enum StmtOrPortal: char 39 { 40 statement = 'S', 41 portal = 'P' 42 } 43 44 // When the client code is uninterested in dpeq connection logging. 45 pragma(inline) 46 void nop_logger(T...)(lazy string fmt, lazy T vals) nothrow @safe pure {} 47 48 /** 49 Connection object. 50 51 Params: 52 SocketT = socket class type to use. 53 logTrace = alias of a logging function with $(D std.stdio.writef) signature. 54 Very verbose byte-stream information will be printed through it. 55 logError = same as logTrace but for errors. 56 */ 57 class PSQLConnection( 58 SocketT = StdSocket, 59 alias logTrace = nop_logger, 60 alias logError = nop_logger) 61 { 62 protected 63 { 64 SocketT m_socket; 65 ubyte[] writeBuffer; 66 int bufHead = 0; 67 bool open = false; 68 69 // we allocate RAM for responses in batches to reduce GC pressure. 70 // 2048 is the last small-sized dling in gc: 71 // https://github.com/dlang/druntime/blob/v2.079.0/src/gc/impl/conservative/gc.d#L1251 72 immutable int readBatchSize = 2048; 73 ubyte[] readBatch; 74 75 // number of expected readyForQuery responses 76 int readyForQueryExpected = 0; 77 int unflushedRfq = 0; 78 79 TransactionStatus tstatus; 80 81 /// counters to generate unique prepared statement and portal ids 82 int preparedCounter = 0; 83 int portalCounter = 0; 84 85 int m_processId = -1; 86 int m_cancellationKey = -1; 87 88 /// backend params this connection was created with 89 const BackendParams m_backendParams; 90 } 91 92 final @property pure @safe nothrow 93 { 94 95 /// Backend parameters this connection was constructed from 96 ref const(BackendParams) backendParams() const 97 { 98 return m_backendParams; 99 } 100 101 /// Backend process ID. Used in CancelRequest message. 102 int processId() const { return m_processId; } 103 104 /// Cancellation secret. Used in CancelRequest message. 105 int cancellationKey() const { return m_cancellationKey; } 106 107 /// Socket getter. 108 SocketT socket() { return m_socket; } 109 110 /// Number of ReadyForQuery messages that are yet to be received 111 /// from the backend. May be useful for checking wether getQueryResults 112 /// would block forever. 113 int expectedRFQCount() const { return readyForQueryExpected; } 114 115 /// Transaction status, reported by the last received ReadyForQuery message. 116 /// For a new connection TransactionStatus.IDLE is returned. 117 TransactionStatus transactionStatus() const { return tstatus; } 118 119 /// Connection is open when it is authorized and socket was alive last time 120 /// it was checked. 121 bool isOpen() const { return open; } 122 } 123 124 invariant 125 { 126 assert(readyForQueryExpected >= 0); 127 assert(unflushedRfq >= 0); 128 assert(bufHead >= 0); 129 } 130 131 /// Generate next connection-unique prepared statement name. 132 final string getNewPreparedName() pure @safe nothrow 133 { 134 return (preparedCounter++).to!string; 135 } 136 137 /// Generate next connection-unique portal name. 138 final string getNewPortalName() pure @safe nothrow 139 { 140 return (portalCounter++).to!string; 141 } 142 143 /// Allocate reuired memory, build socket and authorize the connection. 144 /// Throws: $(D PsqlSocketException) if transport failed, or 145 /// $(D PsqlClientException) if authorization failed for some reason. 146 this(const BackendParams bp, Duration connectTimeout = seconds(10), size_t writeBufSize = 2 * 4096) @safe 147 { 148 m_backendParams = bp; 149 writeBuffer.length = writeBufSize; 150 try 151 { 152 logTrace("Trying to open TCP connection to PSQL"); 153 m_socket = new SocketT(bp.host, bp.port, connectTimeout); 154 logTrace("Success"); 155 } 156 catch (Exception e) 157 { 158 throw new PsqlSocketException(e.msg, e); 159 } 160 scope(failure) m_socket.close(); 161 initialize(bp); 162 open = true; 163 } 164 165 /// Notify backend and close socket. 166 void terminate() nothrow @safe 167 { 168 open = false; 169 try 170 { 171 putTerminateMessage(); 172 flush(); 173 m_socket.close(); 174 } 175 catch (Exception e) 176 { 177 logError("Exception caught while terminating PSQL connection: %s", e.msg); 178 } 179 } 180 181 /** 182 Open new socket and connect it to the same backend as this connection is 183 bound to, send CancelRequest and close the temporary socket. 184 */ 185 void cancelRequest() @safe 186 { 187 SocketT sock = new SocketT(m_backendParams.host, m_backendParams.port, seconds(5)); 188 ubyte[4 * 4] intBuf; 189 static immutable int pn1 = 16; 190 serializeFixedField(intBuf[0..4], &pn1); 191 static immutable int pn2 = int(80877102); 192 serializeFixedField(intBuf[4..8], &pn2); 193 serializeFixedField(intBuf[8..12], &m_processId); 194 serializeFixedField(intBuf[12..16], &m_cancellationKey); 195 sock.send(intBuf[]); 196 sock.close(); 197 } 198 199 /// Flush writeBuffer into the socket. Blocks/yields (according to socket 200 /// implementation). 201 final void flush() @safe 202 { 203 try 204 { 205 // does not block if zero length: 206 // https://github.com/vibe-d/vibe-core/blob/master/source/vibe/core/net.d#L607 207 auto w = m_socket.send(writeBuffer[0..bufHead]); 208 while (bufHead - w > 0) 209 w += m_socket.send(writeBuffer[w..bufHead]); 210 logTrace("flushed %d bytes: %s", w, writeBuffer[0..bufHead].to!string); 211 } 212 catch (PsqlSocketException e) 213 { 214 open = false; 215 throw e; 216 } 217 finally 218 { 219 bufHead = 0; 220 readyForQueryExpected += unflushedRfq; 221 unflushedRfq = 0; 222 } 223 } 224 225 /// discard write buffer content 226 final void discard() pure nothrow @safe 227 { 228 bufHead = 0; 229 unflushedRfq = 0; 230 } 231 232 /// Save write buffer cursor in order to be able to restore it in case of errors. 233 /// Use it to prevent sending junk to backend when something goes wrong during 234 /// serialization or message creation. 235 final auto saveBuffer() pure nothrow @safe 236 { 237 static struct WriteCursor 238 { 239 private int savedHead; 240 private int savedUnflushedRfq; 241 PSQLConnection conn; 242 void restore() pure nothrow @safe 243 { 244 assert(conn.bufHead >= savedHead); 245 conn.bufHead = savedHead; 246 assert(conn.unflushedRfq >= savedUnflushedRfq); 247 conn.unflushedRfq = savedUnflushedRfq; 248 } 249 } 250 return WriteCursor(bufHead, unflushedRfq, this); 251 } 252 253 254 /* 255 //////////////////////////////////////////////////////////////////////////// 256 // All sorts of messages 257 // https://www.postgresql.org/docs/9.5/static/protocol-message-formats.html 258 //////////////////////////////////////////////////////////////////////////// 259 */ 260 261 /** Put Bind message into write buffer. 262 * 263 * 'formatCodes' - input range of FormatCodes. 264 * quotes: 265 * The number of parameter format codes that follow (denoted C below). 266 * This can be zero to indicate that there are no parameters or that the 267 * parameters all use the default format (text); or one, in which case the 268 * specified format code is applied to all parameters; or it can equal 269 * the actual number of parameters. 270 * The parameter format codes. Each must presently be zero (text) or one (binary). 271 * `parameters` is input range of deserialization delegates. 272 * 273 * 'parameters' - input range of serializeler closures. Actual data should 274 * be self-contained in this parameter. Marshaller is a callable that 275 * is covariant with "int delegate(ubyte[] buf)" and returns -2 if buf 276 * is too small, -1 if parameter is null and an actual number of bytes written 277 * otherwise. 278 * 279 * 'resultFormatCodes' - input range of query result FormatCodes. 280 * quotes: 281 * 282 * The number of result-column format codes that follow (denoted R below). 283 * This can be zero to indicate that there are no result columns or that 284 * the result columns should all use the default format (text); or one, 285 * in which case the specified format code is applied to all result 286 * columns (if any); or it can equal the actual number of result columns 287 * of the query. 288 * The result-column format codes. Each must presently be zero (text) or 289 * one (binary). 290 */ 291 final void putBindMessage(FR, PR, RR) 292 (string portal, string prepared, scope FR formatCodes, scope PR parameters, 293 scope RR resultFormatCodes) pure @safe 294 //if (isInputRange!FR && is(Unqual!(ElementType!FR) == FormatCode) && 295 // isInputRange!RR && is(Unqual!(ElementType!RR) == FormatCode) && 296 // isInputRange!PR && __traits(compiles, -1 == parameters.front()(new ubyte[2])) 297 { 298 assert(open, "Connection is not open"); 299 int savepoint = bufHead; 300 scope(failure) bufHead = savepoint; 301 302 write(cast(ubyte)FrontMessageType.Bind); 303 auto lenTotal = reserveLen(); 304 cwrite(portal); 305 cwrite(prepared); 306 307 // parameter format code(s) 308 short fcodes = 0; 309 auto fcodePrefix = reserveLen!short(); 310 foreach (FormatCode fcode; formatCodes) 311 { 312 logTrace("Bind: writing %d fcode", fcode); 313 write(cast(short)fcode); 314 fcodes++; 315 } 316 fcodePrefix.write(fcodes); 317 318 // parameters 319 short pcount = 0; 320 auto pcountPrefix = reserveLen!short(); 321 foreach (param; parameters) 322 { 323 auto paramPrefix = reserveLen!int(); 324 int r = wrappedSerialize(param); 325 logTrace("Bind: wrote 4bytes + %d bytes for value", r); 326 paramPrefix.write(r); // careful! -1 means Null 327 pcount++; 328 } 329 pcountPrefix.write(pcount); 330 331 // result format codes 332 short rcount = 0; 333 auto rcolPrefix = reserveLen!short(); 334 foreach (FormatCode fcode; resultFormatCodes) 335 { 336 write(cast(short)fcode); 337 logTrace("Bind: writing %d rfcode", fcode); 338 rcount++; 339 } 340 rcolPrefix.write(rcount); 341 342 lenTotal.fill(); 343 logTrace("Bind message buffered"); 344 } 345 346 /// putBindMessage overload for parameterless portals 347 final void putBindMessage(RR)(string portal, string prepared, 348 scope RR resultFormatCodes) pure @safe 349 { 350 assert(open, "Connection is not open"); 351 int savepoint = bufHead; 352 scope(failure) bufHead = savepoint; 353 354 write(cast(ubyte)FrontMessageType.Bind); 355 auto lenTotal = reserveLen(); 356 cwrite(portal); 357 cwrite(prepared); 358 write(short(0)); 359 write(short(0)); 360 // result format codes 361 short rcount = 0; 362 auto rcolPrefix = reserveLen!short(); 363 foreach (FormatCode fcode; resultFormatCodes) 364 { 365 write(cast(short)fcode); 366 logTrace("Bind: writing %d rfcode", fcode); 367 rcount++; 368 } 369 rcolPrefix.write(rcount); 370 371 lenTotal.fill(); 372 logTrace("Bind message buffered"); 373 } 374 375 /// putBindMessage overload for already serialized parameters 376 final void putBindMessage(string portal, string prepared, 377 scope const(const(ubyte)[])[] rawChunks) pure @safe 378 { 379 assert(open, "Connection is not open"); 380 int savepoint = bufHead; 381 scope(failure) bufHead = savepoint; 382 383 write(cast(ubyte)FrontMessageType.Bind); 384 auto lenTotal = reserveLen(); 385 cwrite(portal); 386 cwrite(prepared); 387 foreach (chunk; rawChunks) 388 bwrite(chunk); 389 lenTotal.fill(); 390 logTrace("Bind message buffered"); 391 } 392 393 /// put Close message into write buffer. 394 /// `closeWhat` is 'S' for prepared statement and 395 /// 'P' for portal. 396 final void putCloseMessage(StmtOrPortal closeWhat, string name) pure @safe 397 { 398 assert(open, "Connection is not open"); 399 assert(closeWhat == 'S' || closeWhat == 'P'); 400 int savepoint = bufHead; 401 scope(failure) bufHead = savepoint; 402 403 write(cast(ubyte)FrontMessageType.Close); 404 auto lenTotal = reserveLen(); 405 write(cast(ubyte)closeWhat); 406 cwrite(name); 407 lenTotal.fill(); 408 logTrace("Close message buffered"); 409 } 410 411 /// put Close message into write buffer. 412 /// `closeWhat` is 'S' for prepared statement and 413 /// 'P' for portal. 414 final void putDescribeMessage(StmtOrPortal descWhat, string name) pure @safe 415 { 416 assert(open, "Connection is not open"); 417 assert(descWhat == 'S' || descWhat == 'P'); 418 int savepoint = bufHead; 419 scope(failure) bufHead = savepoint; 420 421 write(cast(ubyte)FrontMessageType.Describe); 422 auto lenTotal = reserveLen(); 423 write(cast(ubyte)descWhat); 424 cwrite(name); 425 lenTotal.fill(); 426 logTrace("Describe message buffered"); 427 } 428 429 /** 430 non-zero maxRows will generate PortalSuspended messages, wich are 431 currently not handled by dpeq commands */ 432 final void putExecuteMessage(string portal = "", int maxRows = 0) pure @safe 433 { 434 assert(open, "Connection is not open"); 435 int savepoint = bufHead; 436 scope(failure) bufHead = savepoint; 437 438 write(cast(ubyte)FrontMessageType.Execute); 439 auto lenTotal = reserveLen(); 440 cwrite(portal); 441 write(maxRows); 442 lenTotal.fill(); 443 logTrace("Execute message buffered"); 444 } 445 446 /** 447 Quote: 448 "The Flush message does not cause any specific output to be generated, 449 but forces the backend to deliver any data pending in its output buffers. 450 A Flush must be sent after any extended-query command except Sync, if the 451 frontend wishes to examine the results of that command before issuing more 452 commands. Without Flush, messages returned by the backend will be combined 453 into the minimum possible number of packets to minimize network overhead." 454 */ 455 final void putFlushMessage() pure nothrow @safe 456 { 457 assert(open, "Connection is not open"); 458 write(cast(ubyte)FrontMessageType.Flush); 459 write(4); 460 logTrace("Flush message buffered"); 461 } 462 463 final void putParseMessage(PR)(string prepared, string query, scope PR ptypes) 464 pure @safe if (isInputRange!PR && is(Unqual!(ElementType!PR) == OID)) 465 { 466 assert(open, "Connection is not open"); 467 int savepoint = bufHead; 468 scope(failure) bufHead = savepoint; 469 470 write(cast(ubyte)FrontMessageType.Parse); 471 auto lenTotal = reserveLen(); 472 cwrite(prepared); 473 cwrite(query); 474 475 // parameter types 476 short pcount = 0; 477 auto pcountPrefix = reserveLen!short(); 478 foreach (OID ptype; ptypes) 479 { 480 write(ptype); 481 pcount++; 482 } 483 pcountPrefix.write(pcount); 484 485 lenTotal.fill(); 486 logTrace("Parse message buffered"); 487 } 488 489 /// put Query message (simple query protocol) into the write buffer 490 final void putQueryMessage(in string query) pure @safe 491 { 492 assert(open, "Connection is not open"); 493 int savepoint = bufHead; 494 scope(failure) bufHead = savepoint; 495 496 write(cast(ubyte)FrontMessageType.Query); 497 auto lenTotal = reserveLen(); 498 cwrite(query); 499 lenTotal.fill(); 500 unflushedRfq++; 501 logTrace("Query message buffered"); 502 } 503 504 /// ditto 505 final void putQueryMessage(in string[] queryChunks) pure @safe 506 { 507 assert(open, "Connection is not open"); 508 int savepoint = bufHead; 509 scope(failure) bufHead = savepoint; 510 511 write(cast(ubyte)FrontMessageType.Query); 512 auto lenTotal = reserveLen(); 513 for (int i = 0; i < queryChunks.length; i++) 514 if (i == queryChunks.length - 1) 515 cwrite(queryChunks[i]); 516 else 517 write(queryChunks[i]); 518 lenTotal.fill(); 519 unflushedRfq++; 520 logTrace("Query message buffered"); 521 } 522 523 alias putSimpleQuery = putQueryMessage; 524 525 /** 526 Put Sync message into write buffer. Usually you should call this after 527 every portal execute message. 528 Every postSimpleQuery or PSQLConnection.sync MUST be accompanied by getQueryResults call. */ 529 final void putSyncMessage() pure nothrow @safe 530 { 531 assert(open, "Connection is not open"); 532 write(cast(ubyte)FrontMessageType.Sync); 533 write(4); 534 unflushedRfq++; 535 logTrace("Sync message buffered"); 536 } 537 538 alias sync = putSyncMessage; 539 540 /// NotificationResponse messages will be parsed and passed to this 541 /// callback during 'pollMessages' call. 542 /// https://www.postgresql.org/docs/9.5/static/sql-notify.html 543 bool delegate(typeof(this) con, Notification n) nothrow @safe notificationCallback = null; 544 545 /// NoticeResponse messages will be parsed and 546 /// passed to this callback during 'pollMessages' call. 547 void delegate(typeof(this) con, Notice n) nothrow @safe noticeCallback = null; 548 549 /** When this callback returns true, pollMessages will exit it's loop. 550 Interceptor should set err to true if it has encountered some kind of error 551 and wants it to be rethrown as PsqlClientException at the end of 552 pollMessages call. errMsg should be appended with error description. */ 553 alias InterceptorT = bool delegate(Message msg) @safe nothrow; 554 555 /** Read messages from the socket in loop until: 556 1). if finishOnError is set and ErrorResponse is received, function 557 throws PsqlErrorResponseException immediately. 558 2). ReadyForQuery message is received. 559 3). interceptor delegate returns `true`. 560 4). NotificationResponse received and notificationCallback returned 'true'. 561 Interceptor delegate is used to customize the logic. If the message is 562 not ReadyForQuery, ErrorResponse or Notice\Notification, it is passed to 563 interceptor. */ 564 final void pollMessages(scope InterceptorT interceptor, bool finishOnError = false) @safe 565 { 566 bool error; 567 Notice errorNotice; 568 pollMessages(interceptor, error, errorNotice, finishOnError); 569 if (error) 570 throw new PsqlErrorResponseException(errorNotice); 571 } 572 573 /// Same as above, but throws only on serious protocol or socket-level errors. 574 final void pollMessages(scope InterceptorT interceptor, 575 out bool error, out Notice errorNotice, bool finishOnError = false) @safe 576 { 577 bool finish = false; 578 579 while (!finish) 580 { 581 Message msg = readOneMessage(); 582 583 with (BackendMessageType) 584 switch (msg.type) 585 { 586 case ErrorResponse: 587 enforce!PsqlClientException(!error, "Second ErrorResponse " ~ 588 "received during one pollMessages call"); 589 error = true; 590 parseNoticeMessage(msg.data, errorNotice); 591 if (finishOnError) 592 finish = true; 593 break; 594 case ReadyForQuery: 595 enforce!PsqlClientException(readyForQueryExpected > 0, 596 "Unexpected ReadyForQuery message"); 597 readyForQueryExpected--; 598 tstatus = cast(TransactionStatus) msg.data[0]; 599 finish = true; 600 break; 601 case NoticeResponse: 602 if (noticeCallback !is null) 603 { 604 Notice n; 605 parseNoticeMessage(msg.data, n); 606 noticeCallback(this, n); 607 } 608 break; 609 case NotificationResponse: 610 if (notificationCallback !is null) 611 { 612 Notification n; 613 n.procId = deserializeNumber!int(msg.data[0..4]); 614 size_t l; 615 n.channel = deserializeProtocolString(msg.data[4..$], l); 616 n.payload = deserializeString(msg.data[4+l..$-1]); 617 finish |= notificationCallback(this, n); 618 } 619 break; 620 default: 621 if (interceptor !is null) 622 finish |= interceptor(msg); 623 } 624 } 625 } 626 627 /// reads and discards messages from socket until all expected 628 /// ReadyForQuery messages are received 629 void windupResponseStack() @safe 630 { 631 while (readyForQueryExpected > 0) 632 { 633 Message msg = readOneMessage(); 634 if (msg.type == BackendMessageType.ReadyForQuery) 635 readyForQueryExpected--; 636 } 637 } 638 639 // Protected section for functions that will probably never be used by 640 // client code directly. If you need them, inherit them. 641 protected: 642 643 final void putStartupMessage(in BackendParams params) pure @safe 644 { 645 int savepoint = bufHead; 646 scope(failure) bufHead = savepoint; 647 648 auto lenPrefix = reserveLen(); 649 write(0x0003_0000); // protocol v3 650 cwrite("user"); 651 cwrite(params.user); 652 cwrite("database"); 653 cwrite(params.database); 654 write(cast(ubyte)0); 655 lenPrefix.fill(); 656 logTrace("Startup message buffered"); 657 } 658 659 final void putTerminateMessage() pure nothrow @safe 660 { 661 write(cast(ubyte)FrontMessageType.Terminate); 662 write(4); 663 logTrace("Terminate message buffered"); 664 } 665 666 void initialize(in BackendParams params) @safe 667 { 668 putStartupMessage(params); 669 flush(); 670 671 int authType = -1; 672 Message auth_msg; 673 674 pollMessages((Message msg) 675 { 676 if (msg.type == BackendMessageType.Authentication) 677 { 678 auth_msg = msg; 679 authType = deserializeNumber(msg.data[0..4]); 680 if (authType == 0) // instantly authorized, so we'll get readyForQuery 681 readyForQueryExpected++; 682 else 683 return true; 684 } 685 else if (msg.type == BackendMessageType.BackendKeyData) 686 { 687 m_processId = deserializeNumber(msg.data[0..4]); 688 m_cancellationKey = deserializeNumber(msg.data[4..8]); 689 } 690 return false; 691 }, true); 692 693 enforce!PsqlClientException(authType != -1, 694 "Expected Authentication message was not received"); 695 switch (authType) 696 { 697 case 0: 698 // instant AuthenticationOk, trusted connection usually does this 699 logTrace("Succesfully authorized"); 700 return; 701 case 3: 702 // cleartext password 703 putPasswordMessage(params.password); 704 break; 705 case 5: 706 // MD5 salted password 707 ubyte[4] salt = auth_msg.data[4 .. 8]; 708 putMd5PasswordMessage(params.password, params.user, salt); 709 break; 710 default: 711 throw new PsqlClientException("Unknown auth type " ~ 712 authType.to!string); 713 } 714 flush(); 715 716 int authRes = -1; 717 pollMessages((Message msg) { 718 if (msg.type == BackendMessageType.Authentication) 719 authRes = deserializeNumber(msg.data[0..4]); 720 else if (msg.type == BackendMessageType.BackendKeyData) 721 { 722 m_processId = deserializeNumber(msg.data[0..4]); 723 m_cancellationKey = deserializeNumber(msg.data[4..8]); 724 } 725 return false; 726 }); 727 enforce!PsqlClientException(authRes == 0, 728 "Expected AuthenticationOk message was not received"); 729 } 730 731 final void putPasswordMessage(string pw) pure @safe 732 { 733 int savepoint = bufHead; 734 scope(failure) bufHead = savepoint; 735 736 write(cast(ubyte)FrontMessageType.PasswordMessage); 737 auto lenPrefix = reserveLen(); 738 cwrite(pw); 739 lenPrefix.fill(); 740 unflushedRfq++; 741 logTrace("Password message buffered"); 742 } 743 744 final void putMd5PasswordMessage(string pw, string user, ubyte[4] salt) pure @trusted 745 { 746 int savepoint = bufHead; 747 scope(failure) bufHead = savepoint; 748 749 // thank you ddb authors 750 char[32] MD5toHex(T...)(in T data) 751 { 752 import std.ascii : LetterCase; 753 import std.digest.md : md5Of, toHexString; 754 return md5Of(data).toHexString!(LetterCase.lower); 755 } 756 757 write(cast(ubyte)FrontMessageType.PasswordMessage); 758 auto lenPrefix = reserveLen(); 759 char[3 + 32] mdpw; 760 mdpw[0 .. 3] = "md5"; 761 mdpw[3 .. $] = MD5toHex(MD5toHex(pw, user), salt); 762 cwrite(cast(string) mdpw[]); 763 lenPrefix.fill(); 764 unflushedRfq++; 765 logTrace("MD5 Password message buffered"); 766 } 767 768 /// Read from socket to buffer buf exactly buf.length bytes. 769 /// Blocks and throws. 770 final void read(ubyte[] buf) @safe 771 { 772 try 773 { 774 logTrace("reading %d bytes", buf.length); 775 auto r = m_socket.receive(buf); 776 assert(r == buf.length, "received less bytes than requested"); 777 } 778 catch (PsqlSocketException e) 779 { 780 open = false; 781 throw e; 782 } 783 } 784 785 /// extends writeBuffer if serializer 'm' is lacking space (returns -2) 786 final int wrappedSerialize(SerialT)(scope SerialT m) pure nothrow @trusted 787 if (isCallable!SerialT) 788 { 789 int bcount = m(writeBuffer[bufHead .. $]); 790 while (bcount <= -2) 791 { 792 // reallocate with additional 4 pages 793 int deficit = -bcount - (cast(int) writeBuffer.length - bufHead); 794 assert(deficit > 0, "negative buffer deficit"); 795 writeBuffer.length = writeBuffer.length + max(4 * 4096, deficit); 796 bcount = m(writeBuffer[bufHead .. $]); 797 } 798 if (bcount > 0) 799 bufHead += bcount; 800 return bcount; 801 } 802 803 /// write numeric type T to write buffer 804 final int write(T)(T val) pure nothrow @safe 805 if (isNumeric!T) 806 { 807 return wrappedSerialize((ubyte[] buf) => serializeFixedField(buf, &val)); 808 } 809 810 /// Reserve space in write buffer for length prefix and return 811 /// struct that automatically fills it from current buffer offset position. 812 final auto reserveLen(T = int)() @safe nothrow 813 if (isNumeric!T && !isUnsigned!T) 814 { 815 static struct Len 816 { 817 PSQLConnection con; 818 int idx; // offset of length prefix word in writeBuffer 819 820 /// calculate and write length prefix 821 void fill(bool includeSelf = true) pure @trusted 822 { 823 T len = (con.bufHead - idx).to!T; 824 if (!includeSelf) 825 { 826 len -= T.sizeof.to!T; 827 assert(len >= 0); 828 } 829 else 830 assert(len >= T.sizeof); 831 logTrace("writing length of %d bytes to index %d", len, idx); 832 auto res = serializeFixedField(con.writeBuffer[idx .. idx+T.sizeof], &len); 833 assert(res == T.sizeof); 834 } 835 836 /// write some specific number 837 void write(T v) nothrow pure @trusted 838 { 839 auto res = serializeFixedField(con.writeBuffer[idx .. idx+T.sizeof], &v); 840 assert(res == T.sizeof); 841 } 842 } 843 844 Len l = Len(this, bufHead); 845 bufHead += T.sizeof; 846 return l; 847 } 848 849 final int bwrite(scope const ubyte[] s) pure nothrow @safe 850 { 851 return wrappedSerialize((ubyte[] buf) => serializeBytesField(buf, &s)); 852 } 853 854 final int write(in string s) pure nothrow @safe 855 { 856 return wrappedSerialize((ubyte[] buf) => serializeStringField(buf, &s)); 857 } 858 859 final int cwrite(in string s) pure nothrow @safe 860 { 861 return wrappedSerialize((ubyte[] buf) => serializeCstring(buf, s)); 862 } 863 864 /// read exactly one message from the socket 865 Message readOneMessage() @trusted 866 { 867 Message res; 868 ubyte[5] type_and_length; 869 read(type_and_length); 870 res.type = cast(BackendMessageType) type_and_length[0]; 871 logTrace("Got message of type %s", res.type.to!string); 872 int length = deserializeNumber(cast(immutable(ubyte)[]) type_and_length[1..$]) - 4; 873 enforce!PsqlClientException(length >= 0, "Negative message length"); 874 ubyte[] data; 875 if (length > 0) 876 { 877 if (length <= readBatchSize / 2) 878 { 879 // we should batch the allocation 880 if (readBatch.length < length) 881 readBatch = new ubyte[readBatchSize]; 882 data = readBatch[0..length]; 883 readBatch = readBatch[length..$]; 884 } 885 else 886 data = new ubyte[length]; // fat messages get their own buffer 887 read(data); 888 } 889 res.data = cast(immutable(ubyte)[]) data; 890 return res; 891 } 892 }