Python

Exploring Security, Metrics, and Error-handling with gRPC in Python

In my post “Using gRPC in Python,” we wrote a basic gRPC server implementing a users service. We are going to expand on it and explore more gRPC concepts, such as secure client-server communication via self-signed SSL certificates, implementing gRPC middleware (or interceptors), and error handling.

We will be using Python 3.6 for our demos in this article. The git repository has all the code listings we will be discussing in this article in the subdirectory demo2.

Securing Client-server Communication

We initialized the server in the first article as follows:

..
server.add_insecure_port('127.0.0.1:50051')
..

The add_insecure_port() method initializes an insecure server and client-server communication is unencrypted. It is however recommended that a gRPC client-server communication takes place over a secure channel via SSL/TLS.

First, we will create a self-signed certificate using openssl:

$ # Generate a private key

$ openssl genrsa -out server.key 2048
Generating RSA private key, 2048 bit long modulus
.........+++
.......................................................................+++
e is 65537 (0x10001)

$ openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [AU]:AU
State or Province Name (full name) [Some-State]:NSW
Locality Name (eg, city) []:Sydney
Organization Name (eg, company) [Internet Widgits Pty Ltd]:gRPC Demo
Organizational Unit Name (eg, section) []:gRPC
Common Name (e.g. server FQDN or YOUR name) []:localhost
Email Address []:a@a.com
(

At this stage, we will have two files: server.key and server.crt. On the server side, we will need both these files; on the client side, we will only need the server.crt file.

Using the above certificate, we will now modify our server code as follows:

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
users_service.add_UsersServicer_to_server(UsersService(), server)
# read in key and certificate
with open(os.path.join(os.path.split(__file__)[0], 'server.key')) as f:
    private_key = f.read().encode()
with open(os.path.join(os.path.split(__file__)[0], 'server.crt')) as f:
    certificate_chain = f.read().encode()
# create server credentials
server_creds = grpc.ssl_server_credentials(((private_key, certificate_chain,),))
server.add_secure_port('localhost:50051', server_creds)

Comparing it to the earlier version of our server, add_secure_port has replaced add_insecure_port(). We have kept things simple here and have copied the server.key and server.crt file to the same directory as the server.py file. For all purposes beyond demonstration, these locations should be passed as configuration values (environment variables, for example).

On the client side, we will do a similar modification. Earlier, we created a channel as follows:

channel = grpc.insecure_channel('localhost:50051')

Now to communicate with our server over TLS, we will do the following:

with open('server.crt') as f:
    trusted_certs = f.read().encode()
# create credentials
credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
channel = grpc.secure_channel('localhost:50051', credentials)
...

Implementing Server-side gRPC Interceptors

Interceptors are custom code that run on servers and clients during a request/response lifecycle. If you are familiar with the concept of middleware in HTTP, interceptors are middleware for gRPC. Support for interceptors in Python is still under discussion, but here we will play with the current proposed implementation.

The demo2/grpc-services/grpc_interceptors sub-directory contains a copy of the proposed implementation (commit hash).

Using the above implementation, we will be writing our first server interceptor.

Exporting server metrics

The metric interceptor we will implement will push request latency in seconds (for every request) to statsd, which we will then convert to prometheus metrics via a statsd bridge. I recommend my earlier post to understand more about my choice of the approach.

To write an interceptor, we first subclass the appropriate classes from the grpc_interceptors package:

...
from grpc_interceptors import UnaryUnaryServerInterceptor, UnaryStreamServerInterceptor
class MetricInterceptor(UnaryUnaryServerInterceptor, UnaryStreamServerInterceptor):

    def __init__(self):
        print("Initializing metric interceptor")

    @send_metrics
    def intercept_unary_unary_handler(self, handler, method, request, servicer_context):
        return handler(request, servicer_context)

    @send_metrics
    def intercept_unary_stream_handler(self, handler, method, request, servicer_context):
        result = handler(request, servicer_context)
        for response in result:
            yield response
...

Our class, MetricInterceptor, derives from UnaryUnaryServerInterceptor and UnaryStreamServerInterceptor and then overrides the intercept_unary_unary_handler and intercept_unary_stream_handler methods respectively. This is because our service implements only two kinds of RPC methods:

  • unary request and response (CreateUser)
  • unary request and streaming response (UsersGet)

If we implemented other kinds of RPC methods, we would be deriving from the corresponding interceptor base class and implementing the corresponding method as well.

The send_metrics decorator wraps around the interceptor method to implement the metric calculation and push it to the configured statsd server (the entire function is in the module metric_interceptor in the server sub-directory).

Let’s first go through the following snippet from the function:

@wraps(func)
def wrapper(*args, **kw):
    service_method = None
    service_name = None
    if isinstance(args[4], grpc._server._Context):
        servicer_context = args[4]
        # This gives us <service>/<method name>
        service_method = servicer_context._rpc_event.request_call_details.method
        service_name, method_name = str(service_method).rsplit('/')[1::]
    else:
        logging.warning('Cannot derive the service name and method')
...

The func function here refers to the original interceptor method that we wrapped above. We extract two important pieces of data here to attach as labels to the metric:

  • The gRPC service name
  • The method name

Both of these are available via the _rpc_event.request_call_details.method attribute of the context object, which is an instance of the grpc._server._Context class. I have the isinstance check here since I am working with the experimental implementation and probably will not be necessary.

Next, we store the starting time and call the actual RPC method:

try:
    start_time = time.time()
    result = func(*args, **kw)
    result_status = 'success'
except Exception:
    result_status = 'error'
    raise
finally:
    resp_time = time.time() - start_time
    push_to_statsd_histogram(
        REQUEST_LATENCY_METRIC_NAME,
        resp_time, [
            'service:{0}'.format(service_name),
            'method:{0}'.format(method_name),
            'status:{0}'.format(result_status),
        ])
return result

We add three statsd tags here:

  • the service name
  • the method name
  • the status of the call when we push the time taken to complete the call

To register the interceptor with the server, we call create a new object of type MetricInterceptor() and then call the intercept_server() function (part of the proposed grpc_interceptors package):

..
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
metric_interceptor = MetricInterceptor()
server = intercept_server(server, metric_interceptor)
...

!Sign up for a free Codeship Account

Error Handling and Logging

If there is an unhandled exception that happens when serving a request, we will see a traceback on the server corresponding to the runtime error that occured. On the client, we will see a traceback of the form:

...
raise _Rendezvous(state, None, None, deadline)
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNKNOWN, Exception calling application: division by zero)>

