View Javadoc

1   /*
2    * Copyright 2000 Computer System Services, Inc.
3    *
4    * Permission to use this software for any purpose is granted provided that
5    * this copyright notice is preserved.
6    *
7    * This software is provided as-is and without warranty as to its
8    * fitness for any purpose.  In other words, Computer System Services,
9    * Inc. does not guarantee that this software works.  It is provided
10   * only in the hope that it may be found useful by someone.
11   *
12   * Please e-mail tttaylor@cssassociates.com if you find any errors
13   * or want to request changes/enhancements.
14   */
15  package twcsckernel.projectbase.rmi.socketfactory;
16  
17  import java.rmi.server.*;
18  import java.net.*;
19  import java.io.*;
20  import java.util.*;
21  
22  /***
23   * Server side socket factory which allows the server to make callbacks to a
24   * client whose address is unreachable from the server (as long as the server is
25   * reachable from the client).
26   * 
27   * @author Tim Taylor
28   */
29  public class ServerTwoWaySocketFactory extends RMISocketFactory {
30  	/*** Socket pools where sockets requested from client are returned. */
31  	private Map<String, SocketPool> socketPools = Collections
32  			.synchronizedMap(new HashMap<String, SocketPool>());
33  
34  	/*** Map of request streams to clients. */
35  	private Map<String, DataOutputStream> requestStreams = Collections
36  			.synchronizedMap(new HashMap<String, DataOutputStream>());
37  
38  	private int timeout;
39  
40  	/***
41  	 * Dla inicjacji połączenia do klienta będzie stosowany wskazany timeout.
42  	 * Jeśli połączenie zostanie ustawione poprawnie timeout jest ustawiany na
43  	 * zero.
44  	 * 
45  	 * @param timeout -
46  	 *            timeout łączenia z klientem
47  	 */
48  	public ServerTwoWaySocketFactory(int timeout) {
49  		if (timeout < 0)
50  			timeout = 0;
51  		this.timeout = timeout;
52  	}
53  
54  	/***
55  	 * Add a request <code>socket</code> to the factory. The factory will use
56  	 * this <code>socket</code> to request a callback socket from the host of
57  	 * the given <code>address</code>.
58  	 * <p>
59  	 * 
60  	 * @param address
61  	 *            Address of the client
62  	 * @param socket
63  	 *            Socket on which to request callback sockets from the client.
64  	 */
65  	public void addRequestSocket(final byte[] address, Socket socket)
66  			throws IOException {
67  
68  		final DataOutputStream requestOutStream = new DataOutputStream(socket
69  				.getOutputStream());
70  		final DataInputStream requestInStream = new DataInputStream(socket
71  				.getInputStream());
72  
73  		// Thread for receiving notifications of client exports.
74  		// We need the port to know which request stream to use
75  		// in the event of multiple client VMs calling from
76  		// a single host.
77  		new Thread() {
78  			public void run() {
79  				try {
80  					for (;;) {
81  						// Read port from data input stream
82  						int port = requestInStream.readInt();
83  						String endpoint = EndpointInfo.getEndpointString(
84  								address, port);
85  
86  						System.out.println("&&& " + endpoint);
87  
88  						requestStreams.put(endpoint, requestOutStream);
89  					}
90  				} catch (IOException e) {
91  				}
92  			}
93  		}.start();
94  	}
95  
96  	/***
97  	 * Create a socket connectoin to the host of the given <code>address
98  	 * </code>
99  	 * and <code>port</code>. If the <code>address</code> is for a host
100 	 * that has provided a callback socket request signalling channel, then that
101 	 * channel is used to request that the client create the socket. Otherwise a
102 	 * socket is created directly to the client from the server.
103 	 * 
104 	 * @param address
105 	 *            Address for the socket.
106 	 * @param port
107 	 *            Port for the socket.
108 	 */
109 	public Socket createSocket(String address, int port) throws IOException {
110 		String endpoint = EndpointInfo.getEndpointString(address, port);
111 
112 		DataOutputStream requestStream = (DataOutputStream) requestStreams
113 				.get(endpoint);
114 
115 		if (requestStream == null) {
116 			try {
117 				// System.out.println("Jednak tworze socket");
118 				Socket sock = new Socket();
119 				sock.connect(new InetSocketAddress(address, port), timeout);
120 				// return new Socket(address, port);
121 				if (timeout > 0)
122 					sock.setSoTimeout(0);
123 				return sock;
124 			} catch (IOException ioe) {
125 			}
126 		}
127 
128 		SocketPool requestPool;
129 
130 		synchronized (socketPools) {
131 			requestPool = (SocketPool) socketPools.get(endpoint);
132 
133 			if (requestPool == null) {
134 				requestPool = new SocketPool();
135 				socketPools.put(endpoint, requestPool);
136 			}
137 		}
138 
139 		requestStream.writeInt(TwoWay.PROTOCOL_MAGIC);
140 		requestStream.writeInt(TwoWay.REQUEST_CALLBACK_SOCKET);
141 		requestStream.writeInt(port);
142 		requestStream.flush();
143 
144 		return requestPool.getSocket();
145 	}
146 
147 	/***
148 	 * Private socket class with a buffered input stream. This allows us to use
149 	 * <code>mark</code> and <code>reset</code> in order to run signalling,
150 	 * RMI client sockets, and callback sockets through the same sets of
151 	 * listening ports.
152 	 */
153 	private static class BufferedSocket extends Socket {
154 		Socket sock;
155 		InputStream in;
156 
157 		BufferedSocket() throws IOException {
158 			super();
159 		}
160 
161 		public InputStream getInputStream() throws IOException {
162 			if (in == null) {
163 				in = new BufferedInputStream(super.getInputStream());
164 			}
165 
166 			return in;
167 		}
168 
169 		public synchronized void close() throws IOException {
170 			super.close();
171 		}
172 	}
173 
174 	/***
175 	 * Server socket that handles both signalling and RMI calls. If a call comes
176 	 * in with protocol magic in its first bytes, that call is taken to be a
177 	 * protocol signal and is handled appropriately.
178 	 * <p>
179 	 * 
180 	 * Otherwise, the call is assumed to be an RMI call, and it is passed
181 	 * through to the "accepting" RMI service.
182 	 */
183 	class TwoWayServerSocket extends ServerSocket {
184 		public TwoWayServerSocket(int port) throws IOException {
185 			super(port);
186 		}
187 
188 		public Socket accept() throws IOException {
189 			Socket sock = null;
190 
191 			for (;;) {
192 				sock = new BufferedSocket();
193 				implAccept(sock);
194 
195 				InputStream in = sock.getInputStream();
196 				in.mark(TwoWay.MAX_MESSAGE_LENGTH);
197 
198 				DataInputStream dis = new DataInputStream(in);
199 				DataOutputStream dos = new DataOutputStream(sock
200 						.getOutputStream());
201 
202 				int magic = dis.readInt();
203 				if (magic != TwoWay.PROTOCOL_MAGIC) {
204 					in.reset();
205 					break;
206 				}
207 
208 				int opcode = dis.readInt();
209 
210 				if (opcode == TwoWay.REGISTER_CALLBACK_SOCKET_SOURCE) {
211 					byte[] address = new byte[4];
212 					dis.read(address, 0, 4);
213 
214 					addRequestSocket(address, sock);
215 
216 					// Tell the client our real address and port
217 					// which may be different from the address
218 					// and port used to contact us if we are
219 					// on the other side of a firewall.
220 					dos.writeInt(TwoWay.RETURN_SERVER_ENDPOINT_INFO);
221 					dos.write(InetAddress.getLocalHost().getAddress());
222 					dos.writeInt(sock.getLocalPort());
223 					dos.flush();
224 					continue;
225 				} else if (opcode == TwoWay.RETURN_CALLBACK_SOCKET) {
226 					byte[] address = new byte[4];
227 					dis.read(address, 0, 4);
228 					int port = dis.readInt();
229 
230 					SocketPool pool = (SocketPool) socketPools.get(EndpointInfo
231 							.getEndpointString(address, port));
232 					pool.addSocket(sock);
233 
234 					continue;
235 				}
236 
237 			}
238 
239 			return sock;
240 		}
241 	}
242 	/***
243 	 * Returns a server socket for handling the two-way protocol.
244 	 * 
245 	 * @param port
246 	 *            Port on which the socket will listen.
247 	 */
248 	public ServerSocket createServerSocket(int port) throws IOException {
249 		return new TwoWayServerSocket(port);
250 	}
251 }