分布式系统常用技术及案例分析(第2版)
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

1.3 通信

进程间的通信是一切分布式系统的核心。如果没有通信机制,分布式系统的各个子系统将是“一盘散沙”,毫无作用。本节介绍常用的通信方式。

1.3.1 网络I/O模型的演进

1.同步和异步

同步和异步描述的是用户线程与内核的交互方式:

同步是指用户线程发起I/O请求后需要等待,或者轮询内核I/O操作完成后才能继续执行;

异步是指用户线程发起I/O请求后仍继续执行,当内核I/O操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

2.阻塞和非阻塞

阻塞和非阻塞描述的是用户线程调用内核I/O操作的方式:

阻塞是指I/O操作需要彻底完成后才返回用户空间;

非阻塞是指I/O操作被调用后立即返回给用户一个状态值,无须等到I/O操作彻底完成。

一个I/O操作其实分成了两个步骤:发起I/O请求和实际的I/O操作。

阻塞I/O和非阻塞I/O的区别在于第一步,也就是发起I/O请求是否会被阻塞。如果阻塞直到完成,就是传统的阻塞I/O,如果不阻塞,就是非阻塞I/O。

同步I/O和异步I/O的区别在于第二个步骤是否阻塞,如果实际的I/O读写阻塞请求进程,就是同步I/O。

3.UNIX I/O模型

UNIX下共有5种I/O模型:

● 阻塞I/O;

● 非阻塞I/O;

● I/O复用(select和poll);

● 信号驱动I/O(SIGIO);

● 异步I/O(Posix.1的aio_系列函数)。

注:读者若想深入了解UNIX的网络知识,推荐阅读W.Richard Stevens的UNIX Network Programming, Volume 1, Second Edition: Networking APIs: Sockets and XTI。本节只简单介绍了这5种模型,文中的图例也引用自该书。

阻塞I/O模型

请求无法立即完成则保持阻塞。

阶段1:等待数据就绪。网络I/O的情况就是等待远端数据陆续抵达;磁盘I/O的情况就是等待磁盘数据从磁盘上读取到内核态内存中。

阶段2:数据复制。出于系统安全考虑,用户态的程序没有权限直接读取内核态内存,因此内核负责把内核态内存中的数据复制一份到用户态内存中。

阻塞I/O模型如图1-4所示。

图1-4 阻塞I/O模型

本节中将recvfrom函数视为系统调用。一般recvfrom函数的实现都有一个从应用程序进程中运行到内核中运行的切换,一段时间后再跟一个返回应用进程的切换。

在图1-4中,进程阻塞的整段时间是指从调用recvfrom函数开始到它返回的这段时间,当进程返回成功提示时,应用进程开始处理数据报。

非阻塞I/O模型

● socket设置为NONBLOCK(非阻塞)就是告诉内核,当所请求的I/O操作无法完成时,不要让进程进入睡眠状态,而是立刻返回一个错误码(EWOULDBLOCK),这样请求就不会阻塞;

● I/O操作函数将不断地测试数据是否已经准备好,如果没有准备好,则继续测试,直到数据准备好为止。在整个I/O请求的过程中,虽然用户线程每次发起I/O请求后可以立即返回,但是为了等到数据,仍需轮询、重复请求,而这是对CPU时间的极大浪费。

● 数据准备好了,从内核复制到用户空间。

非阻塞I/O模型如图1-5所示。

图1-5 非阻塞I/O模型

一般很少直接使用这种模型,而是在其他I/O模型中使用非阻塞I/O这一特性。这种方式对单个I/O请求的意义不大,但给I/O复用铺平了道路。

I/O复用模型

I/O复用会用到select或poll函数,在这两个系统调用中的某一个上阻塞,而不是阻塞于真正的I/O系统调用。函数也会使进程阻塞,但和阻塞I/O不同的是,这两个函数可以同时阻塞多个I/O操作。而且可以同时对多个读操作、多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操作函数。

I/O复用模型如图1-6所示。

图1-6 I/O复用模型

从流程上看,使用select函数进行I/O请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select函数最大的优势是用户可以在一个线程内同时处理多个socket的I/O请求。用户可以注册多个socket,然后不断地调用select来读取被激活的socket,达到在同一个线程内同时处理多个I/O请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

I/O复用模型使用Reactor设计模式实现了这一机制。

调用select或poll函数的方法由一个用户态线程负责轮询多个socket,直到阶段1的数据就绪,再通知实际的用户线程执行阶段2的复制操作。通过一个专职的用户态线程执行非阻塞I/O轮询,模拟实现阶段1的异步化。

信号驱动I/O(SIGIO)模型

