Skip to content

Commit

Permalink
Replace std.socketstream with the blocking sockets
Browse files Browse the repository at this point in the history
Fix #58.
  • Loading branch information
belka-ew committed Nov 30, 2016
1 parent c247412 commit c41bf70
Showing 1 changed file with 76 additions and 40 deletions.
116 changes: 76 additions & 40 deletions source/ddb/postgres.d
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,14 @@ OR MODIFICATIONS.
*/
module ddb.postgres;

version (Have_vibe_d_core) {
import vibe.core.net;
import vibe.core.stream;
} else {
import std.socket;
import std.socketstream;
version (Have_vibe_d_core)
{
import vibe.core.net;
import vibe.core.stream;
}
else
{
import std.socket;
}
import std.bitmanip;
import std.exception;
Expand All @@ -184,32 +186,64 @@ const PGEpochDateTime = DateTime(2000, 1, 1, 0, 0, 0);

class PGStream
{
private {
version (Have_vibe_d_core) TCPConnection m_socket;
else SocketStream m_socket;
}
version (Have_vibe_d_core){
@property TCPConnection socket() { return m_socket; }
this(TCPConnection socket)
{
m_socket = socket;
}
}else{
@property SocketStream socket() { return m_socket; }
this(SocketStream socket){
m_socket = socket;
}
}
version (Have_vibe_d_core)
{
private TCPConnection m_socket;

@property TCPConnection socket()
{
return m_socket;
}

/*
* I'm not too sure about this function
* Should I keep the length?
*/
void write(ubyte[] x)
{
m_socket.write(x);
}
this(TCPConnection socket)
{
m_socket = socket;
}
}
else
{
private Socket m_socket;

@property Socket socket()
{
return m_socket;
}

this(Socket socket)
{
m_socket = socket;
}
}

protected void read(ubyte[] buffer)
{
version(Have_vibe_d_core)
{
m_socket.read(buffer);
}
else
{
if (buffer.length > 0)
{
m_socket.receive(buffer);
}
}
}

void write(ubyte[] x)
{
version(Have_vibe_d_core)
{
m_socket.write(x);
}
else
{
if (x.length > 0)
{
m_socket.send(x);
}
}
}

void write(ubyte x)
{
Expand Down Expand Up @@ -1001,19 +1035,19 @@ class PGConnection
char type;
int len;
ubyte[1] ub;
stream.socket.read(ub); // message type
stream.read(ub); // message type

type = bigEndianToNative!char(ub);
ubyte[4] ubi;
stream.socket.read(ubi); // message length, doesn't include type byte
stream.read(ubi); // message length, doesn't include type byte

len = bigEndianToNative!int(ubi) - 4;

ubyte[] msg;
if (len > 0)
{
msg = new ubyte[len];
stream.socket.read(msg);
stream.read(msg);
}

return Message(this, type, msg);
Expand Down Expand Up @@ -1610,13 +1644,15 @@ class PGConnection

ushort port = "port" in params? parse!ushort(p["port"]) : 5432;

version(Have_vibe_d_core){
stream = new PGStream(connectTCP(params["host"], port));
} else {
stream = new PGStream(new SocketStream(new TcpSocket));
stream.socket.socket.connect(new InternetAddress(params["host"], port));
}

version(Have_vibe_d_core)
{
stream = new PGStream(connectTCP(params["host"], port));
}
else
{
stream = new PGStream(new TcpSocket);
stream.socket.connect(new InternetAddress(params["host"], port));
}
sendStartupMessage(params);

receive:
Expand Down

0 comments on commit c41bf70

Please sign in to comment.