经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
Akka-CQRS(16)- gRPC用JWT进行权限管理
来源:cnblogs  作者:雪川大虫  时间:2019/7/16 8:56:18  对本文有异议

   前面谈过gRPC的SSL/TLS安全机制,发现设置过程比较复杂:比如证书签名:需要服务端、客户端两头都设置等。想想实际上用JWT会更加便捷,而且更安全和功能强大,因为除JWT的加密签名之外还可以把私密的用户信息放在JWT里加密后在服务端和客户端之间传递。当然,最基本的是通过对JWT的验证机制可以控制客户端对某些功能的使用权限。

通过JWT实现gRPC的函数调用权限管理原理其实很简单:客户端首先从服务端通过身份验证获取JWT,然后在调用服务函数时把这个JWT同时传给服务端进行权限验证。客户端提交身份验证请求返回JWT可以用一个独立的服务函数实现,如下面.proto文件里的GetAuthToken:

  1. message PBPOSCredential {
  2. string userid = 1;
  3. string password = 2;
  4. }
  5. message PBPOSToken {
  6. string jwt = 1;
  7. }
  8. service SendCommand {
  9. rpc SingleResponse(PBPOSCommand) returns (PBPOSResponse) {};
  10. rpc GetTxnItems(PBPOSCommand) returns (stream PBTxnItem) {};
  11. rpc GetAuthToken(PBPOSCredential) returns (PBPOSToken) {};
  12. }

比较棘手的是如何把JWT从客户端传送至服务端,因为gRPC基本上骑劫了Request和Response。其中一个方法是通过Interceptor来截取Request的header即metadata。客户端将JWT写入metadata,服务端从metadata读取JWT。

我们先看看客户端的Interceptor设置和使用:

  1. class AuthClientInterceptor(jwt: String) extends ClientInterceptor {
  2. def interceptCall[ReqT, RespT](methodDescriptor: MethodDescriptor[ReqT, RespT], callOptions: CallOptions, channel: io.grpc.Channel): ClientCall[ReqT, RespT] =
  3. new ForwardingClientCall.SimpleForwardingClientCall[ReqT, RespT](channel.newCall(methodDescriptor, callOptions)) {
  4. override def start(responseListener: ClientCall.Listener[RespT], headers: Metadata): Unit = {
  5. headers.put(Key.of("jwt", Metadata.ASCII_STRING_MARSHALLER), jwt)
  6. super.start(responseListener, headers)
  7. }
  8. }
  9. }
  10. ...
  11. val unsafeChannel = NettyChannelBuilder
  12. .forAddress("192.168.0.189",50051)
  13. .negotiationType(NegotiationType.PLAINTEXT)
  14. .build()
  15. val securedChannel = ClientInterceptors.intercept(unsafeChannel, new AuthClientInterceptor(jwt))
  16. val securedClient = SendCommandGrpc.blockingStub(securedChannel)
  17. val resp = securedClient.singleResponse(PBPOSCommand())

身份验证请求即JWT获取是不需要Interceptor的,所以要用没有Interceptor的unsafeChannel: 

  1. //build connection channel
  2. val unsafeChannel = NettyChannelBuilder
  3. .forAddress("192.168.0.189",50051)
  4. .negotiationType(NegotiationType.PLAINTEXT)
  5. .build()
  6. val authClient = SendCommandGrpc.blockingStub(unsafeChannel)
  7. val jwt = authClient.getAuthToken(PBPOSCredential(userid="johnny",password="p4ssw0rd")).jwt
  8. println(s"got jwt: $jwt")

JWT的构建和使用已经在前面的几篇博文里讨论过了: 

  1. package com.datatech.auth
  2. import pdi.jwt._
  3. import org.json4s.native.Json
  4. import org.json4s._
  5. import org.json4s.jackson.JsonMethods._
  6. import pdi.jwt.algorithms._
  7. import scala.util._
  8. object AuthBase {
  9. type UserInfo = Map[String, Any]
  10. case class AuthBase(
  11. algorithm: JwtAlgorithm = JwtAlgorithm.HMD5,
  12. secret: String = "OpenSesame",
  13. getUserInfo: (String,String) => Option[UserInfo] = null) {
  14. ctx =>
  15. def withAlgorithm(algo: JwtAlgorithm): AuthBase = ctx.copy(algorithm = algo)
  16. def withSecretKey(key: String): AuthBase = ctx.copy(secret = key)
  17. def withUserFunc(f: (String, String) => Option[UserInfo]): AuthBase = ctx.copy(getUserInfo = f)
  18. def authenticateToken(token: String): Option[String] =
  19. algorithm match {
  20. case algo: JwtAsymmetricAlgorithm =>
  21. Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtAsymmetricAlgorithm]))) match {
  22. case true => Some(token)
  23. case _ => None
  24. }
  25. case _ =>
  26. Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtHmacAlgorithm]))) match {
  27. case true => Some(token)
  28. case _ => None
  29. }
  30. }
  31. def getUserInfo(token: String): Option[UserInfo] = {
  32. algorithm match {
  33. case algo: JwtAsymmetricAlgorithm =>
  34. Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtAsymmetricAlgorithm])) match {
  35. case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
  36. case Failure(err) => None
  37. }
  38. case _ =>
  39. Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtHmacAlgorithm])) match {
  40. case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
  41. case Failure(err) => None
  42. }
  43. }
  44. }
  45. def issueJwt(userinfo: UserInfo): String = {
  46. val claims = JwtClaim() + Json(DefaultFormats).write(("userinfo", userinfo))
  47. Jwt.encode(claims, secret, algorithm)
  48. }
  49. }
  50. }

服务端Interceptor的构建和设置如下: 

  1. abstract class FutureListener[Q](implicit ec: ExecutionContext) extends Listener[Q] {
  2. protected val delegate: Future[Listener[Q]]
  3. private val eventually = delegate.foreach _
  4. override def onComplete(): Unit = eventually { _.onComplete() }
  5. override def onCancel(): Unit = eventually { _.onCancel() }
  6. override def onMessage(message: Q): Unit = eventually { _ onMessage message }
  7. override def onHalfClose(): Unit = eventually { _.onHalfClose() }
  8. override def onReady(): Unit = eventually { _.onReady() }
  9. }
  10. object Keys {
  11. val AUTH_META_KEY: Metadata.Key[String] = of("jwt", Metadata.ASCII_STRING_MARSHALLER)
  12. val AUTH_CTX_KEY: Context.Key[String] = key("jwt")
  13. }
  14. class AuthorizationInterceptor(implicit ec: ExecutionContext) extends ServerInterceptor {
  15. override def interceptCall[Q, R](
  16. call: ServerCall[Q, R],
  17. headers: Metadata,
  18. next: ServerCallHandler[Q, R]
  19. ): Listener[Q] = {
  20. val prevCtx = Context.current
  21. val jwt = headers.get(Keys.AUTH_META_KEY)
  22. println(s"!!!!!!!!!!! $jwt !!!!!!!!!!")
  23. new FutureListener[Q] {
  24. protected val delegate = Future {
  25. val nextCtx = prevCtx withValue (Keys.AUTH_CTX_KEY, jwt)
  26. Contexts.interceptCall(nextCtx, call, headers, next)
  27. }
  28. }
  29. }
  30. }
  31. trait gRPCServer {
  32. def runServer(service: ServerServiceDefinition)(implicit actorSys: ActorSystem): Unit = {
  33. import actorSys.dispatcher
  34. val server = NettyServerBuilder
  35. .forPort(50051)
  36. .addService(ServerInterceptors.intercept(service,
  37. new AuthorizationInterceptor))
  38. .build
  39. .start
  40. // make sure our server is stopped when jvm is shut down
  41. Runtime.getRuntime.addShutdownHook(new Thread() {
  42. override def run(): Unit = {
  43. server.shutdown()
  44. server.awaitTermination()
  45. }
  46. })
  47. }
  48. }

注意:客户端上传的request-header只能在构建server时接触到,在具体服务函数里是无法调用request-header的,但gRPC又一个结构Context可以在两个地方都能调用。所以,我们可以在构建server时把JWT从header搬到Context里。不过,千万注意这个Context的读写必须在同一个线程里。在服务端的Interceptor里我们把JWT从metadata里读出然后写入Context。在需要权限管理的服务函数里再从Context里读取JWT进行验证: 

  1. override def singleResponse(request: PBPOSCommand): Future[PBPOSResponse] = {
  2. val jwt = AUTH_CTX_KEY.get
  3. println(s"***********$jwt**************")
  4. val optUserInfo = authenticator.getUserInfo(jwt)
  5. val shopid = optUserInfo match {
  6. case Some(m) => m("shopid")
  7. case None => "invalid token!"
  8. }
  9. FastFuture.successful(PBPOSResponse(msg=s"shopid:$shopid"))
  10. }