首先,我们允许socket进行信号驱动I/O,并通过调用sigaction来安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好后,进程会收到一个SIGIO信号,可以在信号处理函数中调用recvfrom来读取数据报,并通知主循环数据已准备好被处理,也可以通知主循环,让它来读取数据报。

信号驱动I/O(SIGIO)模型如图1-7所示。

图1-7 信号驱动I/O(SIGIO)模型

异步I/O模型

异步I/O是POSIX规范定义的。通常,这些函数会通知内核来启动操作并在整个操作(包括从内核复制数据到我们的缓存中)完成时通知我们。

该模型与信号驱动I/O(SIGIO)模型的不同点在于,信号驱动I/O(SIGIO)模型告诉我们I/O操作何时可以启动,而异步I/O模型告诉我们I/O操作何时完成。

调用aio_read函数,告诉内核传递描述字、缓存区指针、缓存区大小和文件偏移,然后立即返回,我们的进程不阻塞于等待I/O操作的完成。当内核将数据复制到缓存区后,才会生成一个信号来通知应用程序。

异步I/O模型如图1-8所示。

图1-8 异步I/O模型

异步I/O模型使用Proactor设计模式实现了这一机制。有关“Proactor设计模式”可以参阅https://en.wikipedia.org/wiki/Proactor_pattern。

异步I/O模型告知内核:当整个过程(包括阶段1和阶段2)全部完成时,通知应用程序来读数据。

几种I/O模型的比较

前4种模型的区别是阶段1不相同,阶段2基本相同,都是将数据从内核复制到调用者的缓存区。而异步I/O的两个阶段都不同于前4个模型。几种I/O模型的比较如图1-9所示。

图1-9 几种I/O模型的比较

同步I/O操作引起请求进程阻塞,直到I/O操作完成。异步I/O操作不引起请求进程阻塞。上面前4个模型——阻塞I/O模型、非阻塞I/O模型、I/O复用模型和信号驱动I/O模型都是同步I/O模型,而异步I/O模型才是真正的异步I/O。

4.常见Java I/O模型

在了解了UNIX的I/O模型之后,就能明白其实Java的I/O模型也是类似的。

“阻塞I/O”模型

下面的EchoServer是一个简单的阻塞I/O例子,服务器启动后,等待客户端连接。在客户端连接服务器后,服务器就阻塞读写数据流。

EchoServer代码:

public class EchoServer {
    public static int DEFAULT_PORT = 7;

    public static void main(String[] args) throws IOException {

      int port;

      try {
          port = Integer.parseInt(args[0]);
      } catch (RuntimeException ex) {
          port = DEFAULT_PORT;
      }

      try (
          ServerSocket serverSocket =
              new ServerSocket(port);
          Socket clientSocket = serverSocket.accept();
          PrintWriter out =
              new PrintWriter(clientSocket.getOutputStream(), true);
          BufferedReader in = new BufferedReader(
              new InputStreamReader(clientSocket.getInputStream()));
      ) {
          String inputLine;
          while ((inputLine = in.readLine()) ! = null) {
              out.println(inputLine);
          }
      } catch (IOException e) {
            System.out.println("Exception caught when trying to listen
            on port "+ port + " or listening for a connection");
            System.out.println(e.getMessage());
      }
  }
}

改进为“阻塞I/O+多线程”模型

使用多线程来支持多个客户端访问服务器。

主线程MultiThreadEchoServer.java:

public class MultiThreadEchoServer {
    public static int DEFAULT_PORT = 7;

    public static void main(String[] args) throws IOException {

      int port;

      try {
          port = Integer.parseInt(args[0]);
      } catch (RuntimeException ex) {
          port = DEFAULT_PORT;
      }
      Socket clientSocket = null;
      try (ServerSocket serverSocket = new ServerSocket(port); ) {
          while (true) {
              clientSocket = serverSocket.accept();

              // 多线程
              new Thread(new EchoServerHandler(clientSocket)).start();
          }
      } catch (IOException e) {
          System.out.println(
                  "Exception caught when trying to listen
                    on port " + port + " or listening for a connection");
          System.out.println(e.getMessage());
      }
    }
}

处理器类EchoServerHandler.java:

public class EchoServerHandler implements Runnable {
    private Socket clientSocket;

    public EchoServerHandler(Socket clientSocket) {
      this.clientSocket = clientSocket;
    }

