From cde00ab41e02f1dfc376e17ccad2451bbf7a936f Mon Sep 17 00:00:00 2001 From: "marcello.ceschia" Date: Mon, 28 May 2012 12:05:24 +0200 Subject: [PATCH 01/12] start with ssl implementation --- META.json | 6 +++--- amqp.control | 2 +- sql/amqp.sql | 7 +++++++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/META.json b/META.json index 1e62976..36e9fd4 100644 --- a/META.json +++ b/META.json @@ -2,15 +2,15 @@ "name": "pg_amqp", "abstract": "AMQP protocol support for PostgreSQL", "description": "The pg_amqp package provides the ability for postgres statements to directly publish messages to an AMQP broker.", - "version": "0.3.0", - "maintainer": "Theo Schlossnagle ", + "version": "0.4.0", + "maintainer": ["Theo Schlossnagle ", "Marcello Ceschia "], "license": [ "bsd", "mozilla_1_0" ], "status": "stable", "provides": { "amqp": { "abstract": "AMQP protocol support for PostgreSQL", "file": "sql/amqp.sql", - "version": "0.3.0" + "version": "0.4.0" } }, "meta-spec": { diff --git a/amqp.control b/amqp.control index fe86c62..392ce5a 100644 --- a/amqp.control +++ b/amqp.control @@ -1,6 +1,6 @@ # amqp extension comment = 'AMQP protocol support for PostgreSQL' -default_version = '0.3.0' +default_version = '0.4.0' module_pathname = '$libdir/pg_amqp' relocatable = false schema = amqp diff --git a/sql/amqp.sql b/sql/amqp.sql index 676b2c9..601739f 100644 --- a/sql/amqp.sql +++ b/sql/amqp.sql @@ -49,6 +49,13 @@ create table amqp.broker ( vhost text, username text, password text, + requiressl boolean DEFAULT false, + verify_cert boolean DEFAULT true, + verify_cn boolean DEFAULT true, + cert text, + key text, + key_password character varying, + ca text, primary key(broker_id, host, port) ); From 35d07ed130761bf4ab3da6a174ff80b1b61375d2 Mon Sep 17 00:00:00 2001 From: "marcello.ceschia" Date: Mon, 28 May 2012 12:13:20 +0200 Subject: [PATCH 02/12] add ssl notice to README --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b7f051f..c07034e 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,9 @@ -pg_amqp 0.3.0 +pg_amqp 0.4.0 ============= The pg_amqp package provides the ability for postgres statements to directly -publish messages to an [AMQP](http://www.amqp.org/) broker. +publish messages to an [AMQP](http://www.amqp.org/) broker. Version 0.4.0 also provides an implementation +for secured connections using open ssl. To build v0.4.0 openssl dev package is required. Building -------- From 7cc5d46bed0634ac7324f50426b1b78ffcf42f32 Mon Sep 17 00:00:00 2001 From: "marcello.ceschia" Date: Mon, 28 May 2012 13:05:06 +0200 Subject: [PATCH 03/12] first working version --- src/librabbitmq/amqp.h | 13 +- src/librabbitmq/amqp_api.c | 9 +- src/librabbitmq/amqp_connection.c | 372 ++++++++++++++++++++++++++++-- src/librabbitmq/amqp_private.h | 30 +++ src/librabbitmq/amqp_socket.c | 9 +- src/pg_amqp.c | 197 +++++++++++----- 6 files changed, 526 insertions(+), 104 deletions(-) diff --git a/src/librabbitmq/amqp.h b/src/librabbitmq/amqp.h index 5cc43a8..d8fc01a 100644 --- a/src/librabbitmq/amqp.h +++ b/src/librabbitmq/amqp.h @@ -98,6 +98,11 @@ typedef enum amqp_response_type_enum_ { AMQP_RESPONSE_SERVER_EXCEPTION } amqp_response_type_enum; +typedef enum amqp_ssl_flags_type_enum_ { + AMQP_SSL_FLAG_VERIFY = 1 << 0, + AMQP_SSL_FLAG_CHECK_CN = 1 << 2, +} amqp_ssl_flags_type_enum; + typedef struct amqp_rpc_reply_t_ { amqp_response_type_enum reply_type; amqp_method_t reply; @@ -113,8 +118,7 @@ typedef enum amqp_sasl_method_enum_ { typedef struct amqp_basic_return_t_ amqp_basic_return_t; typedef int (*amqp_output_fn_t)(void *context, void *buffer, size_t count); -typedef void (*amqp_basic_return_fn_t)(amqp_channel_t, amqp_basic_return_t *, - void *); +typedef void (*amqp_basic_return_fn_t)(amqp_channel_t, amqp_basic_return_t *, void *); /* Opaque struct. */ @@ -140,6 +144,7 @@ extern amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src); } \ }) +extern amqp_connection_state_t amqp_new_ssl_connection(const char *certificate, const char *key, const char *password, const char *ca, unsigned short flags); extern amqp_connection_state_t amqp_new_connection(void); extern int amqp_get_sockfd(amqp_connection_state_t state); extern void amqp_set_sockfd(amqp_connection_state_t state, @@ -173,8 +178,7 @@ extern int amqp_send_frame_to(amqp_connection_state_t state, extern int amqp_table_entry_cmp(void const *entry1, void const *entry2); -extern int amqp_open_socket(char const *hostname, int portnumber, - struct timeval *timeout); +extern int amqp_connect(amqp_connection_state_t state, char const *hostname, int portnumber, struct timeval *timeout); extern int amqp_send_header(amqp_connection_state_t state); extern int amqp_send_header_to(amqp_connection_state_t state, @@ -343,6 +347,7 @@ extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state); */ extern amqp_rpc_reply_t *amqp_get_rpc_reply(void); + #ifdef __cplusplus } #endif diff --git a/src/librabbitmq/amqp_api.c b/src/librabbitmq/amqp_api.c index 033f84a..56728f0 100644 --- a/src/librabbitmq/amqp_api.c +++ b/src/librabbitmq/amqp_api.c @@ -103,14 +103,9 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, code, amqp_cstring_bytes(codestr), 0, 0); } -amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, - int code) +amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) { - char codestr[13]; - snprintf(codestr, sizeof(codestr), "%d", code); - return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK, - amqp_connection_close_t, - code, amqp_cstring_bytes(codestr), 0, 0); + return state->close_connection(state, code); } amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, diff --git a/src/librabbitmq/amqp_connection.c b/src/librabbitmq/amqp_connection.c index a1ec1cd..8bcc8aa 100644 --- a/src/librabbitmq/amqp_connection.c +++ b/src/librabbitmq/amqp_connection.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -13,6 +14,7 @@ #include + #define INITIAL_FRAME_POOL_PAGE_SIZE 65536 #define INITIAL_DECODING_POOL_PAGE_SIZE 131072 #define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072 @@ -27,13 +29,279 @@ _check_state->state); \ } +BIO *bio_err = NULL; + + +static int ssl_password_cb(char *buf, int num, int rwflag, void *data) { + if (num < strlen((char *) data) + 1){ + return 0; + } + return (strlen(strcpy(buf, (char *) data))); +} + +static amqp_rpc_reply_t _amqp_connection_close(amqp_connection_state_t state, int code) +{ + amqp_rpc_reply_t result; + char codestr[13]; + snprintf(codestr, sizeof(codestr), "%d", code); + + /* close socket */ + if(state->sockfd >= 0){ + result = AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK, amqp_connection_close_t, code, amqp_cstring_bytes(codestr), 0, 0); + close(state->sockfd); + state->sockfd = -1; + } + fprintf(stderr, "_amqp_connection_closed"); + return result; +} + +static amqp_rpc_reply_t _amqp_ssl_connection_close(amqp_connection_state_t state, int code) +{ + + amqp_rpc_reply_t result = _amqp_connection_close(state, code); + + if(state->ssl){ + SSL_shutdown(state->ssl); + SSL_free(state->ssl); + state->ssl = NULL; + } + + return result; +} + +int amqp_connect(amqp_connection_state_t state, char const *hostname, int portnumber, struct timeval *timeout){ + return state->connect(state, hostname, portnumber, timeout); +} + +/** + * crate native connection + * \remark also used by ssl implementation + * \return 1 if successful done, otherwise 0 + */ +static int _amqp_connect(amqp_connection_state_t state, const char *host, int port, struct timeval *timeout) { + int sockfd = -1; + + sockfd = amqp_open_socket(host, port, timeout); + amqp_set_sockfd(state, sockfd); + + return (sockfd >= 0) ? 1 : 0; +} + +/** + * initialize ssl connection and verify certificate using ssl_flags + * \return 1 if successful done, otherwise 0 + */ +static int _amqp_ssl_connect(amqp_connection_state_t state, const char *host, int port, struct timeval *timeout) { + int success = -1; + int rv = 0; + int verified = -1; + char cert_cn[256] = "\0"; + char error[256] = "\0"; + X509 *peerCert = NULL; + + /* create socket */ + success = _amqp_connect(state, host, port, timeout); + if(!success) + return 0; + + + state->ssl = SSL_new(state->ctx); + state->bio = BIO_new_socket(state->sockfd, BIO_NOCLOSE); + SSL_set_bio(state->ssl, state->bio, state->bio); + + + + + do{ + + /* do connect */ + if ((rv = SSL_connect(state->ssl)) <= 0) { + sprintf(error, "Error during setup SSL context, rv=%d", rv); + break; + } + + /* verify the server certificate */ + if ( (state->ssl_flags & AMQP_SSL_FLAG_VERIFY) && (verified = SSL_get_verify_result(state->ssl)) != X509_V_OK) { + sprintf(error, "SSL certificate presented by peer cannot be verified: %d\n",verified); + break; + } + + /* verify common name */ + if ( state->ssl_flags & AMQP_SSL_FLAG_CHECK_CN ) { + + /* get peer certificate */ + if (!(peerCert = SSL_get_peer_certificate(state->ssl))) { + sprintf(error, "No SSL certificate was presented by peer"); + break; + } + + X509_NAME_get_text_by_NID(X509_get_subject_name(peerCert), NID_commonName, cert_cn, sizeof(cert_cn)); + X509_free(peerCert); + + //TODO add wildcard support + if (strcasecmp(cert_cn, host)) { + sprintf(error, "common name '%s' doesn't match host name '%s'", cert_cn, host); + break; + } + } + }while(0); + + /* if an error occured, close connection */ + if(strlen(error)>0){ + fprintf(stderr, "%s\n", error); + _amqp_ssl_connection_close(state, AMQP_REPLY_SUCCESS); + return 0; + } + + return 1; +} + +static void _amqp_destroy_connection(amqp_connection_state_t state){ + empty_amqp_pool(&state->frame_pool); + empty_amqp_pool(&state->decoding_pool); + free(state->outbound_buffer.bytes); + free(state->sock_inbound_buffer.bytes); +} + +/** + * destroy ssl connection and free ssl context + */ +static void _amqp_destroy_ssl_connection(amqp_connection_state_t state){ + _amqp_destroy_connection(state); + + if(state->ssl_key_password) + free(state->ssl_key_password); + + /* destroy ssl context */ + if(state->ctx){ + SSL_CTX_free(state->ctx); + state->ctx = NULL; + } +} + +amqp_connection_state_t amqp_new_ssl_connection(const char *certificate, const char *key, const char *password, const char *ca, unsigned short flags) { + amqp_connection_state_t state; + X509 *cert = NULL, *cacert = NULL; + RSA *rsa = NULL; + BIO *cbio, *kbio; + X509_STORE *store = NULL; + char error[254] = "\0"; + + + /** initialize connection_state */ + state = amqp_new_connection(); + if(!state){ + return NULL; + } + + /* reset callbacks to ssl */ + state->connect = _amqp_ssl_connect; + state->write = amqp_ssl_write; + state->read = amqp_ssl_read; + state->close_connection = _amqp_ssl_connection_close; + state->destroy_connection = _amqp_destroy_ssl_connection; + + /* save flags for ssl */ + state->ssl_flags = flags; + + /**S initialize ssl */ + if(!bio_err){ + SSL_library_init(); + OpenSSL_add_all_algorithms(); +// SSL_load_error_strings(); + + /* An error write context */ + bio_err = BIO_new_fp(stderr, BIO_NOCLOSE); + } + + + do{ + if (! (state->ctx = SSL_CTX_new(SSLv23_method()))) { + sprintf(error, "Error during setup SSL context"); + break; + } + + /* read certificate */ + if(!SSL_CTX_use_certificate_chain_file(state->ctx, certificate)){ + cbio = BIO_new_mem_buf((void*)certificate, -1); + PEM_read_bio_X509(cbio, &cert, 0, NULL); + BIO_free(cbio); + + if (!SSL_CTX_use_certificate(state->ctx, cert)) { + sprintf(error, "Can't read certificate file"); + X509_free(cert); + break; + } + X509_free(cert); + } + + + if (password != NULL) { + state->ssl_key_password = strdup(password); + SSL_CTX_set_default_passwd_cb_userdata(state->ctx, (void *) state->ssl_key_password); + SSL_CTX_set_default_passwd_cb(state->ctx, ssl_password_cb); + } + + /* try to read as file */ + if (!SSL_CTX_use_PrivateKey_file(state->ctx, key, SSL_FILETYPE_PEM)) { + + /* try reading from memory */ + kbio = BIO_new_mem_buf((void*)key, -1); + PEM_read_bio_RSAPrivateKey(kbio, &rsa, (password) ? ssl_password_cb : NULL, state->ssl_key_password); + BIO_free(kbio); + if (!SSL_CTX_use_RSAPrivateKey(state->ctx, rsa)) { + sprintf(error, "Can't read key file"); + RSA_free(rsa); + break; + } + RSA_free(rsa); + + } + + /* load ca certificate */ + if(ca){ + if (!SSL_CTX_load_verify_locations(state->ctx, ca, 0)) { + /* ca is not a ca file, try reading mem buffer */ + cbio = BIO_new_mem_buf((void*)ca, -1); + PEM_read_bio_X509(cbio, &cacert, 0, NULL); + BIO_free(cbio); + store = SSL_CTX_get_cert_store(state->ctx); + if(!X509_STORE_add_cert(store, cacert)){ + fprintf(stderr, "Can't add ca file to ca store\n"); + } + X509_free(cacert); + } + } + + /* everything was fine, return state */ + return state; + }while(0); + + /* because we reached this point, an error muss be occured */ + empty_amqp_pool(&state->frame_pool); + empty_amqp_pool(&state->decoding_pool); + free(state); + fprintf(stderr, "%s\n", error); + return NULL; + +} + +/** + * initialize amqp connection_state + */ amqp_connection_state_t amqp_new_connection(void) { - amqp_connection_state_t state = - (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); + amqp_connection_state_t state = (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); if (state == NULL) { return NULL; } + + /* initialize with default connect method */ + state->connect = _amqp_connect; + state->write = amqp_write; + state->read = amqp_read; + state->close_connection = _amqp_connection_close; + state->destroy_connection = _amqp_destroy_connection; init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE); init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE); @@ -69,13 +337,13 @@ amqp_connection_state_t amqp_new_connection(void) { return state; } + + int amqp_get_sockfd(amqp_connection_state_t state) { return state->sockfd; } -void amqp_set_sockfd(amqp_connection_state_t state, - int sockfd) -{ +void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) { state->sockfd = sockfd; } @@ -112,10 +380,7 @@ int amqp_get_channel_max(amqp_connection_state_t state) { } void amqp_destroy_connection(amqp_connection_state_t state) { - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); - free(state->outbound_buffer.bytes); - free(state->sock_inbound_buffer.bytes); + state->destroy_connection(state); free(state); } @@ -126,8 +391,7 @@ static void return_to_idle(amqp_connection_state_t state) { state->state = CONNECTION_STATE_IDLE; } -void amqp_set_basic_return_cb(amqp_connection_state_t state, - amqp_basic_return_fn_t f, void *data) { +void amqp_set_basic_return_cb(amqp_connection_state_t state, amqp_basic_return_fn_t f, void *data) { state->basic_return_callback = f; state->basic_return_callback_data = data; } @@ -238,8 +502,7 @@ int amqp_handle_input(amqp_connection_state_t state, decoded_frame->frame_type = AMQP_FRAME_BODY; decoded_frame->payload.body_fragment.len = fragment_len; - decoded_frame->payload.body_fragment.bytes = - D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len); + decoded_frame->payload.body_fragment.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len); break; } @@ -256,10 +519,10 @@ int amqp_handle_input(amqp_connection_state_t state, if(decoded_frame->frame_type == AMQP_FRAME_METHOD && decoded_frame->payload.method.id == AMQP_BASIC_RETURN_METHOD) { + amqp_basic_return_t *m = decoded_frame->payload.method.decoded; if(state->basic_return_callback) - state->basic_return_callback(decoded_frame->channel, m, - state->basic_return_callback_data); + state->basic_return_callback(decoded_frame->channel, m, state->basic_return_callback_data); } return total_bytes_consumed; @@ -268,8 +531,7 @@ int amqp_handle_input(amqp_connection_state_t state, case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER; decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL; - amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P', - "Invalid protocol header received"); + amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P', "Invalid protocol header received"); decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4); decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5); decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6); @@ -381,18 +643,16 @@ int amqp_send_frame(amqp_connection_state_t state, separate_body = inner_send_frame(state, frame, &encoded, &payload_len); switch (separate_body) { case 0: - AMQP_CHECK_RESULT(write(state->sockfd, - state->outbound_buffer.bytes, - payload_len + (HEADER_SIZE + FOOTER_SIZE))); + AMQP_CHECK_RESULT(state->write(state, state->outbound_buffer.bytes, payload_len + (HEADER_SIZE + FOOTER_SIZE))); return 0; case 1: - AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE)); - AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len)); + AMQP_CHECK_RESULT(state->write(state, state->outbound_buffer.bytes, HEADER_SIZE)); + AMQP_CHECK_RESULT(state->write(state, encoded.bytes, payload_len)); { unsigned char frame_end_byte = AMQP_FRAME_END; assert(FOOTER_SIZE == 1); - AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE)); + AMQP_CHECK_RESULT(state->write(state, &frame_end_byte, FOOTER_SIZE)); } return 0; @@ -432,3 +692,69 @@ int amqp_send_frame_to(amqp_connection_state_t state, return separate_body; } } + +int amqp_write(amqp_connection_state_t state, void *buffer, size_t size) { + return write(state->sockfd, buffer, size); +} +int amqp_ssl_write(amqp_connection_state_t state, void *buffer, size_t size) { + int err; + int len; + + len = SSL_write(state->ssl, buffer, size); + + if (! len) { + switch ((err = SSL_get_error(state->ssl, len))) { + case SSL_ERROR_SYSCALL: + if (errno == EWOULDBLOCK || errno == EAGAIN || errno == EINTR) { + case SSL_ERROR_WANT_WRITE: + case SSL_ERROR_WANT_READ: + errno = EWOULDBLOCK; + return (0); + } + + case SSL_ERROR_SSL: + if (errno == EAGAIN) { + return 0; + } + + default: + return 0; + } + } + + + return len; +} +int amqp_read(amqp_connection_state_t state, void *buffer, size_t size) { + return read(state->sockfd, buffer, size); +} + +int amqp_ssl_read(amqp_connection_state_t state, void *buffer, size_t size) { + int err; + int len; + + len = SSL_read(state->ssl, buffer, size); + + if (!len) { + switch ((err = SSL_get_error(state->ssl, len))) { + case SSL_ERROR_SYSCALL: + if ((errno == EWOULDBLOCK) || (errno == EAGAIN) || (errno == EINTR)) { + case SSL_ERROR_WANT_READ: + errno = EWOULDBLOCK; + return 0; + } + + case SSL_ERROR_SSL: + if (errno == EAGAIN) { + return 0; + } + + default: + return 0; + } + } + + + return len; +} +// kate: indent-width 2; replace-tabs off; indent-mode cstyle; auto-insert-doxygen on; line-numbers on; tab-indents on; keep-extra-spaces off; auto-brackets on; diff --git a/src/librabbitmq/amqp_private.h b/src/librabbitmq/amqp_private.h index ce12f9d..3d8b1fd 100644 --- a/src/librabbitmq/amqp_private.h +++ b/src/librabbitmq/amqp_private.h @@ -6,6 +6,8 @@ extern "C" { #endif #include /* ntohl, htonl, ntohs, htons */ +#include +#include /* * Connection states: @@ -48,6 +50,12 @@ typedef struct amqp_link_t_ { void *data; } amqp_link_t; +typedef int (amqp_connect_fn_t)(amqp_connection_state_t state, const char *host, int port, struct timeval *timeout); +typedef int (amqp_write_fn_t)(amqp_connection_state_t state, void *buffer, size_t nb); +typedef int (amqp_read_fn_t)(amqp_connection_state_t state, void *buffer, size_t nb); +typedef amqp_rpc_reply_t (amqp_close_connection_fn_t)(amqp_connection_state_t state, int code); +typedef void (amqp_destroy_connection_fn_t)(amqp_connection_state_t state); + struct amqp_connection_state_t_ { amqp_pool_t frame_pool; amqp_pool_t decoding_pool; @@ -65,6 +73,20 @@ struct amqp_connection_state_t_ { amqp_bytes_t outbound_buffer; int sockfd; + + /** ssl support */ + SSL_CTX *ctx; + SSL *ssl; + BIO *bio; + unsigned short ssl_flags; + amqp_connect_fn_t *connect; + amqp_write_fn_t *write; + amqp_read_fn_t *read; + char *ssl_key_password; + amqp_close_connection_fn_t *close_connection; + amqp_destroy_connection_fn_t *destroy_connection; + /** */ + amqp_bytes_t sock_inbound_buffer; size_t sock_inbound_offset; size_t sock_inbound_limit; @@ -74,6 +96,8 @@ struct amqp_connection_state_t_ { amqp_basic_return_fn_t basic_return_callback; void *basic_return_callback_data; + + }; #define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); }) @@ -109,6 +133,12 @@ extern int amqp_encode_table(amqp_bytes_t encoded, amqp_table_t *input, int *offsetptr); +extern int amqp_open_socket(char const *hostname, int portnumber, struct timeval *timeout); +int amqp_write(amqp_connection_state_t state, void *buffer, size_t size); +int amqp_ssl_write(amqp_connection_state_t state, void *buffer, size_t size); +int amqp_read(amqp_connection_state_t state, void *buffer, size_t size); +int amqp_ssl_read(amqp_connection_state_t state, void* buffer, size_t size); + #define amqp_assert(condition, ...) \ ({ \ if (!(condition)) { \ diff --git a/src/librabbitmq/amqp_socket.c b/src/librabbitmq/amqp_socket.c index a9aa8fc..d79a55c 100644 --- a/src/librabbitmq/amqp_socket.c +++ b/src/librabbitmq/amqp_socket.c @@ -20,8 +20,7 @@ #include -int amqp_open_socket(char const *hostname, - int portnumber, struct timeval *timeout) +int amqp_open_socket(char const *hostname, int portnumber, struct timeval *timeout) { int result = -1; int sockfd; @@ -97,7 +96,7 @@ static char *header() { } int amqp_send_header(amqp_connection_state_t state) { - return write(state->sockfd, header(), 8); + return state->write(state, header(), 8); } int amqp_send_header_to(amqp_connection_state_t state, @@ -176,9 +175,7 @@ static int wait_frame_inner(amqp_connection_state_t state, assert(result != 0); } - result = read(state->sockfd, - state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len); + result = state->read(state, state->sock_inbound_buffer.bytes, state->sock_inbound_buffer.len); if (result < 0) { return -errno; } diff --git a/src/pg_amqp.c b/src/pg_amqp.c index 4ed70a3..f086e10 100644 --- a/src/pg_amqp.c +++ b/src/pg_amqp.c @@ -73,7 +73,6 @@ Datum pg_amqp_disconnect(PG_FUNCTION_ARGS); struct brokerstate { int broker_id; amqp_connection_state_t conn; - int sockfd; int uncommitted; int inerror; int idx; @@ -86,9 +85,14 @@ static void local_amqp_disconnect_bs(struct brokerstate *bs) { if(bs && bs->conn) { int errorstate = bs->inerror; - amqp_connection_close(bs->conn, AMQP_REPLY_SUCCESS); - if(bs->sockfd >= 0) close(bs->sockfd); - amqp_destroy_connection(bs->conn); + + if(bs->conn) { + amqp_connection_close(bs->conn, AMQP_REPLY_SUCCESS); + } + + if(bs->conn) { + amqp_destroy_connection(bs->conn); + } memset(bs, 0, sizeof(*bs)); bs->inerror = errorstate; } @@ -99,30 +103,42 @@ static void amqp_local_phase2(XactEvent event, void *arg) { switch(event) { case XACT_EVENT_COMMIT: for(bs = HEAD_BS; bs; bs = bs->next) { - if(bs->inerror) local_amqp_disconnect_bs(bs); - bs->inerror = 0; - if(!bs->uncommitted) continue; - if(bs->conn) amqp_tx_commit(bs->conn, 2, AMQP_EMPTY_TABLE); - reply = amqp_get_rpc_reply(); - if(reply->reply_type != AMQP_RESPONSE_NORMAL) { + if(bs->inerror) + local_amqp_disconnect_bs(bs); + + bs->inerror = 0; + if(!bs->uncommitted) + continue; + + if(bs->conn) + amqp_tx_commit(bs->conn, 2, AMQP_EMPTY_TABLE); + + reply = amqp_get_rpc_reply(); + if(reply->reply_type != AMQP_RESPONSE_NORMAL) { elog(WARNING, "amqp could not commit tx mode on broker %d", bs->broker_id); local_amqp_disconnect_bs(bs); - } - bs->uncommitted = 0; + } + bs->uncommitted = 0; } break; case XACT_EVENT_ABORT: for(bs = HEAD_BS; bs; bs = bs->next) { - if(bs->inerror) local_amqp_disconnect_bs(bs); - bs->inerror = 0; - if(!bs->uncommitted) continue; - if(bs->conn) amqp_tx_rollback(bs->conn, 2, AMQP_EMPTY_TABLE); - reply = amqp_get_rpc_reply(); - if(reply->reply_type != AMQP_RESPONSE_NORMAL) { + if(bs->inerror) + local_amqp_disconnect_bs(bs); + + bs->inerror = 0; + if(!bs->uncommitted) + continue; + + if(bs->conn) + amqp_tx_rollback(bs->conn, 2, AMQP_EMPTY_TABLE); + + reply = amqp_get_rpc_reply(); + if(reply->reply_type != AMQP_RESPONSE_NORMAL) { elog(WARNING, "amqp could not commit tx mode on broker %d", bs->broker_id); local_amqp_disconnect_bs(bs); - } - bs->uncommitted = 0; + } + bs->uncommitted = 0; } break; case XACT_EVENT_PREPARE: @@ -136,8 +152,7 @@ void _PG_init() { RegisterXactCallback(amqp_local_phase2, NULL); } -static struct brokerstate * -local_amqp_get_a_bs(broker_id) { +static struct brokerstate *local_amqp_get_a_bs(broker_id) { struct brokerstate *bs; for(bs = HEAD_BS; bs; bs = bs->next) { if(bs->broker_id == broker_id) return bs; @@ -148,15 +163,14 @@ local_amqp_get_a_bs(broker_id) { HEAD_BS = bs; return bs; } -static struct brokerstate * -local_amqp_get_bs(broker_id) { +static struct brokerstate *local_amqp_get_bs(broker_id) { char sql[1024]; char host_copy[300] = ""; int tries = 0; struct brokerstate *bs = local_amqp_get_a_bs(broker_id); if(bs->conn) return bs; if(SPI_connect() == SPI_ERROR_CONNECT) return NULL; - snprintf(sql, sizeof(sql), "SELECT host, port, vhost, username, password " + snprintf(sql, sizeof(sql), "SELECT host, port, vhost, username, password, requiressl, verify_cert, verify_cn, cert, key, key_password, ca " " FROM amqp.broker " " WHERE broker_id = %d " " ORDER BY host DESC, port", broker_id); @@ -167,56 +181,117 @@ local_amqp_get_bs(broker_id) { if(SPI_processed > 0) { struct timeval hb = { .tv_sec = 2UL, .tv_usec = 0UL }; amqp_rpc_reply_t *reply, s_reply; - char *host, *vhost, *user, *pass; - Datum port_datum; + char *host, *vhost, *user, *pass, *cert = NULL, *key = NULL, *ca = NULL, *key_pass = NULL; + Datum port_datum, requiressl_datum; bool is_null; + bool requiressl= false, verifyCert = false, verifyCN = false; int port = 5672; + int ssl_flags = 0; + int success = 0; + bs->idx = (bs->idx + 1) % SPI_processed; + host = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 1); - if(!host) host = "localhost"; + if(!host) + host = "localhost"; + port_datum = SPI_getbinval(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 2, &is_null); - if(!is_null) port = DatumGetInt32(port_datum); + if(!is_null) + port = DatumGetInt32(port_datum); + vhost = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 3); - if(!vhost) vhost = "/"; + if(!vhost) + vhost = "/"; + user = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 4); - if(!user) user = "guest"; + if(!user) + user = "guest"; + pass = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 5); - if(!pass) pass = "guest"; + if(!pass) + pass = "guest"; + + /* ssl data */ + requiressl_datum = SPI_getbinval(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 6, &is_null); + if(!is_null) + requiressl = DatumGetBool(requiressl_datum); + + requiressl_datum = SPI_getbinval(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 7, &is_null); + if(!is_null) + verifyCert = DatumGetBool(requiressl_datum); + + requiressl_datum = SPI_getbinval(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 8, &is_null); + if(!is_null) + verifyCN = DatumGetBool(requiressl_datum); + + /* get ssl values */ + cert = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 9); + key = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 10); + key_pass = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 11); + ca = SPI_getvalue(SPI_tuptable->vals[bs->idx], SPI_tuptable->tupdesc, 12); + + /* set ssl flags */ + if(verifyCert) + ssl_flags |= AMQP_SSL_FLAG_VERIFY; + + if(verifyCN) + ssl_flags |= AMQP_SSL_FLAG_CHECK_CN; + + snprintf(host_copy, sizeof(host_copy), "%s:%d", host, port); - bs->conn = amqp_new_connection(); - if(!bs->conn) { SPI_finish(); return NULL; } - bs->sockfd = amqp_open_socket(host, port, &hb); - if(bs->sockfd < 0) { - elog(WARNING, "amqp[%s] login socket/connect failed: %s", - host_copy, strerror(-bs->sockfd)); - goto busted; + do{ + bs->conn = amqp_new_ssl_connection(cert, key, key_pass, ca, ssl_flags); + if( NULL != bs->conn) { + elog(INFO, "amqp[%s] successfully created ssl connection for broker %d", host_copy, broker_id); + break; + } + + /* ssl is not required, try create unsecure connection */ + if( !requiressl ) { + bs->conn = amqp_new_connection(); + } else { + elog(WARNING, "amqp[%s] unable to create ssl connetion, but ssl is required", host_copy); + } + + + if(!bs->conn) { + SPI_finish(); + return NULL; + } + }while(0); + +// bs->sockfd = amqp_open_socket(host, port, &hb); + success = amqp_connect(bs->conn, host, port, &hb); + if(!success) { +// elog(WARNING, "amqp[%s] login socket/connect failed: %s", host_copy, strerror(-bs->sockfd)); + elog(WARNING, "amqp[%s] login socket/connect failed!", host_copy); + goto busted; } - amqp_set_sockfd(bs->conn, bs->sockfd); - s_reply = amqp_login(bs->conn, vhost, 0, 131072, - 0, AMQP_SASL_METHOD_PLAIN, +// amqp_set_sockfd(bs->conn, bs->sockfd); + s_reply = amqp_login(bs->conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user, pass); if(s_reply.reply_type != AMQP_RESPONSE_NORMAL) { - elog(WARNING, "amqp[%s] login failed on broker %d", host_copy, broker_id); - goto busted; + elog(WARNING, "amqp[%s] login failed on broker %d", host_copy, broker_id); + goto busted; } amqp_channel_open(bs->conn, 1); reply = amqp_get_rpc_reply(); if(reply->reply_type != AMQP_RESPONSE_NORMAL) { - elog(WARNING, "amqp[%s] channel open failed on broker %d", host_copy, broker_id); - goto busted; + elog(WARNING, "amqp[%s] channel open failed on broker %d", host_copy, broker_id); + goto busted; } amqp_channel_open(bs->conn, 2); reply = amqp_get_rpc_reply(); if(reply->reply_type != AMQP_RESPONSE_NORMAL) { - elog(WARNING, "amqp[%s] channel open failed on broker %d", host_copy, broker_id); - goto busted; + elog(WARNING, "amqp[%s] channel open failed on broker %d", host_copy, broker_id); + goto busted; } amqp_tx_select(bs->conn, 2, AMQP_EMPTY_TABLE); reply = amqp_get_rpc_reply(); if(reply->reply_type != AMQP_RESPONSE_NORMAL) { - elog(WARNING, "amqp[%s] could not start tx mode on broker %d", host_copy, broker_id); - goto busted; + elog(WARNING, "amqp[%s] could not start tx mode on broker %d", host_copy, broker_id); + goto busted; } } else { elog(WARNING, "amqp can't find broker %d", broker_id); @@ -235,15 +310,14 @@ local_amqp_get_bs(broker_id) { local_amqp_disconnect_bs(bs); return bs; } -static void -local_amqp_disconnect(broker_id) { + +static void local_amqp_disconnect(broker_id) { struct brokerstate *bs = local_amqp_get_a_bs(broker_id); local_amqp_disconnect_bs(bs); } PG_FUNCTION_INFO_V1(pg_amqp_exchange_declare); -Datum -pg_amqp_exchange_declare(PG_FUNCTION_ARGS) { +Datum pg_amqp_exchange_declare(PG_FUNCTION_ARGS) { struct brokerstate *bs; if(!PG_ARGISNULL(0)) { int broker_id; @@ -273,8 +347,7 @@ pg_amqp_exchange_declare(PG_FUNCTION_ARGS) { } PG_RETURN_BOOL(0 != 0); } -static Datum -pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { +static Datum pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { struct brokerstate *bs; if(!PG_ARGISNULL(0)) { int broker_id; @@ -294,8 +367,7 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { set_bytes_from_text(exchange_b,1); set_bytes_from_text(routing_key_b,2); set_bytes_from_text(body_b,3); - rv = amqp_basic_publish(bs->conn, channel, exchange_b, routing_key_b, - mandatory, immediate, NULL, body_b); + rv = amqp_basic_publish(bs->conn, channel, exchange_b, routing_key_b, mandatory, immediate, NULL, body_b); reply = amqp_get_rpc_reply(); if(rv || reply->reply_type != AMQP_RESPONSE_NORMAL) { if(once_more && (channel == 1 || bs->uncommitted == 0)) { @@ -315,20 +387,17 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { } PG_FUNCTION_INFO_V1(pg_amqp_publish); -Datum -pg_amqp_publish(PG_FUNCTION_ARGS) { +Datum pg_amqp_publish(PG_FUNCTION_ARGS) { return pg_amqp_publish_opt(fcinfo, 2); } PG_FUNCTION_INFO_V1(pg_amqp_autonomous_publish); -Datum -pg_amqp_autonomous_publish(PG_FUNCTION_ARGS) { +Datum pg_amqp_autonomous_publish(PG_FUNCTION_ARGS) { return pg_amqp_publish_opt(fcinfo, 1); } PG_FUNCTION_INFO_V1(pg_amqp_disconnect); -Datum -pg_amqp_disconnect(PG_FUNCTION_ARGS) { +Datum pg_amqp_disconnect(PG_FUNCTION_ARGS) { if(!PG_ARGISNULL(0)) { int broker_id; broker_id = PG_GETARG_INT32(0); From ac7e078bcff7c8cc1db526aa9e1a63eb627728ea Mon Sep 17 00:00:00 2001 From: Duncan Burke Date: Tue, 3 Jul 2012 05:26:45 +1000 Subject: [PATCH 04/12] Added support for arbitrary headers and setting content_type Signed-off-by: Duncan Burke --- sql/amqp.sql | 8 ++-- src/librabbitmq/amqp_api.c | 2 + src/pg_amqp.c | 89 +++++++++++++++++++++++++++++++++++++- 3 files changed, 94 insertions(+), 5 deletions(-) diff --git a/sql/amqp.sql b/sql/amqp.sql index 676b2c9..a389718 100644 --- a/sql/amqp.sql +++ b/sql/amqp.sql @@ -10,11 +10,11 @@ comment on function amqp.exchange_declare(integer, varchar, varchar, boolean, bo auto_delete should be set to false as unexpected errors can cause disconnect/reconnect which would trigger the auto deletion of the exchange.'; -create function amqp.publish(integer, varchar, varchar, varchar) +create function amqp.publish(integer, varchar, varchar, varchar, varchar[][2], varchar) returns boolean as 'pg_amqp.so', 'pg_amqp_publish' language C immutable; -comment on function amqp.publish(integer, varchar, varchar, varchar) is +comment on function amqp.publish(integer, varchar, varchar, varchar, varchar[][2], varchar) is 'Publishes a message (broker_id, exchange, routing_key, message). The message will only be published if the containing PostgreSQL transaction successfully commits. Under certain circumstances, the AMQP commit might fail. In this case, a WARNING is emitted. @@ -22,11 +22,11 @@ circumstances, the AMQP commit might fail. In this case, a WARNING is emitted. Publish returns a boolean indicating if the publish command was successful. Note that as AMQP publish is asynchronous, you may find out later it was unsuccessful.'; -create function amqp.autonomous_publish(integer, varchar, varchar, varchar) +create function amqp.autonomous_publish(integer, varchar, varchar, varchar, varchar[][2], varchar) returns boolean as 'pg_amqp.so', 'pg_amqp_autonomous_publish' language C immutable; -comment on function amqp.autonomous_publish(integer, varchar, varchar, varchar) is +comment on function amqp.autonomous_publish(integer, varchar, varchar, varchar, varchar[][2], varchar) is 'Works as amqp.publish does, but the message is published immediately irrespective of the current transaction state. PostgreSQL commit and rollback at a later point will have no effect on this message being sent to AMQP.'; diff --git a/src/librabbitmq/amqp_api.c b/src/librabbitmq/amqp_api.c index 033f84a..f24b09d 100644 --- a/src/librabbitmq/amqp_api.c +++ b/src/librabbitmq/amqp_api.c @@ -4,6 +4,8 @@ #include #include +#define DISABLE_THREADS + #ifndef DISABLE_THREADS #include #endif diff --git a/src/pg_amqp.c b/src/pg_amqp.c index 4ed70a3..9464518 100644 --- a/src/pg_amqp.c +++ b/src/pg_amqp.c @@ -37,6 +37,7 @@ #include #include #include +#include #include "postgres.h" #include "funcapi.h" @@ -50,6 +51,8 @@ #include "access/xact.h" #include "utils/memutils.h" #include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/array.h" #include "librabbitmq/amqp.h" #include "librabbitmq/amqp_framing.h" @@ -61,6 +64,8 @@ } \ } while(0) +#define safe_free(x) if (x){ free(x); x = NULL; } + #ifdef PG_MODULE_MAGIC PG_MODULE_MAGIC; #endif @@ -291,23 +296,105 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { amqp_bytes_t routing_key_b = amqp_cstring_bytes(""); amqp_bytes_t body_b = amqp_cstring_bytes(""); + amqp_basic_properties_t properties; + + memset(&properties, 0, sizeof(properties)); + set_bytes_from_text(exchange_b,1); set_bytes_from_text(routing_key_b,2); set_bytes_from_text(body_b,3); + + if (!PG_ARGISNULL(4)) { + ArrayType *v; + int *dims, ndims; + Oid element_type; + int typlen; + bool typbyval; + char typalign; + char typdelim; + Oid typelem; + Oid typiofunc; + FmgrInfo proc; + + ArrayIterator iter; + Datum value; + bool isnull; + int i; + + v = PG_GETARG_ARRAYTYPE_P(4); + + ndims = ARR_NDIM(v); + dims = ARR_DIMS(v); + + if (ndims != 2 || dims[1] != 2){ + elog(ERROR, "headers must be n x 2 dimensional array"); + PG_RETURN_BOOL(0 != 0); + } + + if (array_contains_nulls(v)){ + elog(ERROR, "headers may not contain NULLs"); + PG_RETURN_BOOL(0 != 0); + } + + element_type = ARR_ELEMTYPE(v); + + get_type_io_data(element_type, IOFunc_output, + (int16*)&typlen, &typbyval, + &typalign, &typdelim, + &typelem, &typiofunc); + + fmgr_info_cxt(typiofunc, &proc, fcinfo->flinfo->fn_mcxt); + + properties.headers.num_entries = dims[0]; + properties._flags |= AMQP_BASIC_HEADERS_FLAG; + properties.headers.entries = malloc(dims[0] * sizeof(amqp_table_entry_t)); + if (!properties.headers.entries){ + elog(ERROR, "out of memory"); + PG_RETURN_BOOL(0 != 0); + } + + iter = array_create_iterator(v, 0); + i = 0; + while (array_iterate(iter, &value, &isnull)){ + char* key; + char* bytes; + key = DatumGetCString(FunctionCall3(&proc, value, ObjectIdGetDatum(typelem), Int32GetDatum(-1))); + if (!array_iterate(iter, &value, &isnull)){ + elog(ERROR, "array is fucked"); + safe_free(properties.headers.entries); + PG_RETURN_BOOL(0 != 0); + } + bytes = DatumGetCString(FunctionCall3(&proc, value, ObjectIdGetDatum(typelem), Int32GetDatum(-1))); + properties.headers.entries[i].key.len = strlen(key); + properties.headers.entries[i].key.bytes = key; + properties.headers.entries[i].kind = 'S'; + properties.headers.entries[i].value.bytes.len = strlen(bytes); + properties.headers.entries[i].value.bytes.bytes = bytes; + i++; + } + + } + if (!PG_ARGISNULL(5)){ + set_bytes_from_text(properties.content_type,5); + properties._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; + } rv = amqp_basic_publish(bs->conn, channel, exchange_b, routing_key_b, - mandatory, immediate, NULL, body_b); + mandatory, immediate, &properties, body_b); reply = amqp_get_rpc_reply(); if(rv || reply->reply_type != AMQP_RESPONSE_NORMAL) { if(once_more && (channel == 1 || bs->uncommitted == 0)) { once_more = 0; local_amqp_disconnect_bs(bs); + safe_free(properties.headers.entries); goto redo; } bs->inerror = 1; + safe_free(properties.headers.entries); PG_RETURN_BOOL(0 != 0); } /* channel two is transactional */ if(channel == 2) bs->uncommitted++; + safe_free(properties.headers.entries); PG_RETURN_BOOL(rv == 0); } } From 3fd438ecc00f0dc17a7f54049eaa1ce0cdb27f79 Mon Sep 17 00:00:00 2001 From: Duncan Burke Date: Mon, 30 Jul 2012 22:58:43 +1000 Subject: [PATCH 05/12] Add support for setting any amqp or application headers Signed-off-by: Duncan Burke --- src/pg_amqp.c | 220 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 165 insertions(+), 55 deletions(-) diff --git a/src/pg_amqp.c b/src/pg_amqp.c index 9464518..57bc459 100644 --- a/src/pg_amqp.c +++ b/src/pg_amqp.c @@ -66,15 +66,60 @@ #define safe_free(x) if (x){ free(x); x = NULL; } +#define _BYTES_T 0 +#define _UINT64_T 1 +#define _UINT8_T 2 + #ifdef PG_MODULE_MAGIC PG_MODULE_MAGIC; #endif + +typedef struct { + FmgrInfo proc; + int elems; + Oid typelem; + ArrayIterator iter; +} pg_array_foo; + +typedef struct { + char* key; + char* value; +} pg_array_elem; + +typedef struct { + char* name; + size_t offset; + int type; + int flag; +} amqp_field; + +/* TODO: make this a trie or some other optimised datastructure */ +static amqp_field amqp_fields[] = { { "content_type", offsetof(amqp_basic_properties_t,content_type), _BYTES_T, AMQP_BASIC_CONTENT_TYPE_FLAG}, + { "content_encoding", offsetof(amqp_basic_properties_t,content_encoding), _BYTES_T, AMQP_BASIC_CONTENT_ENCODING_FLAG}, + { "delivery_mode", offsetof(amqp_basic_properties_t,delivery_mode), _UINT8_T, AMQP_BASIC_DELIVERY_MODE_FLAG}, + { "priority", offsetof(amqp_basic_properties_t,priority), _UINT8_T, AMQP_BASIC_PRIORITY_FLAG}, + { "correlation_id", offsetof(amqp_basic_properties_t,correlation_id), _BYTES_T, AMQP_BASIC_CORRELATION_ID_FLAG}, + { "reply_to", offsetof(amqp_basic_properties_t,reply_to), _BYTES_T, AMQP_BASIC_REPLY_TO_FLAG}, + { "expiration", offsetof(amqp_basic_properties_t,expiration), _BYTES_T, AMQP_BASIC_EXPIRATION_FLAG}, + { "message_id", offsetof(amqp_basic_properties_t,message_id), _BYTES_T, AMQP_BASIC_MESSAGE_ID_FLAG}, + { "timestamp", offsetof(amqp_basic_properties_t,timestamp), _UINT64_T, AMQP_BASIC_TIMESTAMP_FLAG}, + { "type", offsetof(amqp_basic_properties_t,type), _BYTES_T, AMQP_BASIC_TYPE_FLAG}, + { "user_id", offsetof(amqp_basic_properties_t,user_id), _BYTES_T, AMQP_BASIC_USER_ID_FLAG}, + { "app_id", offsetof(amqp_basic_properties_t,app_id), _BYTES_T, AMQP_BASIC_APP_ID_FLAG}, + { "cluster_id", offsetof(amqp_basic_properties_t,cluster_id), _BYTES_T, AMQP_BASIC_CLUSTER_ID_FLAG}, + { NULL, 0, 0, 0} +}; + void _PG_init(void); Datum pg_amqp_exchange_declare(PG_FUNCTION_ARGS); Datum pg_amqp_publish(PG_FUNCTION_ARGS); Datum pg_amqp_autonomous_publish(PG_FUNCTION_ARGS); Datum pg_amqp_disconnect(PG_FUNCTION_ARGS); +int pg_process_array(ArrayType* v, FunctionCallInfoData* fcinfo, pg_array_foo* array_foo); +int pg_array_get_elem(pg_array_foo* array_foo, pg_array_elem* elem); +int amqp_set_property(amqp_basic_properties_t* properties, char* key, char* value); + struct brokerstate { int broker_id; amqp_connection_state_t conn; @@ -278,6 +323,87 @@ pg_amqp_exchange_declare(PG_FUNCTION_ARGS) { } PG_RETURN_BOOL(0 != 0); } + +int pg_process_array(ArrayType* v, FunctionCallInfoData* fcinfo, pg_array_foo* array_foo){ + int *dims, ndims; + Oid element_type; + int typlen; + bool typbyval; + char typalign; + char typdelim; + Oid typiofunc; + + ndims = ARR_NDIM(v); + dims = ARR_DIMS(v); + + if (ndims != 2 || dims[1] != 2){ + elog(ERROR, "headers must be n x 2 dimensional array"); + return 1; + } + + if (array_contains_nulls(v)){ + elog(ERROR, "headers may not contain NULLs"); + return 1; + } + + element_type = ARR_ELEMTYPE(v); + + get_type_io_data(element_type, IOFunc_output, + (int16*)&typlen, &typbyval, + &typalign, &typdelim, + &array_foo->typelem, &typiofunc); + + fmgr_info_cxt(typiofunc, &array_foo->proc, fcinfo->flinfo->fn_mcxt); + array_foo->elems = dims[0]; + array_foo->iter = array_create_iterator(v, 0); + return 0; +} + +int pg_array_get_elem(pg_array_foo* array_foo, pg_array_elem* elem){ + bool isnull; + Datum value; + if (array_iterate(array_foo->iter, &value, &isnull)){ + elem->key = DatumGetCString(FunctionCall3(&array_foo->proc, value, ObjectIdGetDatum(array_foo->typelem), Int32GetDatum(-1))); + if (!array_iterate(array_foo->iter, &value, &isnull)){ + elog(ERROR, "array seriously fucked"); + return 2; + } + elem->key = DatumGetCString(FunctionCall3(&array_foo->proc, value, ObjectIdGetDatum(array_foo->typelem), Int32GetDatum(-1))); + } else { + elem->key = NULL; + elem->value = NULL; + return 1; + } + return 0; +} + +int amqp_set_property(amqp_basic_properties_t* properties, char* key, char* value){ + int i; + void* field; + for (i = 0; amqp_fields[i].name; i++) + if (!strcmp(key,amqp_fields[i].name)) + break; + if (!amqp_fields[i].name) + return 1; + + field = (void*)((size_t)&properties->headers + amqp_fields[i].offset); + + switch (amqp_fields[i].type) { + case _BYTES_T: + ((amqp_bytes_t*)field)->len = strlen(value); + ((amqp_bytes_t*)field)->bytes = value; + break; + case _UINT64_T: + *(uint64_t*)field = (uint64_t)atoi(value); + break; + case _UINT8_T: + *(uint8_t*)field = (uint8_t)atoi(value); + break; + } + properties->_flags |= amqp_fields[i].flag; + return 0; +} + static Datum pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { struct brokerstate *bs; @@ -306,78 +432,62 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { if (!PG_ARGISNULL(4)) { ArrayType *v; - int *dims, ndims; - Oid element_type; - int typlen; - bool typbyval; - char typalign; - char typdelim; - Oid typelem; - Oid typiofunc; - FmgrInfo proc; - - ArrayIterator iter; - Datum value; - bool isnull; + pg_array_foo array_foo; + pg_array_elem array_elem; int i; + int ret; v = PG_GETARG_ARRAYTYPE_P(4); - - ndims = ARR_NDIM(v); - dims = ARR_DIMS(v); - - if (ndims != 2 || dims[1] != 2){ - elog(ERROR, "headers must be n x 2 dimensional array"); + if (!pg_process_array(v, fcinfo, &array_foo)) PG_RETURN_BOOL(0 != 0); - } - if (array_contains_nulls(v)){ - elog(ERROR, "headers may not contain NULLs"); - PG_RETURN_BOOL(0 != 0); - } - - element_type = ARR_ELEMTYPE(v); - - get_type_io_data(element_type, IOFunc_output, - (int16*)&typlen, &typbyval, - &typalign, &typdelim, - &typelem, &typiofunc); - - fmgr_info_cxt(typiofunc, &proc, fcinfo->flinfo->fn_mcxt); - - properties.headers.num_entries = dims[0]; + properties.headers.num_entries = array_foo.elems; properties._flags |= AMQP_BASIC_HEADERS_FLAG; - properties.headers.entries = malloc(dims[0] * sizeof(amqp_table_entry_t)); + properties.headers.entries = malloc(array_foo.elems * sizeof(amqp_table_entry_t)); if (!properties.headers.entries){ elog(ERROR, "out of memory"); PG_RETURN_BOOL(0 != 0); } - - iter = array_create_iterator(v, 0); + i = 0; - while (array_iterate(iter, &value, &isnull)){ - char* key; - char* bytes; - key = DatumGetCString(FunctionCall3(&proc, value, ObjectIdGetDatum(typelem), Int32GetDatum(-1))); - if (!array_iterate(iter, &value, &isnull)){ - elog(ERROR, "array is fucked"); - safe_free(properties.headers.entries); - PG_RETURN_BOOL(0 != 0); - } - bytes = DatumGetCString(FunctionCall3(&proc, value, ObjectIdGetDatum(typelem), Int32GetDatum(-1))); - properties.headers.entries[i].key.len = strlen(key); - properties.headers.entries[i].key.bytes = key; + while ((ret = pg_array_get_elem(&array_foo, &array_elem))){ + properties.headers.entries[i].key.len = strlen(array_elem.key); + properties.headers.entries[i].key.bytes = array_elem.key; properties.headers.entries[i].kind = 'S'; - properties.headers.entries[i].value.bytes.len = strlen(bytes); - properties.headers.entries[i].value.bytes.bytes = bytes; + properties.headers.entries[i].value.bytes.len = strlen(array_elem.value); + properties.headers.entries[i].value.bytes.bytes = array_elem.value; i++; } + if (ret == 2) + PG_RETURN_BOOL(0 != 0); } - if (!PG_ARGISNULL(5)){ - set_bytes_from_text(properties.content_type,5); - properties._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; + if (!PG_ARGISNULL(5)) { + ArrayType *v; + pg_array_foo array_foo; + pg_array_elem array_elem; + int ret; + + v = PG_GETARG_ARRAYTYPE_P(5); + if (!pg_process_array(v, fcinfo, &array_foo)) + PG_RETURN_BOOL(0 != 0); + + properties.headers.num_entries = array_foo.elems; + properties._flags |= AMQP_BASIC_HEADERS_FLAG; + properties.headers.entries = malloc(array_foo.elems * sizeof(amqp_table_entry_t)); + if (!properties.headers.entries){ + elog(ERROR, "out of memory"); + PG_RETURN_BOOL(0 != 0); + } + + while ((ret = pg_array_get_elem(&array_foo, &array_elem))) + if (amqp_set_property(&properties, array_elem.key, array_elem.value)) + PG_RETURN_BOOL(0 != 0); + + if (ret == 2) + PG_RETURN_BOOL(0 != 0); } + rv = amqp_basic_publish(bs->conn, channel, exchange_b, routing_key_b, mandatory, immediate, &properties, body_b); reply = amqp_get_rpc_reply(); From 66eb465fd1712776cdb3d6e51c245b64725d308a Mon Sep 17 00:00:00 2001 From: marcelloceschia Date: Thu, 3 Apr 2014 12:03:09 +0200 Subject: [PATCH 06/12] update Makefile for packaging --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index ac8350b..f39fc17 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ sql/$(EXTENSION)--$(EXTVERSION).sql: sql/$(EXTENSION).sql # strip first two lines (BEGIN, CREATE SCHEMA) and last line (COMMIT). sed '1,2d;$$d' $< > $@ -DATA = $(wildcard sql/*--*.sql) sql/$(EXTENSION)--$(EXTVERSION).sql +DATA = sql/$(EXTENSION)--$(EXTVERSION).sql EXTRA_CLEAN = sql/$(EXTENSION)--$(EXTVERSION).sql endif From 7f4f3ece059b6916eeec53c3d329c3eed17279d5 Mon Sep 17 00:00:00 2001 From: Marcello Ceschia Date: Wed, 16 Mar 2016 12:18:41 +0100 Subject: [PATCH 07/12] update function definition as well --- sql/amqp--0.4.2.sql | 5 +++-- sql/functions/functions.sql | 7 ++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/amqp--0.4.2.sql b/sql/amqp--0.4.2.sql index e95f1ab..9fb7f0c 100644 --- a/sql/amqp--0.4.2.sql +++ b/sql/amqp--0.4.2.sql @@ -75,12 +75,13 @@ CREATE FUNCTION @extschema@.publish( , reply_to varchar default null , correlation_id varchar default null , headers varchar[][2] default null + , properties varchar[][2] default null ) RETURNS boolean AS 'pg_amqp.so', 'pg_amqp_publish' LANGUAGE C IMMUTABLE; -COMMENT ON FUNCTION @extschema@.publish(integer, varchar, varchar, varchar, integer, varchar, varchar, varchar, varchar[][2]) IS -'Publishes a message (broker_id, exchange, routing_key, message, delivery_mode, content_type, reply_to, correlation_id, headers). +COMMENT ON FUNCTION @extschema@.publish(integer, varchar, varchar, varchar, integer, varchar, varchar, varchar, varchar[][2], varchar[][2]) IS +'Publishes a message (broker_id, exchange, routing_key, message, delivery_mode, content_type, reply_to, correlation_id, headers, properties). The message will only be published if the containing PostgreSQL transaction successfully commits. Under certain circumstances, the AMQP commit might fail. In this case, a WARNING is emitted. The last four parameters are optional and set the following message properties: diff --git a/sql/functions/functions.sql b/sql/functions/functions.sql index a3339b5..9aabbcf 100644 --- a/sql/functions/functions.sql +++ b/sql/functions/functions.sql @@ -56,12 +56,14 @@ CREATE FUNCTION @extschema@.publish( , content_type varchar default null , reply_to varchar default null , correlation_id varchar default null + , headers varchar[][2] default null + , properties varchar[][2] default null ) RETURNS boolean AS 'pg_amqp.so', 'pg_amqp_publish' LANGUAGE C IMMUTABLE; -COMMENT ON FUNCTION @extschema@.publish(integer, varchar, varchar, varchar, integer, varchar, varchar, varchar) IS -'Publishes a message (broker_id, exchange, routing_key, message). +COMMENT ON FUNCTION @extschema@.publish(integer, varchar, varchar, varchar, integer, varchar, varchar, varchar, varchar[][2], varchar[][2]) IS +'Publishes a message (broker_id, exchange, routing_key, message, delivery_mode, content_type, reply_to, correlation_id, headers, properties). The message will only be published if the containing PostgreSQL transaction successfully commits. Under certain circumstances, the AMQP commit might fail. In this case, a WARNING is emitted. The last four parameters are optional and set the following message properties: @@ -69,4 +71,3 @@ delivery_mode (either 1 or 2), content_type, reply_to and correlation_id. Publish returns a boolean indicating if the publish command was successful. Note that as AMQP publish is asynchronous, you may find out later it was unsuccessful.'; - From c58a71c23fe91a57282a62c11e60643619fe8d3b Mon Sep 17 00:00:00 2001 From: Marcello Ceschia Date: Sun, 20 Mar 2016 12:12:50 +0100 Subject: [PATCH 08/12] initialize properties correctly --- sql/amqp--0.4.2.sql | 1 - src/pg_amqp.c | 7 +++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/amqp--0.4.2.sql b/sql/amqp--0.4.2.sql index 9fb7f0c..a13b8b2 100644 --- a/sql/amqp--0.4.2.sql +++ b/sql/amqp--0.4.2.sql @@ -89,4 +89,3 @@ delivery_mode (either 1 or 2), content_type, reply_to and correlation_id. Publish returns a boolean indicating if the publish command was successful. Note that as AMQP publish is asynchronous, you may find out later it was unsuccessful.'; - diff --git a/src/pg_amqp.c b/src/pg_amqp.c index 8e18987..340fd9c 100644 --- a/src/pg_amqp.c +++ b/src/pg_amqp.c @@ -475,6 +475,7 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { if(!PG_ARGISNULL(0)) { int broker_id; amqp_basic_properties_t properties; + memset(&properties, 0, sizeof(properties)); int once_more = 1; broker_id = PG_GETARG_INT32(0); @@ -565,7 +566,7 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { pg_array_elem array_elem; int ret; - v = PG_GETARG_ARRAYTYPE_P(8); + v = PG_GETARG_ARRAYTYPE_P(9); if (!pg_process_array(v, fcinfo, &array_properties)) PG_RETURN_BOOL(0 != 0); @@ -586,7 +587,9 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { } reply = amqp_get_rpc_reply(); - safe_free(properties.headers.entries); + if(properties.headers.entries) { + safe_free(properties.headers.entries); + } if(rv || reply->reply_type != AMQP_RESPONSE_NORMAL) { if(once_more && (channel == 1 || bs->uncommitted == 0)) { once_more = 0; From 12b34c2e7635e9a8f355a44f5dbd358c5465c86f Mon Sep 17 00:00:00 2001 From: Marcello Ceschia Date: Sun, 19 Feb 2017 13:15:20 +0100 Subject: [PATCH 09/12] fix header an properties processing --- src/pg_amqp.c | 54 ++++++++++++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/src/pg_amqp.c b/src/pg_amqp.c index 340fd9c..9c2cc45 100644 --- a/src/pg_amqp.c +++ b/src/pg_amqp.c @@ -420,7 +420,7 @@ int pg_process_array(ArrayType* v, FunctionCallInfoData* fcinfo, pg_array_foo* a fmgr_info_cxt(typiofunc, &array_foo->proc, fcinfo->flinfo->fn_mcxt); array_foo->elems = dims[0]; - array_foo->iter = array_create_iterator(v, 0); + array_foo->iter = array_create_iterator(v, 0, NULL); return 0; } @@ -433,7 +433,7 @@ int pg_array_get_elem(pg_array_foo* array_foo, pg_array_elem* elem){ elog(ERROR, "array seriously fucked"); return 2; } - elem->key = DatumGetCString(FunctionCall3(&array_foo->proc, value, ObjectIdGetDatum(array_foo->typelem), Int32GetDatum(-1))); + elem->value = DatumGetCString(FunctionCall3(&array_foo->proc, value, ObjectIdGetDatum(array_foo->typelem), Int32GetDatum(-1))); } else { elem->key = NULL; elem->value = NULL; @@ -528,53 +528,59 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { /* headers */ if (!PG_ARGISNULL(8)) { ArrayType *v; - pg_array_foo array_foo; - pg_array_elem array_elem; + pg_array_foo array_headers; + pg_array_elem header; int i; int ret; v = PG_GETARG_ARRAYTYPE_P(8); - if (!pg_process_array(v, fcinfo, &array_foo)) + if (pg_process_array(v, fcinfo, &array_headers)){ + elog(WARNING, "PG_RETURN_BOOL"); PG_RETURN_BOOL(0 != 0); - - properties.headers.num_entries = array_foo.elems; + } + + properties.headers.num_entries = array_headers.elems; properties._flags |= AMQP_BASIC_HEADERS_FLAG; - properties.headers.entries = malloc(array_foo.elems * sizeof(amqp_table_entry_t)); + properties.headers.entries = malloc(array_headers.elems * sizeof(amqp_table_entry_t)); if (!properties.headers.entries){ elog(ERROR, "out of memory"); PG_RETURN_BOOL(0 != 0); } i = 0; - while ((ret = pg_array_get_elem(&array_foo, &array_elem))){ - properties.headers.entries[i].key.len = strlen(array_elem.key); - properties.headers.entries[i].key.bytes = array_elem.key; + while (!(ret = pg_array_get_elem(&array_headers, &header))){ + properties.headers.entries[i].key = amqp_cstring_bytes(header.key); properties.headers.entries[i].kind = 'S'; - properties.headers.entries[i].value.bytes.len = strlen(array_elem.value); - properties.headers.entries[i].value.bytes.bytes = array_elem.value; + properties.headers.entries[i].value.bytes = amqp_cstring_bytes(header.value); i++; } + + array_free_iterator(array_headers.iter); if (ret == 2) PG_RETURN_BOOL(0 != 0); + } /* generic properties */ if (!PG_ARGISNULL(9)) { - ArrayType *v; - pg_array_foo array_properties; - pg_array_elem array_elem; - int ret; + ArrayType *v; + pg_array_foo array_properties; + pg_array_elem array_elem; + int ret; - v = PG_GETARG_ARRAYTYPE_P(9); - if (!pg_process_array(v, fcinfo, &array_properties)) + v = PG_GETARG_ARRAYTYPE_P(9); + if (pg_process_array(v, fcinfo, &array_properties)) PG_RETURN_BOOL(0 != 0); - while ((ret = pg_array_get_elem(&array_properties, &array_elem))){ - if(amqp_set_property(&properties, array_elem.key, array_elem.value)){ - elog(WARNING, "Unknow property name '%s', ignore value", array_elem.key); - } - } + while (!(ret = pg_array_get_elem(&array_properties, &array_elem))){ + if(amqp_set_property(&properties, array_elem.key, array_elem.value)){ + elog(WARNING, "Unknow property name '%s', ignore value", array_elem.key); + } + } + array_free_iterator(array_properties.iter); + if (ret == 2) + PG_RETURN_BOOL(0 != 0); } //rv = amqp_basic_publish(bs->conn, channel, exchange_b, routing_key_b, mandatory, immediate, &properties, body_b); From 1276544f9db431b4d1afec53b6af9b0cf557f04c Mon Sep 17 00:00:00 2001 From: Marcello Ceschia Date: Sun, 19 Feb 2017 13:16:13 +0100 Subject: [PATCH 10/12] remove debug log --- src/pg_amqp.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pg_amqp.c b/src/pg_amqp.c index 9c2cc45..c5b904e 100644 --- a/src/pg_amqp.c +++ b/src/pg_amqp.c @@ -535,7 +535,6 @@ pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) { v = PG_GETARG_ARRAYTYPE_P(8); if (pg_process_array(v, fcinfo, &array_headers)){ - elog(WARNING, "PG_RETURN_BOOL"); PG_RETURN_BOOL(0 != 0); } From 5aa624e08543e925ebe340a3e3f40e81c2621261 Mon Sep 17 00:00:00 2001 From: Marcello Ceschia Date: Sun, 19 Feb 2017 14:18:15 +0100 Subject: [PATCH 11/12] fixes array_create_iterator for postgres < 9.6 --- src/pg_amqp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pg_amqp.c b/src/pg_amqp.c index c5b904e..c51f540 100644 --- a/src/pg_amqp.c +++ b/src/pg_amqp.c @@ -420,7 +420,7 @@ int pg_process_array(ArrayType* v, FunctionCallInfoData* fcinfo, pg_array_foo* a fmgr_info_cxt(typiofunc, &array_foo->proc, fcinfo->flinfo->fn_mcxt); array_foo->elems = dims[0]; - array_foo->iter = array_create_iterator(v, 0, NULL); + array_foo->iter = array_create_iterator(v, 0,); return 0; } From 3d2f2e9baa6f71bc48892d6c1083d02863d07b7e Mon Sep 17 00:00:00 2001 From: Marcello Ceschia Date: Sun, 19 Feb 2017 14:26:17 +0100 Subject: [PATCH 12/12] fixes array_create_iterator for postgres < 9.6 --- src/pg_amqp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pg_amqp.c b/src/pg_amqp.c index c51f540..0522bee 100644 --- a/src/pg_amqp.c +++ b/src/pg_amqp.c @@ -420,7 +420,7 @@ int pg_process_array(ArrayType* v, FunctionCallInfoData* fcinfo, pg_array_foo* a fmgr_info_cxt(typiofunc, &array_foo->proc, fcinfo->flinfo->fn_mcxt); array_foo->elems = dims[0]; - array_foo->iter = array_create_iterator(v, 0,); + array_foo->iter = array_create_iterator(v, 0); return 0; }