JWT的构建也是一个服务函数: 

  1. val authenticator = new AuthBase()
  2. .withAlgorithm(JwtAlgorithm.HS256)
  3. .withSecretKey("OpenSesame")
  4. .withUserFunc(getValidUser)
  5. override def getAuthToken(request: PBPOSCredential): Future[PBPOSToken] = {
  6. getValidUser(request.userid, request.password) match {
  7. case Some(userinfo) => FastFuture.successful(PBPOSToken(authenticator.issueJwt(userinfo)))
  8. case None => FastFuture.successful(PBPOSToken("Invalid Token!"))
  9. }
  10. }

还需要一个模拟的身份验证服务函数: 

  1. package com.datatech.auth
  2. object MockUserAuthService {
  3. type UserInfo = Map[String,Any]
  4. case class User(username: String, password: String, userInfo: UserInfo)
  5. val validUsers = Seq(User("johnny", "p4ssw0rd",Map("shopid" -> "1101", "userid" -> "101"))
  6. ,User("tiger", "secret", Map("shopid" -> "1101" , "userid" -> "102")))
  7. def getValidUser(userid: String, pswd: String): Option[UserInfo] =
  8. validUsers.find(user => user.username == userid && user.password == pswd) match {
  9. case Some(user) => Some(user.userInfo)
  10. case _ => None
  11. }
  12. }

下面是本次示范的源代码:

project/plugins.sbt

  1. addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
  2. addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2")
  3. addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15")
  4. addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.21")
  5. addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2")
  6. libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.9.0-M6"

build.sbt

  1. name := "grpc-jwt"
  2.  
  3. version := "0.1"
  4.  
  5. version := "0.1"
  6.  
  7. scalaVersion := "2.12.8"
  8.  
  9. scalacOptions += "-Ypartial-unification"
  10.  
  11. val akkaversion = "2.5.23"
  12.  
  13. libraryDependencies := Seq(
  14. "com.typesafe.akka" %% "akka-cluster-metrics" % akkaversion,
  15. "com.typesafe.akka" %% "akka-cluster-sharding" % akkaversion,
  16. "com.typesafe.akka" %% "akka-persistence" % akkaversion,
  17. "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.0.1",
  18. "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",
  19. "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.0.1",
  20. "com.typesafe.akka" %% "akka-persistence-query" % akkaversion,
  21. "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.97",
  22. "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
  23. "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",
  24. "ch.qos.logback" % "logback-classic" % "1.2.3",
  25. "io.monix" %% "monix" % "3.0.0-RC2",
  26. "org.typelevel" %% "cats-core" % "2.0.0-M1",
  27. "io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion,
  28. "io.netty" % "netty-tcnative-boringssl-static" % "2.0.22.Final",
  29. "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
  30. "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
  31. "com.pauldijou" %% "jwt-core" % "3.0.1",
  32. "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",
  33. "org.json4s" %% "json4s-native" % "3.6.1",
  34. "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
  35. "org.json4s" %% "json4s-jackson" % "3.6.7",
  36. "org.json4s" %% "json4s-ext" % "3.6.7"
  37.  
  38. )
  39.  
  40. // (optional) If you need scalapb/scalapb.proto or anything from
  41. // google/protobuf/*.proto
  42. //libraryDependencies += "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
  43.  
  44.  
  45. PB.targets in Compile := Seq(
  46. scalapb.gen() -> (sourceManaged in Compile).value
  47. )
  48.  
  49. enablePlugins(JavaAppPackaging)