    @Override
    public void run() {
      try(PrintWriter out=new PrintWriter(clientSocket.getOutputStream(), true);
              BufferedReader in = new BufferedReader(new InputStreamReader (clientSocket.getInputStream())); ) {

          String inputLine;
          while ((inputLine = in.readLine()) ! = null) {
              out.println(inputLine);
          }
      } catch (IOException e) {
          System.out.println(e.getMessage());
      }
    }
}

存在问题:每次收到新的连接都要新建一个线程,处理完后销毁线程,代价大。当有大量的短连接出现时,性能比较低。

改进为“阻塞I/O+线程池”模型

针对上面多线程的模型中出现的线程重复创建、销毁带来的开销问题,可以采用线程池来优化。每次收到新连接后从池中取一个空闲线程进行处理,处理完后再放回池中,重用线程避免了频繁地创建和销毁线程带来的开销。

主线程ThreadPoolEchoServer.java:

public class ThreadPoolEchoServer {
    public static int DEFAULT_PORT = 7;

    public static void main(String[] args) throws IOException {
      int port;

      try {
          port = Integer.parseInt(args[0]);
      } catch (RuntimeException ex) {
          port = DEFAULT_PORT;
      }
      ExecutorService threadPool = Executors.newFixedThreadPool(5);
      Socket clientSocket = null;
      try (ServerSocket serverSocket = new ServerSocket(port); ) {
          while (true) {
            clientSocket = serverSocket.accept();

            // 线程池
            threadPool.submit(new Thread(new EchoServerHandler(clientSocket)));
          }
      } catch (IOException e) {
          System.out.println(
                "Exception caught when trying to listen
                    on port " + port + " or listening for a connection");
          System.out.println(e.getMessage());
      }
  }
}

存在问题:在大量短连接的场景中性能会提升,因为不用每次都创建和销毁线程,而是重用连接池中的线程。但在大量长连接的场景中,因为线程被连接长期占用,不需要频繁地创建和销毁线程,所以没有什么优势。

虽然这种方法适用于小到中等规模的客户端的并发数,但是如果连接数超过100000,那么性能将很不理想。

改进为“非阻塞I/O”模型

“阻塞I/O+线程池”模型虽然比“阻塞I/O+多线程”模型在性能方面有所提升,但这两种模型存在一个共同的问题:读和写操作都是同步阻塞的,面对高并发(持续大量连接同时请求)的场景,需要消耗大量的线程来维持连接。CPU在大量的线程之间频繁切换,性能损耗很大。一旦单机的连接数超过1万,甚至达到几万,服务器的性能会急剧下降。

而NIO的Selector却很好地解决了这个问题,用主线程(一个线程或CPU个数的线程)保持所有的连接,管理和读取客户端连接的数据,将读取的数据交给后面的线程池处理,线程池处理完业务逻辑后,将结果交给主线程发送响应给客户端,少量的线程就可以处理大量连接的请求。

Java NIO由以下几个核心部分组成:

● Channel;

● Buffer;

● Selector。

要使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,比如新连接进来、数据接收等事件。

主线程NonBlokingEchoServer.java:

public class NonBlokingEchoServer {
    public static int DEFAULT_PORT = 7;

    public static void main(String[] args) throws IOException {

      int port;

      try {
          port = Integer.parseInt(args[0]);
      } catch (RuntimeException ex) {
          port = DEFAULT_PORT;
      }
      System.out.println("Listening for connections on port " + port);

      ServerSocketChannel serverChannel;
      Selector selector;
      try {
          serverChannel = ServerSocketChannel.open();
          InetSocketAddress address = new InetSocketAddress(port);
          serverChannel.bind(address);
          serverChannel.configureBlocking(false);
          selector = Selector.open();
          serverChannel.register(selector, SelectionKey.OP_ACCEPT);
      } catch (IOException ex) {
          ex.printStackTrace();
          return;
      }

      while (true) {
          try {
              selector.select();
          } catch (IOException ex) {
              ex.printStackTrace();
              break;
          }
          Set<SelectionKey> readyKeys = selector.selectedKeys();
          Iterator<SelectionKey> iterator = readyKeys.iterator();
          while (iterator.hasNext()) {
              SelectionKey key = iterator.next();
              iterator.remove();
              try {
                if (key.isAcceptable()) {
                    ServerSocketChannel  server  =  (ServerSocketChannel) key.channel();
                    SocketChannel client = server.accept();
                    System.out.println("Accepted connection from " + client);
                    client.configureBlocking(false);
                    SelectionKey clientKey = client.register(selector,
                            SelectionKey.OP_WRITE | SelectionKey.OP_READ);
                    ByteBuffer buffer = ByteBuffer.allocate(100);
                    clientKey.attach(buffer);
                }
                if (key.isReadable()) {
                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer output = (ByteBuffer) key.attachment();
                    client.read(output);
                }
                if (key.isWritable()) {
                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer output = (ByteBuffer) key.attachment();
                    output.flip();
                    client.write(output);
                    output.compact();
                }
              } catch (IOException ex) {
                key.cancel();
                try {
                    key.channel().close();
                } catch (IOException cex) {
                }
              }
          }
      }

  }
}

改进为“异步I/O”模式

Java SE 7之后的版本,引入了对异步I/O(NIO.2)的支持,为构建高性能的网络应用提供了一个利器。

主线程AsyncEchoServer.java:

public class AsyncEchoServer {

