1 /** 2 Connection. 3 4 Copyright: Copyright Boris-Barboris 2017. 5 License: MIT 6 Authors: Boris-Barboris 7 */ 8 9 module dpeq.connection; 10 11 import core.time: seconds; 12 13 import std.exception: enforce; 14 import std.conv: to; 15 import std.traits; 16 import std.range; 17 import std.socket; 18 19 import dpeq.constants; 20 import dpeq.exceptions; 21 import dpeq.marshalling; 22 import dpeq.schema: Notification, Notice; 23 24 25 26 /** 27 $(D std.socket.Socket) wrapper wich is compatible with PSQLConnection. 28 If you want to use custom socket type (vibe-d or any other), make it 29 duck-type and exception-compatible with this one. 30 */ 31 final class StdSocket 32 { 33 private Socket m_socket; 34 35 /// Underlying socket instance. 36 @property Socket socket() { return m_socket; } 37 38 /// Establish connection to backend. Constructor is expected to throw Exception 39 /// if anything goes wrong. 40 this(string host, ushort port) 41 { 42 if (host[0] == '/') 43 { 44 // Unix socket 45 version(Posix) 46 { 47 Address addr = new UnixAddress(host); 48 m_socket = new Socket(AddressFamily.UNIX, SocketType.STREAM); 49 m_socket.connect(addr); 50 } 51 else 52 assert(0, "Cannot connect using UNIX sockets on non-Posix OS"); 53 } 54 else 55 { 56 m_socket = new TcpSocket(); 57 // 20 minutes for both tcp_keepalive_time and tcp_keepalive_intvl 58 m_socket.setKeepAlive(1200, 1200); 59 // example of receive timeout: 60 // m_socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, seconds(120)); 61 m_socket.connect(new InternetAddress(host, port)); 62 } 63 } 64 65 void close() nothrow @nogc 66 { 67 m_socket.shutdown(SocketShutdown.BOTH); 68 m_socket.close(); 69 } 70 71 /// Send whole byte buffer or throw. Throws: $(D PsqlSocketException). 72 auto send(const(ubyte)[] buf) 73 { 74 auto r = m_socket.send(buf); 75 if (r == Socket.ERROR) 76 throw new PsqlSocketException("Socket.ERROR on send"); 77 return r; 78 } 79 80 /// Fill byte buffer from the socket completely or throw. 81 /// Throws: $(D PsqlSocketException). 82 auto receive(ubyte[] buf) 83 { 84 auto r = m_socket.receive(buf); 85 if (r == 0 && buf.length > 0) 86 throw new PsqlSocketException("Connection closed"); 87 if (r == Socket.ERROR) 88 throw new PsqlSocketException("Socket.ERROR on receive: " ~ m_socket.getErrorText()); 89 return r; 90 } 91 } 92 93 94 95 /// Transport and authorization parameters. 96 struct BackendParams 97 { 98 /// Hostname or IP address for TCP socket. Filename for UNIX socket. 99 string host = "/var/run/postgresql/.s.PGSQL.5432"; 100 ushort port = cast(ushort)5432; 101 string user = "postgres"; 102 string password; 103 string database = "postgres"; 104 } 105 106 107 /// Message, received from backend. 108 struct Message 109 { 110 BackendMessageType type; 111 /// Raw unprocessed message byte array excluding message type byte and 112 /// first 4 bytes that represent message body length. 113 ubyte[] data; 114 } 115 116 enum StmtOrPortal: char 117 { 118 Statement = 'S', 119 Portal = 'P' 120 } 121 122 // When the client code is uninterested in dpeq connection logging. 123 private pragma(inline, true) void nop_logger(T...)(T vals) {} 124 125 /** 126 Connection object. 127 128 Params: 129 SocketT = socket class type to use. 130 logTrace = alias of a logging function with $(D std.stdio.writef) signature. 131 Very verbose byte-stream information will be printed through it. 132 logError = same as logTrace but for errors. 133 */ 134 class PSQLConnection( 135 SocketT = StdSocket, 136 alias logTrace = nop_logger, 137 alias logError = nop_logger) 138 { 139 protected 140 { 141 SocketT m_socket; 142 ubyte[] writeBuffer; 143 int bufHead = 0; 144 bool open = false; 145 146 // we allocate RAM for responses in batches to reduce GC pressure. 147 // 2048 is the last small-sized dling in gc: 148 // https://github.com/dlang/druntime/blob/v2.079.0/src/gc/impl/conservative/gc.d#L1251 149 immutable int readBatchSize = 2048; 150 ubyte[] readBatch; 151 152 // number of expected readyForQuery responses 153 int readyForQueryExpected = 0; 154 155 TransactionStatus tstatus = TransactionStatus.IDLE; 156 157 /// counters to generate unique prepared statement and portal ids 158 int preparedCounter = 0; 159 int portalCounter = 0; 160 161 int m_processId = -1; 162 int m_cancellationKey = -1; 163 164 /// backend params this connection was created with 165 const BackendParams m_backendParams; 166 } 167 168 /// Backend parameters this connection was constructed from 169 final @property ref const(BackendParams) backendParams() const 170 { 171 return m_backendParams; 172 } 173 174 /// Backend process ID. Used in CancelRequest message. 175 final @property int processId() const { return m_processId; } 176 177 /// Cancellation secret. Used in CancelRequest message. 178 final @property int cancellationKey() const { return m_cancellationKey; } 179 180 /// Socket getter. 181 final @property SocketT socket() { return m_socket; } 182 183 /// Number of ReadyForQuery messages that are yet to be received 184 /// from the backend. May be useful for checking wether getQueryResults 185 /// would block forever. 186 final @property int expectedRFQCount() const { return readyForQueryExpected; } 187 188 /// Transaction status, reported by the last received ReadyForQuery message. 189 /// For a new connection TransactionStatus.IDLE is returned. 190 final @property TransactionStatus transactionStatus() const { return tstatus; } 191 192 invariant 193 { 194 assert(readyForQueryExpected >= 0); 195 assert(bufHead >= 0); 196 } 197 198 /// Connection is open when it is authorized and socket was alive last time 199 /// it was checked. 200 final @property bool isOpen() const { return open; } 201 202 /// Generate next connection-unique prepared statement name. 203 final string getNewPreparedName() 204 { 205 return (preparedCounter++).to!string; 206 } 207 208 /// Generate next connection-unique portal name. 209 final string getNewPortalName() 210 { 211 return (portalCounter++).to!string; 212 } 213 214 /// Allocate reuired memory, build socket and authorize the connection. 215 /// Throws: $(D PsqlSocketException) if transport failed, or 216 /// $(D PsqlClientException) if authorization failed for some reason. 217 this(const BackendParams bp) 218 { 219 m_backendParams = bp; 220 writeBuffer.length = 2 * 4096; // liberal procurement of two pages 221 try 222 { 223 logTrace("Trying to open TCP connection to PSQL"); 224 m_socket = new SocketT(bp.host, bp.port); 225 logTrace("Success"); 226 } 227 catch (Exception e) 228 { 229 throw new PsqlSocketException(e.msg, e); 230 } 231 scope(failure) m_socket.close(); 232 initialize(bp); 233 open = true; 234 } 235 236 /// Notify backend and close socket. 237 void terminate() 238 { 239 open = false; 240 try 241 { 242 putTerminateMessage(); 243 flush(); 244 m_socket.close(); 245 } 246 catch (Exception e) 247 { 248 logError("Exception caught while terminating PSQL connection: %s", e.msg); 249 } 250 } 251 252 /** 253 Open new socket and connect it to the same backend as this connection is 254 bound to, send CancelRequest and close socket. 255 */ 256 void cancelRequest() 257 { 258 SocketT sock = new SocketT(m_backendParams.host, m_backendParams.port); 259 ubyte[4 * 4] intBuf; 260 marshalFixedField(intBuf[0..4], int(16)); 261 marshalFixedField(intBuf[4..8], int(80877102)); 262 marshalFixedField(intBuf[8..12], m_processId); 263 marshalFixedField(intBuf[12..16], m_cancellationKey); 264 sock.send(intBuf[]); 265 sock.close(); 266 } 267 268 /// Flush writeBuffer into the socket. Blocks/yields (according to socket 269 /// implementation). 270 final void flush() 271 { 272 try 273 { 274 // does not block if zero length: 275 // https://github.com/vibe-d/vibe-core/blob/master/source/vibe/core/net.d#L607 276 auto w = m_socket.send(writeBuffer[0..bufHead]); 277 while (bufHead - w > 0) 278 w += m_socket.send(writeBuffer[w..bufHead]); 279 logTrace("flushed %d bytes: %s", w, writeBuffer[0..bufHead].to!string); 280 } 281 catch (PsqlSocketException e) 282 { 283 open = false; 284 throw e; 285 } 286 finally 287 { 288 bufHead = 0; 289 } 290 } 291 292 /// discard write buffer content 293 final void discard() 294 { 295 bufHead = 0; 296 } 297 298 /// Save write buffer cursor in order to be able to restore it in case of errors. 299 /// Use it to prevent sending junk to backend when something goes wrong during 300 /// marshalling or message creation. 301 final auto saveBuffer() 302 { 303 static struct WriteCursor 304 { 305 int offset; 306 PSQLConnection conn; 307 void restore() 308 { 309 conn.bufHead = offset; 310 } 311 } 312 return WriteCursor(bufHead, this); 313 } 314 315 pragma(inline) 316 final void ensureOpen() 317 { 318 enforce!PsqlConnectionClosedException(open, "Connection is not open"); 319 } 320 321 322 /* 323 //////////////////////////////////////////////////////////////////////////// 324 // All sorts of messages 325 // https://www.postgresql.org/docs/9.5/static/protocol-message-formats.html 326 //////////////////////////////////////////////////////////////////////////// 327 */ 328 329 /** Put Bind message into write buffer. 330 * 331 * 'formatCodes' - input range of FormatCodes. 332 * quotes: 333 * The number of parameter format codes that follow (denoted C below). 334 * This can be zero to indicate that there are no parameters or that the 335 * parameters all use the default format (text); or one, in which case the 336 * specified format code is applied to all parameters; or it can equal 337 * the actual number of parameters. 338 * The parameter format codes. Each must presently be zero (text) or one (binary). 339 * `parameters` is input range of marshalling delegates. 340 * 341 * 'parameters' - input range of marshaller closures. Actual data should 342 * be self-contained in this parameter. Marshaller is a callable that 343 * is covariant with "int delegate(ubyte[] buf)" and returns -2 if buf 344 * is too small, -1 if parameter is null and an actual number of bytes written 345 * otherwise. 346 * 347 * 'resultFormatCodes' - input range of query result FormatCodes. 348 * quotes: 349 * 350 * The number of result-column format codes that follow (denoted R below). 351 * This can be zero to indicate that there are no result columns or that 352 * the result columns should all use the default format (text); or one, 353 * in which case the specified format code is applied to all result 354 * columns (if any); or it can equal the actual number of result columns 355 * of the query. 356 * The result-column format codes. Each must presently be zero (text) or 357 * one (binary). 358 */ 359 final void putBindMessage(FR, PR, RR) 360 (string portal, string prepared, scope FR formatCodes, scope PR parameters, 361 scope RR resultFormatCodes) 362 //if (isInputRange!FR && is(Unqual!(ElementType!FR) == FormatCode) && 363 // isInputRange!RR && is(Unqual!(ElementType!RR) == FormatCode) && 364 // isInputRange!PR && __traits(compiles, -1 == parameters.front()(new ubyte[2])) 365 { 366 ensureOpen(); 367 write(cast(ubyte)FrontMessageType.Bind); 368 auto lenTotal = reserveLen(); 369 cwrite(portal); 370 cwrite(prepared); 371 372 // parameter format code(s) 373 short fcodes = 0; 374 auto fcodePrefix = reserveLen!short(); 375 foreach (FormatCode fcode; formatCodes) 376 { 377 logTrace("Bind: writing %d fcode", fcode); 378 write(cast(short)fcode); 379 fcodes++; 380 } 381 fcodePrefix.write(fcodes); 382 383 // parameters 384 short pcount = 0; 385 auto pcountPrefix = reserveLen!short(); 386 foreach (param; parameters) 387 { 388 auto paramPrefix = reserveLen!int(); 389 int r = wrappedMarsh(param); 390 logTrace("Bind: wrote 4bytes + %d bytes for value", r); 391 paramPrefix.write(r); // careful! -1 means Null 392 pcount++; 393 } 394 pcountPrefix.write(pcount); 395 396 // result format codes 397 short rcount = 0; 398 auto rcolPrefix = reserveLen!short(); 399 foreach (FormatCode fcode; resultFormatCodes) 400 { 401 write(cast(short)fcode); 402 logTrace("Bind: writing %d rfcode", fcode); 403 rcount++; 404 } 405 rcolPrefix.write(rcount); 406 407 lenTotal.fill(); 408 logTrace("Bind message buffered"); 409 } 410 411 /// putBindMessage overload for parameterless portals 412 final void putBindMessage(RR) 413 (string portal, string prepared, scope RR resultFormatCodes) 414 { 415 ensureOpen(); 416 write(cast(ubyte)FrontMessageType.Bind); 417 auto lenTotal = reserveLen(); 418 cwrite(portal); 419 cwrite(prepared); 420 421 // parameter format code(s) 422 short fcodes = 0; 423 auto fcodePrefix = reserveLen!short(); 424 fcodePrefix.write(fcodes); 425 426 // parameters 427 short pcount = 0; 428 auto pcountPrefix = reserveLen!short(); 429 pcountPrefix.write(pcount); 430 431 // result format codes 432 short rcount = 0; 433 auto rcolPrefix = reserveLen!short(); 434 foreach (FormatCode fcode; resultFormatCodes) 435 { 436 write(cast(short)fcode); 437 logTrace("Bind: writing %d rfcode", fcode); 438 rcount++; 439 } 440 rcolPrefix.write(rcount); 441 442 lenTotal.fill(); 443 logTrace("Bind message buffered"); 444 } 445 446 /// put Close message into write buffer. 447 /// `closeWhat` is 'S' for prepared statement and 448 /// 'P' for portal. 449 final void putCloseMessage(StmtOrPortal closeWhat, string name) 450 { 451 ensureOpen(); 452 assert(closeWhat == 'S' || closeWhat == 'P'); 453 write(cast(ubyte)FrontMessageType.Close); 454 auto lenTotal = reserveLen(); 455 write(cast(ubyte)closeWhat); 456 cwrite(name); 457 lenTotal.fill(); 458 logTrace("Close message buffered"); 459 } 460 461 /// put Close message into write buffer. 462 /// `closeWhat` is 'S' for prepared statement and 463 /// 'P' for portal. 464 final void putDescribeMessage(StmtOrPortal descWhat, string name) 465 { 466 ensureOpen(); 467 assert(descWhat == 'S' || descWhat == 'P'); 468 write(cast(ubyte)FrontMessageType.Describe); 469 auto lenTotal = reserveLen(); 470 write(cast(ubyte)descWhat); 471 cwrite(name); 472 lenTotal.fill(); 473 logTrace("Describe message buffered"); 474 } 475 476 /** 477 non-zero maxRows will generate PortalSuspended messages, wich are 478 currently not handled by dpeq commands */ 479 final void putExecuteMessage(string portal = "", int maxRows = 0) 480 { 481 ensureOpen(); 482 write(cast(ubyte)FrontMessageType.Execute); 483 auto lenTotal = reserveLen(); 484 cwrite(portal); 485 write(maxRows); 486 lenTotal.fill(); 487 logTrace("Execute message buffered"); 488 } 489 490 /** 491 Quote: 492 "The Flush message does not cause any specific output to be generated, 493 but forces the backend to deliver any data pending in its output buffers. 494 A Flush must be sent after any extended-query command except Sync, if the 495 frontend wishes to examine the results of that command before issuing more 496 commands. Without Flush, messages returned by the backend will be combined 497 into the minimum possible number of packets to minimize network overhead." 498 */ 499 final void putFlushMessage() 500 { 501 ensureOpen(); 502 write(cast(ubyte)FrontMessageType.Flush); 503 write(4); 504 logTrace("Flush message buffered"); 505 } 506 507 final void putParseMessage(PR)(string prepared, string query, scope PR ptypes) 508 if (isInputRange!PR && is(Unqual!(ElementType!PR) == ObjectID)) 509 { 510 logTrace("Message to parse query: %s", query); 511 512 ensureOpen(); 513 write(cast(ubyte)FrontMessageType.Parse); 514 auto lenTotal = reserveLen(); 515 cwrite(prepared); 516 cwrite(query); 517 518 // parameter types 519 short pcount = 0; 520 auto pcountPrefix = reserveLen!short(); 521 foreach (ObjectID ptype; ptypes) 522 { 523 write(cast(int)ptype); 524 pcount++; 525 } 526 pcountPrefix.write(pcount); 527 528 lenTotal.fill(); 529 logTrace("Parse message buffered"); 530 } 531 532 /// put Query message (simple query protocol) into the write buffer 533 final void putQueryMessage(string query) 534 { 535 ensureOpen(); 536 write(cast(ubyte)FrontMessageType.Query); 537 auto lenTotal = reserveLen(); 538 cwrite(query); 539 lenTotal.fill(); 540 readyForQueryExpected++; 541 logTrace("Query message buffered"); 542 } 543 544 alias putSimpleQuery = putQueryMessage; 545 546 /** 547 Put Sync message into write buffer. Usually you should call this after 548 every portal execute message. 549 Every postSimpleQuery or PSQLConnection.sync MUST be accompanied by getQueryResults call. */ 550 final void putSyncMessage() 551 { 552 ensureOpen(); 553 write(cast(ubyte)FrontMessageType.Sync); 554 write(4); 555 readyForQueryExpected++; 556 logTrace("Sync message buffered"); 557 } 558 559 alias sync = putSyncMessage; 560 561 /// NotificationResponse messages will be parsed and passed to this 562 /// callback during 'pollMessages' call. 563 /// https://www.postgresql.org/docs/9.5/static/sql-notify.html 564 bool delegate(typeof(this) con, Notification n) nothrow notificationCallback = null; 565 566 /// NoticeResponse messages will be parsed and 567 /// passed to this callback during 'pollMessages' call. 568 void delegate(typeof(this) con, Notice n) nothrow noticeCallback = null; 569 570 /** When this callback returns true, pollMessages will exit it's loop. 571 Interceptor should set err to true if it has encountered some kind of error 572 and wants it to be rethrown as PsqlClientException at the end of 573 pollMessages call. errMsg should be appended with error description. */ 574 alias InterceptorT = bool delegate(Message msg, ref bool err, 575 ref string errMsg) nothrow; 576 577 /** Read messages from the socket in loop until: 578 1). if finishOnError is set and ErrorResponse is received, function 579 throws immediately. 580 2). ReadyForQuery message is received. 581 3). interceptor delegate returns `true`. 582 4). NotificationResponse received and notificationCallback returned 'true'. 583 Interceptor delegate is used to customize the logic. If the message is 584 not ReadyForQuery, ErrorResponse or Notice\Notification, it is passed to 585 interceptor. It may set bool 'err' flag to true and append to 'errMsg' 586 string, if delayed throw is required. */ 587 final void pollMessages(scope InterceptorT interceptor, bool finishOnError = false) 588 { 589 bool finish = false; 590 bool error = false; 591 Notice errorNotice; 592 bool intError = false; 593 string intErrMsg; 594 595 while (!finish) 596 { 597 Message msg = readOneMessage(); 598 599 with (BackendMessageType) 600 switch (msg.type) 601 { 602 case ErrorResponse: 603 enforce!PsqlClientException(!error, "Second ErrorResponse " ~ 604 "received during one pollMessages call"); 605 error = true; 606 parseNoticeMessage(msg.data, errorNotice); 607 if (finishOnError) 608 finish = true; 609 break; 610 case ReadyForQuery: 611 enforce!PsqlClientException(readyForQueryExpected > 0, 612 "Unexpected ReadyForQuery message"); 613 readyForQueryExpected--; 614 tstatus = cast(TransactionStatus) msg.data[0]; 615 finish = true; 616 break; 617 case NoticeResponse: 618 if (noticeCallback !is null) 619 { 620 Notice n; 621 parseNoticeMessage(msg.data, n); 622 noticeCallback(this, n); 623 } 624 break; 625 case NotificationResponse: 626 if (notificationCallback !is null) 627 { 628 Notification n; 629 n.procId = demarshalNumber!int(msg.data[0..4]); 630 size_t l; 631 n.channel = demarshalProtocolString(msg.data[4..$], l); 632 n.payload = demarshalString(msg.data[4+l..$-1]); 633 finish |= notificationCallback(this, n); 634 } 635 break; 636 default: 637 if (interceptor !is null) 638 finish |= interceptor(msg, intError, intErrMsg); 639 } 640 } 641 642 if (error) 643 throw new PsqlErrorResponseException(errorNotice); 644 if (intError) 645 throw new PsqlClientException(intErrMsg); 646 } 647 648 /// reads and discards messages from socket until all expected 649 /// ReadyForQuery messages are received 650 void windupResponseStack() 651 { 652 while (readyForQueryExpected > 0) 653 { 654 Message msg = readOneMessage(); 655 if (msg.type == BackendMessageType.ReadyForQuery) 656 readyForQueryExpected--; 657 } 658 } 659 660 // Protected section for functions that will probably never be used by 661 // client code directly. If you need them, inherit them. 662 protected: 663 664 final void putStartupMessage(in BackendParams params) 665 { 666 auto lenPrefix = reserveLen(); 667 write(0x0003_0000); // protocol v3 668 cwrite("user"); 669 cwrite(params.user); 670 cwrite("database"); 671 cwrite(params.database); 672 write(cast(ubyte)0); 673 lenPrefix.fill(); 674 logTrace("Startup message buffered"); 675 } 676 677 final void putTerminateMessage() 678 { 679 write(cast(ubyte)FrontMessageType.Terminate); 680 write(4); 681 logTrace("Terminate message buffered"); 682 } 683 684 final void initialize(in BackendParams params) 685 { 686 putStartupMessage(params); 687 flush(); 688 689 int authType = -1; 690 Message auth_msg; 691 692 pollMessages((Message msg, ref bool e, ref string eMsg) { 693 if (msg.type == BackendMessageType.Authentication) 694 { 695 auth_msg = msg; 696 if (authType != -1) 697 { 698 e = true; 699 eMsg ~= "Unexpected second Authentication " ~ 700 "message received from backend"; 701 } 702 authType = demarshalNumber(msg.data[0..4]); 703 if (authType == 0) // instantly authorized, so we'll get readyForQuery 704 readyForQueryExpected++; 705 else 706 return true; 707 } 708 else if (msg.type == BackendMessageType.BackendKeyData) 709 { 710 m_processId = demarshalNumber(msg.data[0..4]); 711 m_cancellationKey = demarshalNumber(msg.data[4..8]); 712 } 713 return false; 714 }, true); 715 716 enforce!PsqlClientException(authType != -1, 717 "Expected Authentication message was not received"); 718 switch (authType) 719 { 720 case 0: 721 // instant AuthenticationOk, trusted connection usually does this 722 logTrace("Succesfully authorized"); 723 return; 724 case 3: 725 // cleartext password 726 putPasswordMessage(params.password); 727 break; 728 case 5: 729 // MD5 salted password 730 ubyte[4] salt = auth_msg.data[4 .. 8]; 731 putMd5PasswordMessage(params.password, params.user, salt); 732 break; 733 default: 734 throw new PsqlClientException("Unknown auth type " ~ 735 authType.to!string); 736 } 737 flush(); 738 739 int authRes = -1; 740 pollMessages((Message msg, ref bool e, ref string eMsg) { 741 if (msg.type == BackendMessageType.Authentication) 742 authRes = demarshalNumber(msg.data[0..4]); 743 else if (msg.type == BackendMessageType.BackendKeyData) 744 { 745 m_processId = demarshalNumber(msg.data[0..4]); 746 m_cancellationKey = demarshalNumber(msg.data[4..8]); 747 } 748 return false; 749 }); 750 enforce!PsqlClientException(authRes == 0, 751 "Expected AuthenticationOk message was not received"); 752 } 753 754 final void putPasswordMessage(string pw) 755 { 756 write(cast(ubyte)FrontMessageType.PasswordMessage); 757 auto lenPrefix = reserveLen(); 758 cwrite(pw); 759 lenPrefix.fill(); 760 readyForQueryExpected++; 761 logTrace("Password message buffered"); 762 } 763 764 final void putMd5PasswordMessage(string pw, string user, ubyte[4] salt) 765 { 766 // thank you ddb authors 767 char[32] MD5toHex(T...)(in T data) 768 { 769 import std.ascii : LetterCase; 770 import std.digest.md : md5Of, toHexString; 771 return md5Of(data).toHexString!(LetterCase.lower); 772 } 773 774 write(cast(ubyte)FrontMessageType.PasswordMessage); 775 auto lenPrefix = reserveLen(); 776 char[3 + 32] mdpw; 777 mdpw[0 .. 3] = "md5"; 778 mdpw[3 .. $] = MD5toHex(MD5toHex(pw, user), salt); 779 cwrite(cast(string) mdpw[]); 780 lenPrefix.fill(); 781 readyForQueryExpected++; 782 logTrace("MD5 Password message buffered"); 783 } 784 785 final void parseNoticeMessage(ubyte[] data, ref Notice n) 786 { 787 void copyTillZero(ref string dest) 788 { 789 size_t length; 790 dest = demarshalProtocolString(data, length); 791 data = data[length..$]; 792 } 793 794 void discardTillZero() 795 { 796 size_t idx = 0; 797 while (idx < data.length && data[idx]) 798 idx++; 799 data = data[idx+1..$]; 800 } 801 802 while (data.length > 1) 803 { 804 char fieldType = cast(char) data[0]; 805 data = data[1..$]; 806 // https://www.postgresql.org/docs/current/static/protocol-error-fields.html 807 switch (fieldType) 808 { 809 case 'S': 810 copyTillZero(n.severity); 811 break; 812 case 'V': 813 copyTillZero(n.severityV); 814 break; 815 case 'C': 816 enforce!PsqlClientException(data.length >= 6, 817 "Expected 5 bytes of SQLSTATE code."); 818 n.code[] = cast(char[]) data[0..5]; 819 data = data[6..$]; 820 break; 821 case 'M': 822 copyTillZero(n.message); 823 break; 824 case 'D': 825 copyTillZero(n.detail); 826 break; 827 case 'H': 828 copyTillZero(n.hint); 829 break; 830 case 'P': 831 copyTillZero(n.position); 832 break; 833 case 'p': 834 copyTillZero(n.internalPos); 835 break; 836 case 'q': 837 copyTillZero(n.internalQuery); 838 break; 839 case 'W': 840 copyTillZero(n.where); 841 break; 842 case 's': 843 copyTillZero(n.schema); 844 break; 845 case 't': 846 copyTillZero(n.table); 847 break; 848 case 'c': 849 copyTillZero(n.column); 850 break; 851 case 'd': 852 copyTillZero(n.dataType); 853 break; 854 case 'n': 855 copyTillZero(n.constraint); 856 break; 857 case 'F': 858 copyTillZero(n.file); 859 break; 860 case 'L': 861 copyTillZero(n.line); 862 break; 863 case 'R': 864 copyTillZero(n.routine); 865 break; 866 default: 867 discardTillZero(); 868 } 869 } 870 } 871 872 /// Read from socket to buffer buf exactly buf.length bytes. 873 /// Blocks and throws. 874 final void read(ubyte[] buf) 875 { 876 try 877 { 878 logTrace("reading %d bytes", buf.length); 879 // TODO: make sure this code is generic enough for all sockets 880 auto r = m_socket.receive(buf); 881 assert(r == buf.length); 882 //logTrace("received %d bytes", r); 883 } 884 catch (PsqlSocketException e) 885 { 886 open = false; 887 throw e; 888 } 889 } 890 891 /// extends writeBuffer if marshalling functor m is lacking space (returns -2) 892 final int wrappedMarsh(MarshT)(scope MarshT m) 893 { 894 int bcount = m(writeBuffer[bufHead .. $]); 895 while (bcount == -2) 896 { 897 writeBuffer.length = writeBuffer.length + 4 * 4096; 898 bcount = m(writeBuffer[bufHead .. $]); 899 } 900 if (bcount > 0) 901 bufHead += bcount; 902 return bcount; 903 } 904 905 /// write numeric type T to write buffer 906 final int write(T)(T val) 907 if (isNumeric!T) 908 { 909 return wrappedMarsh((ubyte[] buf) => marshalFixedField(buf, val)); 910 } 911 912 /// Reserve space in write buffer for length prefix and return 913 /// struct that automatically fills it from current buffer offset position. 914 final auto reserveLen(T = int)() 915 if (isNumeric!T && !isUnsigned!T) 916 { 917 static struct Len 918 { 919 PSQLConnection con; 920 int idx; // offset of length prefix word in writeBuffer 921 922 /// calculate and write length prefix 923 void fill(bool includeSelf = true) 924 { 925 T len = (con.bufHead - idx).to!T; 926 if (!includeSelf) 927 { 928 len -= T.sizeof.to!T; 929 assert(len >= 0); 930 } 931 else 932 assert(len >= T.sizeof); 933 logTrace("writing length of %d bytes to index %d", len, idx); 934 auto res = marshalFixedField(con.writeBuffer[idx .. idx+T.sizeof], len); 935 assert(res == T.sizeof); 936 } 937 938 /// write some specific number 939 void write(T v) 940 { 941 auto res = marshalFixedField(con.writeBuffer[idx .. idx+T.sizeof], v); 942 assert(res == T.sizeof); 943 } 944 } 945 946 Len l = Len(this, bufHead); 947 bufHead += T.sizeof; 948 return l; 949 } 950 951 final int write(string s) 952 { 953 return wrappedMarsh((ubyte[] buf) => marshalStringField(buf, s)); 954 } 955 956 final int cwrite(string s) 957 { 958 return wrappedMarsh((ubyte[] buf) => marshalCstring(buf, s)); 959 } 960 961 /// read exactly one message from the socket 962 final Message readOneMessage() 963 { 964 Message res; 965 ubyte[1] type; 966 read(type); 967 res.type = cast(BackendMessageType) type[0]; 968 logTrace("Got message of type %s", res.type.to!string); 969 ubyte[4] length_arr; 970 read(length_arr); 971 int length = demarshalNumber(length_arr) - 4; 972 enforce!PsqlClientException(length >= 0, "Negative message length"); 973 ubyte[] data; 974 if (length > 0) 975 { 976 if (length <= readBatchSize / 2) 977 { 978 // we should batch the allocation 979 if (readBatch.length < length) 980 readBatch = new ubyte[readBatchSize]; 981 data = readBatch[0..length]; 982 readBatch = readBatch[length..$]; 983 } 984 else 985 data = new ubyte[length]; 986 read(data); 987 } 988 res.data = data; 989 return res; 990 } 991 }