main/protobuf/posmessages.proto

  1. syntax = "proto3";
  2.  
  3. import "google/protobuf/wrappers.proto";
  4. import "google/protobuf/any.proto";
  5. import "scalapb/scalapb.proto";
  6.  
  7. option (scalapb.options) = {
  8. // use a custom Scala package name
  9. // package_name: "io.ontherocks.introgrpc.demo"
  10.  
  11. // don't append file name to package
  12. flat_package: true
  13.  
  14. // generate one Scala file for all messages (services still get their own file)
  15. single_file: true
  16.  
  17. // add imports to generated file
  18. // useful when extending traits or using custom types
  19. // import: "io.ontherocks.hellogrpc.RockingMessage"
  20.  
  21. // code to put at the top of generated file
  22. // works only with `single_file: true`
  23. //preamble: "sealed trait SomeSealedTrait"
  24. };
  25.  
  26. package com.datatech.pos.messages;
  27.  
  28. message PBVchState { //单据状态
  29. string opr = 1; //收款员
  30. int64 jseq = 2; //begin journal sequence for read-side replay
  31. int32 num = 3; //当前单号
  32. int32 seq = 4; //当前序号
  33. bool void = 5; //取消模式
  34. bool refd = 6; //退款模式
  35. bool susp = 7; //挂单
  36. bool canc = 8; //废单
  37. bool due = 9; //当前余额
  38. string su = 10; //主管编号
  39. string mbr = 11; //会员号
  40. int32 mode = 12; //当前操作流程:0=logOff, 1=LogOn, 2=Payment
  41. }
  42.  
  43. message PBTxnItem { //交易记录
  44. string txndate = 1; //交易日期
  45. string txntime = 2; //录入时间
  46. string opr = 3; //操作员
  47. int32 num = 4; //销售单号
  48. int32 seq = 5; //交易序号
  49. int32 txntype = 6; //交易类型
  50. int32 salestype = 7; //销售类型
  51. int32 qty = 8; //交易数量
  52. int32 price = 9; //单价(分)
  53. int32 amount = 10; //码洋(分)
  54. int32 disc = 11; //折扣率 (%)
  55. int32 dscamt = 12; //折扣额:负值 net实洋 = amount + dscamt
  56. string member = 13; //会员卡号
  57. string code = 14; //编号(商品、卡号...)
  58. string acct = 15; //账号
  59. string dpt = 16; //部类
  60. }
  61.  
  62. message PBPOSResponse {
  63. int32 sts = 1;
  64. string msg = 2;
  65. PBVchState voucher = 3;
  66. repeated PBTxnItem txnitems = 4;
  67.  
  68. }
  69.  
  70. message PBPOSCommand {
  71. string commandname = 1;
  72. string delimitedparams = 2;
  73. }
  74.  
  75. message PBPOSCredential {
  76. string userid = 1;
  77. string password = 2;
  78. }
  79. message PBPOSToken {
  80. string jwt = 1;
  81. }
  82.  
  83. service SendCommand {
  84. rpc SingleResponse(PBPOSCommand) returns (PBPOSResponse) {};
  85. rpc GetTxnItems(PBPOSCommand) returns (stream PBTxnItem) {};
  86. rpc GetAuthToken(PBPOSCredential) returns (PBPOSToken) {};
  87.  
  88. }

