properties, PgDataSource dat
}
/**
- * Returns an {@link Operation} that connects this {@link Connection} to a server. If the
- * Operation completes successfully and the lifecycle is {@link Lifecycle#NEW} -> {@link
- * Lifecycle#OPEN}. If lifecycle is {@link Lifecycle#NEW_INACTIVE} -> {@link
- * Lifecycle#INACTIVE}. If the {@link Operation} completes exceptionally the lifecycle ->
- * {@link Lifecycle#CLOSED}. The lifecycle must be {@link Lifecycle#NEW} or {@link
- * Lifecycle#NEW_INACTIVE} when the {@link Operation} is executed. Otherwise the {@link Operation}
- * will complete exceptionally with {@link SqlException}.
+ * Returns an {@link Operation} that connects this {@link Connection} to a
+ * server. If the Operation completes successfully and the lifecycle is
+ * {@link Lifecycle#NEW} -> {@link Lifecycle#OPEN}. If lifecycle is
+ * {@link Lifecycle#NEW_INACTIVE} -> {@link Lifecycle#INACTIVE}. If the
+ * {@link Operation} completes exceptionally the lifecycle ->
+ * {@link Lifecycle#CLOSED}. The lifecycle must be {@link Lifecycle#NEW} or
+ * {@link Lifecycle#NEW_INACTIVE} when the {@link Operation} is executed.
+ * Otherwise the {@link Operation} will complete exceptionally with
+ * {@link SqlException}.
*
- * Note: It is highly recommended to use the {@link Connection#connect()} convenience method or to
- * use {@link DataSource#getConnection} which itself calls {@link Connection#connect()}. Unless
- * there is a specific need, do not call this method directly.
+ *
+ * Note: It is highly recommended to use the {@link Connection#connect()}
+ * convenience method or to use {@link DataSource#getConnection} which itself
+ * calls {@link Connection#connect()}. Unless there is a specific need, do not
+ * call this method directly.
*
- *
This method exists partially to clearly explain that while creating a {@link Connection} is
- * non-blocking, the act of connecting to the server may block and so is executed asynchronously.
- * We could write a bunch of text saying this but defining this method is more explicit. Given the
- * {@link Connection#connect()} convenience methods there's probably not much reason to use this
- * method, but on the other hand, who knows, so here it is.
+ *
+ * This method exists partially to clearly explain that while creating a
+ * {@link Connection} is non-blocking, the act of connecting to the server may
+ * block and so is executed asynchronously. We could write a bunch of text
+ * saying this but defining this method is more explicit. Given the
+ * {@link Connection#connect()} convenience methods there's probably not much
+ * reason to use this method, but on the other hand, who knows, so here it is.
*
- * @return an {@link Operation} that connects this {@link Connection} to a server.
- * @throws IllegalStateException if this {@link Connection} is in a lifecycle state other than
- * {@link Lifecycle#NEW}.
+ * @return an {@link Operation} that connects this {@link Connection} to a
+ * server.
+ * @throws IllegalStateException if this {@link Connection} is in a lifecycle
+ * state other than {@link Lifecycle#NEW}.
*/
@Override
public Operation connectOperation() {
if (lifecycle != Lifecycle.NEW) {
- throw new IllegalStateException(
- "only connections in state NEW are allowed to start connecting");
+ throw new IllegalStateException("only connections in state NEW are allowed to start connecting");
}
return new PgConnectOperation(this, groupSubmission);
}
/**
- * Returns an {@link Operation} that verifies that the resources are available and operational.
- * Successful completion of that {@link Operation} implies that at some point between the
- * beginning and end of the {@link Operation} the Connection was working properly to the extent
- * specified by {@code depth}. There is no guarantee that the {@link Connection} is still working
- * after completion.
+ * Returns an {@link Operation} that verifies that the resources are available
+ * and operational. Successful completion of that {@link Operation} implies that
+ * at some point between the beginning and end of the {@link Operation} the
+ * Connection was working properly to the extent specified by {@code depth}.
+ * There is no guarantee that the {@link Connection} is still working after
+ * completion.
*
- * @param depth how completely to check that resources are available and operational. Not {@code
- * null}.
+ * @param depth how completely to check that resources are available and
+ * operational. Not {@code
+ * null} .
* @return an {@link Operation} that will validate this {@link Connection}
* @throws IllegalStateException if this Connection is not active
*/
@Override
public Operation validationOperation(Connection.Validation depth) {
if (!lifecycle.isOpen() || !lifecycle.isActive()) {
- throw new IllegalStateException(
- "connection lifecycle in state: " + lifecycle + " and not open for new work");
+ throw new IllegalStateException("connection lifecycle in state: " + lifecycle + " and not open for new work");
}
return new PgValidationOperation(this, depth);
}
/**
- * Create an {@link Operation} to close this {@link Connection}. When the {@link Operation} is
- * executed, if this {@link Connection} is open -> {@link Lifecycle#CLOSING}. If this {@link
- * Connection} is closed executing the returned {@link Operation} is a noop. When the queue is
- * empty and all resources released -> {@link Lifecycle#CLOSED}.
+ * Create an {@link Operation} to close this {@link Connection}. When the
+ * {@link Operation} is executed, if this {@link Connection} is open ->
+ * {@link Lifecycle#CLOSING}. If this {@link Connection} is closed executing the
+ * returned {@link Operation} is a noop. When the queue is empty and all
+ * resources released -> {@link Lifecycle#CLOSED}.
*
- * A close {@link Operation} is never skipped. Even when the {@link Connection} is dependent, the
- * default, and an {@link Operation} completes exceptionally, a close {@link Operation} is still
- * executed. If the {@link Connection} is parallel, a close {@link Operation} is not executed so
- * long as there are other {@link Operation}s or the {@link Connection} is held; for more {@link
- * Operation}s.
+ *
+ * A close {@link Operation} is never skipped. Even when the {@link Connection}
+ * is dependent, the default, and an {@link Operation} completes exceptionally,
+ * a close {@link Operation} is still executed. If the {@link Connection} is
+ * parallel, a close {@link Operation} is not executed so long as there are
+ * other {@link Operation}s or the {@link Connection} is held; for more
+ * {@link Operation}s.
*
- *
Note: It is highly recommended to use try with resources or the {@link Connection#close()}
- * convenience method. Unless there is a specific need, do not call this method directly.
+ *
+ * Note: It is highly recommended to use try with resources or the
+ * {@link Connection#close()} convenience method. Unless there is a specific
+ * need, do not call this method directly.
*
* @return an {@link Operation} that will close this {@link Connection}.
* @throws IllegalStateException if the Connection is not active
@@ -169,17 +181,17 @@ public Operation closeOperation() {
/**
* Create a new {@link OperationGroup} for this {@link Connection}.
*
- * @param the result type of the member {@link Operation}s of the returned {@link
- * OperationGroup}
- * @param the result type of the collected results of the member {@link Operation}s
+ * @param the result type of the member {@link Operation}s of the returned
+ * {@link OperationGroup}
+ * @param the result type of the collected results of the member
+ * {@link Operation}s
* @return a new {@link OperationGroup}.
* @throws IllegalStateException if this Connection is not active
*/
@Override
public OperationGroup operationGroup() {
if (!lifecycle.isOpen() || !lifecycle.isActive()) {
- throw new IllegalStateException(
- "connection lifecycle in state: " + lifecycle + " and not open for new work");
+ throw new IllegalStateException("connection lifecycle in state: " + lifecycle + " and not open for new work");
}
if (logger.isLoggable(Level.CONFIG)) {
@@ -190,11 +202,14 @@ public OperationGroup operationGroup() {
}
/**
- * Returns a new {@link Transaction} that can be used as an argument to a commit Operation.
+ * Returns a new {@link Transaction} that can be used as an argument to a commit
+ * Operation.
*
- * It is most likely an error to call this within an error handler, or any handler as it is very
- * likely that when the handler is executed the next submitted endTransaction {@link Operation}
- * will have been created with a different Transaction.
+ *
+ * It is most likely an error to call this within an error handler, or any
+ * handler as it is very likely that when the handler is executed the next
+ * submitted endTransaction {@link Operation} will have been created with a
+ * different Transaction.
*
* @return a new {@link Transaction}. Not retained.
* @throws IllegalStateException if this Connection is not active
@@ -205,8 +220,8 @@ public Transaction transaction() {
}
/**
- * Register a listener that will be called whenever there is a change in the lifecycle of this
- * {@link Connection}.
+ * Register a listener that will be called whenever there is a change in the
+ * lifecycle of this {@link Connection}.
*
* @param listener Can be {@code null}.
* @throws IllegalStateException if this Connection is not active
@@ -225,9 +240,10 @@ public Connection registerLifecycleListener(Connection.ConnectionLifecycleListen
}
/**
- * Removes a listener that was registered by calling registerLifecycleListener.Sometime after this
- * method is called the listener will stop receiving lifecycle events. If the listener is not
- * registered, this is a no-op.
+ * Removes a listener that was registered by calling
+ * registerLifecycleListener.Sometime after this method is called the listener
+ * will stop receiving lifecycle events. If the listener is not registered, this
+ * is a no-op.
*
* @param listener Not {@code null}.
* @return this Connection
@@ -257,12 +273,13 @@ public Lifecycle getConnectionLifecycle() {
}
/**
- * Terminate this {@link Connection}. If lifecycle is {@link Lifecycle#NEW}, {@link
- * Lifecycle#OPEN}, {@link Lifecycle#INACTIVE} or {@link Lifecycle#CLOSING} -> {@link
- * Lifecycle#ABORTING} If lifecycle is {@link Lifecycle#ABORTING} or {@link Lifecycle#CLOSED} this
- * is a noop. If an {@link Operation} is currently executing, terminate it immediately. Remove all
- * remaining {@link Operation}s from the queue. {@link Operation}s are not skipped. They are just
- * removed from the queue.
+ * Terminate this {@link Connection}. If lifecycle is {@link Lifecycle#NEW},
+ * {@link Lifecycle#OPEN}, {@link Lifecycle#INACTIVE} or
+ * {@link Lifecycle#CLOSING} -> {@link Lifecycle#ABORTING} If lifecycle is
+ * {@link Lifecycle#ABORTING} or {@link Lifecycle#CLOSED} this is a noop. If an
+ * {@link Operation} is currently executing, terminate it immediately. Remove
+ * all remaining {@link Operation}s from the queue. {@link Operation}s are not
+ * skipped. They are just removed from the queue.
*
* @return this {@link Connection}
*/
@@ -279,13 +296,15 @@ public Connection abort() {
}
/**
- * Return the set of properties configured on this {@link Connection} excepting any sensitive
- * properties. Neither the key nor the value for sensitive properties are included in the result.
- * Properties (other than sensitive properties) that have default values are included even when
- * not explicitly set. Properties that have no default value and are not set explicitly are not
+ * Return the set of properties configured on this {@link Connection} excepting
+ * any sensitive properties. Neither the key nor the value for sensitive
+ * properties are included in the result. Properties (other than sensitive
+ * properties) that have default values are included even when not explicitly
+ * set. Properties that have no default value and are not set explicitly are not
* included.
*
- * @return a {@link Map} of property, value. Not modifiable. May be retained. Not {@code null}.
+ * @return a {@link Map} of property, value. Not modifiable. May be retained.
+ * Not {@code null}.
* @throws IllegalStateException if this Connection is not active
*/
@Override
@@ -294,7 +313,8 @@ public Map getProperties() {
}
/**
- * Returns a {@link ShardingKey.Builder} that is valid for this {@link Connection}.
+ * Returns a {@link ShardingKey.Builder} that is valid for this
+ * {@link Connection}.
*
* @return a {@link ShardingKey.Builder} for this {@link Connection}
*/
@@ -309,10 +329,11 @@ public Connection requestHook(Consumer request) {
}
/**
- * Make this {@link Connection} ready for use. A newly created {@link Connection} is active.
- * Calling this method on a {@link Connection} that is active is a no-op. If the lifecycle is
- * {@link Lifecycle#INACTIVE} -> {@link Lifecycle#OPEN}. If the lifecycle is {@link
- * Lifecycle#NEW_INACTIVE} -> {@link Lifecycle#NEW}.
+ * Make this {@link Connection} ready for use. A newly created
+ * {@link Connection} is active. Calling this method on a {@link Connection}
+ * that is active is a no-op. If the lifecycle is {@link Lifecycle#INACTIVE}
+ * -> {@link Lifecycle#OPEN}. If the lifecycle is
+ * {@link Lifecycle#NEW_INACTIVE} -> {@link Lifecycle#NEW}.
*
* @return this {@link Connection}
* @throws IllegalStateException if this {@link Connection} is closed.
@@ -330,23 +351,28 @@ public Connection activate() {
}
/**
- * Makes this {@link Connection} inactive. After a call to this method previously submitted
- * Operations will be executed normally. If the lifecycle is {@link Lifecycle#NEW} -> {@link
- * Lifecycle#NEW_INACTIVE}. if the lifecycle is {@link Lifecycle#OPEN} -> {@link
- * Lifecycle#INACTIVE}. If the lifecycle is {@link Lifecycle#INACTIVE} or {@link
- * Lifecycle#NEW_INACTIVE} this method is a no-op. After calling this method calling any method
- * other than {@link Connection#deactivate}, {@link Connection#activate}, {@link
- * Connection#abort}, or {@link Connection#getConnectionLifecycle} or submitting any member {@link
- * Operation} will throw {@link IllegalStateException}. Local {@link Connection} state not created
- * by {@link Connection.Builder} may not be preserved.
+ * Makes this {@link Connection} inactive. After a call to this method
+ * previously submitted Operations will be executed normally. If the lifecycle
+ * is {@link Lifecycle#NEW} -> {@link Lifecycle#NEW_INACTIVE}. if the
+ * lifecycle is {@link Lifecycle#OPEN} -> {@link Lifecycle#INACTIVE}. If the
+ * lifecycle is {@link Lifecycle#INACTIVE} or {@link Lifecycle#NEW_INACTIVE}
+ * this method is a no-op. After calling this method calling any method other
+ * than {@link Connection#deactivate}, {@link Connection#activate},
+ * {@link Connection#abort}, or {@link Connection#getConnectionLifecycle} or
+ * submitting any member {@link Operation} will throw
+ * {@link IllegalStateException}. Local {@link Connection} state not created by
+ * {@link Connection.Builder} may not be preserved.
*
- * Any implementation of a {@link Connection} pool is by default required to call {@code
- * deactivate} when putting a {@link Connection} into a pool. The implementation is required to
- * call {@code activate} when removing a {@link Connection} from a pool so the {@link Connection}
- * can be used. An implementation of a {@link Connection} pool may have an optional mode where it
- * does not call {@code deactivate}/{@code activate} as required above. The behavior of the pool
- * and {@link Connection}s cached in the pool in such a mode is entirely implementation
- * dependent.
+ *
+ * Any implementation of a {@link Connection} pool is by default required to
+ * call {@code
+ * deactivate} when putting a {@link Connection} into a pool. The implementation
+ * is required to call {@code activate} when removing a {@link Connection} from
+ * a pool so the {@link Connection} can be used. An implementation of a
+ * {@link Connection} pool may have an optional mode where it does not call
+ * {@code deactivate}/{@code activate} as required above. The behavior of the
+ * pool and {@link Connection}s cached in the pool in such a mode is entirely
+ * implementation dependent.
*
* @return this {@link Connection}
* @throws IllegalStateException if this {@link Connection} is closed
@@ -399,20 +425,18 @@ public void sendNetworkConnect(NetworkConnect connect) {
*/
public void submit(PgSubmission> submission) {
switch (submission.getCompletionType()) {
- case LOCAL:
- case CATCH:
- sendNetworkRequest(new ImmediateComplete(submission));
- break;
- case GROUP:
- if (lastSubmission != null) {
- ((CompletableFuture>) lastSubmission.getCompletionStage()).thenApply(a ->
- submission.finish(null));
- }
- break;
-
- default:
- Portal portal = new Portal(submission);
- sendNetworkRequest(new ParseRequest<>(portal));
+ case LOCAL:
+ case CATCH:
+ sendNetworkRequest(new ImmediateComplete(submission));
+ break;
+ case GROUP:
+ if (lastSubmission != null) {
+ ((CompletableFuture>) lastSubmission.getCompletionStage()).thenApply(a -> submission.finish(null));
+ }
+ break;
+
+ default:
+ sendNetworkRequest(new ParseRequest<>(submission));
}
lastSubmission = submission;
}
diff --git a/src/main/java/org/postgresql/sql2/communication/BEFrameParser.java b/src/main/java/org/postgresql/sql2/communication/BEFrameParser.java
new file mode 100644
index 0000000..0052940
--- /dev/null
+++ b/src/main/java/org/postgresql/sql2/communication/BEFrameParser.java
@@ -0,0 +1,106 @@
+package org.postgresql.sql2.communication;
+
+import java.io.IOException;
+
+import org.postgresql.sql2.util.BinaryHelper;
+
+/**
+ * Reads bytes from the stream from the server and produces packages on a stack
+ */
+public class BEFrameParser {
+
+ public static final char AUTHENTICATION = 'R';
+ public static final char CANCELLATION_KEY_DATA = 'K';
+ public static final char BIND_COMPLETE = '2';
+ public static final char CLOSE_COMPLETE = '3';
+ public static final char COMMAND_COMPLETE = 'C';
+ public static final char COPY_DATA = 'd';
+ public static final char COPY_DONE = 'c';
+ public static final char COPY_IN_RESPONSE = 'G';
+ public static final char COPY_OUT_RESPONSE = 'H';
+ public static final char COPY_BOTH_RESPONSE = 'W';
+ public static final char DATA_ROW = 'D';
+ public static final char EMPTY_QUERY_RESPONSE = 'I';
+ public static final char ERROR_RESPONSE = 'E';
+ public static final char FUNCTION_CALL_RESPONSE = 'V';
+ public static final char NEGOTIATE_PROTOCOL_VERSION = 'v';
+ public static final char NO_DATA = 'n';
+ public static final char NOTICE_RESPONSE = 'N';
+ public static final char NOTIFICATION_RESPONSE = 'A';
+ public static final char PARAM_DESCRIPTION = 't';
+ public static final char PARAM_STATUS = 'S';
+ public static final char PARSE_COMPLETE = '1';
+ public static final char PORTAL_SUSPENDED = 's';
+ public static final char READY_FOR_QUERY = 'Z';
+ public static final char ROW_DESCRIPTION = 'T';
+
+ private enum States {
+ BETWEEN, READ_TAG, READ_LEN1, READ_LEN2, READ_LEN3, READ_LEN4
+ }
+
+ private States state = States.BETWEEN;
+
+ private byte tag;
+ private byte len1;
+ private byte len2;
+ private byte len3;
+ private byte len4;
+ private int payloadLength;
+
+ public boolean parseBEFrame(NetworkInputStream inputStream) throws IOException {
+
+ // Read frame header (tag and length)
+ if (this.state != States.READ_LEN4) {
+ READ_HEADER: while (inputStream.available() > 0) {
+ switch (state) {
+ case BETWEEN:
+ tag = (byte) inputStream.read();
+ state = States.READ_TAG;
+ break;
+ case READ_TAG:
+ len1 = (byte) inputStream.read();
+ state = States.READ_LEN1;
+ break;
+ case READ_LEN1:
+ len2 = (byte) inputStream.read();
+ state = States.READ_LEN2;
+ break;
+ case READ_LEN2:
+ len3 = (byte) inputStream.read();
+ state = States.READ_LEN3;
+ break;
+ case READ_LEN3:
+ len4 = (byte) inputStream.read();
+ // -4 to ignore payload length
+ payloadLength = BinaryHelper.readInt(len1, len2, len3, len4) - 4;
+ state = States.READ_LEN4;
+ break READ_HEADER;
+ case READ_LEN4:
+ break READ_HEADER;
+ }
+ }
+ }
+
+ // Wait until all frame bytes available
+ if (this.state == States.READ_LEN4) {
+ if (this.payloadLength <= inputStream.available()) {
+
+ // Reset for next frame
+ this.state = States.BETWEEN;
+ return true;
+ }
+ }
+
+ // As here, buffer underflow
+ return false;
+ }
+
+ public char getTag() {
+ return (char) this.tag;
+ }
+
+ public int getPayloadLength() {
+ return this.payloadLength;
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/BeFrame.java b/src/main/java/org/postgresql/sql2/communication/BeFrame.java
deleted file mode 100644
index bed7345..0000000
--- a/src/main/java/org/postgresql/sql2/communication/BeFrame.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.postgresql.sql2.communication;
-
-public class BeFrame {
- public enum BackendTag {
- AUTHENTICATION('R'),
- CANCELLATION_KEY_DATA('K'),
- BIND_COMPLETE('2'),
- CLOSE_COMPLETE('3'),
- COMMAND_COMPLETE('C'),
- COPY_DATA('d'),
- COPY_DONE('c'),
- COPY_IN_RESPONSE('G'),
- COPY_OUT_RESPONSE('H'),
- COPY_BOTH_RESPONSE('W'),
- DATA_ROW('D'),
- EMPTY_QUERY_RESPONSE('I'),
- ERROR_RESPONSE('E'),
- FUNCTION_CALL_RESPONSE('V'),
- NEGOTIATE_PROTOCOL_VERSION('v'),
- NO_DATA('n'),
- NOTICE_RESPONSE('N'),
- NOTIFICATION_RESPONSE('A'),
- PARAM_DESCRIPTION('t'),
- PARAM_STATUS('S'),
- PARSE_COMPLETE('1'),
- PORTAL_SUSPENDED('s'),
- READY_FOR_QUERY('Z'),
- ROW_DESCRIPTION('T');
-
- private char tag;
-
- BackendTag(char tag) {
- this.tag = tag;
- }
-
- /**
- * Find the BackendTag that corresponds to the supplied byte value.
- * @param input byte to search for
- * @return the corresponding BackendTag
- */
- public static BackendTag lookup(byte input) {
- for (BackendTag bt : values()) {
- if (input == bt.tag) {
- return bt;
- }
- }
- throw new IllegalArgumentException("There is no backend server tag that matches byte " + input);
- }
- }
-
- private BackendTag tag;
- private byte[] payload;
-
- public BeFrame(byte tag, byte[] payload) {
- this.tag = BackendTag.lookup(tag);
- this.payload = payload;
- }
-
- public BackendTag getTag() {
- return tag;
- }
-
- // TODO make this InputStream from PooledByteBuffer instances (avoids unnecessary copies)
- public byte[] getPayload() {
- return payload;
- }
-}
diff --git a/src/main/java/org/postgresql/sql2/communication/BeFrameParser.java b/src/main/java/org/postgresql/sql2/communication/BeFrameParser.java
deleted file mode 100644
index 56baac2..0000000
--- a/src/main/java/org/postgresql/sql2/communication/BeFrameParser.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.postgresql.sql2.communication;
-
-import java.nio.ByteBuffer;
-
-import org.postgresql.sql2.util.BinaryHelper;
-
-/**
- * Reads bytes from the stream from the server and produces packages on a stack.
- */
-public class BeFrameParser {
- private enum States {
- BETWEEN, READ_TAG, READ_LEN1, READ_LEN2, READ_LEN3, READ_LEN4
- }
-
- private States state = States.BETWEEN;
-
- private byte tag;
- private byte len1;
- private byte len2;
- private byte len3;
- private byte len4;
- private int payloadLength;
- private int payloadRead;
-
- // TODO wrap ByteBuffer instances in InputStream for the payload (save copy to
- // unnecessary array)
- private byte[] payload;
-
- private int consumedBytes = 0;
-
- public int getConsumedBytes() {
- return this.consumedBytes;
- }
-
- /**
- * Reads bytes from the readBuffer, starting at position and stopping when the first
- * packet ends or bytesRead bytes are consumed.
- *
- * @param readBuffer the buffer that contains the packets
- * @param position position to start to read at
- * @param bytesRead number of bytes that's available for reading
- * @return a BeFrame
- */
- public BeFrame parseBeFrame(ByteBuffer readBuffer, int position, int bytesRead) {
- this.consumedBytes = 0;
- for (int i = position; i < bytesRead; i++) {
- this.consumedBytes++;
- switch (state) {
- case BETWEEN:
- tag = readBuffer.get(i);
- state = States.READ_TAG;
- break;
- case READ_TAG:
- len1 = readBuffer.get(i);
- state = States.READ_LEN1;
- break;
- case READ_LEN1:
- len2 = readBuffer.get(i);
- state = States.READ_LEN2;
- break;
- case READ_LEN2:
- len3 = readBuffer.get(i);
- state = States.READ_LEN3;
- break;
- case READ_LEN3:
- len4 = readBuffer.get(i);
-
- payloadLength = BinaryHelper.readInt(len1, len2, len3, len4);
- payload = new byte[payloadLength - 4];
- payloadRead = 0;
- if (payloadLength - 4 == 0) { // no payload sent, so we short cut this here
- state = States.BETWEEN;
- return new BeFrame(tag, payload);
- } else {
- state = States.READ_LEN4;
- }
- break;
- case READ_LEN4:
- payload[payloadRead] = readBuffer.get(i);
- payloadRead++;
- if (payloadRead == payloadLength - 4) {
- state = States.BETWEEN;
-
- // Have data to process
- return new BeFrame(tag, payload);
- }
- break;
- default:
- throw new IllegalStateException("not all BeFrameParser.States implemented in switch");
- }
- }
-
- // As here, buffer underflow
- return null;
- }
-
-}
diff --git a/src/main/java/org/postgresql/sql2/communication/NetworkConnection.java b/src/main/java/org/postgresql/sql2/communication/NetworkConnection.java
index da25992..d796ea7 100644
--- a/src/main/java/org/postgresql/sql2/communication/NetworkConnection.java
+++ b/src/main/java/org/postgresql/sql2/communication/NetworkConnection.java
@@ -1,15 +1,5 @@
package org.postgresql.sql2.communication;
-import jdk.incubator.sql2.ConnectionProperty;
-import org.postgresql.sql2.PgConnection;
-import org.postgresql.sql2.buffer.ByteBufferPool;
-import org.postgresql.sql2.buffer.ByteBufferPoolOutputStream;
-import org.postgresql.sql2.buffer.PooledByteBuffer;
-import org.postgresql.sql2.communication.packets.ErrorPacket;
-import org.postgresql.sql2.execution.NioLoop;
-import org.postgresql.sql2.execution.NioService;
-import org.postgresql.sql2.execution.NioServiceContext;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
@@ -22,6 +12,17 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.postgresql.sql2.PgConnection;
+import org.postgresql.sql2.buffer.ByteBufferPool;
+import org.postgresql.sql2.buffer.ByteBufferPoolOutputStream;
+import org.postgresql.sql2.buffer.PooledByteBuffer;
+import org.postgresql.sql2.communication.packets.ErrorPacket;
+import org.postgresql.sql2.execution.NioLoop;
+import org.postgresql.sql2.execution.NioService;
+import org.postgresql.sql2.execution.NioServiceContext;
+
+import jdk.incubator.sql2.ConnectionProperty;
+
public class NetworkConnection implements NioService, NetworkConnectContext, NetworkWriteContext, NetworkReadContext {
private static ClosedChannelException CLOSE_EXCEPTION = new ClosedChannelException();
@@ -32,6 +33,10 @@ public class NetworkConnection implements NioService, NetworkConnectContext, Net
private final NioLoop loop;
+ private final NetworkInputStream inputStream = new NetworkInputStream();
+
+ private final ByteBufferPool bufferPool;
+
private final ByteBufferPoolOutputStream outputStream;
private final Queue priorityRequestQueue = new LinkedList<>();
@@ -40,9 +45,9 @@ public class NetworkConnection implements NioService, NetworkConnectContext, Net
private final Queue awaitingResponses = new LinkedList<>();
- private final BeFrameParser parser = new BeFrameParser();
+ private final BEFrameParser parser = new BEFrameParser();
- private final PreparedStatementCache preparedStatementCache = new PreparedStatementCache();
+ private final QueryFactory queryFactory = new QueryFactory();
private NetworkConnect connect = null;
@@ -53,17 +58,9 @@ public class NetworkConnection implements NioService, NetworkConnectContext, Net
/**
* Possible blocking {@link NetworkResponse}.
*/
- private NetworkResponse blockingResponse = new NetworkResponse() {
- @Override
- public NetworkResponse read(NetworkReadContext context) throws IOException {
- throw new IllegalStateException("Should not read until connected");
- }
+ private NetworkResponse blockingResponse=new NetworkResponse(){@Override public NetworkResponse read(NetworkReadContext context)throws IOException{throw new IllegalStateException("Should not read until connected");}
- @Override
- public NetworkResponse handleException(Throwable ex) {
- throw new IllegalStateException("Should not read until connected");
- }
- };
+ @Override public NetworkResponse handleException(Throwable ex){throw new IllegalStateException("Should not read until connected");}};
/**
* Instantiate.
@@ -78,6 +75,7 @@ public NetworkConnection(Map properties, PgConnectio
this.properties = properties;
this.connection = connection;
this.loop = loop;
+ this.bufferPool = bufferPool;
this.outputStream = new ByteBufferPoolOutputStream(bufferPool);
}
@@ -270,9 +268,9 @@ private void handleWrite(Queue requests) throws Exception {
}
/**
- * {@link BeFrame} for {@link NetworkReadContext}.
+ * Current {@link PooledByteBuffer} for reading data.
*/
- private BeFrame beFrame = null;
+ private PooledByteBuffer currentReadBuffer = null;
/**
* Allows {@link NetworkReadContext} to specify if write required.
@@ -303,8 +301,11 @@ private NetworkResponse getAwaitingResponse() {
@Override
public void handleRead() throws IOException {
- // TODO use pooled byte buffers
- ByteBuffer readBuffer = ByteBuffer.allocate(1024);
+ // Ensure have read buffer
+ if (this.currentReadBuffer == null) {
+ this.currentReadBuffer = this.bufferPool.getPooledByteBuffer();
+ this.currentReadBuffer.getByteBuffer().clear();
+ }
// Reset for reads
int bytesRead = -1;
@@ -312,16 +313,28 @@ public void handleRead() throws IOException {
try {
// Consume data on the socket
- while ((bytesRead = this.socketChannel.read(readBuffer)) > 0) {
+ int position = this.currentReadBuffer.getByteBuffer().position();
+ while ((bytesRead = this.socketChannel.read(this.currentReadBuffer.getByteBuffer().slice())) > 0) {
- // Setup for consuming parts
- readBuffer.flip();
- int position = 0;
+ // Update position (as slice required for direct buffers)
+ this.currentReadBuffer.getByteBuffer().position(this.currentReadBuffer.getByteBuffer().position() + bytesRead);
+
+ // Determine if filled the read buffer
+ boolean isFilled = this.currentReadBuffer.getByteBuffer().remaining() == 0;
+
+ // Add the buffer to input stream
+ this.inputStream.appendBuffer(this.currentReadBuffer, position, bytesRead, isFilled);
+
+ // Obtain new current buffer if filled
+ if (isFilled) {
+ this.currentReadBuffer = this.bufferPool.getPooledByteBuffer();
+ }
// Service the BE frames
- BeFrame frame;
- while ((frame = this.parser.parseBeFrame(readBuffer, position, bytesRead)) != null) {
- position += this.parser.getConsumedBytes();
+ while (this.parser.parseBEFrame(this.inputStream)) {
+
+ // Specify frame size
+ this.inputStream.setBytesToEndOfStream(this.parser.getPayloadLength());
// Obtain the awaiting response
NetworkResponse awaitingResponse = this.getAwaitingResponse();
@@ -329,20 +342,20 @@ public void handleRead() throws IOException {
// Ensure have awaiting response
if (awaitingResponse == null) {
throw new IllegalStateException(
- "No awaiting " + NetworkResponse.class.getSimpleName() + " for tag " + frame.getTag());
+ "No awaiting " + NetworkResponse.class.getSimpleName() + " for tag " + this.parser.getTag());
}
// Handle frame
- switch (frame.getTag()) {
- case ERROR_RESPONSE:
- // Handle error
- this.immediateResponse = awaitingResponse.handleException(new ErrorPacket(frame.getPayload()));
- break;
-
- default:
- // Provide frame to awaiting response
- this.beFrame = frame;
- this.immediateResponse = awaitingResponse.read(this);
+ switch (this.parser.getTag()) {
+
+ case BEFrameParser.ERROR_RESPONSE:
+ // Handle error
+ this.immediateResponse = awaitingResponse.handleException(new ErrorPacket(this));
+ break;
+
+ default:
+ // Provide frame to awaiting response
+ this.immediateResponse = awaitingResponse.read(this);
}
// Remove if blocking writing
@@ -352,10 +365,10 @@ public void handleRead() throws IOException {
// Flag to write (as very likely have writes)
this.isWriteRequired = true;
}
- }
- // Clear buffer for re-use
- readBuffer.clear();
+ // Clear frame size to parse next frame
+ this.inputStream.clearFrame();
+ }
}
} catch (NotYetConnectedException | ClosedChannelException ignore) {
ignore.printStackTrace();
@@ -414,8 +427,18 @@ public Map getProperties() {
*/
@Override
- public BeFrame getBeFrame() {
- return this.beFrame;
+ public char getFrameTag() {
+ return this.parser.getTag();
+ }
+
+ @Override
+ public int getPayloadLength() {
+ return this.parser.getPayloadLength();
+ }
+
+ @Override
+ public NetworkInputStream getPayload() {
+ return this.inputStream;
}
@Override
@@ -439,8 +462,8 @@ public NetworkOutputStream getOutputStream() {
}
@Override
- public PreparedStatementCache getPreparedStatementCache() {
- return this.preparedStatementCache;
+ public QueryFactory getQueryFactory() {
+ return this.queryFactory;
}
@Override
diff --git a/src/main/java/org/postgresql/sql2/communication/NetworkInputStream.java b/src/main/java/org/postgresql/sql2/communication/NetworkInputStream.java
new file mode 100644
index 0000000..5db78c8
--- /dev/null
+++ b/src/main/java/org/postgresql/sql2/communication/NetworkInputStream.java
@@ -0,0 +1,414 @@
+package org.postgresql.sql2.communication;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+
+import org.postgresql.sql2.buffer.PooledByteBuffer;
+import org.postgresql.sql2.communication.network.NetworkConnectRequest;
+
+/**
+ * Network {@link InputStream}.
+ *
+ * @author Daniel Sagenschneider
+ */
+public class NetworkInputStream extends InputStream {
+
+ /**
+ * Maximum length of bytes (and by this the maximum number of characters).
+ */
+ private static int maxStringLength = 4096;
+
+ /**
+ * Specifies the maximum byte length of any string.
+ *
+ * @param length Maximum byte length of any string.
+ */
+ public static void setStringLength(int length) {
+ maxStringLength = length;
+ }
+
+ /**
+ * {@link ThreadLocal} {@link CharBuffer} to re-use to reduce memory creation
+ * (and garbage collection).
+ */
+ private static final ThreadLocal threadLocalState = new ThreadLocal() {
+ @Override
+ protected ThreadLocalState initialValue() {
+ return new ThreadLocalState(maxStringLength);
+ }
+ };
+
+ /**
+ * {@link ThreadLocal} state.
+ */
+ private static class ThreadLocalState {
+
+ /**
+ * Re-useable {@link CharBuffer} for reduced memory.
+ */
+ private final CharBuffer charBuffer;
+
+ /**
+ * String decoder.
+ */
+ private final CharsetDecoder stringDecoder = Charset.forName(NetworkConnectRequest.CHARSET).newDecoder();
+
+ /**
+ * Instantiate.
+ *
+ * @param charBufferLength Length of the {@link CharBuffer}.
+ */
+ private ThreadLocalState(int charBufferLength) {
+ this.charBuffer = CharBuffer.allocate(charBufferLength);
+ }
+ }
+
+ /**
+ * Head {@link StreamSegment}.
+ */
+ private StreamSegment head = null;
+
+ /**
+ * Tail {@link StreamSegment}.
+ */
+ private StreamSegment tail = null;
+
+ /**
+ * Position within the head {@link StreamSegment}.
+ */
+ private int headPosition;
+
+ /**
+ * Number of bytes currently buffered.
+ */
+ private int bufferedByteCount = 0;
+
+ /**
+ * Position within the frame.
+ */
+ private int framePosition = 0;
+
+ /**
+ * Frame payload size.
+ */
+ private int framePayloadSize = -1;
+
+ /**
+ * Appends the {@link PooledByteBuffer}.
+ *
+ * @param buffer {@link PooledByteBuffer}.
+ */
+ public void appendBuffer(PooledByteBuffer buffer, int offset, int length, boolean isRelease) {
+ StreamSegment segment = new StreamSegment(buffer, offset, length, isRelease);
+ this.bufferedByteCount += length;
+ if (this.head == null) {
+ this.head = segment;
+ this.tail = segment;
+ } else {
+ this.tail.next = segment;
+ this.tail = segment;
+ }
+ }
+
+ /**
+ * Specifies the bytes to end of stream.
+ *
+ * @param byteCount Byte count to end of stream.
+ */
+ public void setBytesToEndOfStream(int byteCount) {
+ this.framePayloadSize = byteCount;
+ this.framePosition = 0;
+ }
+
+ /**
+ * Clear frame.
+ */
+ public void clearFrame() throws IOException {
+
+ // Consume remaining bytes of frame
+ for (int i = this.framePosition; i < this.framePayloadSize; i++) {
+ this.read(); // consume byte
+ }
+
+ // No frame active
+ this.framePayloadSize = -1;
+ }
+
+ /**
+ * Obtains the head {@link StreamSegment} remaining length.
+ *
+ * @return Head {@link StreamSegment} remaining length.
+ */
+ private int getHeadRemainingLength() {
+ return this.head.length - this.headPosition;
+ }
+
+ /*
+ * ================== InputStream ======================
+ */
+
+ @Override
+ public int read() throws IOException {
+ for (;;) {
+
+ // Determine if end of frame
+ if ((this.framePayloadSize != -1) && (this.framePosition >= this.framePayloadSize)) {
+ return -1; // end of frame
+ }
+
+ // Determine if end of stream
+ if (this.head == null) {
+ return -1;
+ }
+
+ // Obtain the next byte
+ if (this.headPosition < this.head.length) {
+ int index = this.head.offset + (this.headPosition++);
+ this.framePosition++;
+ return this.head.buffer.getByteBuffer().get(index);
+ }
+
+ // Completed the segment
+ PooledByteBuffer releaseBuffer = null;
+ if (this.head.isRelease) {
+ releaseBuffer = this.head.buffer;
+ }
+
+ // Move to next segment for reading
+ this.bufferedByteCount -= this.head.length;
+ this.head = this.head.next;
+ this.headPosition = 0;
+
+ // Release buffer after move (so next not corrupted)
+ if (releaseBuffer != null) {
+ releaseBuffer.release();
+ }
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (this.framePayloadSize == -1) {
+ // Return all available
+ return Math.max(0, (this.bufferedByteCount - this.headPosition));
+ } else {
+ // Return remaining for frame
+ return (this.framePayloadSize - this.framePosition);
+ }
+ }
+
+ /**
+ * Reads in an {@link Integer} value.
+ *
+ * @return {@link Integer} value or -1
if end of stream.
+ * @throws IOException If fails to read {@link Integer} value.
+ */
+ public int readInteger() throws IOException {
+
+ // Ensure only reading within frame
+ if (this.framePayloadSize == -1) {
+ throw new IOException("Attempting to read integer outside frame");
+ }
+
+ // Determine if can read directly
+ int segmentRemaining = this.getHeadRemainingLength();
+ if (segmentRemaining >= 4) {
+ int value = this.head.buffer.getByteBuffer().getInt(this.head.offset + this.headPosition);
+ this.headPosition += 4;
+ this.framePosition += 4;
+ return value;
+ }
+
+ // Read in the integer
+ int value = 0;
+ for (int i = 0; i < 4; i++) {
+
+ // Read the next byte
+ int nextByte = this.read();
+ if (nextByte == -1) {
+ return -1; // end of stream
+ }
+
+ // Obtain the value
+ value <<= 8;
+ value += nextByte;
+ }
+
+ // Return the value
+ return value;
+ }
+
+ /**
+ * Reads in an {@link Short} value.
+ *
+ * @return {@link Short} value or -1
if end of stream.
+ * @throws IOException If fails to read {@link Short} value.
+ */
+ public short readShort() throws IOException {
+
+ // Ensure only reading within frame
+ if (this.framePayloadSize == -1) {
+ throw new IOException("Attempting to read short outside frame");
+ }
+
+ // Determine if can read directly
+ int segmentRemaining = this.getHeadRemainingLength();
+ if (segmentRemaining >= 2) {
+ short value = this.head.buffer.getByteBuffer().getShort(this.head.offset + this.headPosition);
+ this.headPosition += 2;
+ this.framePosition += 2;
+ return value;
+ }
+
+ // Read in the short
+ short value = 0;
+ for (int i = 0; i < 2; i++) {
+
+ // Read the next byte
+ int nextByte = this.read();
+ if (nextByte == -1) {
+ return -1; // end of stream
+ }
+
+ // Obtain the value
+ value <<= 8;
+ value += nextByte;
+ }
+
+ // Return the value
+ return value;
+ }
+
+ /**
+ * Reads in a {@link String} value.
+ *
+ * @return {@link String} value or null
if end of stream.
+ * @throws IOException If fails to read {@link String} value.
+ */
+ public String readString() throws IOException {
+
+ // Ensure only reading within frame
+ if (this.framePayloadSize == -1) {
+ throw new IOException("Attempting to read string outside frame");
+ }
+
+ // Obtain the char buffer (ready for use)
+ ThreadLocalState state = threadLocalState.get();
+ state.charBuffer.clear();
+
+ // Decode the content into the char buffer
+ boolean isComplete = false;
+ while (!isComplete) {
+
+ // Determine number of bytes to read
+ int stringLength = this.getHeadRemainingLength();
+ int frameRemaining = this.framePayloadSize - this.framePosition;
+ isComplete = (frameRemaining <= stringLength);
+ boolean isSegmentConsumed = true;
+
+ // Scan through segment for terminating null
+ FOUND_TERMINATOR: for (int i = this.headPosition; i < (this.head.length); i++) {
+ if (this.head.buffer.getByteBuffer().get(this.head.offset + i) == 0) {
+ // Terminating string
+ stringLength = i - this.headPosition;
+ isComplete = true;
+ isSegmentConsumed = false;
+ break FOUND_TERMINATOR;
+ }
+ }
+
+ // Slice up buffer to content
+ ByteBuffer input = this.head.buffer.getByteBuffer().duplicate();
+ input.position(this.head.offset + this.headPosition);
+ input.limit(input.position() + stringLength);
+
+ // Decode the content into the char buffer
+ CoderResult result = state.stringDecoder.decode(input, state.charBuffer, isComplete);
+ if (result.isError()) {
+ throw new IOException("Failed to read string: " + result);
+ }
+
+ // Increment positions
+ stringLength++; // include terminating null byte
+ this.headPosition += stringLength;
+ this.framePosition += stringLength;
+
+ // Move to next segment (if not yet complete)
+ if (isSegmentConsumed) {
+
+ // Completed the segment
+ PooledByteBuffer releaseBuffer = null;
+ if (this.head.isRelease) {
+ releaseBuffer = this.head.buffer;
+ }
+
+ // Move to next segment for reading
+ this.bufferedByteCount -= this.head.length;
+ this.head = this.head.next;
+ this.headPosition = 0;
+
+ // Release buffer after move (so next not corrupted)
+ if (releaseBuffer != null) {
+ releaseBuffer.release();
+ }
+ }
+ }
+
+ // Flip to get content just decoded
+ state.charBuffer.flip();
+
+ // Return the string value
+ return state.charBuffer.toString();
+ }
+
+ /**
+ * Segment of the {@link ByteBuffer} for the sequence.
+ */
+ private class StreamSegment {
+
+ /**
+ * {@link PooledByteBuffer} for this segment.
+ */
+ private final PooledByteBuffer buffer;
+
+ /**
+ * Offset into the {@link ByteBuffer} for this segment.
+ */
+ private final int offset;
+
+ /**
+ * Length of data from the {@link ByteBuffer} for this segment.
+ */
+ private final int length;
+
+ /**
+ * Indicates if to release {@link ByteBuffer} once complete.
+ */
+ private final boolean isRelease;
+
+ /**
+ * Next {@link StreamSegment}.
+ */
+ private StreamSegment next = null;
+
+ /**
+ * Instantiate.
+ *
+ * @param buffer {@link PooledByteBuffer} for this segment.
+ * @param offset Offset into the {@link ByteBuffer} for this segment.
+ * @param length Length of data from the {@link ByteBuffer} for this segment.
+ */
+ private StreamSegment(PooledByteBuffer buffer, int offset, int length, boolean isRelease) {
+ this.buffer = buffer;
+ this.offset = offset;
+ this.length = length;
+ this.isRelease = isRelease;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/NetworkReadContext.java b/src/main/java/org/postgresql/sql2/communication/NetworkReadContext.java
index 8b211f7..63e16dd 100644
--- a/src/main/java/org/postgresql/sql2/communication/NetworkReadContext.java
+++ b/src/main/java/org/postgresql/sql2/communication/NetworkReadContext.java
@@ -10,18 +10,32 @@
public interface NetworkReadContext extends NetworkContext {
/**
- * Obtains the {@link BeFrame} just read.
+ * Obtains the frame tag.
*
- * @return {@link BeFrame} just read.
+ * @return Frame tag.
*/
- BeFrame getBeFrame();
+ char getFrameTag();
/**
- * Obtains the {@link PreparedStatementCache}.
+ * Obtains the payload length.
*
- * @return {@link PreparedStatementCache}.
+ * @return Payload length.
*/
- PreparedStatementCache getPreparedStatementCache();
+ int getPayloadLength();
+
+ /**
+ * Obtains the payload.
+ *
+ * @return Payload.
+ */
+ NetworkInputStream getPayload();
+
+ /**
+ * Obtains the {@link QueryFactory}.
+ *
+ * @return {@link QueryFactory}.
+ */
+ QueryFactory getQueryFactory();
/**
* Allows overriding {@link ConnectionProperty}.
diff --git a/src/main/java/org/postgresql/sql2/communication/NetworkWriteContext.java b/src/main/java/org/postgresql/sql2/communication/NetworkWriteContext.java
index cfeba26..50d733e 100644
--- a/src/main/java/org/postgresql/sql2/communication/NetworkWriteContext.java
+++ b/src/main/java/org/postgresql/sql2/communication/NetworkWriteContext.java
@@ -15,10 +15,10 @@ public interface NetworkWriteContext extends NetworkContext {
NetworkOutputStream getOutputStream();
/**
- * Obtains the {@link PreparedStatementCache}.
+ * Obtains the {@link QueryFactory}.
*
- * @return {@link PreparedStatementCache}.
+ * @return {@link QueryFactory}.
*/
- PreparedStatementCache getPreparedStatementCache();
+ QueryFactory getQueryFactory();
}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/PreparedStatementCache.java b/src/main/java/org/postgresql/sql2/communication/QueryFactory.java
similarity index 56%
rename from src/main/java/org/postgresql/sql2/communication/PreparedStatementCache.java
rename to src/main/java/org/postgresql/sql2/communication/QueryFactory.java
index 57e8d82..85ddf13 100644
--- a/src/main/java/org/postgresql/sql2/communication/PreparedStatementCache.java
+++ b/src/main/java/org/postgresql/sql2/communication/QueryFactory.java
@@ -4,15 +4,18 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import org.postgresql.sql2.PgSubmission;
import org.postgresql.sql2.communication.network.Query;
+import org.postgresql.sql2.communication.network.QueryReuse;
-public class PreparedStatementCache {
+public class QueryFactory {
/**
* As only used on networking thread, is thread safe.
*/
- private Map sqlToQuery = new HashMap<>();
+ private Map sqlToReuse = new HashMap<>();
/**
* Obtains the {@link Query} for the SQL.
@@ -21,13 +24,21 @@ public class PreparedStatementCache {
* @param params Parameters.
* @return {@link Query}.
*/
- public Query getQuery(String sql, List params) {
+ public Query createQuery(PgSubmission> submission) throws InterruptedException, ExecutionException {
+
+ // Obtain the details
+ String sql = submission.getSql();
+ List parameters = submission.getParamTypes();
if (sql == null) {
throw new IllegalArgumentException("No SQL provided");
}
- // Obtain or create the query
- return this.sqlToQuery.computeIfAbsent(new StatementKey(sql, params), key -> new Query());
+ // Obtain or create the query re-use
+ StatementKey key = new StatementKey(sql, parameters);
+ QueryReuse reuse = this.sqlToReuse.computeIfAbsent(key, (absentKey) -> new QueryReuse());
+
+ // Return the query
+ return new Query(submission, reuse);
}
private class StatementKey {
@@ -50,7 +61,7 @@ public boolean equals(Object o) {
}
StatementKey that = (StatementKey) o;
- return Objects.equals(sql, that.sql) && Objects.equals(params, that.params);
+ return Objects.equals(sql, that.sql) ; //&& Objects.equals(params, that.params);
}
@Override
@@ -59,4 +70,4 @@ public int hashCode() {
}
}
-}
+}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/TableCell.java b/src/main/java/org/postgresql/sql2/communication/TableCell.java
index 1c41f8f..b62f5a8 100644
--- a/src/main/java/org/postgresql/sql2/communication/TableCell.java
+++ b/src/main/java/org/postgresql/sql2/communication/TableCell.java
@@ -4,21 +4,10 @@
public class TableCell {
private byte[] bytes;
- private int start;
- private int stop;
private ColumnDescription columnDescription;
- /**
- * one cell in the returned result set.
- * @param bytes the payload
- * @param start where in the payload the information starts
- * @param stop where in the payload the information stops
- * @param columnDescription description of the data type for this cell
- */
- public TableCell(byte[] bytes, int start, int stop, ColumnDescription columnDescription) {
+ public TableCell(byte[] bytes, ColumnDescription columnDescription) {
this.bytes = bytes;
- this.start = start;
- this.stop = stop;
this.columnDescription = columnDescription;
}
@@ -26,14 +15,6 @@ public byte[] getBytes() {
return bytes;
}
- public int getStart() {
- return start;
- }
-
- public int getStop() {
- return stop;
- }
-
public ColumnDescription getColumnDescription() {
return columnDescription;
}
diff --git a/src/main/java/org/postgresql/sql2/communication/network/AbstractPortalResponse.java b/src/main/java/org/postgresql/sql2/communication/network/AbstractPortalResponse.java
deleted file mode 100644
index f2947cc..0000000
--- a/src/main/java/org/postgresql/sql2/communication/network/AbstractPortalResponse.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.postgresql.sql2.communication.network;
-
-import org.postgresql.sql2.communication.NetworkResponse;
-
-/**
- * Abstract {@link Portal} {@link NetworkResponse}.
- *
- * @author Daniel Sagenschneider
- */
-public abstract class AbstractPortalResponse implements NetworkResponse {
-
- /**
- * {@link Portal}.
- */
- protected final Portal portal;
-
- /**
- * Instantiate.
- *
- * @param portal {@link Portal}.
- */
- public AbstractPortalResponse(Portal portal) {
- this.portal = portal;
- }
-
- @Override
- public NetworkResponse handleException(Throwable ex) {
- portal.handleException(ex);
- return new ReadyForQueryResponse();
- }
-}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/network/AbstractQueryResponse.java b/src/main/java/org/postgresql/sql2/communication/network/AbstractQueryResponse.java
new file mode 100644
index 0000000..d378a9b
--- /dev/null
+++ b/src/main/java/org/postgresql/sql2/communication/network/AbstractQueryResponse.java
@@ -0,0 +1,72 @@
+package org.postgresql.sql2.communication.network;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.postgresql.sql2.PgSubmission;
+import org.postgresql.sql2.communication.NetworkResponse;
+import org.postgresql.sql2.communication.packets.ErrorPacket;
+import org.postgresql.sql2.communication.packets.parts.ErrorResponseField;
+
+import jdk.incubator.sql2.SqlException;
+
+/**
+ * Abstract {@link PgSubmission} {@link NetworkResponse}.
+ *
+ * @author Daniel Sagenschneider
+ */
+public abstract class AbstractQueryResponse implements NetworkResponse {
+
+ /**
+ * Handle exception that occurred.
+ *
+ * @param submission the submission that was active when the exception happened
+ * @param ex the exception
+ */
+ public static void doHandleException(PgSubmission> submission, Throwable ex) {
+ if (ex instanceof ErrorPacket) {
+ ErrorPacket e = (ErrorPacket) ex;
+ int code = 0;
+ if (e.getField(ErrorResponseField.Types.SQLSTATE_CODE) != null) {
+ try {
+ code = Integer.parseInt(e.getField(ErrorResponseField.Types.SQLSTATE_CODE));
+ } catch (NumberFormatException ignore) {
+ // ignored for now
+ }
+ }
+ int position = 0;
+ if (e.getField(ErrorResponseField.Types.POSITION) != null) {
+ position = Integer.parseInt(e.getField(ErrorResponseField.Types.POSITION));
+ }
+ ex = new SqlException(e.getMessage(), e, e.getField(ErrorResponseField.Types.SEVERITY), code, null, position);
+ }
+ if (!(ex instanceof SqlException)) {
+ ex = new SqlException(ex.getMessage(), ex, null, 0, null, 0);
+ }
+ Consumer errorHandler = submission.getErrorHandler();
+ if (errorHandler != null) {
+ errorHandler.accept(ex);
+ }
+ ((CompletableFuture) submission.getCompletionStage()).completeExceptionally(ex);
+ }
+
+ /**
+ * {@link Query}.
+ */
+ protected final Query query;
+
+ /**
+ * Instantiate.
+ *
+ * @param query {@link Query}.
+ */
+ public AbstractQueryResponse(Query query) {
+ this.query = query;
+ }
+
+ @Override
+ public NetworkResponse handleException(Throwable ex) {
+ doHandleException(this.query.getSubmission(), ex);
+ return new ReadyForQueryResponse();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/network/AuthenticationResponse.java b/src/main/java/org/postgresql/sql2/communication/network/AuthenticationResponse.java
index 7562eea..8804fc1 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/AuthenticationResponse.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/AuthenticationResponse.java
@@ -1,17 +1,17 @@
package org.postgresql.sql2.communication.network;
+import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
import org.postgresql.sql2.PgConnectionProperty;
-import org.postgresql.sql2.communication.BeFrame;
+import org.postgresql.sql2.communication.BEFrameParser;
import org.postgresql.sql2.communication.NetworkReadContext;
import org.postgresql.sql2.communication.NetworkResponse;
import org.postgresql.sql2.communication.packets.AuthenticationRequest;
import org.postgresql.sql2.communication.packets.ParameterStatus;
import org.postgresql.sql2.submissions.ConnectSubmission;
-import java.io.IOException;
-import java.util.function.Consumer;
-
/**
* Authentication success {@link NetworkResponse}.
*
@@ -28,38 +28,39 @@ public AuthenticationResponse(ConnectSubmission connectSubmission) {
@Override
public NetworkResponse read(NetworkReadContext context) throws IOException {
// Expecting authentication challenge
- BeFrame frame = context.getBeFrame();
- switch (frame.getTag()) {
-
- case AUTHENTICATION:
- AuthenticationRequest authentication = new AuthenticationRequest(frame.getPayload());
- switch (authentication.getType()) {
-
- case SUCCESS:
- // Connected, so trigger any waiting submissions
- this.connectSubmission.finish(null);
- return this;
-
- default:
- throw new IllegalStateException("Unhandled authentication " + authentication.getType());
- }
-
- case PARAM_STATUS:
- // Load parameters for connection
- ParameterStatus paramStatus = new ParameterStatus(frame.getPayload());
- context.setProperty(PgConnectionProperty.lookup(paramStatus.getName()), paramStatus.getValue());
- return this;
+ switch (context.getFrameTag()) {
- case CANCELLATION_KEY_DATA:
- // TODO handle cancellation key
- return this;
+ case BEFrameParser.AUTHENTICATION:
+ AuthenticationRequest authentication = new AuthenticationRequest(context);
+ switch (authentication.getType()) {
- case READY_FOR_QUERY:
- return null;
+ case SUCCESS:
+ // Connected, so trigger any waiting submissions
+ this.connectSubmission.finish(null);
+ return this;
default:
- throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName());
+ throw new IllegalStateException("Unhandled authentication " + authentication.getType());
+ }
+
+ case BEFrameParser.PARAM_STATUS:
+ // Load parameters for connection
+ ParameterStatus paramStatus = new ParameterStatus(context);
+ context.setProperty(PgConnectionProperty.lookup(paramStatus.getName()), paramStatus.getValue());
+ return this;
+
+ case BEFrameParser.CANCELLATION_KEY_DATA:
+ // TODO handle cancellation key
+ return this;
+
+ case BEFrameParser.READY_FOR_QUERY:
+ return null;
+
+ default:
+ throw new IllegalStateException(
+ "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName());
}
+
}
@Override
@@ -68,7 +69,7 @@ public NetworkResponse handleException(Throwable ex) {
if (errorHandler != null) {
this.connectSubmission.getErrorHandler().accept(ex);
}
- ((CompletableFuture)connectSubmission.getCompletionStage()).completeExceptionally(ex);
+ ((CompletableFuture) connectSubmission.getCompletionStage()).completeExceptionally(ex);
return null;
}
diff --git a/src/main/java/org/postgresql/sql2/communication/network/BindRequest.java b/src/main/java/org/postgresql/sql2/communication/network/BindRequest.java
index 106797c..412b30d 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/BindRequest.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/BindRequest.java
@@ -16,10 +16,10 @@
*/
public class BindRequest implements NetworkRequest {
- private final Portal portal;
+ private final Query query;
- public BindRequest(Portal portal) {
- this.portal = portal;
+ public BindRequest(Query query) {
+ this.query = query;
}
/*
@@ -30,17 +30,14 @@ public BindRequest(Portal portal) {
public NetworkRequest write(NetworkWriteContext context) throws Exception {
// Obtain the query details
- String portalName = this.portal.getPortalName();
- String queryName = this.portal.getQuery().getQueryName();
- String sql = this.portal.getSql();
- ParameterHolder holder = this.portal.getParameterHolder();
+ ParameterHolder holder = this.query.getSubmission().getHolder();
// Write the packet
NetworkOutputStream wire = context.getOutputStream();
wire.write(FeFrame.FrontendTag.BIND.getByte());
wire.initPacket();
- wire.write(this.portal.getPortalName());
- wire.write(this.portal.getQuery().getQueryName());
+ wire.write(this.query.getQueryName());
+ wire.write(this.query.getReuse().getPortalNameOrUnnamed());
wire.write(BinaryHelper.writeShort(holder.size()));
for (QueryParameter qp : holder.parameters()) {
wire.write(BinaryHelper.writeShort(qp.getParameterFormatCode()));
@@ -61,12 +58,12 @@ public NetworkRequest write(NetworkWriteContext context) throws Exception {
wire.completePacket();
// Next step to execute
- return new ExecuteRequest<>(this.portal);
+ return new ExecuteRequest<>(this.query);
}
@Override
public NetworkResponse getRequiredResponse() {
- return new BindResponse(this.portal);
+ return new BindResponse(this.query);
}
}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/network/BindResponse.java b/src/main/java/org/postgresql/sql2/communication/network/BindResponse.java
index f0a459a..0e14f30 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/BindResponse.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/BindResponse.java
@@ -2,7 +2,7 @@
import java.io.IOException;
-import org.postgresql.sql2.communication.BeFrame;
+import org.postgresql.sql2.communication.BEFrameParser;
import org.postgresql.sql2.communication.NetworkReadContext;
import org.postgresql.sql2.communication.NetworkResponse;
@@ -11,22 +11,22 @@
*
* @author Daniel Sagenschneider
*/
-public class BindResponse extends AbstractPortalResponse {
+public class BindResponse extends AbstractQueryResponse {
- public BindResponse(Portal portal) {
- super(portal);
+ public BindResponse(Query query) {
+ super(query);
}
@Override
public NetworkResponse read(NetworkReadContext context) throws IOException {
- BeFrame frame = context.getBeFrame();
- switch (frame.getTag()) {
+ switch (context.getFrameTag()) {
- case BIND_COMPLETE:
- return null; // Nothing further
+ case BEFrameParser.BIND_COMPLETE:
+ return null; // Nothing further
- default:
- throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName());
+ default:
+ throw new IllegalStateException(
+ "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName());
}
}
diff --git a/src/main/java/org/postgresql/sql2/communication/network/DescribeRequest.java b/src/main/java/org/postgresql/sql2/communication/network/DescribeRequest.java
index 5d6e0b9..b8caaf1 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/DescribeRequest.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/DescribeRequest.java
@@ -13,15 +13,15 @@
*/
public class DescribeRequest implements NetworkRequest {
- private final Portal portal;
+ private final Query query;
/**
* Instantiate.
*
- * @param portal the portal this request connects to
+ * @param query {@link Query}.
*/
- public DescribeRequest(Portal portal) {
- this.portal = portal;
+ public DescribeRequest(Query query) {
+ this.query = query;
}
/*
@@ -31,21 +31,37 @@ public DescribeRequest(Portal portal) {
@Override
public NetworkRequest write(NetworkWriteContext context) throws Exception {
- // Send describe packet
- NetworkOutputStream wire = context.getOutputStream();
- wire.write(FeFrame.FrontendTag.DESCRIBE.getByte());
- wire.initPacket();
- wire.write('S');
- wire.write(this.portal.getQuery().getQueryName());
- wire.completePacket();
+ // Obtain the reuse
+ QueryReuse reuse = this.query.getReuse();
+
+ // Determine if describe query
+ if ((reuse.getRowDescription() == null) && (!reuse.isWaitingDescribe())) {
+
+ // Send describe packet
+ NetworkOutputStream wire = context.getOutputStream();
+ wire.write(FeFrame.FrontendTag.DESCRIBE.getByte());
+ wire.initPacket();
+ wire.write('S');
+ wire.write(this.query.getReuse().getPortalNameOrUnnamed());
+ wire.completePacket();
+ }
// Next step to bind
- return new BindRequest<>(this.portal);
+ return new BindRequest<>(this.query);
}
@Override
public NetworkResponse getRequiredResponse() {
- return new DescribeResponse(this.portal);
+
+ // Determine if waiting on describe
+ QueryReuse reuse = this.query.getReuse();
+ if (!reuse.isWaitingDescribe()) {
+ reuse.flagWaitingDescribe();
+ return new DescribeResponse(this.query);
+ }
+
+ // Already waiting on describe
+ return null;
}
}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/network/DescribeResponse.java b/src/main/java/org/postgresql/sql2/communication/network/DescribeResponse.java
index e3b8db8..6f1c71d 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/DescribeResponse.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/DescribeResponse.java
@@ -1,41 +1,42 @@
package org.postgresql.sql2.communication.network;
-import org.postgresql.sql2.communication.BeFrame;
+import java.io.IOException;
+
+import org.postgresql.sql2.communication.BEFrameParser;
import org.postgresql.sql2.communication.NetworkReadContext;
import org.postgresql.sql2.communication.NetworkResponse;
import org.postgresql.sql2.communication.packets.RowDescription;
-import java.io.IOException;
-
/**
* Describe {@link NetworkResponse}.
*
* @author Daniel Sagenschneider
*/
-public class DescribeResponse extends AbstractPortalResponse {
+public class DescribeResponse extends AbstractQueryResponse {
- public DescribeResponse(Portal portal) {
- super(portal);
+ public DescribeResponse(Query query) {
+ super(query);
}
@Override
public NetworkResponse read(NetworkReadContext context) throws IOException {
- BeFrame frame = context.getBeFrame();
- switch (frame.getTag()) {
+ switch (context.getFrameTag()) {
- case NO_DATA:
- return null;
+ case BEFrameParser.NO_DATA:
+ return null;
- case PARAM_DESCRIPTION:
- return this; // wait on row description
+ case BEFrameParser.PARAM_DESCRIPTION:
+ return this; // wait on row description
- case ROW_DESCRIPTION:
- RowDescription rowDescription = new RowDescription(frame.getPayload());
- this.portal.getQuery().setRowDescription(rowDescription);
- return null; // nothing further
+ case BEFrameParser.ROW_DESCRIPTION:
+ RowDescription rowDescription = new RowDescription(context.getPayload());
+ QueryReuse reuse = this.query.getReuse();
+ reuse.setRowDescription(rowDescription);
+ return null; // nothing further
- default:
- throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName());
+ default:
+ throw new IllegalStateException(
+ "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName());
}
}
diff --git a/src/main/java/org/postgresql/sql2/communication/network/ExecuteRequest.java b/src/main/java/org/postgresql/sql2/communication/network/ExecuteRequest.java
index e7c19f6..7b95fd0 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/ExecuteRequest.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/ExecuteRequest.java
@@ -14,10 +14,10 @@
*/
public class ExecuteRequest implements NetworkRequest {
- private final Portal portal;
+ private final Query query;
- public ExecuteRequest(Portal portal) {
- this.portal = portal;
+ public ExecuteRequest(Query query) {
+ this.query = query;
}
/*
@@ -31,17 +31,16 @@ public NetworkRequest write(NetworkWriteContext context) throws Exception {
NetworkOutputStream wire = context.getOutputStream();
wire.write(FeFrame.FrontendTag.EXECUTE.getByte());
wire.initPacket();
- wire.write(this.portal.getPortalName());
+ wire.write(this.query.getQueryName());
wire.write(BinaryHelper.writeInt(0)); // number of rows to return, 0 == all
wire.completePacket();
- // TODO Auto-generated method stub
- return new SyncRequest(portal);
+ return new SyncRequest(this.query);
}
@Override
public NetworkResponse getRequiredResponse() {
- return new ExecuteResponse(this.portal);
+ return new ExecuteResponse(this.query);
}
}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/network/ExecuteResponse.java b/src/main/java/org/postgresql/sql2/communication/network/ExecuteResponse.java
index 9ed24de..d56d339 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/ExecuteResponse.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/ExecuteResponse.java
@@ -2,7 +2,7 @@
import java.io.IOException;
-import org.postgresql.sql2.communication.BeFrame;
+import org.postgresql.sql2.communication.BEFrameParser;
import org.postgresql.sql2.communication.NetworkReadContext;
import org.postgresql.sql2.communication.NetworkResponse;
import org.postgresql.sql2.communication.packets.CommandComplete;
@@ -13,33 +13,33 @@
*
* @author Daniel Sagenschneider
*/
-public class ExecuteResponse extends AbstractPortalResponse {
+public class ExecuteResponse extends AbstractQueryResponse {
- public ExecuteResponse(Portal portal) {
- super(portal);
+ public ExecuteResponse(Query query) {
+ super(query);
}
@Override
public NetworkResponse read(NetworkReadContext context) throws IOException {
- BeFrame frame = context.getBeFrame();
- switch (frame.getTag()) {
+ switch (context.getFrameTag()) {
- case DATA_ROW:
- DataRow dataRow = new DataRow(frame.getPayload(), this.portal.getQuery().getRowDescription().getDescriptions(),
- this.portal.nextRowNumber());
- this.portal.addDataRow(dataRow);
- return this;
+ case BEFrameParser.DATA_ROW:
+ DataRow dataRow = new DataRow(context, this.query.getReuse().getRowDescription().getDescriptions(),
+ this.query.nextRowNumber());
+ this.query.addDataRow(dataRow);
+ return this;
- case COMMAND_COMPLETE:
- CommandComplete complete = new CommandComplete(frame.getPayload());
- this.portal.commandComplete(complete, context.getSocketChannel());
- return this;
+ case BEFrameParser.COMMAND_COMPLETE:
+ CommandComplete complete = new CommandComplete(context);
+ this.query.commandComplete(complete, context.getSocketChannel());
+ return this;
- case READY_FOR_QUERY:
- return null;
+ case BEFrameParser.READY_FOR_QUERY:
+ return null;
- default:
- throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName());
+ default:
+ throw new IllegalStateException(
+ "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName());
}
}
diff --git a/src/main/java/org/postgresql/sql2/communication/network/NetworkConnectRequest.java b/src/main/java/org/postgresql/sql2/communication/network/NetworkConnectRequest.java
index a60d778..63e9451 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/NetworkConnectRequest.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/NetworkConnectRequest.java
@@ -1,9 +1,11 @@
package org.postgresql.sql2.communication.network;
-import jdk.incubator.sql2.AdbaConnectionProperty;
-import jdk.incubator.sql2.ConnectionProperty;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
import org.postgresql.sql2.PgConnectionProperty;
-import org.postgresql.sql2.communication.BeFrame;
+import org.postgresql.sql2.communication.BEFrameParser;
import org.postgresql.sql2.communication.NetworkConnect;
import org.postgresql.sql2.communication.NetworkConnectContext;
import org.postgresql.sql2.communication.NetworkOutputStream;
@@ -15,9 +17,8 @@
import org.postgresql.sql2.submissions.ConnectSubmission;
import org.postgresql.sql2.util.BinaryHelper;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
+import jdk.incubator.sql2.AdbaConnectionProperty;
+import jdk.incubator.sql2.ConnectionProperty;
/**
* Connect {@link NetworkRequest}.
@@ -26,6 +27,8 @@
*/
public class NetworkConnectRequest implements NetworkConnect, NetworkRequest, NetworkResponse {
+ public static final String CHARSET = "UTF8";
+
/**
* {@link ConnectSubmission}.
*/
@@ -81,7 +84,7 @@ public NetworkRequest write(NetworkWriteContext context) throws IOException {
wire.write("application_name");
wire.write("java_sql2_client");
wire.write("client_encoding");
- wire.write("UTF8");
+ wire.write(CHARSET);
wire.writeTerminator();
wire.completePacket();
@@ -103,35 +106,35 @@ public NetworkResponse getRequiredResponse() {
public NetworkResponse read(NetworkReadContext context) throws IOException {
// Expecting authentication challenge
- BeFrame frame = context.getBeFrame();
- switch (frame.getTag()) {
-
- case AUTHENTICATION:
- AuthenticationRequest authentication = new AuthenticationRequest(frame.getPayload());
- switch (authentication.getType()) {
+ switch (context.getFrameTag()) {
- case MD5:
- // Password authentication required
- context.write(new PasswordRequest(authentication, this.connectSubmission));
- return null;
+ case BEFrameParser.AUTHENTICATION:
+ AuthenticationRequest authentication = new AuthenticationRequest(context);
+ switch (authentication.getType()) {
- case SUCCESS:
- // Connected, so trigger any waiting submissions
- context.writeRequired();
- return new AuthenticationResponse(this.connectSubmission);
+ case MD5:
+ // Password authentication required
+ context.write(new PasswordRequest(authentication, this.connectSubmission));
+ return null;
- default:
- throw new IllegalStateException("Unhandled authentication " + authentication.getType());
- }
+ case SUCCESS:
+ // Connected, so trigger any waiting submissions
+ context.writeRequired();
+ return new AuthenticationResponse(this.connectSubmission);
default:
- throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName());
+ throw new IllegalStateException("Unhandled authentication " + authentication.getType());
+ }
+
+ default:
+ throw new IllegalStateException(
+ "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName());
}
}
@Override
public NetworkResponse handleException(Throwable ex) {
- Portal.doHandleException(this.connectSubmission, ex);
+ AbstractQueryResponse.doHandleException(this.connectSubmission, ex);
return null;
}
diff --git a/src/main/java/org/postgresql/sql2/communication/network/ParseRequest.java b/src/main/java/org/postgresql/sql2/communication/network/ParseRequest.java
index 9624d8a..2f04d08 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/ParseRequest.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/ParseRequest.java
@@ -1,11 +1,12 @@
package org.postgresql.sql2.communication.network;
+import org.postgresql.sql2.PgSubmission;
import org.postgresql.sql2.communication.FeFrame;
import org.postgresql.sql2.communication.NetworkOutputStream;
import org.postgresql.sql2.communication.NetworkRequest;
import org.postgresql.sql2.communication.NetworkResponse;
import org.postgresql.sql2.communication.NetworkWriteContext;
-import org.postgresql.sql2.communication.PreparedStatementCache;
+import org.postgresql.sql2.communication.QueryFactory;
import org.postgresql.sql2.operations.helpers.ParameterHolder;
import org.postgresql.sql2.operations.helpers.QueryParameter;
import org.postgresql.sql2.util.BinaryHelper;
@@ -17,10 +18,12 @@
*/
public class ParseRequest implements NetworkRequest {
- private final Portal portal;
+ private final PgSubmission submission;
- public ParseRequest(Portal portal) {
- this.portal = portal;
+ private Query query = null;
+
+ public ParseRequest(PgSubmission submission) {
+ this.submission = submission;
}
/*
@@ -30,55 +33,48 @@ public ParseRequest(Portal portal) {
@Override
public NetworkRequest write(NetworkWriteContext context) throws Exception {
- // Determine if already query
- Query query = portal.getQuery();
- if (query == null) {
-
- // Obtain the prepared statement cache
- PreparedStatementCache cache = context.getPreparedStatementCache();
+ // Obtain the query
+ this.query = context.getQueryFactory().createQuery(this.submission);
- // Obtain the query
- String sql = this.portal.getSql();
- ParameterHolder holder = this.portal.getParameterHolder();
- query = cache.getQuery(sql, holder.getParamTypes());
+ // Obtain the details of query
+ String sql = this.query.getSubmission().getSql();
+ ParameterHolder parameters = this.query.getSubmission().getHolder();
- // Associate query to portal
- this.portal.setQuery(query);
+ // Determine if simple query (not yet executed enough)
+ QueryReuse reuse = this.query.getReuse();
+ if (reuse.isSimpleQuery()) {
+ // TODO send simple query
+ throw new UnsupportedOperationException("TODO implement simple query");
}
// Determine if prepare query
- if ((!query.isParsed()) && (!query.isWaitingParse())) {
-
- // Obtain the query details
- String sql = this.portal.getSql();
- ParameterHolder holder = this.portal.getParameterHolder();
+ if ((!reuse.isParsed()) && (!reuse.isWaitingParse())) {
// Send the prepare packet
NetworkOutputStream wire = context.getOutputStream();
wire.write(FeFrame.FrontendTag.PARSE.getByte());
wire.initPacket();
- wire.write(query.getQueryName());
+ wire.write(this.query.getReuse().getPortalNameOrUnnamed());
wire.write(sql);
- wire.write(BinaryHelper.writeShort(holder.size()));
- for (QueryParameter qp : holder.parameters()) {
+ wire.write(BinaryHelper.writeShort(parameters.size()));
+ for (QueryParameter qp : parameters.parameters()) {
wire.write(BinaryHelper.writeInt(qp.getOid()));
}
wire.completePacket();
}
// Determine if describe or bind
- return new DescribeRequest<>(this.portal);
-
+ return new DescribeRequest<>(this.query);
}
@Override
public NetworkResponse getRequiredResponse() {
- Query query = this.portal.getQuery();
// Determine if waiting on parse
- if (!query.isWaitingParse()) {
- query.flagWaitingParse();
- return new ParseResponse(this.portal);
+ QueryReuse reuse = this.query.getReuse();
+ if (!reuse.isWaitingParse()) {
+ reuse.flagWaitingParse();
+ return new ParseResponse(this.query);
}
// Already waiting on parse
diff --git a/src/main/java/org/postgresql/sql2/communication/network/ParseResponse.java b/src/main/java/org/postgresql/sql2/communication/network/ParseResponse.java
index 1a9b019..007e2ad 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/ParseResponse.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/ParseResponse.java
@@ -2,7 +2,7 @@
import java.io.IOException;
-import org.postgresql.sql2.communication.BeFrame;
+import org.postgresql.sql2.communication.BEFrameParser;
import org.postgresql.sql2.communication.NetworkReadContext;
import org.postgresql.sql2.communication.NetworkResponse;
@@ -11,23 +11,23 @@
*
* @author Daniel Sagenschneider
*/
-public class ParseResponse extends AbstractPortalResponse {
+public class ParseResponse extends AbstractQueryResponse {
- public ParseResponse(Portal portal) {
- super(portal);
+ public ParseResponse(Query query) {
+ super(query);
}
@Override
public NetworkResponse read(NetworkReadContext context) throws IOException {
- BeFrame frame = context.getBeFrame();
- switch (frame.getTag()) {
+ switch (context.getFrameTag()) {
- case PARSE_COMPLETE:
- this.portal.getQuery().flagParsed();
- return null; // nothing further
+ case BEFrameParser.PARSE_COMPLETE:
+ this.query.getReuse().flagParsed();
+ return null; // nothing further
- default:
- throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName());
+ default:
+ throw new IllegalStateException(
+ "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName());
}
}
diff --git a/src/main/java/org/postgresql/sql2/communication/network/Portal.java b/src/main/java/org/postgresql/sql2/communication/network/Portal.java
index d40353d..8623f20 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/Portal.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/Portal.java
@@ -1,21 +1,6 @@
package org.postgresql.sql2.communication.network;
-import static org.postgresql.sql2.PgSubmission.Types.ARRAY_COUNT;
-
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-import jdk.incubator.sql2.SqlException;
-import org.postgresql.sql2.PgSubmission;
-import org.postgresql.sql2.communication.packets.CommandComplete;
-import org.postgresql.sql2.communication.packets.DataRow;
-import org.postgresql.sql2.communication.packets.ErrorPacket;
-import org.postgresql.sql2.communication.packets.parts.ErrorResponseField;
-import org.postgresql.sql2.operations.helpers.ParameterHolder;
-import org.postgresql.sql2.submissions.ArrayCountSubmission;
-import org.postgresql.sql2.util.PgCount;
/**
* Portal.
@@ -24,78 +9,15 @@
*/
public class Portal {
- /**
- * Handle exception that occurred.
- *
- * @param submission the submission that was active when the exception happened
- * @param ex the exception
- */
- public static void doHandleException(PgSubmission> submission, Throwable ex) {
- if (ex instanceof ErrorPacket) {
- ErrorPacket e = (ErrorPacket)ex;
- int code = 0;
- if (e.getField(ErrorResponseField.Types.SQLSTATE_CODE) != null) {
- try {
- code = Integer.parseInt(e.getField(ErrorResponseField.Types.SQLSTATE_CODE));
- } catch (NumberFormatException ignore) {
- // ignored for now
- }
- }
- int position = 0;
- if (e.getField(ErrorResponseField.Types.POSITION) != null) {
- position = Integer.parseInt(e.getField(ErrorResponseField.Types.POSITION));
- }
- ex = new SqlException(e.getMessage(), e, e.getField(ErrorResponseField.Types.SEVERITY), code, null, position);
- }
- if (!(ex instanceof SqlException)) {
- ex = new SqlException(ex.getMessage(), ex, null, 0, null, 0);
- }
- Consumer errorHandler = submission.getErrorHandler();
- if (errorHandler != null) {
- errorHandler.accept(ex);
- }
- ((CompletableFuture) submission.getCompletionStage()).completeExceptionally(ex);
- }
-
private static AtomicLong nameIndex = new AtomicLong(0);
- private final PgSubmission> submission;
-
- private String name;
-
- private long nextRowNumber = 0;
-
- /**
- * Thread safe as only accessed via network thread.
- */
- private Query query = null;
+ private final String name;
/**
* Instantiate.
- *
- * @param submission {@link PgSubmission}.
*/
- public Portal(PgSubmission> submission) {
+ public Portal() {
this.name = "p" + nameIndex.incrementAndGet();
- this.submission = submission;
- }
-
- /**
- * Obtains the SQL.
- *
- * @return SQL.
- */
- public String getSql() {
- return this.submission.getSql();
- }
-
- /**
- * Obtains the {@link ParameterHolder}.
- *
- * @return {@link ParameterHolder}.
- */
- public ParameterHolder getParameterHolder() {
- return this.submission.getHolder();
}
/**
@@ -107,105 +29,4 @@ public String getPortalName() {
return this.name;
}
- /**
- * Handles the {@link Throwable}.
- *
- * @param ex {@link Throwable}.
- */
- public void handleException(Throwable ex) {
- doHandleException(this.submission, ex);
- }
-
- /**
- * Obtains the possibly associated {@link Query}.
- *
- * @return {@link Query}. May be null
.
- */
- Query getQuery() {
- return this.query;
- }
-
- /**
- * Specifies the {@link Query}.
- *
- * @param query {@link Query}.
- */
- void setQuery(Query query) {
- this.query = query;
- }
-
- /**
- * Obtains the next row number.
- *
- * @return Next row number.
- */
- long nextRowNumber() {
- return this.nextRowNumber++;
- }
-
- /**
- * Adds a data row.
- *
- * @param dataRow {@link DataRow}.
- */
- void addDataRow(DataRow dataRow) {
- this.submission.addRow(dataRow);
- }
-
- /**
- * Flags the command is complete.
- *
- * @param complete Command is complete.
- * @param socketChannel {@link SocketChannel}.
- */
- void commandComplete(CommandComplete complete, SocketChannel socketChannel) {
- try {
- switch (submission.getCompletionType()) {
- case COUNT:
- submission.finish(new PgCount(complete.getNumberOfRowsAffected()));
- break;
- case ROW:
- submission.finish(null);
- break;
- case CLOSE:
- submission.finish(socketChannel);
- break;
- case TRANSACTION:
- submission.finish(complete.getType());
- break;
- case ARRAY_COUNT:
- submission.finish(complete.getNumberOfRowsAffected());
- break;
- case VOID:
- ((CompletableFuture) submission.getCompletionStage()).complete(null);
- break;
- case PROCESSOR:
- submission.finish(null);
- break;
- case OUT_PARAMETER:
- submission.finish(null);
- break;
- default:
- throw new IllegalStateException("Invalid completion type '" + submission.getCompletionType() + "' for "
- + this.getClass().getSimpleName());
- }
- } catch (Throwable t) {
- ((CompletableFuture>)submission.getCompletionStage()).completeExceptionally(t);
- }
- }
-
- /**
- * Some submission types needs multiple rounds of queries before the operation is finished. This function
- * returns true if more is needed.
- *
- * @return true if another query should be sent to the database
- * @throws ExecutionException if the bound variables are a future that fails
- * @throws InterruptedException if the bound variables are a future that fails
- */
- public boolean hasMoreToExecute() throws ExecutionException, InterruptedException {
- if (submission.getCompletionType() == ARRAY_COUNT) {
- return ((ArrayCountSubmission)submission).hasMoreToExecute();
- }
- return false;
- }
}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/network/Query.java b/src/main/java/org/postgresql/sql2/communication/network/Query.java
index 1e55623..c7cb367 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/Query.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/Query.java
@@ -1,8 +1,17 @@
package org.postgresql.sql2.communication.network;
+import static org.postgresql.sql2.PgSubmission.Types.ARRAY_COUNT;
+
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
-import org.postgresql.sql2.communication.packets.RowDescription;
+import org.postgresql.sql2.PgSubmission;
+import org.postgresql.sql2.communication.packets.CommandComplete;
+import org.postgresql.sql2.communication.packets.DataRow;
+import org.postgresql.sql2.submissions.ArrayCountSubmission;
+import org.postgresql.sql2.util.PgCount;
/**
* Query.
@@ -17,29 +26,34 @@ public class Query {
private static AtomicInteger nameIndex = new AtomicInteger(0);
/**
- * Name for the {@link Query}.
+ * {@link PgSubmission}.
*/
- private final String name;
+ private final PgSubmission> submission;
/**
- * Indicates whether parsed.
+ * {@link QueryReuse}.
*/
- private boolean isParsed = false;
+ private final QueryReuse reuse;
/**
- * Indicates if waiting parse.
+ * Name for the {@link Query}.
*/
- private boolean isAwaitingParse = false;
+ private final String name;
/**
- * {@link RowDescription}.
+ * Next row number.
*/
- private RowDescription rowDescription = null;
+ private long nextRowNumber = 0;
/**
* Instantiate.
+ *
+ * @param submission {@link PgSubmission}.
+ * @param reuse {@link QueryReuse}.
*/
- public Query() {
+ public Query(PgSubmission> submission, QueryReuse reuse) {
+ this.submission = submission;
+ this.reuse = reuse;
this.name = "q" + nameIndex.incrementAndGet();
}
@@ -53,53 +67,95 @@ public String getQueryName() {
}
/**
- * Indicates if parsed.
+ * Obtains the {@link QueryReuse}.
*
- * @return Parsed.
+ * @return {@link QueryReuse}.
*/
- public boolean isParsed() {
- return this.isParsed;
+ public QueryReuse getReuse() {
+ return this.reuse;
}
/**
- * Flags that the query has parsed.
+ * Obtains the {@link PgSubmission}.
+ *
+ * @return {@link PgSubmission}.
*/
- void flagParsed() {
- this.isParsed = true;
+ public PgSubmission> getSubmission() {
+ return this.submission;
}
/**
- * Indicates if waiting on parse.
+ * Obtains the next row number.
*
- * @return Waiting on parse.
+ * @return Next row number.
*/
- public boolean isWaitingParse() {
- return this.isAwaitingParse;
+ long nextRowNumber() {
+ return this.nextRowNumber++;
}
-
+
/**
- * Flags that waiting on parse.
+ * Adds a data row.
+ *
+ * @param dataRow {@link DataRow}.
*/
- void flagWaitingParse() {
- this.isAwaitingParse = true;
+ void addDataRow(DataRow dataRow) {
+ this.submission.addRow(dataRow);
}
/**
- * Obtains the {@link RowDescription}.
+ * Flags the command is complete.
*
- * @return {@link RowDescription}.
+ * @param complete Command is complete.
+ * @param socketChannel {@link SocketChannel}.
*/
- RowDescription getRowDescription() {
- return this.rowDescription;
+ void commandComplete(CommandComplete complete, SocketChannel socketChannel) {
+ try {
+ switch (submission.getCompletionType()) {
+ case COUNT:
+ submission.finish(new PgCount(complete.getNumberOfRowsAffected()));
+ break;
+ case ROW:
+ submission.finish(null);
+ break;
+ case CLOSE:
+ submission.finish(socketChannel);
+ break;
+ case TRANSACTION:
+ submission.finish(complete.getType());
+ break;
+ case ARRAY_COUNT:
+ submission.finish(complete.getNumberOfRowsAffected());
+ break;
+ case VOID:
+ ((CompletableFuture) submission.getCompletionStage()).complete(null);
+ break;
+ case PROCESSOR:
+ submission.finish(null);
+ break;
+ case OUT_PARAMETER:
+ submission.finish(null);
+ break;
+ default:
+ throw new IllegalStateException(
+ "Invalid completion type '" + submission.getCompletionType() + "' for " + this.getClass().getSimpleName());
+ }
+ } catch (Throwable t) {
+ ((CompletableFuture>) submission.getCompletionStage()).completeExceptionally(t);
+ }
}
/**
- * Specifies the {@link RowDescription}.
- *
- * @param rowDescription {@link RowDescription}.
+ * Some submission types needs multiple rounds of queries before the operation
+ * is finished. This function returns true if more is needed.
+ *
+ * @return true if another query should be sent to the database
+ * @throws ExecutionException if the bound variables are a future that fails
+ * @throws InterruptedException if the bound variables are a future that fails
*/
- void setRowDescription(RowDescription rowDescription) {
- this.rowDescription = rowDescription;
+ public boolean hasMoreToExecute() throws ExecutionException, InterruptedException {
+ if (submission.getCompletionType() == ARRAY_COUNT) {
+ return ((ArrayCountSubmission) submission).hasMoreToExecute();
+ }
+ return false;
}
-
}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/network/QueryReuse.java b/src/main/java/org/postgresql/sql2/communication/network/QueryReuse.java
new file mode 100644
index 0000000..9201d05
--- /dev/null
+++ b/src/main/java/org/postgresql/sql2/communication/network/QueryReuse.java
@@ -0,0 +1,140 @@
+package org.postgresql.sql2.communication.network;
+
+import org.postgresql.sql2.communication.packets.RowDescription;
+
+/**
+ * Re-use of an SQL query.
+ *
+ * @author Daniel Sagenschneider
+ */
+public class QueryReuse {
+
+ /**
+ * {@link RowDescription}.
+ */
+ private RowDescription rowDescription = null;
+
+ /**
+ * Number of times this {@link Query} has been executed.
+ */
+ private int executeCount = 10; // TODO reset to start at 0
+
+ /**
+ * {@link Portal}.
+ */
+ private Portal portal = new Portal(); // TODO reset to create
+
+ /**
+ * Indicates whether parsed.
+ */
+ private boolean isParsed = false;
+
+ /**
+ * Indicates if waiting parse.
+ */
+ private boolean isAwaitingParse = false;
+
+ /**
+ * Indicates whether described.
+ */
+ private boolean isDescribed = false;
+
+ /**
+ * Indicates if waiting describe.
+ */
+ private boolean isAwaitingDescribe = false;
+
+ /**
+ * Indicates if execute as simple query.
+ *
+ * @return true
to execute as simple query.
+ */
+ public boolean isSimpleQuery() {
+ return this.executeCount < 5; // TODO allow configuring
+ }
+
+ /**
+ * Specifies the {@link RowDescription}.
+ *
+ * @param rowDescription {@link RowDescription}.
+ */
+ public void setRowDescription(RowDescription rowDescription) {
+ this.rowDescription = rowDescription;
+ }
+
+ /**
+ * Obtains the {@link RowDescription}.
+ *
+ * @return {@link RowDescription}.
+ */
+ public RowDescription getRowDescription() {
+ return this.rowDescription;
+ }
+
+ /**
+ * Indicates if parsed.
+ *
+ * @return Parsed.
+ */
+ public boolean isParsed() {
+ return this.isParsed;
+ }
+
+ /**
+ * Flags that the query has parsed.
+ */
+ void flagParsed() {
+ this.isParsed = true;
+ }
+
+ /**
+ * Indicates if waiting on parse.
+ *
+ * @return Waiting on parse.
+ */
+ public boolean isWaitingParse() {
+ return this.isAwaitingParse;
+ }
+
+ /**
+ * Flags that waiting on parse.
+ */
+ void flagWaitingParse() {
+ this.isAwaitingParse = true;
+ }
+
+ /**
+ * Indicates if waiting on describe.
+ *
+ * @return Waiting on describe.
+ */
+ public boolean isWaitingDescribe() {
+ return this.isAwaitingDescribe;
+ }
+
+ /**
+ * Flags that waiting on describe.
+ */
+ void flagWaitingDescribe() {
+ this.isAwaitingDescribe = true;
+ }
+
+ /**
+ * Obtains the {@link Portal}.
+ *
+ * @return {@link Portal}.
+ */
+ public Portal getPortal() {
+ return this.portal;
+ }
+
+ /**
+ * Convenience method to obtain the {@link Portal} name.
+ *
+ * @return {@link Portal} name or unnamed name if no {@link Portal}.
+ */
+ public String getPortalNameOrUnnamed() {
+ return this.portal == null ? "" : this.portal.getPortalName();
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/network/ReadyForQueryResponse.java b/src/main/java/org/postgresql/sql2/communication/network/ReadyForQueryResponse.java
index 029753c..3144214 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/ReadyForQueryResponse.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/ReadyForQueryResponse.java
@@ -1,11 +1,11 @@
package org.postgresql.sql2.communication.network;
-import org.postgresql.sql2.communication.BeFrame;
+import java.io.IOException;
+
+import org.postgresql.sql2.communication.BEFrameParser;
import org.postgresql.sql2.communication.NetworkReadContext;
import org.postgresql.sql2.communication.NetworkResponse;
-import java.io.IOException;
-
/**
* Ready for query {@link NetworkResponse}.
*
@@ -20,14 +20,14 @@ public NetworkResponse handleException(Throwable ex) {
@Override
public NetworkResponse read(NetworkReadContext context) throws IOException {
- BeFrame frame = context.getBeFrame();
- switch (frame.getTag()) {
+ switch (context.getFrameTag()) {
- case READY_FOR_QUERY:
- return null; // Nothing further
+ case BEFrameParser.READY_FOR_QUERY:
+ return null; // Nothing further
- default:
- throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName());
+ default:
+ throw new IllegalStateException(
+ "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName());
}
}
}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/network/SyncRequest.java b/src/main/java/org/postgresql/sql2/communication/network/SyncRequest.java
index 4fb3de3..85b3665 100644
--- a/src/main/java/org/postgresql/sql2/communication/network/SyncRequest.java
+++ b/src/main/java/org/postgresql/sql2/communication/network/SyncRequest.java
@@ -11,10 +11,11 @@
* @author Daniel Sagenschneider
*/
public class SyncRequest implements NetworkRequest {
- private final Portal portal;
+
+ private final Query query;
- public SyncRequest(Portal portal) {
- this.portal = portal;
+ public SyncRequest(Query query) {
+ this.query = query;
}
/*
@@ -29,8 +30,8 @@ public NetworkRequest write(NetworkWriteContext context) throws Exception {
wire.initPacket();
wire.completePacket();
- if (portal.hasMoreToExecute()) {
- return new BindRequest<>(portal);
+ if (this.query.hasMoreToExecute()) {
+ return new BindRequest<>(this.query);
}
// Nothing further
diff --git a/src/main/java/org/postgresql/sql2/communication/packets/AuthenticationRequest.java b/src/main/java/org/postgresql/sql2/communication/packets/AuthenticationRequest.java
index c71ff0d..e06add0 100644
--- a/src/main/java/org/postgresql/sql2/communication/packets/AuthenticationRequest.java
+++ b/src/main/java/org/postgresql/sql2/communication/packets/AuthenticationRequest.java
@@ -1,20 +1,14 @@
package org.postgresql.sql2.communication.packets;
-import org.postgresql.sql2.util.BinaryHelper;
+import java.io.IOException;
+
+import org.postgresql.sql2.communication.NetworkInputStream;
+import org.postgresql.sql2.communication.NetworkReadContext;
public class AuthenticationRequest {
public enum Types {
- SUCCESS(0),
- KERBEROS_V5(2),
- CLEAR_TEXT(3),
- MD5(5),
- SCM_CREDENTIAL(6),
- GSS(7),
- GSS_CONTINUE(8),
- SSPI(9),
- SASL(10),
- SASL_CONTINUE(11),
- SASL_FINAL(12);
+ SUCCESS(0), KERBEROS_V5(2), CLEAR_TEXT(3), MD5(5), SCM_CREDENTIAL(6), GSS(7), GSS_CONTINUE(8), SSPI(9), SASL(10),
+ SASL_CONTINUE(11), SASL_FINAL(12);
private int value;
@@ -24,6 +18,7 @@ public enum Types {
/**
* find the corresponding type for the incoming integer value.
+ *
* @param input integer value to search for
* @return the corresponding type
*/
@@ -41,17 +36,14 @@ public static Types lookup(int input) {
private Types type;
private byte[] salt = new byte[4];
- /**
- * describes the authentication type and salt.
- * @param bytes incoming bytes
- */
- public AuthenticationRequest(byte[] bytes) {
- type = Types.lookup(BinaryHelper.readInt(bytes[0], bytes[1], bytes[2], bytes[3]));
+ public AuthenticationRequest(NetworkReadContext context) throws IOException {
+ NetworkInputStream input = context.getPayload();
+ type = Types.lookup(input.readInteger());
if (type == Types.MD5) {
- salt[0] = bytes[4];
- salt[1] = bytes[5];
- salt[2] = bytes[6];
- salt[3] = bytes[7];
+ salt[0] = (byte) input.read();
+ salt[1] = (byte) input.read();
+ salt[2] = (byte) input.read();
+ salt[3] = (byte) input.read();
}
}
diff --git a/src/main/java/org/postgresql/sql2/communication/packets/CommandComplete.java b/src/main/java/org/postgresql/sql2/communication/packets/CommandComplete.java
index a963015..3c1b9d8 100644
--- a/src/main/java/org/postgresql/sql2/communication/packets/CommandComplete.java
+++ b/src/main/java/org/postgresql/sql2/communication/packets/CommandComplete.java
@@ -1,39 +1,26 @@
package org.postgresql.sql2.communication.packets;
-import java.nio.charset.StandardCharsets;
+import java.io.IOException;
+
+import org.postgresql.sql2.communication.NetworkReadContext;
public class CommandComplete {
public enum Types {
- INSERT,
- DELETE,
- CREATE_TABLE,
- CREATE_TYPE,
- START_TRANSACTION,
- ROLLBACK,
- COMMIT,
- UPDATE,
- SELECT,
- MOVE,
- FETCH,
- COPY
+ INSERT, DELETE, CREATE_TABLE, CREATE_TYPE, START_TRANSACTION, ROLLBACK, COMMIT, UPDATE, SELECT, MOVE, FETCH, COPY
}
private int numberOfRowsAffected;
private Types type;
- /**
- * parses a command complete package from the server.
- * @param payload the bytes to parse
- */
- public CommandComplete(byte[] payload) {
- String message = new String(payload, StandardCharsets.UTF_8);
+ public CommandComplete(NetworkReadContext context) throws IOException {
+ String message = context.getPayload().readString();
if (message.startsWith("INSERT")) {
type = Types.INSERT;
- numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1));
+ numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length()));
} else if (message.startsWith("DELETE")) {
type = Types.DELETE;
- numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1));
+ numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length()));
} else if (message.startsWith("CREATE TABLE")) {
type = Types.CREATE_TABLE;
numberOfRowsAffected = 0;
@@ -51,19 +38,19 @@ public CommandComplete(byte[] payload) {
numberOfRowsAffected = 0;
} else if (message.startsWith("UPDATE")) {
type = Types.UPDATE;
- numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1));
+ numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length()));
} else if (message.startsWith("SELECT")) {
type = Types.SELECT;
- numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1));
+ numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length()));
} else if (message.startsWith("MOVE")) {
type = Types.MOVE;
- numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1));
+ numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length()));
} else if (message.startsWith("FETCH")) {
type = Types.FETCH;
- numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1));
+ numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length()));
} else if (message.startsWith("COPY")) {
type = Types.COPY;
- numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1));
+ numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length()));
}
}
diff --git a/src/main/java/org/postgresql/sql2/communication/packets/DataRow.java b/src/main/java/org/postgresql/sql2/communication/packets/DataRow.java
index bbcb3f7..3aa03ec 100644
--- a/src/main/java/org/postgresql/sql2/communication/packets/DataRow.java
+++ b/src/main/java/org/postgresql/sql2/communication/packets/DataRow.java
@@ -1,42 +1,37 @@
package org.postgresql.sql2.communication.packets;
-import jdk.incubator.sql2.Result;
-import jdk.incubator.sql2.SqlType;
-import org.postgresql.sql2.communication.TableCell;
-import org.postgresql.sql2.communication.packets.parts.ColumnDescription;
-import org.postgresql.sql2.util.BinaryHelper;
-
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
+import org.postgresql.sql2.communication.NetworkInputStream;
+import org.postgresql.sql2.communication.NetworkReadContext;
+import org.postgresql.sql2.communication.TableCell;
+import org.postgresql.sql2.communication.packets.parts.ColumnDescription;
+
+import jdk.incubator.sql2.Result;
+import jdk.incubator.sql2.SqlType;
+
public class DataRow implements Result.RowColumn, Result.OutColumn {
private Map columnNames;
private Map columns;
private long rowNumber;
private int currentPos = 0;
- /**
- * parses the bytes that describe one data row in a result set.
- * @param bytes bytes to parse
- * @param description the descriptions of the columns
- * @param rowNumber current row number in the result set
- */
- public DataRow(byte[] bytes, ColumnDescription[] description, long rowNumber) {
+ public DataRow(NetworkReadContext context, ColumnDescription[] description, long rowNumber) throws IOException {
this.rowNumber = rowNumber;
+ NetworkInputStream input = context.getPayload();
- short numOfColumns = BinaryHelper.readShort(bytes[0], bytes[1]);
- int pos = 2;
- int columnPos = 1;
+ short numOfColumns = input.readShort();
columns = new HashMap<>(numOfColumns);
columnNames = new HashMap<>(numOfColumns);
for (int i = 0; i < numOfColumns; i++) {
- int length = BinaryHelper.readInt(bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]);
- pos += 4;
- columnNames.put(description[i].getName().toLowerCase(), columnPos);
- columns.put(columnPos, new TableCell(bytes, pos, pos + length, description[i]));
- pos += length;
- columnPos++;
+ int length = input.readInteger();
+ columnNames.put(description[i].getName().toLowerCase(), i);
+ byte[] cellBytes = new byte[length];
+ input.read(cellBytes);
+ columns.put(i, new TableCell(cellBytes, description[i]));
}
}
@@ -58,19 +53,17 @@ public T get(Class type) {
throw new IllegalArgumentException("no column with position " + currentPos);
}
- if (tc.getStart() > tc.getStop()) { // handle the null special case
+ // handle the null special case
+ if (tc.getBytes().length == 0) {
return null;
}
switch (tc.getColumnDescription().getFormatCode()) {
- case TEXT:
- String data = new String(BinaryHelper.subBytes(tc.getBytes(), tc.getStart(), tc.getStop()), StandardCharsets.UTF_8);
- return (T)tc.getColumnDescription().getColumnType().getTextParser().apply(data, type);
- case BINARY:
- return (T)tc.getColumnDescription().getColumnType().getBinaryParser().apply(tc.getBytes(), tc.getStart(),
- tc.getStop(), type);
- default:
- throw new IllegalStateException("unimplemented switch case");
+ case TEXT:
+ String data = new String(tc.getBytes(), StandardCharsets.UTF_8);
+ return (T) tc.getColumnDescription().getColumnType().getTextParser().apply(data, type);
+ default:
+ return (T) tc.getColumnDescription().getColumnType().getBinaryParser().apply(tc.getBytes(), type);
}
}
@@ -110,7 +103,7 @@ public Class javaType() {
@Override
public long length() {
TableCell tc = columns.get(currentPos);
- return tc.getStop() - tc.getStart();
+ return tc.getBytes().length;
}
@Override
@@ -162,4 +155,5 @@ public Column clone() {
return row;
}
-}
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/postgresql/sql2/communication/packets/ErrorPacket.java b/src/main/java/org/postgresql/sql2/communication/packets/ErrorPacket.java
index f2b8437..7f2ef3c 100644
--- a/src/main/java/org/postgresql/sql2/communication/packets/ErrorPacket.java
+++ b/src/main/java/org/postgresql/sql2/communication/packets/ErrorPacket.java
@@ -1,30 +1,31 @@
package org.postgresql.sql2.communication.packets;
-import org.postgresql.sql2.communication.packets.parts.ErrorResponseField;
-import org.postgresql.sql2.util.BinaryHelper;
-
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.postgresql.sql2.communication.NetworkInputStream;
+import org.postgresql.sql2.communication.NetworkReadContext;
+import org.postgresql.sql2.communication.packets.parts.ErrorResponseField;
+
public class ErrorPacket extends Exception {
-
- private static List parseFields(byte[] payload) {
+
+ private static List parseFields(NetworkReadContext context) throws IOException {
+
List fields = new ArrayList<>();
- List nullPositions = new ArrayList<>();
- for (int i = 0; i < payload.length; i++) {
- if (payload[i] == 0) {
- nullPositions.add(i);
- }
+ // Parse out the fields
+ NetworkInputStream input = context.getPayload();
+ int errorType;
+ while ((errorType = input.read()) != -1) {
+ String message = input.readString();
+ fields.add(new ErrorResponseField(ErrorResponseField.Types.lookup(errorType), message));
}
- for (int i = 0; i < nullPositions.size() - 2; i++) {
- fields.add(new ErrorResponseField(ErrorResponseField.Types.lookup(payload[nullPositions.get(i) + 1]),
- new String(BinaryHelper.subBytes(payload, nullPositions.get(i) + 2, nullPositions.get(i + 1)))));
- }
+ // Return the fields
return fields;
}
-
+
private static String getField(ErrorResponseField.Types type, List fields) {
for (ErrorResponseField field : fields) {
if (type == field.getType()) {
@@ -34,13 +35,13 @@ private static String getField(ErrorResponseField.Types type, List fields;
- public ErrorPacket(byte[] payload) {
- this(parseFields(payload));
+ public ErrorPacket(NetworkReadContext context) throws IOException {
+ this(parseFields(context));
}
-
+
private ErrorPacket(List fields) {
super(getField(ErrorResponseField.Types.MESSAGE, fields));
this.fields = fields;
@@ -52,6 +53,7 @@ public List getFields() {
/**
* returns the message of the field that matches the type.
+ *
* @param type type to search for
* @return message of field
*/
diff --git a/src/main/java/org/postgresql/sql2/communication/packets/ParameterStatus.java b/src/main/java/org/postgresql/sql2/communication/packets/ParameterStatus.java
index bb64c6c..34325bb 100644
--- a/src/main/java/org/postgresql/sql2/communication/packets/ParameterStatus.java
+++ b/src/main/java/org/postgresql/sql2/communication/packets/ParameterStatus.java
@@ -1,35 +1,18 @@
package org.postgresql.sql2.communication.packets;
-import org.postgresql.sql2.util.BinaryHelper;
+import java.io.IOException;
+
+import org.postgresql.sql2.communication.NetworkInputStream;
+import org.postgresql.sql2.communication.NetworkReadContext;
public class ParameterStatus {
private String name;
private String value;
- /**
- * parses the parameter status.
- * @param payload bytes from the server to parse
- */
- public ParameterStatus(byte[] payload) {
- int firstNullPos = 0;
- int secondNullPos = 0;
-
- for (int i = 0; i < payload.length; i++) {
- if (payload[i] == 0) {
- firstNullPos = i;
- break;
- }
- }
-
- for (int i = firstNullPos; i < payload.length; i++) {
- if (payload[i] == 0) {
- secondNullPos = i;
- break;
- }
- }
-
- name = new String(BinaryHelper.subBytes(payload, 0, firstNullPos));
- value = new String(BinaryHelper.subBytes(payload, firstNullPos, secondNullPos));
+ public ParameterStatus(NetworkReadContext context) throws IOException {
+ NetworkInputStream input = context.getPayload();
+ name = input.readString();
+ value = input.readString();
}
public String getName() {
diff --git a/src/main/java/org/postgresql/sql2/communication/packets/RowDescription.java b/src/main/java/org/postgresql/sql2/communication/packets/RowDescription.java
index e53deb7..22daa88 100644
--- a/src/main/java/org/postgresql/sql2/communication/packets/RowDescription.java
+++ b/src/main/java/org/postgresql/sql2/communication/packets/RowDescription.java
@@ -1,41 +1,28 @@
package org.postgresql.sql2.communication.packets;
-import org.postgresql.sql2.communication.packets.parts.ColumnDescription;
-import org.postgresql.sql2.util.BinaryHelper;
+import java.io.IOException;
-import java.nio.charset.StandardCharsets;
+import org.postgresql.sql2.communication.NetworkInputStream;
+import org.postgresql.sql2.communication.NetworkReadContext;
+import org.postgresql.sql2.communication.packets.parts.ColumnDescription;
public class RowDescription {
private ColumnDescription[] descriptions;
- /**
- * parses a sequence of bytes in to a RowDescription object.
- *
- * @param bytes bytes to parse
- */
- public RowDescription(byte[] bytes) {
- short numOfColumns = BinaryHelper.readShort(bytes[0], bytes[1]);
- int pos = 2;
+ public RowDescription(NetworkInputStream input) throws IOException {
+ short numOfColumns = input.readShort();
descriptions = new ColumnDescription[numOfColumns];
for (int i = 0; i < numOfColumns; i++) {
- int nameEnd = BinaryHelper.nextNullBytePos(bytes, pos);
- String name = new String(BinaryHelper.subBytes(bytes, pos, nameEnd), StandardCharsets.UTF_8);
- pos = nameEnd + 1;
- int objectIdOfTable = BinaryHelper.readInt(bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]);
- pos += 4;
- short attributeNumberOfColumn = BinaryHelper.readShort(bytes[pos], bytes[pos + 1]);
- pos += 2;
- int fieldOId = BinaryHelper.readInt(bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]);
- pos += 4;
- short dataTypeSize = BinaryHelper.readShort(bytes[pos], bytes[pos + 1]);
- pos += 2;
- int typeModifier = BinaryHelper.readInt(bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]);
- pos += 4;
- short formatCode = BinaryHelper.readShort(bytes[pos], bytes[pos + 1]);
- pos += 2;
+ String name = input.readString();
+ int objectIdOfTable = input.readInteger();
+ short attributeNumberOfColumn = input.readShort();
+ int fieldOId = input.readInteger();
+ short dataTypeSize = input.readShort();
+ int typeModifier = input.readInteger();
+ short formatCode = input.readShort();
- descriptions[i] = new ColumnDescription(name, objectIdOfTable, attributeNumberOfColumn, fieldOId,
- dataTypeSize, typeModifier, formatCode);
+ descriptions[i] = new ColumnDescription(name, objectIdOfTable, attributeNumberOfColumn, fieldOId, dataTypeSize,
+ typeModifier, formatCode);
}
}
diff --git a/src/main/java/org/postgresql/sql2/communication/packets/parsers/BinaryParser.java b/src/main/java/org/postgresql/sql2/communication/packets/parsers/BinaryParser.java
index 1feec04..65dfdbc 100644
--- a/src/main/java/org/postgresql/sql2/communication/packets/parsers/BinaryParser.java
+++ b/src/main/java/org/postgresql/sql2/communication/packets/parsers/BinaryParser.java
@@ -3,275 +3,275 @@
import org.postgresql.sql2.util.BinaryHelper;
public class BinaryParser {
- public static Object boolsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object boolsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object byteasend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object byteasend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object charsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object charsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object namesend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object namesend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object int8send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object int8send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object int2send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object int2send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object int2vectorsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object int2vectorsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object int4send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
- return BinaryHelper.readInt(bytes[start], bytes[start + 1], bytes[start + 2], bytes[start + 3]);
+ public static Object int4send(byte[] bytes, Class> requestedClass) {
+ return BinaryHelper.readInt(bytes[0], bytes[1], bytes[2], bytes[3]);
}
- public static Object regprocsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object regprocsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object oidsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object oidsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object tidsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object tidsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object xidsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object xidsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object cidsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object cidsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object oidvectorsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object oidvectorsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object pg_ddl_command_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object pg_ddl_command_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object json_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object json_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object xml_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object xml_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object pg_node_tree_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object pg_node_tree_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object point_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object point_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object lseg_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object lseg_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object path_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object path_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object box_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object box_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object poly_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object poly_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object line_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object line_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object cidr_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object cidr_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object float4send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object float4send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object float8send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object float8send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object abstimesend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object abstimesend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object reltimesend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object reltimesend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object tintervalsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object tintervalsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object unknownsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object unknownsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object circle_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object circle_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object cash_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object cash_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object macaddr_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object macaddr_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object inet_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object inet_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object bpcharsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object bpcharsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object varcharsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object varcharsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object date_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object date_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object time_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object time_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object timestamp_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object timestamp_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object timestamptz_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object timestamptz_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object interval_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object interval_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object timetz_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object timetz_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object bit_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object bit_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object varbit_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object varbit_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object numeric_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object numeric_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object textsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object textsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object regproceduresend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object regproceduresend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object regopersend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object regopersend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object regoperatorsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object regoperatorsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object regclasssend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object regclasssend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object regtypesend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object regtypesend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object cstring_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object cstring_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object anyarray_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object anyarray_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object void_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object void_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object uuid_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object uuid_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object txid_snapshot_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object txid_snapshot_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object pg_lsn_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object pg_lsn_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object tsvectorsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object tsvectorsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object tsquerysend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object tsquerysend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object regconfigsend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object regconfigsend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object regdictionarysend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object regdictionarysend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object jsonb_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object jsonb_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object range_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object range_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object regnamespacesend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object regnamespacesend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object regrolesend(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object regrolesend(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object array_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object array_send(byte[] bytes, Class> requestedClass) {
return null;
}
- public static Object record_send(byte[] bytes, Integer start, Integer end, Class> requestedClass) {
+ public static Object record_send(byte[] bytes, Class> requestedClass) {
return null;
}
}
diff --git a/src/main/java/org/postgresql/sql2/communication/packets/parts/ColumnTypes.java b/src/main/java/org/postgresql/sql2/communication/packets/parts/ColumnTypes.java
index bd811aa..3bb66b1 100644
--- a/src/main/java/org/postgresql/sql2/communication/packets/parts/ColumnTypes.java
+++ b/src/main/java/org/postgresql/sql2/communication/packets/parts/ColumnTypes.java
@@ -7,9 +7,9 @@
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.util.function.BiFunction;
+
import org.postgresql.sql2.communication.packets.parsers.BinaryParser;
import org.postgresql.sql2.communication.packets.parsers.TextParser;
-import org.postgresql.sql2.util.QuadFunction;
public enum ColumnTypes {
BOOL(16, TextParser::boolOut, BinaryParser::boolsend, Boolean.class, PgAdbaType.BOOLEAN),
@@ -132,8 +132,7 @@ public enum ColumnTypes {
VOID(2278, TextParser::void_out, BinaryParser::void_send, null, null),
TRIGGER(2279, TextParser::trigger_out, null, null, null),
LANGUAGE_HANDLER(2280, TextParser::language_handler_out, null, null, null),
- INTERNAL(2281, TextParser::internal_out, null, null, null),
- OPAQUE(2282, TextParser::opaque_out, null, null, null),
+ INTERNAL(2281, TextParser::internal_out, null, null, null), OPAQUE(2282, TextParser::opaque_out, null, null, null),
ANYELEMENT(2283, TextParser::anyelement_out, null, null, null),
_RECORD(2287, TextParser::array_out, BinaryParser::array_send, null, null),
ANYNONARRAY(2776, TextParser::anynonarray_out, null, null, null),
@@ -182,22 +181,22 @@ public enum ColumnTypes {
private final int oid;
private final BiFunction, Object> textParser;
- private final QuadFunction, Object> binaryParser;
- private final Class clazz;
+ private final BiFunction, Object> binaryParser;
+ private final Class c;
private final PgAdbaType type;
ColumnTypes(int oid, BiFunction, Object> textParser,
- QuadFunction, Object> binaryParser,
- Class c, PgAdbaType type) {
+ BiFunction, Object> binaryParser, Class c, PgAdbaType type) {
this.oid = oid;
this.textParser = textParser;
this.binaryParser = binaryParser;
- this.clazz = c;
+ this.c = c;
this.type = type;
}
/**
* Finds the correct type from it's oid.
+ *
* @param oid oid to search from
* @return the ColumnTypes object
*/
@@ -209,18 +208,19 @@ public static ColumnTypes lookup(int oid) {
}
return OTHER;
+
}
public BiFunction, Object> getTextParser() {
return textParser;
}
- public QuadFunction, Object> getBinaryParser() {
+ public BiFunction, Object> getBinaryParser() {
return binaryParser;
}
public Class javaType() {
- return clazz;
+ return c;
}
public PgAdbaType sqlType() {
diff --git a/src/test/java/org/postgresql/sql2/communication/BEFrameReaderTest.java b/src/test/java/org/postgresql/sql2/communication/BEFrameReaderTest.java
new file mode 100644
index 0000000..ca57b61
--- /dev/null
+++ b/src/test/java/org/postgresql/sql2/communication/BEFrameReaderTest.java
@@ -0,0 +1,61 @@
+package org.postgresql.sql2.communication;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.postgresql.sql2.buffer.PooledByteBuffer;
+
+public class BEFrameReaderTest {
+
+ public static Collection