[gRPC] Lecture 12 – P2: Upload file in chunks with client-streaming gRPC – Java

Hello and welcome back to the gRPC course. In this lecture, we will learn how to implement
client-streaming RPC with Java We’re gonna implement an API that allows clients to upload a laptop image
file in multiple chunks. OK let’s start! This is the pcbook-java project that we’ve
been working on. The first thing we need to do is to define
the new upload image RPC. As we’ve already done that in the previous
video with Golang, I will just open the pcbook golang project, And copy-paste the content of the laptop_service.proto
file. Here we have the UploadImageRequest message. It has a oneof data field, which can either be image info, or a chunk
of image data. The ImageInfo contains the laptop ID and image
type such as .jpg or .png The chunk_data is a sequence of bytes. The idea is that we will divide the image
into multiple chunks of 1 kilobyte And send them to the server sequentially via
the stream. Then the server will send back 1 single response, Which contains the ID of the uploaded image And the total size of that image. So the UploadImage RPC will take a stream
of UploadImageRequest as input And return a UploadImageResponse. Alright, let’s build the project to generate
Java codes. The build is successful. Now before we implement the RPC, We will need to add a new store to save the
uploaded image. I will create a new ImageStore interface. It has 1 function: Save Which takes the laptopID, the imageType, and
the imageData as input And returns the imageID, or throws out an
IOException. Let’s say we want to store the image on
disk, and its metadata on memory. So I will create a new DiskImageStore class
to implement this interface. In this class, we need a field to tell us
where to store the images. We also need a concurrent map to store the
metadata of the images. The key of the map is the image ID, and its
value is the metadata. I will create a new class for the ImageMetadata. In this class, we will store the laptop ID, The type of the image, And the path to the image on disk. Let’s write a constructor to initialize
the object. And also create some getter functions for
each of the fields. OK, now go back to our DiskImageStore. First we create a new constructor that takes
only the imageFolder as input. We initialize the data map with a new ConcurrentHashMap. Then in the Save function, We generate a random UUID that will be used
as the image ID. We make the path to store the image by joining the imageFolder, imageID, and imageType
together. Then we create a new FileOutputStream with
the image path. We call imageData.writeTo() to write the image
data to that file output stream. And close the output stream. Once the file is successfully written to disk, We create a new metadata object, And put it to the data map with the imageID
key. Finally, we return the imageID. And we’re done with the DiskImageStore. Now let’s implement the uploadImage RPC
in the LaptopService class. First I will change this store field to laptopStore. Then we will add a new field for the imageStore. Also add it to this constructor. OK, now we need to override the uploadImage()
method. As you can see, this method has a responseObserver
parameter That will be used to send the response to
the client, Just like the way it works in the searchLaptop
RPC. How about the stream of requests? This is very different from server-streaming
RPC, Because it’s not an input parameter, but
the return value of this function instead. Here we can see that the uploadImage function must return a StreamObserver
of UploadImageRequest And this StreamObserver is just an interface
with 3 functions: onNext, onError, and onCompleted. What we need to do is to return an implementation
of this interface. So let’s do that. First we define 3 fields: laptopID, imageType,
and imageData. Now in the onNext() function, We check the data case. If it is image info, We write a simple log saying that we have
received the image info. Then we get the laptopID and imageType from
that info. We also initialize the imageData as a new
ByteArrayOutputStream. And return. Else, it must be a new data chunk. So we get the chunk from the request. Write a log here saying that we’ve received
a chunk with this size. Then we check if the imageData is null or
not. If it is null, it means that the client hasn’t
sent the image info So we just send an error with INVALID_ARGUMENT
status, And return immediately. Otherwise, we just call chunkData.writeTo()
function To add this chunk to the image data. If we catch an exception, just send an INTERNAL error to the client and return. That’s it for the onNext() function. The onError() function is called whenever an error occurs while the server
is receiving stream data. So here we just need to write a warning log. OK, now let’s implement the onCompleted()
function. When this function is called, it means that the server has received all
image chunk data. So we just call imageStore.Save() to save
the image data to the store. Surround this call with a try-catch. If an error is caught, We call responseObserver.onError() to send
it to the client. We save the output imageID to a variable, And also get the total image size. Then we build a new UploadImageResponse object With the imageID and imageSize. We call responseObserver.onNext() to send
the response to the client, And finally call responseObserver.onCompleted()
to finish it. OK the uploadImage RPC is ready. Now we need to update the LaptopServer a bit. First change this store to laptopStore. Add a new imageStore to this constructor. Then pass it into this LaptopService. Do the same for this constructor. In the main function, We also change the store variable to laptopStore. And create a new DiskImageStore with the image
folder is “img” Then pass it into the new LaptopServer constructor. The img folder is already here, So we’re all set. Let’s run the server. Now let’s try to call this server using the Golang client that we wrote in the
previous video. The laptop image is successfully uploaded. We can see it in the img folder. So it works! Great! Now we will implement the Java client. We cannot use the blockingStub to call the
client-streaming RPC, Instead, we will need an asynchronous stub. So let’s define it here. And initialize it inside this constructor By calling LaptopServiceGrpc.newStub(). Alright, Now define a uploadImage() function with 2
input parameters: A laptop ID, and an image path. In the main function, I’m gonna comment out this block of codes
to test create and search laptop That we wrote in the previous lectures. Add add new codes to test upload image here. First we generate a new random laptop. We call client.createLaptop() to create this
laptop on the server. Then we call client.uploadImage() with the
laptop ID and a laptop.jpg file inside the tmp folder. Let’s create that tmp folder, And copy the laptop.jpg file from the golang
project to that folder. Alright, it’s here. Now in the uploadImage() function, We call asyncStub.withDeadlineAfter 5 seconds Then .uploadImage() We create a new StreamObserver of UploadImageResponse
here. The output of this call will be another StreamObserver
of UploadImageRequest. In the onNext() function, we just write a
simple log Saying we’ve received this response from
the server. In the onError() function, we write a SEVERE
log: upload failed. Note that the stub is asynchronous, Which means that the send request part and the receive response part are run asynchronously. Because of this, we need to use a CountDownLatch() To wait until the whole process is completed. Here we just use a count of 1 because we only
need to wait for the response thread. OK, now if an error occurs, we will call countDown() inside the onError()
function. Similarly, in the onCompleted() function, We also write a log, And call finishLatch.countDown() At the end of the uploadImage() function, We call finishLatch.await() to wait for the
response thread to finish. Here we only wait for at most 1 minute, Which is more than enough Because above we set the deadline of the call
to be 5 seconds. Next we will create a new FileInputStream
to read the image file. If we catch an exception, Just write a SEVERE log and return. Else we get the image type from the image
file extension. We build a new image info with the laptop
ID and image type. We create a new UploadImageRequest with the
image info. And call requestObserver.onNext() to send
the request to the server. Surround this with a try-catch. If there’s an exception, we write a SEVERE
log, Call requestObserver.onError() to report it
to the server. And return. Finally we call requestObserver.onCompleted() Inside the try catch block, After we’ve sent the image info, We will start sending the image data in chunks. Each chunk will be 1 kilobyte, So we create a new byte buffer with the size
of 1024. We use a while loop here to read and send
data multiple times. I will need to pull this fileInputStream variable
out. Then here we can call fileInputStream.read()
to read more data into the buffer. It will return the number of bytes read. Assign it to n. If n is less than or equal to 0, then it’s
the end of file We can safely break the loop. Now we check if the latch has already finished
because of some unexpected error, Then we don’t need to send more data, so
just return. Otherwise, we make a new request with the
chunk data. Here we just copy the first n bytes from the
buffer. Similar as before, We call requestObserver.onNext() to send the
request to the server. And write a log saying that the chunk with
this size was sent. That’s it! We’re done with the client. Now let’s run the server. And run the client. The image is successfully uploaded. And we got this response with image ID and
image size. The logs on the server side look good. And we can see the laptop image inside the
img folder. Now let’s say, we want to put a constraint
on the maximum size of the image. For example, only allow upload images with
size of at most 1 kilobyte. Then, in the onNext() function, Before writing the chunk to the image data, We compute the current size of the image. If it is greater than the maximum allowed
size, Then we write a log “image is too large” We report the error to the client with INVALID_ARGUMENT
status. And return right away. OK let’s try it. Run the server. Then run the client. As you can see, some chunks are sent to the
server And we got an INVALID_ARGUMENT error: image
is too large. So it works. Note that the send part and receive part are
parallel, So it’s totally possible that the client
will send more than 2 chunks Before it receives the error from the server
and stops sending more. As a result, we will see a warning like this
on the server side Because the server has already closed the
stream when it sent the error to the client. OK the last thing before we finish, When we receive the image info, We need to check that the laptop ID exists
in the store. To do so, we just call laptopStore.Find(laptopID) If the laptop is not found, We simply call responseObserver.onError()
with Status NOT_FOUND. On the client side, we can comment out this
command So that the laptop is not created on the server. Alright, now let’s run the server. And run the client. We got the not found error. So it’s working as expected. And that’s it for today’s video about
client-streaming RPC. In the next lecture, we will learn how to implement the last type
of gRPC, Which is bidirectional streaming. I hope the course is useful for you so far. Thank you for watching, and see you later!

Tags: , , , , , , , , , , , , , , , , , , , , , , , , ,

There are no comments yet

Why not be the first

Leave a Reply

Your email address will not be published. Required fields are marked *