gRPCServer.scala

  1. package com.datatech.grpc.server
  2.  
  3. import io.grpc.ServerServiceDefinition
  4. import io.grpc.netty.NettyServerBuilder
  5. import io.grpc.ServerInterceptors
  6. import scala.concurrent._
  7. import io.grpc.Context
  8. import io.grpc.Contexts
  9. import io.grpc.ServerCall
  10. import io.grpc.ServerCallHandler
  11. import io.grpc.ServerInterceptor
  12. import io.grpc.Metadata
  13. import io.grpc.Metadata.Key.of
  14. import io.grpc.Context.key
  15. import io.grpc.ServerCall.Listener
  16. import akka.actor._
  17.  
  18.  
  19. abstract class FutureListener[Q](implicit ec: ExecutionContext) extends Listener[Q] {
  20.  
  21. protected val delegate: Future[Listener[Q]]
  22.  
  23. private val eventually = delegate.foreach _
  24.  
  25. override def onComplete(): Unit = eventually { _.onComplete() }
  26. override def onCancel(): Unit = eventually { _.onCancel() }
  27. override def onMessage(message: Q): Unit = eventually { _ onMessage message }
  28. override def onHalfClose(): Unit = eventually { _.onHalfClose() }
  29. override def onReady(): Unit = eventually { _.onReady() }
  30.  
  31. }
  32.  
  33. object Keys {
  34. val AUTH_META_KEY: Metadata.Key[String] = of("jwt", Metadata.ASCII_STRING_MARSHALLER)
  35. val AUTH_CTX_KEY: Context.Key[String] = key("jwt")
  36. }
  37.  
  38. class AuthorizationInterceptor(implicit ec: ExecutionContext) extends ServerInterceptor {
  39. override def interceptCall[Q, R](
  40. call: ServerCall[Q, R],
  41. headers: Metadata,
  42. next: ServerCallHandler[Q, R]
  43. ): Listener[Q] = {
  44.  
  45. val prevCtx = Context.current
  46. val jwt = headers.get(Keys.AUTH_META_KEY)
  47.  
  48. println(s"!!!!!!!!!!! $jwt !!!!!!!!!!")
  49.  
  50. new FutureListener[Q] {
  51. protected val delegate = Future {
  52. val nextCtx = prevCtx withValue (Keys.AUTH_CTX_KEY, jwt)
  53. Contexts.interceptCall(nextCtx, call, headers, next)
  54. }
  55. }
  56. }
  57. }
  58.  
  59. trait gRPCServer {
  60.  
  61. def runServer(service: ServerServiceDefinition)(implicit actorSys: ActorSystem): Unit = {
  62. import actorSys.dispatcher
  63. val server = NettyServerBuilder
  64. .forPort(50051)
  65. .addService(ServerInterceptors.intercept(service,
  66. new AuthorizationInterceptor))
  67. .build
  68. .start
  69. // make sure our server is stopped when jvm is shut down
  70. Runtime.getRuntime.addShutdownHook(new Thread() {
  71. override def run(): Unit = {
  72. server.shutdown()
  73. server.awaitTermination()
  74. }
  75. })
  76. }
  77.  
  78. }

POSServices.scala

  1. package com.datatech.pos.service
  2. import com.datatech.grpc.server.Keys._
  3. import akka.http.scaladsl.util.FastFuture
  4. import com.datatech.pos.messages._
  5. import com.datatech.grpc.server._
  6. import com.datatech.auth.MockUserAuthService._
  7.  
  8. import scala.concurrent.Future
  9. import com.datatech.auth.AuthBase._
  10. import pdi.jwt._
  11. import akka.actor._
  12. import io.grpc.stub.StreamObserver
  13.  
  14.  
  15. object POSServices extends gRPCServer {
  16. type UserInfo = Map[String, Any]
  17.  
  18. class POSServices extends SendCommandGrpc.SendCommand {
  19.  
  20. val authenticator = new AuthBase()
  21. .withAlgorithm(JwtAlgorithm.HS256)
  22. .withSecretKey("OpenSesame")
  23. .withUserFunc(getValidUser)
  24.  
  25. override def getTxnItems(request: PBPOSCommand, responseObserver: StreamObserver[PBTxnItem]): Unit = ???
  26.  
  27. override def singleResponse(request: PBPOSCommand): Future[PBPOSResponse] = {
  28. val jwt = AUTH_CTX_KEY.get
  29. println(s"***********$jwt**************")
  30. val optUserInfo = authenticator.getUserInfo(jwt)
  31. val shopid = optUserInfo match {
  32. case Some(m) => m("shopid")
  33. case None => "invalid token!"
  34. }
  35. FastFuture.successful(PBPOSResponse(msg=s"shopid:$shopid"))
  36. }
  37.  
  38. override def getAuthToken(request: PBPOSCredential): Future[PBPOSToken] = {
  39. getValidUser(request.userid, request.password) match {
  40. case Some(userinfo) => FastFuture.successful(PBPOSToken(authenticator.issueJwt(userinfo)))
  41. case None => FastFuture.successful(PBPOSToken("Invalid Token!"))
  42. }
  43. }
  44. }
  45.  
  46. def main(args: Array[String]) = {
  47. implicit val system = ActorSystem("grpc-system")
  48. val svc = SendCommandGrpc.bindService(new POSServices, system.dispatcher)
  49. runServer(svc)
  50. }
  51. }