    public static int DEFAULT_PORT = 7;

    public static void main(String[] args) throws IOException {
      int port;

      try {
          port = Integer.parseInt(args[0]);
      } catch (RuntimeException ex) {
          port = DEFAULT_PORT;
      }

      ExecutorService taskExecutor = Executors.newCachedThreadPool Executors.defaultThreadFactory());

      // 创建异步服务器socket channel并绑定到默认组
      try (AsynchronousServerSocketChannel asynchronousServerSocketChannel= AsynchronousServerSocketChannel.open()) {
          if (asynchronousServerSocketChannel.isOpen()) {

            // 设置一些参数
            asynchronousServerSocketChannel.setOption (StandardSocketOptions.SO_RCVBUF, 4 * 1024);
            asynchronousServerSocketChannel.setOption (StandardSocketOptions.SO_REUSEADDR, true);

            // 绑定服务器socket channel到本地地址
            asynchronousServerSocketChannel.bind(new  InetSocketAddress (port));

            // 显示等待客户端的信息
            System.out.println("Waiting for connections ...");
            while (true) {
                Future<AsynchronousSocketChannel>asynchronousSocketChannelFuture = asynchronousServerSocketChannel
                        .accept();
                try {
                    final AsynchronousSocketChannel asynchronousSocketChannel= asynchronousSocketChannelFuture
                          .get();
                    Callable<String> worker = new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                          String host = asynchronousSocketChannel .getRemoteAddress().toString();
                          System.out.println("Incoming connection from:"+ host);
                          final ByteBuffer buffer = ByteBuffer .allocateDirect(1024);

                          // 发送数据
                          while (asynchronousSocketChannel.read (buffer).get() ! = -1) {
                              buffer.flip();
                              asynchronousSocketChannel.write(buffer).get();
                              if (buffer.hasRemaining()) {
                                  buffer.compact();
                              } else {
                                  buffer.clear();
                              }
                          }
                          asynchronousSocketChannel.close();
                          System.out.println(host+"was successfully served! ");
                          return host;
                        }
                    };
                    taskExecutor.submit(worker);
                } catch (InterruptedException | ExecutionException ex) {
                    System.err.println(ex);
                    System.err.println("\n Server is shutting down ...");

                    // 执行器不再接收新线程并完成队列中所有的线程
                    taskExecutor.shutdown();

                    // 等待所有线程完成
                    while (! taskExecutor.isTerminated()) {
                    }
                    break;
                }
            }
          } else {
            System.out.println("The asynchronous server-socket channel cannot be opened! ");
          }
      } catch (IOException ex) {
          System.err.println(ex);
      }
  }
}

1.3.2 远程过程调用(RPC)

1.进程间通信(IPC)

进程间通信(Inter-Process Communication, IPC)指至少两个进程或线程间传送数据或信号的一些技术或方法。进程是计算机系统分配资源的最小单位。每个进程都有自己的一部分独立的系统资源,彼此是隔离的。为了使不同的进程互相访问资源并协调工作,才有了进程间通信。这些进程可以运行在同一计算机上或有网络连接的不同计算机上。进程间通信技术包括消息传递、同步、共享内存和远程过程调用。IPC是一种标准的UNIX通信机制。

2.过程调用的类型

在讨论客户机/服务器(C/S)模型和过程调用时,主要有三种不同类型的过程调用。

● 本地过程调用(Local Procedure Call, LPC):指被调用的过程(函数)与调用过程处于同一个进程中。典型的情况是,调用者通过执行某条机器指令把控制传给新过程,被调用过程保存机器寄存器的值,并在栈顶分配存放其本地变量的空间。

● 同主机间的远程过程调用(Remote Procedure Call, RPC):指被调用的过程与调用过程处于不同的进程中,但同属于一台主机。

● 不同主机间的远程过程调用:指一台主机上的某个客户调用另外一台主机上的某个服务器的过程。

3.什么是远程过程调用

RPC是Remote Procedure Call(远程过程调用)的缩写形式。Birrell和Nelson在1984发表于ACM Transactions on Computer Systems的论文Implementing Remote Procedure Calls对RPC做了经典的诠释。RPC是指计算机A上的进程,调用计算机B上的进程,其中A上的调用进程被挂起,而B上的被调用进程开始执行,当值返回A时,A上的进程继续执行。调用方可以使用参数将信息传送给被调用方,而后可以通过传回的结果得到信息。这一过程对于开发人员来说是透明的。

