Jadi, Anda ingin mengoptimalkan gRPC. Bagian 1

Pertanyaan yang sering muncul tentang bagaimana mempercepat gRPC. gRPC memungkinkan RPC performa tinggi, tetapi cara mencapai performa ini tidak selalu jelas. Dan saya memutuskan untuk mencoba menunjukkan alur pemikiran saya saat mengoptimalkan program.





"-", . . . , , . gRPC . , .





Java. protobuf-, API:





  • KvClient — "-". , , . , .





  • KvService — "-". . 10 50 , .





  • KvRunner — . , , , . Runner 60 , RPC.





  • kvstore.proto — Protocol Buffers . , . Create, Retrieve, Update Delete ( CRUD). , . REST, .





Protocol Buffers gRPC — , . gRPC. gRPC-, , (stub, ).





, , , , . , RPC. , create:





private void doCreate(KeyValueServiceBlockingStub stub) {
  ByteString key = createRandomKey();
  try {
    CreateResponse res = stub.create(
        CreateRequest.newBuilder()
            .setKey(key)
            .setValue(randomBytes(MEAN_VALUE_SIZE))
            .build());
    if (!res.equals(CreateResponse.getDefaultInstance())) {
      throw new RuntimeException("Invalid response");
    }
  } catch (StatusRuntimeException e) {
    if (e.getStatus().getCode() == Code.ALREADY_EXISTS) {
      knownKeys.remove(key);
      logger.log(Level.INFO, "Key already existed", e);
    } else {
      throw e;
    }
  }
}

      
      



. , . , , , , , . , , , . , . , - , . .





gRPC API, . gRPC-, . , RPC.





:





private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();

@Override
public synchronized void create(
    CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
  ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
  ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
  simulateWork(WRITE_DELAY_MILLIS);
  if (store.putIfAbsent(key, value) == null) {
    responseObserver.onNext(CreateResponse.getDefaultInstance());
    responseObserver.onCompleted();
    return;
  }
  responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}

      
      



ByteBuffer



. , , synchronized



. Map



.





, . onNext()



responseObserver



. onCompleted()



.





— , . Ubuntu, 12- 32 . :





$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 1:10:07 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 1:11:07 PM io.grpc.examples.KvRunner runClient
INFO: Did 16.55 RPCs/s

real	1m0.927s
user	0m10.688s
sys	0m1.456s
      
      



! 16 RPC . , , . , .





- , , . , , . , .





RPC . , :





void doClientWork(AtomicBoolean done) {
  Random random = new Random();
  KeyValueServiceBlockingStub stub = KeyValueServiceGrpc.newBlockingStub(channel);

  while (!done.get()) {
    // Pick a random CRUD action to take.
    int command = random.nextInt(4);
    if (command == 0) {
      doCreate(stub);
      continue;
    }
    /* ... */
    rpcCount++;
  }
}

      
      



, RPC. . RPC? , 50 . 20 :





20 = 1000 / (50 / )





16 , . , time



, . simulateWork (sleep). , , RPC.





, (real) (user). , 10 . 16% . , , , , RPC.





. — , . .





gRPC- Java : , ListenableFuture



. . ListenableFuture API — , , . , , , RPC, . 





ListenableFuture



. , . , , . , RPC ( ). , RPC , RPC .





. , . . - . doCreate():





private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) {
  ByteString key = createRandomKey();
  ListenableFuture<CreateResponse> res = stub.create(
      CreateRequest.newBuilder()
          .setKey(key)
          .setValue(randomBytes(MEAN_VALUE_SIZE))
          .build());
  res.addListener(() -> rpcCount.incrementAndGet(), MoreExecutors.directExecutor());
  Futures.addCallback(res, new FutureCallback<CreateResponse>() {
    @Override
    public void onSuccess(CreateResponse result) {
      if (!result.equals(CreateResponse.getDefaultInstance())) {
        error.compareAndSet(null, new RuntimeException("Invalid response"));
      }
      synchronized (knownKeys) {
        knownKeys.add(key);
      }
    }

    @Override
    public void onFailure(Throwable t) {
      Status status = Status.fromThrowable(t);
      if (status.getCode() == Code.ALREADY_EXISTS) {
        synchronized (knownKeys) {
          knownKeys.remove(key);
        }
        logger.log(Level.INFO, "Key already existed", t);
      } else {
        error.compareAndSet(null, t);
      }
    }
  });
}

      
      



KeyValueServiceFutureStub



, Future



. gRPC Java ListenableFuture



, Future



. . , RPC . , .





RPC. , RPC.





RPC , , . doCreate()



RPC, , throw. . , , .





, , knownKeys



, RPC , , , . , , . : knownKeys



, . knownKeys



, knownKeys



, RPC, . , . , . , .





, , :





WARNING: An exception was thrown by io.grpc.netty.NettyClientStream$Sink$1.operationComplete()
java.lang.OutOfMemoryError: unable to create new native thread
	at java.lang.Thread.start0(Native Method)
	at java.lang.Thread.start(Thread.java:714)
	...
      
      



?! , ? . . , . , . : "unable to create new native thread" ( ), . , . OOM , Java , . , , .





, ? RPC , . , RPC, . ListenableFuture



.





RPC. RPC . , RPC. , , . RPC ( ), . (Semaphore):





private final Semaphore limiter = new Semaphore(100);

private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error)
    throws InterruptedException {
  limiter.acquire();
  ByteString key = createRandomKey();
  ListenableFuture<CreateResponse> res = stub.create(
      CreateRequest.newBuilder()
          .setKey(key)
          .setValue(randomBytes(MEAN_VALUE_SIZE))
          .build());
  res.addListener(() ->  {
    rpcCount.incrementAndGet();
    limiter.release();
  }, MoreExecutors.directExecutor());
  /* ... */
}

      
      



.





:





$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 2:40:47 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 2:41:47 PM io.grpc.examples.KvRunner runClient
INFO: Did 24.283 RPCs/s

real	1m0.923s
user	0m12.772s
sys	0m1.572s
      
      



46% RPC , . , 20% . , . . - .





? , 1/4 (create, update delete). 1/4 . RPC :





.25 * 50ms (create)
  .25 * 10ms (retrieve)
  .25 * 50ms (update)
 +.25 * 50ms (delete)
------------
        40ms
      
      



40 RPC , RPC :





25 = 1000 / (40 / )





, . , . , , .





gRPC-. , . , . .






Java Developer. Basic. , , .





.








All Articles