AuthBase.scala

  1. package com.datatech.auth
  2.  
  3. import pdi.jwt._
  4. import org.json4s.native.Json
  5. import org.json4s._
  6. import org.json4s.jackson.JsonMethods._
  7. import pdi.jwt.algorithms._
  8. import scala.util._
  9.  
  10. object AuthBase {
  11. type UserInfo = Map[String, Any]
  12. case class AuthBase(
  13. algorithm: JwtAlgorithm = JwtAlgorithm.HMD5,
  14. secret: String = "OpenSesame",
  15. getUserInfo: (String,String) => Option[UserInfo] = null) {
  16. ctx =>
  17.  
  18. def withAlgorithm(algo: JwtAlgorithm): AuthBase = ctx.copy(algorithm = algo)
  19.  
  20. def withSecretKey(key: String): AuthBase = ctx.copy(secret = key)
  21.  
  22. def withUserFunc(f: (String, String) => Option[UserInfo]): AuthBase = ctx.copy(getUserInfo = f)
  23.  
  24. def authenticateToken(token: String): Option[String] =
  25. algorithm match {
  26. case algo: JwtAsymmetricAlgorithm =>
  27. Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtAsymmetricAlgorithm]))) match {
  28. case true => Some(token)
  29. case _ => None
  30. }
  31. case _ =>
  32. Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtHmacAlgorithm]))) match {
  33. case true => Some(token)
  34. case _ => None
  35. }
  36. }
  37.  
  38. def getUserInfo(token: String): Option[UserInfo] = {
  39. algorithm match {
  40. case algo: JwtAsymmetricAlgorithm =>
  41. Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtAsymmetricAlgorithm])) match {
  42. case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
  43. case Failure(err) => None
  44. }
  45. case _ =>
  46. Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtHmacAlgorithm])) match {
  47. case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
  48. case Failure(err) => None
  49. }
  50. }
  51. }
  52.  
  53. def issueJwt(userinfo: UserInfo): String = {
  54. val claims = JwtClaim() + Json(DefaultFormats).write(("userinfo", userinfo))
  55. Jwt.encode(claims, secret, algorithm)
  56. }
  57. }
  58.  
  59. }

POSClient.scala

  1. package com.datatech.pos.client
  2.  
  3. import com.datatech.pos.messages.{PBPOSCommand, PBPOSCredential, SendCommandGrpc}
  4. import io.grpc.stub.StreamObserver
  5. import io.grpc.netty.{ NegotiationType, NettyChannelBuilder}
  6. import io.grpc.CallOptions
  7. import io.grpc.ClientCall
  8. import io.grpc.ClientInterceptor
  9. import io.grpc.ForwardingClientCall
  10. import io.grpc.Metadata
  11. import io.grpc.Metadata.Key
  12. import io.grpc.MethodDescriptor
  13. import io.grpc.ClientInterceptors
  14.  
  15. object POSClient {
  16. class AuthClientInterceptor(jwt: String) extends ClientInterceptor {
  17. def interceptCall[ReqT, RespT](methodDescriptor: MethodDescriptor[ReqT, RespT], callOptions: CallOptions, channel: io.grpc.Channel): ClientCall[ReqT, RespT] =
  18. new ForwardingClientCall.SimpleForwardingClientCall[ReqT, RespT](channel.newCall(methodDescriptor, callOptions)) {
  19. override def start(responseListener: ClientCall.Listener[RespT], headers: Metadata): Unit = {
  20. headers.put(Key.of("jwt", Metadata.ASCII_STRING_MARSHALLER), jwt)
  21. super.start(responseListener, headers)
  22. }
  23. }
  24. }
  25.  
  26. def main(args: Array[String]): Unit = {
  27.  
  28. //build connection channel
  29. val unsafeChannel = NettyChannelBuilder
  30. .forAddress("192.168.0.189",50051)
  31. .negotiationType(NegotiationType.PLAINTEXT)
  32. .build()
  33.  
  34.  
  35. val authClient = SendCommandGrpc.blockingStub(unsafeChannel)
  36. val jwt = authClient.getAuthToken(PBPOSCredential(userid="johnny",password="p4ssw0rd")).jwt
  37. println(s"got jwt: $jwt")
  38.  
  39.  
  40. val securedChannel = ClientInterceptors.intercept(unsafeChannel, new AuthClientInterceptor(jwt))
  41.  
  42. val securedClient = SendCommandGrpc.blockingStub(securedChannel)
  43.  
  44. val resp = securedClient.singleResponse(PBPOSCommand())
  45.  
  46. println(s"secured response: $resp")
  47.  
  48. // wait for async execution
  49. scala.io.StdIn.readLine()
  50. }
  51.  
  52.  
  53. }

 

原文链接:http://www.cnblogs.com/tiger-xc/p/11188900.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号