远程过程调用采用客户机/服务器(C/S)模式。请求程序就是一个客户机,服务提供程序就是一台服务器。和常规或本地过程调用一样,远程过程调用是同步操作,在远程过程结果返回之前,需要暂时中止请求程序。使用相同地址空间的低权进程或低权线程允许同时运行多个远程过程调用。

图1-10描述了并发环境下RPC的调用过程。

图1-10 并发环境下RPC的调用过程

4.RPC的基本操作

让我们看看本地过程调用是如何实现的。考虑下面的C语言的调用:

count = read(fd, buf, nbytes);

fd为一个整型数,表示一个文件。buf为一个字符数组,用于存储读入的数据。nbytes为另一个整型数,用于记录实际读入的字节数。如果该调用位于主程序中,那么在调用之前堆栈的状态如图1-11(a)所示。为了进行调用,调用方首先把参数反序压入堆栈,即最后一个参数先压入,如图1-11(b)所示。在read操作运行完毕后,它将返回值放在某个寄存器中,移出返回地址,并将控制权交回给调用方。调用方随后将参数从堆栈中移出,使堆栈还原到最初的状态。

图1-11 过程调用中的参数传递

RPC背后的思想是尽量使远程过程调用具有与本地调用相同的形式。假设程序需要从某个文件中读取数据,程序员将在代码中执行read调用来取得数据。在传统的系统中,read例程由链接器从库中提取出来,然后链接器再将它插入目标程序中。read过程是一个短过程,一般通过执行一个等效的read系统调用来实现,即read过程是一个位于用户代码与本地操作系统之间的接口。

虽然read中执行了系统调用,但它本身依然是通过将参数压入堆栈的常规方式实现调用的。如图1-11(b)所示,程序员并不知道read干了什么。

RPC通过类似的途径来获得透明性。当read实际上是一个远程过程时(比如在文件服务器所在的机器上运行的过程),库中就放入read的另外一个版本,称为客户存根(client stub)。这种版本的read过程同样遵循图1-11(b)的调用次序,这点与原来的read过程相同。另一个相同点是其中也执行了本地操作系统调用。唯一不同的是,它不要求操作系统提供数据,而是将参数打包成消息,而后请求将此消息发送到服务器,如图1-12所示。在调用send后,客户存根调用receive过程,随即阻塞自己,直到收到响应消息。

图1-12 客户与服务器之间的RPC原理

当消息到达服务器时,服务器上的操作系统将它传递给服务器存根(server stub)。服务器存根是客户存根在服务器端的等价物,也是一段代码,用来将通过网络输入的请求转换为本地过程调用。服务器存根一般先调用receive,然后被阻塞,等待消息输入。收到消息后,服务器将参数从消息中提取出来,然后以常规方式调用服务器上的相应过程。从服务器的角度看,过程好像是由客户直接调用的一样:参数和返回地址都位于堆栈中,一切都很正常。服务器执行所要求的操作,随后将得到的结果以常规的方式返回给调用方。以read为例,服务器将用数据填充read中第二个参数指向的缓存区,该缓存区是服务器存根内部的。

调用完后,服务器存根要将控制权交回给客户发出调用的过程,它将结果(缓存区)打包成消息,随后调用send将结果返回给客户。事后,服务器存根一般会再次调用receive,等待下一个输入的请求。

客户端接收到消息后,客户操作系统发现该消息属于某个客户进程(实际上该进程是客户存根,只是操作系统无法区分二者)。操作系统将消息复制到相应的缓存区中,随后解除对客户进程的阻塞。客户存根检查该消息,将结果提取出来并复制给调用者,而后以通常的方式返回。当调用者在read调用进行完毕后重新获得控制权时,它所知道的唯一事情就是已经得到了所需的数据。它不知道操作是在本地操作系统中进行的,还是远程完成的。

在整个方法中,客户端可以简单地忽略不关心的内容。客户端涉及的操作只是执行普通的(本地)过程调用来访问远程服务,它并不需要直接调用send和receive。消息传递的所有细节都隐藏在双方的库过程中,就像传统库隐藏了执行实际系统调用的细节一样。

概括来说,远程过程调用包含如下步骤:

(1)客户过程以正常的方式调用客户存根。

(2)客户存根生成一个消息,然后调用本地操作系统。

(3)客户端操作系统将消息发送给远程操作系统。

(4)远程操作系统将消息交给服务器存根。

(5)服务器存根将参数提取出来,而后调用服务器。

