Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement IPC with java 16 unix domain sockets #967

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
41 changes: 41 additions & 0 deletions jeromq-ipc/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.zeromq</groupId>
<artifactId>jeromq-pom</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>

<artifactId>jeromq-ipc</artifactId>

<properties>
<rootDir>${project.basedir}/..</rootDir>
<maven.compiler.release>16</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>${project.parent.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

</project>
73 changes: 73 additions & 0 deletions jeromq-ipc/src/main/java/zmq/io/net/ipc/IpcAddress.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package zmq.io.net.ipc;

import java.io.IOException;
import java.net.ProtocolFamily;
import java.net.StandardProtocolFamily;
import java.net.UnixDomainSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;

import org.zeromq.ZMQException;

import zmq.ZError;
import zmq.io.net.Address;

public class IpcAddress implements Address.IZAddress
{
private String name;
private final UnixDomainSocketAddress address;

public IpcAddress(String addr)
{
// TODO inline?
this.address = this.resolve(addr);
}

@Override
public String toString()
{
// TODO possible?
if (name == null) {
return "";
}

return "ipc://" + this.address.toString();
}

@Override
public String toString(int port)
{
// TODO why is port in the interface?
return toString();
}

private UnixDomainSocketAddress resolve(String name)
{
this.name = name;

if (!"*".equals(name)) {
return UnixDomainSocketAddress.of(name);
}

try {
Path temp = Files.createTempFile("zmq-", ".sock");
Files.delete(temp);
return UnixDomainSocketAddress.of(temp);
}
catch (IOException e) {
throw new ZMQException(e.getMessage(), ZError.EADDRNOTAVAIL, e);
}
}

@Override
public UnixDomainSocketAddress address()
{
return address;
}

@Override
public ProtocolFamily family()
{
return StandardProtocolFamily.UNIX;
}
}
33 changes: 33 additions & 0 deletions jeromq-ipc/src/main/java/zmq/io/net/ipc/IpcConnecter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package zmq.io.net.ipc;

import java.io.IOException;
import java.net.StandardProtocolFamily;
import java.nio.channels.SocketChannel;

import zmq.Options;
import zmq.io.IOThread;
import zmq.io.SessionBase;
import zmq.io.net.AbstractSocketConnecter;
import zmq.io.net.Address;

public class IpcConnecter extends AbstractSocketConnecter
{
public IpcConnecter(IOThread ioThread, SessionBase session, final Options options, final Address addr, boolean wait)
{
super(ioThread, session, options, addr, wait);
}

@Override
protected SocketChannel openClient(Address.IZAddress address) throws IOException
{
SocketChannel fd = SocketChannel.open(StandardProtocolFamily.UNIX);
fd.configureBlocking(false);
return fd;
}

@Override
protected void tuneConnectedChannel(SocketChannel channel) throws IOException
{
// no-op
}
}
89 changes: 89 additions & 0 deletions jeromq-ipc/src/main/java/zmq/io/net/ipc/IpcListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package zmq.io.net.ipc;

import java.io.IOException;
import java.net.UnixDomainSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.nio.file.Path;

import zmq.Options;
import zmq.SocketBase;
import zmq.io.IOThread;
import zmq.io.net.AbstractSocketListener;

public class IpcListener extends AbstractSocketListener<IpcAddress>
{
// bind will create this socket file but close will not remove it, so we need to do that ourselves on close.
private Path boundSocketPath;

public IpcListener(IOThread ioThread, SocketBase socket, final Options options)
{
super(ioThread, socket, options);
}

// Get the bound address for use with wildcards
@Override
public String getAddress()
{
// TODO
return super.getZAddress().toString(-1);
}

// Set address to listen on.
@Override
public boolean setAddress(String addr)
{
return super.setZAddress(new IpcAddress(addr));
}

@Override
protected ServerSocketChannel openServer(IpcAddress address) throws IOException
{
if (options.selectorChooser == null) {
return ServerSocketChannel.open(address.family());
}
else {
return options.selectorChooser.choose(address, options).openServerSocketChannel(address.family());
}
}

@Override
protected void bindServer(ServerSocketChannel fd, IpcAddress address) throws IOException
{
fd.configureBlocking(false);

UnixDomainSocketAddress socketAddress = address.address();
fd.bind(socketAddress, options.backlog);

assert (this.boundSocketPath == null);
this.boundSocketPath = socketAddress.getPath();
}

@Override
protected SocketChannel accept(ServerSocketChannel fd) throws IOException
{
return fd.accept();
}

@Override
protected void tuneAcceptedChannel(SocketChannel channel) throws IOException
{
// no-op
}

@Override
protected void closeServerChannel(ServerSocketChannel fd) throws IOException
{
try {
fd.close();
}
finally {
Path socketPath = this.boundSocketPath;
this.boundSocketPath = null;
if (socketPath != null) {
Files.deleteIfExists(socketPath);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package zmq.io.net.ipc;

import java.net.SocketAddress;
import java.net.UnixDomainSocketAddress;

import zmq.Options;
import zmq.Own;
import zmq.SocketBase;
import zmq.io.IOThread;
import zmq.io.SessionBase;
import zmq.io.net.Address;
import zmq.io.net.Listener;

public class UnixDomainSocketIpcImpl implements IpcImpl
{
@Override
public boolean isImplementedViaTcpLoopback()
{
return false;
}

@Override
public Address.IZAddress createAddress(String addr)
{
return new IpcAddress(addr);
}

@Override
public boolean isIpcAddress(SocketAddress address)
{
return address instanceof UnixDomainSocketAddress;
}

@Override
public String getAddress(SocketAddress address)
{
assert (isIpcAddress(address));
var unixDomainSocketAddress = (UnixDomainSocketAddress) address;
return unixDomainSocketAddress.getPath().toString();
}

@Override
public Own createConnector(IOThread ioThread, SessionBase session, Options options, Address addr, boolean wait)
{
return new IpcConnecter(ioThread, session, options, addr, wait);
}

@Override
public Listener createListener(IOThread ioThread, SocketBase socket, Options options)
{
return new IpcListener(ioThread, socket, options);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
zmq.io.net.ipc.UnixDomainSocketIpcImpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package zmq;

public class TermEndpointUnixDomainSocketIpcTest extends TermEndpointIpcTest
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package zmq.io.net.ipc;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

import org.junit.Test;

public class UnixDomainSocketIpcImplTest
{
@Test
public void testUnixDomainSocketImplIsSelected()
{
assertThat(IpcImpl.get(), instanceOf(UnixDomainSocketIpcImpl.class));
}

@Test
public void testIsImplementedViaTcpLoopback()
{
assertThat(IpcImpl.get().isImplementedViaTcpLoopback(), is(false));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package zmq.socket.pair;

public class TestPairUnixDomainSocketIpc extends TestPairIpc
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package zmq.socket.reqrep;

public class TestReqrepUnixDomainSocketIpc extends TestReqrepIpc
{
}
Loading