The unhandled error here is an 1/0 statement that was injected into the CreateUser function implementation.

With a couple of additional steps — one on the server and one on the client side — we can have more control over the traceback logging for unhandled runtime errors. Let’s modify the sample_client_demo.py client so that we have try..except block in which we make the call to the RPC method:

...
try:
    response = stub.CreateUser(
          users_messages.CreateUserRequest(username='tom'),
          metadata=metadata,
     )
except grpc.RpcError as e:
    print('CreateUser failed with {0}: {1}'.format(e.code(), e.details()))
else:
    print("User created:", response.user.username)
...

Now, if we run the client again:

CreateUser failed with StatusCode.UNKNOWN: Exception calling application: division by zero

The code() method of the returned exception gives us access to the status code returned by the server. It’s value is one of the gRPC status codes. The details() method gives us access to additional details sent by the server. Here it gives us the exception message associated with the runtime error on the server.

Ideally, we should write a client interceptor to always make the call in a try..except block as above and hence not have to do so individually for all method calls.

On the server side, we will write a logging interceptor (similar to the metric interceptor above) so that it looks for any unhandled errors and logs it with any metadata associated with the request. This will be useful to identify and correlate the failure to the request. In addition, we can also perform a translation of the error message to a more user friendly error if we so desired.

The module, logging_interceptor.py in the server/ sub-directory implements a logging interceptor. The most important snippet is as follows:

from pythonjsonlogger import jsonlogger

from grpc_interceptors import UnaryUnaryServerInterceptor, UnaryStreamServerInterceptor

logger = logging.getLogger()
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)

...
...
metadata = {}
metadata['timestamp'] = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
servicer_context = None
if isinstance(args[4], grpc._server._Context):
    servicer_context = args[4]
    metadata.update(dict(servicer_context.invocation_metadata()))
try:
    result = func(*args, **kw)
except Exception as e:
    logger.error(e, exc_info=True, extra=metadata)
    if servicer_context:
        servicer_context.set_details(str(e))
        servicer_context.set_code(grpc.StatusCode.UNKNOWN)
    # TODO: need to return an appropriate response type here
    # Currently this will raise a serialization error on the server
    # side
    # see https://github.com/grpc/grpc/issues/12824
    return None
else:
    return result

We use the set_details() method on the servicer context object to send back unhandled exception message. This is where we can customize what the client’s get_details() method returns. Then we set the grpc status code to UNKNOWN. This tells the client that this is a non-OK response and will be manifested as an exception of type grpc.RpcError. Since this is an error, we don’t have anything meaningful to return here, so we return None. This however, results in a serialization here on the server side. Please see issue to learn more.

We then modify our server code to register the logging interceptor:

...
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
metric_interceptor = MetricInterceptor()
logging_interceptor = LoggingInterceptor()
server = intercept_server(server, metric_interceptor, logging_interceptor)
...

The order of registration of the interceptors is important here. As we have it above, the call flow is as follows:

  1. Logging interceptor invoked
  2. Metric interceptor invoked
  3. Service method called
  4. Metric interceptor returns
  5. Logging interceptor returns

Running the server

In addition to Python 3.6, we will be using docker and docker-compose to run the gRPC server as the other software we will be using (promethus, statsd exporter and grafana). If you don’t have these installed, please follow the official install guide to install these on your operating system.

Starting with a clone of the repo:

$ cd demo2/grpc-services

We will first build the docker image for the gRPC server:

$ docker build -t amitsaha/grpc-users -f Dockerfile.users .

Start the server along with statsd-exporter, Prometheus, and Grafana:

$ docker-compose -f docker-compse.yml -f docker-compose-infra.yml up

Run the client:

$ docker exect -ti users bash
# cd /client
# # run the sample_client_demo.py file 10 times
#  ./run-client.sh 10

The server.py file (in users/server/server.py) has a random runtime error injected into the CreateUser method which happens 50 percent of the time.

On the server console, you will see that the exception is being logged as a JSON object with a timestamp and additional metadata added.

Then open http://127.0.0.1:9090/graph?g0.range_input=5m&g0.expr=request_latency_seconds_timer&g0.tab=0 in your browser. You will see the Prometheus expression browser showing the latency of the requests and you will see different metrics for each unique combination of the labels we added.

Server metrics

Conclusion

We extended our gRPC server in this article to be secure, export metrics, and log errors. We also learned about the in-progress work on bringing support for interceptors to gRPC Python. Next, here are a few resources to learn more about these and other gRPC features:

Published on Web Code Geeks with permission by Amit Saha, partner at our WCG program. See the original article here: Exploring Security, Metrics, and Error-handling with gRPC in Python

Opinions expressed by Web Code Geeks contributors are their own.

Amit Saha

Amit Saha is a software engineer and author of Doing Math with Python. He's written for Linux Journal, Linux Voice, and Linux Magazine.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button