(6)服务器执行要求的操作,操作完成后将结果返回服务器存根。

(7)服务器存根将结果打包成一个消息,而后调用本地操作系统。

(8)服务器操作系统将含有结果的消息发送给客户端操作系统。

(9)客户端操作系统将消息交给客户存根。

(10)客户存根将结果从消息中提取出来,返回调用它的客户存根。

以上步骤客户过程将客户存根发出的本地调用转换成对服务器过程的本地调用,而客户端和服务器都不会意识到中间步骤的存在。

RPC的主要好处是双重的。首先,程序员可以使用过程调用语义来调用远程函数并获取响应。其次,简化了编写分布式应用程序的难度,因为RPC隐藏了所有的网络代码存根函数。应用程序不必担心一些细节,比如socket、端口号,以及数据的转换和解析。在OSI参考模型中,RPC跨越了会话层和表示层。

5.实现远程过程调用

要实现远程过程调用,需要考虑以下几个问题。

1)如何传递参数

传递值参数

传递值参数比较简单,图1-13是一个简单的通过RPC进行远程计算的例子。其中,远程过程add(i, j)有两个参数i和j,其结果是返回i和j的算术和。

图1-13 通过RPC进行远程计算的步骤

通过RPC进行远程计算的步骤如下:

(1)将参数放入消息中,并在消息中添加要调用的过程的名称或编码。

(2)消息到达服务器后,服务器存根对该消息进行分析,以判明需要调用哪个过程,随后执行相应的调用。

(3)服务器运行完毕,服务器存根将服务器得到的结果打包成消息返回客户存根,客户存根将结果从消息中提取出来,把结果值返回客户端。

当然,这里只是做了简单的演示,在实际的分布式系统中,还需要考虑其他情况,因为不同的机器对于数字、字符和其他类型的数据项的表示方式常有差异,比如,整数型就有Big Endian和Little Endian之分。

传递引用参数

传递引用参数相对来说比较困难。单纯传递参数的引用(也包含指针)是完全没有意义的,因为引用地址传递给远程计算机,其指向的内存位置可能跟远程系统上的内存位置完全不同。如果你想支持传递引用参数,则必须发送参数的副本,将它们放置在远程系统内存中,向它们传递一个指向服务器函数的指针,然后将对象发送回客户端,复制它的引用。如果远程过程调用必须支持引用复杂的结构,比如树和链表,则它们需要将结构复制到一个无指针的表示里面(比如,一个扁平的树),并传输到远程系统来重建数据结构。

2)如何表示数据

在本地系统中不存在数据不相容的问题,因为数据格式总是相同的。而在分布式系统中,不同的远程机器上可能有不同的字节顺序、不同大小的整数,以及不同的浮点表示。对于RPC,如果想与异构系统通信,就需要一个“标准”来对所有数据类型进行编码,并可以作为参数传递。例如,ONC RPC使用XDR(eXternal Data Representation)格式。这些数据表示格式可以使用隐式或显式类型。隐式类型指只传递值,而不传递变量的名称或类型。常见的例子是ONC RPC的XDR和DCE RPC的NDR。显式类型指需要传递每个字段的类型和值。常见的例子是ISO标准ASN.1(Abstract Syntax Notation)、JSON(JavaScript Object Notation)、Google Protocol Buffers,以及各种基于XML的数据表示格式。

3)如何选用传输协议

有些实现只允许使用一个协议(例如,TCP)。大多数RPC实现支持几个协议,并允许用户选择。

4)出错时会发生什么

相比于本地过程调用,远程过程调用出错的机会更多。由于本地过程调用没有过程调用失败的概念,项目使用远程过程调用必须准备测试远程过程调用的失败或捕获异常。

5)远程调用的语义是什么

调用一个普通的过程语义很简单:当我们调用时,过程被执行。远程过程完全一次性调用成功是非常难以实现的。执行远程过程可以有如下结果:

● 如果服务器崩溃或进程在运行服务器代码之前就死了,那么远程过程会被执行0次。

● 如果一切工作正常,则远程过程会被执行1次。

● 如果服务器返回服务器存根后在发送响应前就崩溃了,则远程过程会被执行1次或多次。客户端收不到返回的响应,可以决定再试一次,因此会出现多次执行函数的情况。如果没有则再试一次,函数执行1次。

● 如果客户机超时和重新传输,那么远程过程会被执行多次。也有可能原始请求延迟了,两者都可能执行或不执行。

