configuration_manager.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package pogs
  2. import (
  3. "context"
  4. "fmt"
  5. capnp "zombiezen.com/go/capnproto2"
  6. "zombiezen.com/go/capnproto2/rpc"
  7. "zombiezen.com/go/capnproto2/server"
  8. "github.com/cloudflare/cloudflared/tunnelrpc/metrics"
  9. "github.com/cloudflare/cloudflared/tunnelrpc/proto"
  10. )
  11. type ConfigurationManager interface {
  12. // UpdateConfiguration is the call provided to cloudflared to load the latest remote configuration.
  13. UpdateConfiguration(ctx context.Context, version int32, config []byte) *UpdateConfigurationResponse
  14. }
  15. type ConfigurationManager_PogsImpl struct {
  16. impl ConfigurationManager
  17. }
  18. func ConfigurationManager_ServerToClient(c ConfigurationManager) proto.ConfigurationManager {
  19. return proto.ConfigurationManager_ServerToClient(ConfigurationManager_PogsImpl{c})
  20. }
  21. func (i ConfigurationManager_PogsImpl) UpdateConfiguration(p proto.ConfigurationManager_updateConfiguration) error {
  22. return metrics.ObserveServerHandler(func() error { return i.updateConfiguration(p) }, metrics.ConfigurationManager, metrics.OperationUpdateConfiguration)
  23. }
  24. func (i ConfigurationManager_PogsImpl) updateConfiguration(p proto.ConfigurationManager_updateConfiguration) error {
  25. server.Ack(p.Options)
  26. version := p.Params.Version()
  27. config, err := p.Params.Config()
  28. if err != nil {
  29. return err
  30. }
  31. result, err := p.Results.NewResult()
  32. if err != nil {
  33. return err
  34. }
  35. updateResp := i.impl.UpdateConfiguration(p.Ctx, version, config)
  36. return updateResp.Marshal(result)
  37. }
  38. type ConfigurationManager_PogsClient struct {
  39. Client capnp.Client
  40. Conn *rpc.Conn
  41. }
  42. func (c ConfigurationManager_PogsClient) Close() error {
  43. c.Client.Close()
  44. return c.Conn.Close()
  45. }
  46. func (c ConfigurationManager_PogsClient) UpdateConfiguration(ctx context.Context, version int32, config []byte) (*UpdateConfigurationResponse, error) {
  47. client := proto.ConfigurationManager{Client: c.Client}
  48. promise := client.UpdateConfiguration(ctx, func(p proto.ConfigurationManager_updateConfiguration_Params) error {
  49. p.SetVersion(version)
  50. return p.SetConfig(config)
  51. })
  52. result, err := promise.Result().Struct()
  53. if err != nil {
  54. return nil, wrapRPCError(err)
  55. }
  56. response := new(UpdateConfigurationResponse)
  57. err = response.Unmarshal(result)
  58. if err != nil {
  59. return nil, err
  60. }
  61. return response, nil
  62. }
  63. type UpdateConfigurationResponse struct {
  64. LastAppliedVersion int32 `json:"lastAppliedVersion"`
  65. Err error `json:"err"`
  66. }
  67. func (p *UpdateConfigurationResponse) Marshal(s proto.UpdateConfigurationResponse) error {
  68. s.SetLatestAppliedVersion(p.LastAppliedVersion)
  69. if p.Err != nil {
  70. return s.SetErr(p.Err.Error())
  71. }
  72. return nil
  73. }
  74. func (p *UpdateConfigurationResponse) Unmarshal(s proto.UpdateConfigurationResponse) error {
  75. p.LastAppliedVersion = s.LatestAppliedVersion()
  76. respErr, err := s.Err()
  77. if err != nil {
  78. return err
  79. }
  80. if respErr != "" {
  81. p.Err = fmt.Errorf(respErr)
  82. }
  83. return nil
  84. }