Skip to content
This repository has been archived by the owner on Aug 3, 2019. It is now read-only.

SocketAddress Supplier API

Mark Paluch edited this page Oct 22, 2015 · 1 revision

The SocketAddressSupplier API allows to control the used connection points/SocketAddresses for initial connection and reconnection. The use case for controlling the source and sequence of connection points are failover and cluster node discovery.

The API exposes a factory for SocketAddressSuppliers which accepts a RedisURI. By default, multiple addresses are utilized in with RoundRobin.

The predefined methods can be found in the SocketAddressSupplierFactory.Factories enum:

  • ROUND_ROBIN: Cyclic use of multiple addresses specified by the RedisURI
  • HELLO_CLUSTER: Uses ROUND_ROBIN for the initial connect. Once a connection is established the mechanism obtains the cluster nodes using the HELLO command AT STARTUP. Periodical scheduling/updating is currently not implemented. This can be however achieved by implementing an own factory that calls the reloadNodes() method periodically/on demand.

New mechanisms can be implemented by implementing the SocketAddressSupplier/SocketAddressSupplierFactory interfaces.

Submit your mechanism by opening a Pull-Request if you want to contribute to spinach.

DisqueClient client = new DisqueClient();

DisqueConnection<String, String> connection = client.connect(new Utf8StringCodec(), disqueURI,
                SocketAddressSupplierFactory.Factories.ROUND_ROBIN);


DisqueConnection<String, String> connection = client.connect(new Utf8StringCodec(), disqueURI,
                SocketAddressSupplierFactory.Factories.HELLO_CLUSTER);

Interfaces

  • SocketAddressSupplierFactory Factory to produce a SocketAddressSupplier for every logical connection by accepting a RedisURI
  • SocketAddressSupplier A SocketAddressSupplier that supplies SocketAddresses for initial connect and future reconnects. The client requests a socket address from the supplier to establish initially a connection or to reconnect. The supplier is required to supply an infinite number of elements. The sequence and ordering of elements are a detail of the particular implementation. SocketAddressSupplier instances should not be shared between connections although this is possible.

Callback interfaces

A SocketAddressSupplier can optionally implement further interfaces to benefit from callback methods which provide infrastructure components:

  • ConnectionAware Classes implementing this interfaces are notified with the DisqueConnection (call to DisqueConnection(...) which was created by using the SocketAddressSupplier. The call is only performed once a connection has been established.
  • EventExecutorAware Classes implementing this interfaces are notified with the EventExecutor (call to setEventExecutor(...) which is used for the client. The call is only performed once a connection has been established.

Examples

Periodically updating cluster topology-aware SocketAddressSupplierFactory

public class PeriodicallyUpdatingSocketAddressSupplierFactory implements SocketAddressSupplierFactory {

    private final ScheduledExecutorService scheduledExecutorService;
    private final Set<ScheduledFuture<?>> futures = Sets.newConcurrentHashSet();

    public PeriodicallyUpdatingSocketAddressSupplierFactory(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = scheduledExecutorService;
    }

    @Override
    public SocketAddressSupplier newSupplier(DisqueURI disqueURI) {

        RoundRobinSocketAddressSupplier bootstrap = new RoundRobinSocketAddressSupplier(disqueURI.getConnectionPoints());

        HelloClusterSocketAddressSupplier helloCluster = new HelloClusterSocketAddressSupplier(bootstrap) {

            /**
             * This method is called only once when the connection is established.
             */
            @Override
            public <K, V> void setConnection(DisqueConnection<K, V> disqueConnection) {

                Runnable command = new Runnable() {
                    @Override
                    public void run() {
                        reloadNodes();
                    }
                };

                ScheduledFuture<?> scheduledFuture = scheduledExecutorService
                        .scheduleAtFixedRate(command, 1, 1, TimeUnit.HOURS);

                futures.add(scheduledFuture);
                super.setConnection(disqueConnection);
            }

        };

        return helloCluster;
    }

    /**
     * Shutdown the scheduling.
     */
    public void shutdown() {
        for (ScheduledFuture<?> future : futures) {
            future.cancel(false);
        }
    }
}