RPC系统通常会提供至少一次或最多一次的语义,或者在两者之间选择。如果需要了解应用程序的性质和远程过程的功能是否安全,则可以通过多次调用同一个函数来验证。如果一个函数可以运行任何次而不影响结果,则这是幂等(idempotent)函数,如每天的时间、数学函数、读取静态数据等。否则,它是一个非幂等(nonidempotent)函数,如添加或修改一个文件。

6)远程调用的性能怎么样

毫无疑问,一个远程过程调用将比常规的本地过程调用慢得多,因为产生了额外的步骤及网络传输本身存在延迟。然而,这并不应该阻止我们使用远程过程调用。

7)远程调用安全吗

使用RPC,我们必须关注各种安全问题:

● 客户端发送消息到远程过程,这个过程是可信的吗?

● 客户端发送消息到远程计算机,这个远程机器是可信的吗?

● 服务器如何验证接收的消息来自合法的客户端?服务器如何识别客户端?

● 消息在网络中传播时如何防止被其他进程嗅探?

● 如何防止消息在客户端和服务器的网络传播中被其他进程拦截和修改?

● 协议能防止重播攻击吗?

● 如何防止消息在网络传播中被意外损坏或截断?

6.远程过程调用的优点

远程过程调用有诸多优点:

● 不必担心传输地址问题。服务器可以绑定到任何可用的端口,然后用RPC名称服务来注册端口。客户端将通过该名称服务来找到对应的端口号所需的程序。而这一切对于程序员来说是透明的。

● 系统可以独立于传输提供者。自动生成服务器存根使其可以在系统上的任何一个传输提供者上可用,包括TCP和UDP,而这些,客户端是可以动态选择的。当代码发送以后,接收消息是自动生成的,而不需要额外的代码。

● 应用程序在客户端只需要知道一个传输地址——名称服务,负责告诉应用程序去哪里连接服务器函数集。

● 使用函数调用模型来代替socket的发送/接收(读/写)接口。用户不需要处理参数的解析。

7.远程过程调用API

任何RPC实现都需要提供一组支持库。

名称服务操作:注册和查找绑定信息(端口、机器)。允许一个应用程序使用动态端口(操作系统分配的)。

绑定操作:使用适当的协议建立客户机/服务器通信(建立通信端点)。

终端操作:注册端点信息(协议、端口号、机器名)到名称服务并监听过程调用请求。这些函数通常被自动生成的主程序——服务器存根(骨架)所调用。

安全操作:系统应该提供机制来保证客户端和服务器之间能够相互验证,为两者提供一个安全的通信通道。

国际化操作(可能):目前,有一小部分RPC包支持转换包括时间格式、货币格式和特定语言的字符串的功能。

封送处理/数据转换操作:函数将数据序列化为一个普通的的字节数组,通过网络进行传递,并能够重建。

存根内存管理和垃圾收集:存根可能需要分配内存来存储参数,特别是模拟引用传递语义。RPC包需要分配和清理任何这样的内存。它们也可能需要为创建网络缓存区而分配内存。RPC包支持对象,RPC系统需要跟踪远程客户端是否仍有引用对象或一个对象是否可以删除。

程序标识操作:允许应用程序访问(或处理)RPC接口集的标识符,这样服务器提供的接口集可以被用来交流和使用。

对象和函数的标识操作:允许将远程函数或远程对象的引用传递给其他进程(并不是所有的RPC系统都支持)。

所以,判断一种通信方式是否是RPC,就看它是否提供了上述的API。

8.远程过程调用发展历程

第一代RPC:Sun公司是第一个提供商业化RPC库和RPC编译器的。在20世纪80年代中期Sun计算机提供RPC,并在Sun Network File System(NFS)上得到支持。该协议被以Sun和AT&T为首的Open Network Computing(开放网络计算)作为一个标准来推动。这是一个非常轻量级的RPC系统,可在大多数POSIX和类POSIX操作系统中使用,包括Linux、SunOS、OS X和各种发布版本的BSD。这样的系统被称为Sun RPC或ONC RPC。该阶段的代表产品还有DCE RPC。

第二代RPC支持对象:面向对象的语言在20世纪80年代末兴起,很明显,当时的Sun ONC和DCE RPC系统都没有提供任何支持,诸如从远程类实例化远程对象、跟踪对象的实例或提供支持多态性。现有的RPC机制虽然可以运作,但它们仍然不支持自动、透明的面向对象编程技术。该阶段的主要产品有微软DCOM(COM+)、CORBA和Java RMI。

第三代RPC及Web Services:传统RPC解决方案可以工作在互联网上,但问题是,它们通常严重依赖动态端口分配,往往要进行额外的防火墙配置。Web Services成为一组协议,允许服务被发布、发现,并用于技术无关的形式,即服务不应该依赖于客户的语言、操作系统或机器架构。该阶段的代表产品有XML-RPC、SOAP、Microsoft .NET Remoting和JAX-WS等。

1.3.3 面向消息的通信

远程过程调用有助于隐藏分布式系统中的通信细节,也就是说增强了访问透明性。但这种机制并不一定适合所有场景,特别是当无法保证发出请求时接收端一定正在执行的情况下,就必须有其他的通信服务。同时RPC的同步特性也会造成客户在发出的请求得到处理之前就被阻塞了,因而有时也需要采取其他的办法。而面向消息的通信就解决了上面提到的种种问题。

面向消息的通信一般由消息队列系统(Message-Queuing System, MQ)或面向消息的中间件(Message-Oriented Middleware, MOM)提供高效可靠的消息传递机制来进行平台无关的数据交流,并可基于数据通信进行分布系统的集成。通过提供消息传递和消息排队模型,可在分布环境下扩展进程间的通信,并支持多种通信协议、语言、应用程序、硬件和软件平台。

通过使用MQ或MOM,通信双方的程序(称其为消息客户程序)可以在不同的时间运行,程序不在网络上直接通话,而是间接地将消息放入MQ或MOM服务器的消息机制中。因为程序间没有直接的联系,所以它们不必同时运行:当消息放入适当的队列时,目标程序不需要正在运行;即使目标程序在运行,也不意味着要立即处理该消息。

消息客户程序之间通过将消息放入消息队列或从消息队列中取出消息来进行通信。客户程序不直接与其他程序通信,避免了网络通信的复杂性。消息队列和网络通信的维护工作由MQ或MOM完成。

常见的MQ或MOM产品有Java Message Service、Apache ActiveMQ、Apache RocketMQ、RabbitMQ、Apache Kafka等,这些产品在第3章中详细讲解。

Java Message Service(JMS)

Java Message Service(JMS)API是一个Java面向消息中间件的API,用于两个或多个客户端之间发送消息。

JMS的目标包括:

● 包含实现复杂企业应用所需要的功能特性;

● 定义企业消息概念和功能的一组通用集合;

● 最小化这些Java程序员必须学习以使用企业消息产品的概念集合;

● 最大化消息应用的可移植性。

JMS支持企业消息产品提供两种主要的消息风格:

● 点对点(Point-to-Point, PTP)消息风格——允许客户端通过一个叫“队列(queue)”的中间抽象发送一个消息给另一个客户端。发送消息的客户端将一个消息发送到指定的队列中,接收消息的客户端从这个队列中抽取消息。

● 发布订阅(Publish/Subscribe, Pub/Sub)消息风格——允许一个客户端通过一个叫“主题(topic)”的中间抽象发送一个消息给多个客户端。发送消息的客户端将一个消息发布到指定的主题中,然后这个消息将被投递到所有订阅了这个主题的客户端。

JMS API

由于历史的原因,JMS提供了四组用于发送和接收消息的接口。

● JMS1.0定义了两组特定领域相关的API,一组用于点对点的消息处理(queue),另一组用于发布订阅的消息处理(topic)。尽管由于向后兼容的理由,这些接口一直被保留在JMS中,但是在以后的API中应该考虑被废弃掉。

● JMS1.1引入了新的统一的一组API,可以同时用于点对点和发布订阅消息模式。这也被称作标准(standard)API。

● JMS2.0引入了一组简化API,它拥有标准API的全部特性,同时接口更少、使用更方便。

以上每组API提供一组不同的接口集合,用于连接JMS提供者、发送和接收消息。因此,它们共享一组代表消息、消息目的地和其他各方面功能特性的通用接口。

下面是使用标准API来发送信息的例子:

@Resource(lookup = "jms/connectionFactory ")
ConnectionFactory connectionFactory;

@Resource(lookup="jms/inboundQueue")
Queue inboundQueue;

public void sendMessageOld (String payload) throws JMSException{
    try (Connection connection = connectionFactory.createConnection()) {
      Session session = connection.createSession();
      MessageProducer messageProducer =
      session.createProducer(inboundQueue);
      TextMessage textMessage =
      session.createTextMessage(payload);
      messageProducer.send(textMessage);
    }
}

下面是使用简化API来发送信息的例子:

@Resource(lookup = "jms/connectionFactory")
ConnectionFactory connectionFactory;

@Resource(lookup="jms/inboundQueue")
Queue inboundQueue;

public void sendMessageNew (String payload) {
    try (MessagingContext context = connectionFactory.createMessagingContext(); ){
      context.send(inboundQueue, payload);
    }
}

所有的接口都在javax.jms包中。

更多有关JMS的规范可以在线查阅https://java.net/projects/jms-spec/pages/Home