1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package twcsckernel.projectbase.rmi.socketfactory;
16
17 import java.rmi.server.*;
18 import java.net.*;
19 import java.io.*;
20 import java.util.*;
21
22 /***
23 * Server side socket factory which allows the server to make callbacks to a
24 * client whose address is unreachable from the server (as long as the server is
25 * reachable from the client).
26 *
27 * @author Tim Taylor
28 */
29 public class ServerTwoWaySocketFactory extends RMISocketFactory {
30 /*** Socket pools where sockets requested from client are returned. */
31 private Map<String, SocketPool> socketPools = Collections
32 .synchronizedMap(new HashMap<String, SocketPool>());
33
34 /*** Map of request streams to clients. */
35 private Map<String, DataOutputStream> requestStreams = Collections
36 .synchronizedMap(new HashMap<String, DataOutputStream>());
37
38 private int timeout;
39
40 /***
41 * Dla inicjacji połączenia do klienta będzie stosowany wskazany timeout.
42 * Jeśli połączenie zostanie ustawione poprawnie timeout jest ustawiany na
43 * zero.
44 *
45 * @param timeout -
46 * timeout łączenia z klientem
47 */
48 public ServerTwoWaySocketFactory(int timeout) {
49 if (timeout < 0)
50 timeout = 0;
51 this.timeout = timeout;
52 }
53
54 /***
55 * Add a request <code>socket</code> to the factory. The factory will use
56 * this <code>socket</code> to request a callback socket from the host of
57 * the given <code>address</code>.
58 * <p>
59 *
60 * @param address
61 * Address of the client
62 * @param socket
63 * Socket on which to request callback sockets from the client.
64 */
65 public void addRequestSocket(final byte[] address, Socket socket)
66 throws IOException {
67
68 final DataOutputStream requestOutStream = new DataOutputStream(socket
69 .getOutputStream());
70 final DataInputStream requestInStream = new DataInputStream(socket
71 .getInputStream());
72
73
74
75
76
77 new Thread() {
78 public void run() {
79 try {
80 for (;;) {
81
82 int port = requestInStream.readInt();
83 String endpoint = EndpointInfo.getEndpointString(
84 address, port);
85
86 System.out.println("&&& " + endpoint);
87
88 requestStreams.put(endpoint, requestOutStream);
89 }
90 } catch (IOException e) {
91 }
92 }
93 }.start();
94 }
95
96 /***
97 * Create a socket connectoin to the host of the given <code>address
98 * </code>
99 * and <code>port</code>. If the <code>address</code> is for a host
100 * that has provided a callback socket request signalling channel, then that
101 * channel is used to request that the client create the socket. Otherwise a
102 * socket is created directly to the client from the server.
103 *
104 * @param address
105 * Address for the socket.
106 * @param port
107 * Port for the socket.
108 */
109 public Socket createSocket(String address, int port) throws IOException {
110 String endpoint = EndpointInfo.getEndpointString(address, port);
111
112 DataOutputStream requestStream = (DataOutputStream) requestStreams
113 .get(endpoint);
114
115 if (requestStream == null) {
116 try {
117
118 Socket sock = new Socket();
119 sock.connect(new InetSocketAddress(address, port), timeout);
120
121 if (timeout > 0)
122 sock.setSoTimeout(0);
123 return sock;
124 } catch (IOException ioe) {
125 }
126 }
127
128 SocketPool requestPool;
129
130 synchronized (socketPools) {
131 requestPool = (SocketPool) socketPools.get(endpoint);
132
133 if (requestPool == null) {
134 requestPool = new SocketPool();
135 socketPools.put(endpoint, requestPool);
136 }
137 }
138
139 requestStream.writeInt(TwoWay.PROTOCOL_MAGIC);
140 requestStream.writeInt(TwoWay.REQUEST_CALLBACK_SOCKET);
141 requestStream.writeInt(port);
142 requestStream.flush();
143
144 return requestPool.getSocket();
145 }
146
147 /***
148 * Private socket class with a buffered input stream. This allows us to use
149 * <code>mark</code> and <code>reset</code> in order to run signalling,
150 * RMI client sockets, and callback sockets through the same sets of
151 * listening ports.
152 */
153 private static class BufferedSocket extends Socket {
154 Socket sock;
155 InputStream in;
156
157 BufferedSocket() throws IOException {
158 super();
159 }
160
161 public InputStream getInputStream() throws IOException {
162 if (in == null) {
163 in = new BufferedInputStream(super.getInputStream());
164 }
165
166 return in;
167 }
168
169 public synchronized void close() throws IOException {
170 super.close();
171 }
172 }
173
174 /***
175 * Server socket that handles both signalling and RMI calls. If a call comes
176 * in with protocol magic in its first bytes, that call is taken to be a
177 * protocol signal and is handled appropriately.
178 * <p>
179 *
180 * Otherwise, the call is assumed to be an RMI call, and it is passed
181 * through to the "accepting" RMI service.
182 */
183 class TwoWayServerSocket extends ServerSocket {
184 public TwoWayServerSocket(int port) throws IOException {
185 super(port);
186 }
187
188 public Socket accept() throws IOException {
189 Socket sock = null;
190
191 for (;;) {
192 sock = new BufferedSocket();
193 implAccept(sock);
194
195 InputStream in = sock.getInputStream();
196 in.mark(TwoWay.MAX_MESSAGE_LENGTH);
197
198 DataInputStream dis = new DataInputStream(in);
199 DataOutputStream dos = new DataOutputStream(sock
200 .getOutputStream());
201
202 int magic = dis.readInt();
203 if (magic != TwoWay.PROTOCOL_MAGIC) {
204 in.reset();
205 break;
206 }
207
208 int opcode = dis.readInt();
209
210 if (opcode == TwoWay.REGISTER_CALLBACK_SOCKET_SOURCE) {
211 byte[] address = new byte[4];
212 dis.read(address, 0, 4);
213
214 addRequestSocket(address, sock);
215
216
217
218
219
220 dos.writeInt(TwoWay.RETURN_SERVER_ENDPOINT_INFO);
221 dos.write(InetAddress.getLocalHost().getAddress());
222 dos.writeInt(sock.getLocalPort());
223 dos.flush();
224 continue;
225 } else if (opcode == TwoWay.RETURN_CALLBACK_SOCKET) {
226 byte[] address = new byte[4];
227 dis.read(address, 0, 4);
228 int port = dis.readInt();
229
230 SocketPool pool = (SocketPool) socketPools.get(EndpointInfo
231 .getEndpointString(address, port));
232 pool.addSocket(sock);
233
234 continue;
235 }
236
237 }
238
239 return sock;
240 }
241 }
242 /***
243 * Returns a server socket for handling the two-way protocol.
244 *
245 * @param port
246 * Port on which the socket will listen.
247 */
248 public ServerSocket createServerSocket(int port) throws IOException {
249 return new TwoWayServerSocket(port);
250 }
251 }