Skip to content
Snippets Groups Projects
Commit c27ef416 authored by thijsheijden's avatar thijsheijden
Browse files

Producing to ui queue works

Together with Arjan.
parent af55f9cb
No related branches found
No related tags found
No related merge requests found
...@@ -33,3 +33,19 @@ linux: # Build for linux ...@@ -33,3 +33,19 @@ linux: # Build for linux
run: run:
./builds/main ./builds/main
develop:
# RabbitMQ env variables
$(eval export RABBIT_USER := guest)
$(eval export RABBIT_PASSWORD := guest)
$(eval export RABBIT_HOST := localhost)
$(eval export RABBIT_PORT := 5672)
# Whether to log
$(eval export LOG_MESSAGES := true)
# Redis env variables
$(eval export REDIS_ADDRESS := localhost:6379)
@go run cmd/query-service/main.go
package main package main
import ( import (
"fmt" "context"
"log"
"query-service/internal/aql" "query-service/internal/aql"
"query-service/internal/consumer"
"query-service/internal/errorhandler" "query-service/internal/errorhandler"
"query-service/internal/messagequeue"
"query-service/internal/redisclient"
"github.com/streadway/amqp"
"github.com/thijsheijden/alice"
) )
func main() { var producer alice.Producer
func main() {
exchangeID := "query-requests" exchangeID := "query-requests"
routingKey := "aql-user-request" routingKey := "aql-user-request"
consumer.StartConsuming(onMessageReceived, exchangeID, routingKey)
broker := messagequeue.Create()
messagequeue.StartConsumer(broker, &exchangeID, &routingKey, onMessageReceived)
producer = messagequeue.StartProducer(broker)
redisclient.Start()
select {}
} }
func onMessageReceived(jsonMsg *[]byte) { func onMessageReceived(msg amqp.Delivery) {
// Retrieve JSON formatted string payload from msg // Retrieve JSON formatted string payload from msg
// Convert the json byte msg to an aql query string // Convert the json byte msg to an aql query string
aqlQuery, err := aql.ConvertJSONToAQL(jsonMsg) aqlQuery, err := aql.ConvertJSONToAQL(&msg.Body)
errorhandler.FailWithError(err, "failed to parse incoming msg to AQL") // TODO: don't panic on error, send error message to client instead errorhandler.FailWithError(err, "failed to parse incoming msg to AQL") // TODO: don't panic on error, send error message to client instead
fmt.Println(*aqlQuery) // Get the queueID for this sessionID
sessionID, ok := msg.Headers["sessionID"].(string)
if !ok {
log.Println("No sessionID passed in message")
return
}
// Get queueID for this client
queueID := redisclient.Conn.Get(context.Background(), sessionID).Val()
log.Println(queueID)
// producer.PublishMessage(aqlQuery, , &amqp.Table{})
// execute and retrieve result // execute and retrieve result
// convert result to general (node-link (?)) format // convert result to general (node-link (?)) format
// publish converted result // publish converted result
headers := amqp.Table{}
headers["sessionID"] = sessionID
headers["type"] = "schemaResult"
log.Println("publishing message")
producer.PublishMessage(aqlQuery, &queueID, &headers)
msg.Ack(true)
} }
...@@ -28,5 +28,7 @@ spec: ...@@ -28,5 +28,7 @@ spec:
value: I9YPuqNYvN_o4597-LJ6i0sWZTDTV5kk value: I9YPuqNYvN_o4597-LJ6i0sWZTDTV5kk
- name: RABBIT_PASSWORD - name: RABBIT_PASSWORD
value: zBA4m-IzK6ejLtCdr2gxB6kHmURaUvy4 value: zBA4m-IzK6ejLtCdr2gxB6kHmURaUvy4
- name: REDIS_ADDRESS
value: redis.redis.svc.cluster.local:6379
imagePullSecrets: imagePullSecrets:
- name: docker-regcred - name: docker-regcred
\ No newline at end of file
...@@ -3,7 +3,8 @@ module query-service ...@@ -3,7 +3,8 @@ module query-service
go 1.15 go 1.15
require ( require (
github.com/go-redis/redis/v8 v8.8.0
github.com/streadway/amqp v1.0.0 github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.7.0 // indirect github.com/stretchr/testify v1.7.0
github.com/thijsheijden/alice v0.1.5 github.com/thijsheijden/alice v0.1.8
) )
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis/v8 v8.8.0 h1:fDZP58UN/1RD3DjtTXP/fFZ04TFohSYhjZDkcDe2dnw=
github.com/go-redis/redis/v8 v8.8.0/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/thijsheijden/alice v0.1.5 h1:kOZHhLGSHMja77I/Wvd19M7nvxgEWVIJ4vk5antCKdQ= github.com/thijsheijden/alice v0.1.5 h1:kOZHhLGSHMja77I/Wvd19M7nvxgEWVIJ4vk5antCKdQ=
github.com/thijsheijden/alice v0.1.5/go.mod h1:UypS/UTucbp+fmG1JtmXGCKPnGKimAC9AgmJ8iGoLNo= github.com/thijsheijden/alice v0.1.5/go.mod h1:UypS/UTucbp+fmG1JtmXGCKPnGKimAC9AgmJ8iGoLNo=
github.com/thijsheijden/alice v0.1.8 h1:Ts53lxrr5UrCQnQ9wdo6MLth/DkBuIrgDkDoI+hPhpQ=
github.com/thijsheijden/alice v0.1.8/go.mod h1:lYOP30HKhw/7xJa3lLhs+Xsdc5T7MRo7DOb/npzfg9I=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opentelemetry.io/otel v0.19.0 h1:Lenfy7QHRXPZVsw/12CWpxX6d/JkrX8wrx2vO8G80Ng=
go.opentelemetry.io/otel v0.19.0/go.mod h1:j9bF567N9EfomkSidSfmMwIwIBuP37AMAIzVW85OxSg=
go.opentelemetry.io/otel/metric v0.19.0 h1:dtZ1Ju44gkJkYvo+3qGqVXmf88tc+a42edOywypengg=
go.opentelemetry.io/otel/metric v0.19.0/go.mod h1:8f9fglJPRnXuskQmKpnad31lcLJ2VmNNqIsx/uIwBSc=
go.opentelemetry.io/otel/oteltest v0.19.0/go.mod h1:tI4yxwh8U21v7JD6R3BcA/2+RBoTKFexE/PJ/nSO7IA=
go.opentelemetry.io/otel/trace v0.19.0 h1:1ucYlenXIDA1OlHVLDZKX0ObXV5RLaq06DtUKz5e5zc=
go.opentelemetry.io/otel/trace v0.19.0/go.mod h1:4IXiNextNOpPnRlI4ryK69mn5iC84bjBWZQA5DXz/qg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
package messagequeue
import (
"os"
"query-service/internal/errorhandler"
"strconv"
"time"
"github.com/thijsheijden/alice"
)
// Create creates a broker
func Create() *alice.RabbitBroker {
// Create connection config using environment variables
rabbitUser := os.Getenv("RABBIT_USER")
rabbitPassword := os.Getenv("RABBIT_PASSWORD")
rabbitHost := os.Getenv("RABBIT_HOST")
rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT"))
if err != nil {
errorhandler.FailWithError(err, "port should be a number")
}
config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler)
broker := alice.CreateBroker(config)
return broker
}
package consumer package messagequeue
import ( import (
"os"
"query-service/internal/errorhandler" "query-service/internal/errorhandler"
"strconv"
"time"
"github.com/streadway/amqp" "github.com/streadway/amqp"
"github.com/thijsheijden/alice" "github.com/thijsheijden/alice"
) )
// ConsumeMessageFunc is a function type to be called when a message is consumed // ConsumeMessageFunc is a function type to be called when a message is consumed
type ConsumeMessageFunc func(*[]byte) type ConsumeMessageFunc func(amqp.Delivery)
// StartConsuming will start consuming messages // StartConsumer will start a consumer
// When a message is received the consumeMessage function will be called // When a message is received the consumeMessage function will be called
func StartConsuming(consumeMessage ConsumeMessageFunc, exchangeID string, routingKey string) { func StartConsumer(broker *alice.RabbitBroker, exchangeID *string, routingKey *string, consumeMessage ConsumeMessageFunc) {
// Create connection config using environment variables
rabbitUser := os.Getenv("RABBIT_USER")
rabbitPassword := os.Getenv("RABBIT_PASSWORD")
rabbitHost := os.Getenv("RABBIT_HOST")
rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT"))
config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler)
// Open connection to broker
conn := alice.Connect(*config)
// Declare the exchange we want to bind to // Declare the exchange we want to bind to
exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct) exchange, err := alice.CreateDefaultExchange(*exchangeID, alice.Direct)
if err != nil { if err != nil {
errorhandler.FailWithError(err, "failed to create exchange") errorhandler.FailWithError(err, "failed to create exchange")
} }
...@@ -38,11 +23,11 @@ func StartConsuming(consumeMessage ConsumeMessageFunc, exchangeID string, routin ...@@ -38,11 +23,11 @@ func StartConsuming(consumeMessage ConsumeMessageFunc, exchangeID string, routin
queue := alice.CreateQueue(exchange, "", true, false, true, false, nil) queue := alice.CreateQueue(exchange, "", true, false, true, false, nil)
// Create the consumer // Create the consumer
c, err := conn.CreateConsumer(queue, routingKey, alice.DefaultConsumerErrorHandler) c, err := broker.CreateConsumer(queue, *routingKey, alice.DefaultConsumerErrorHandler)
if err != nil { if err != nil {
errorhandler.FailWithError(err, "failed to create consumer") errorhandler.FailWithError(err, "failed to create consumer")
} }
// Start consuming messages // Start consuming messages
c.ConsumeMessages(nil, false, func(msg amqp.Delivery) { consumeMessage(&msg.Body) }) go c.ConsumeMessages(nil, false, func(msg amqp.Delivery) { consumeMessage(msg) })
} }
package messagequeue
import (
"query-service/internal/errorhandler"
"github.com/thijsheijden/alice"
)
// StartProducer starts a producer and returns it
func StartProducer(broker *alice.RabbitBroker) alice.Producer {
exchange, err := alice.CreateDefaultExchange("ui-direct-exchange", alice.Direct)
if err != nil {
errorhandler.FailWithError(err, "failed to create exchange for producer")
}
producer, err := broker.CreateProducer(exchange, alice.DefaultProducerErrorHandler)
if err != nil {
errorhandler.FailWithError(err, "failed to created producer")
}
return producer
}
package redisclient
import (
"context"
"fmt"
"log"
"os"
"github.com/go-redis/redis/v8"
)
// Conn is the redis connection
var Conn *redis.Client
// Start starts the redis client
func Start() {
// Grab the redis host and port from environment vars
redisAddress := os.Getenv("REDIS_ADDRESS")
// redisPassword := os.Getenv("REDIS_PASSWORD")
// Create redis client
Conn = redis.NewClient(&redis.Options{
Addr: redisAddress,
})
pong := Conn.Ping(context.Background())
log.Printf(fmt.Sprintf("%v", pong))
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment