Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ssl connection #10

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
10 changes: 5 additions & 5 deletions META.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"name": "pg_amqp",
"abstract": "AMQP protocol support for PostgreSQL",
"version": "0.4.1",
"version": "0.4.2",
"description": "The pg_amqp package provides the ability for postgres statements to directly publish messages to an AMQP broker.",
"maintainer":
[ "Theo Schlossnagle <[email protected]>", "Keith Fiske <[email protected]" ]
[ "Theo Schlossnagle <[email protected]>", "Keith Fiske <[email protected]", "Marcello Ceschia <[email protected]>" ]
"license": [ "bsd", "mozilla_1_0" ],
"generated_by": "Keith Fiske",
"status": "stable",
Expand All @@ -17,9 +17,9 @@
},
"provides": {
"amqp": {
"file": "sql/amqp--0.4.1.sql",
"file": "sql/amqp--0.4.2.sql",
"docfile": "doc/amqp.md",
"version": "0.4.1",
"version": "0.4.2",
"abstract": "AMQP protocol support for PostgreSQL"
}
},
Expand All @@ -43,4 +43,4 @@
"rabbitmq",
"queue"
]
}
}
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pg_amqp
=============

The pg_amqp package provides the ability for postgres statements to directly
publish messages to an [AMQP](http://www.amqp.org/) broker.
publish messages to an [AMQP](http://www.amqp.org/) broker. Version 0.4.2 also provides an implementation
for secured connections using open ssl. To build v0.4.2 openssl dev package is required.

All bug reports, feature requests and general questions can be directed to the Issues section on Github. - http://github.com/omniti-labs/pg_amqp

Expand Down
2 changes: 1 addition & 1 deletion amqp.control
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# amqp extension
comment = 'AMQP protocol support for PostgreSQL'
default_version = '0.4.1'
default_version = '0.4.2'
module_pathname = '$libdir/pg_amqp'
relocatable = false
schema = amqp
91 changes: 91 additions & 0 deletions sql/amqp--0.4.2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
CREATE TABLE @[email protected] (
broker_id serial NOT NULL,
host text NOT NULL,
port integer NOT NULL DEFAULT 5672,
vhost text,
username text,
password text,
requiressl boolean DEFAULT false,
verify_cert boolean DEFAULT true,
verify_cn boolean DEFAULT true,
cert text,
key text,
key_password character varying,
ca text,
PRIMARY KEY (broker_id, host, port)
);


CREATE FUNCTION @[email protected]_publish(
broker_id integer
, exchange varchar
, routing_key varchar
, message varchar
, delivery_mode integer default null
, content_type varchar default null
, reply_to varchar default null
, correlation_id varchar default null
)

RETURNS boolean AS 'pg_amqp.so', 'pg_amqp_autonomous_publish'
LANGUAGE C IMMUTABLE;

COMMENT ON FUNCTION @[email protected]_publish(integer, varchar, varchar, varchar, integer, varchar, varchar, varchar) IS
'Works as amqp.publish does, but the message is published immediately irrespective of the
current transaction state. PostgreSQL commit and rollback at a later point will have no
effect on this message being sent to AMQP.';


CREATE FUNCTION amqp.disconnect(broker_id integer)
RETURNS void AS 'pg_amqp.so', 'pg_amqp_disconnect'
LANGUAGE C IMMUTABLE STRICT;

COMMENT ON FUNCTION amqp.disconnect(integer) IS
'Explicitly disconnect the specified (broker_id) if it is current connected. Broker
connections, once established, live until the PostgreSQL backend terminated. This
allows for more precise control over that.
select amqp.disconnect(broker_id) from amqp.broker
will disconnect any brokers that may be connected.';


CREATE FUNCTION amqp.exchange_declare(
broker_id integer
, exchange varchar
, exchange_type varchar
, passive boolean
, durable boolean
, auto_delete boolean DEFAULT false
)
RETURNS boolean AS 'pg_amqp.so', 'pg_amqp_exchange_declare'
LANGUAGE C IMMUTABLE;

COMMENT ON FUNCTION amqp.exchange_declare(integer, varchar, varchar, boolean, boolean, boolean) IS
'Declares a exchange (broker_id, exchange_name, exchange_type, passive, durable, auto_delete)
auto_delete should be set to false (default) as unexpected errors can cause disconnect/reconnect which
would trigger the auto deletion of the exchange.';


CREATE FUNCTION @[email protected](
broker_id integer
, exchange varchar
, routing_key varchar
, message varchar
, delivery_mode integer default null
, content_type varchar default null
, reply_to varchar default null
, correlation_id varchar default null
, headers varchar[][2] default null
, properties varchar[][2] default null
)
RETURNS boolean AS 'pg_amqp.so', 'pg_amqp_publish'
LANGUAGE C IMMUTABLE;

COMMENT ON FUNCTION @[email protected](integer, varchar, varchar, varchar, integer, varchar, varchar, varchar, varchar[][2], varchar[][2]) IS
'Publishes a message (broker_id, exchange, routing_key, message, delivery_mode, content_type, reply_to, correlation_id, headers, properties).
The message will only be published if the containing PostgreSQL transaction successfully commits.
Under certain circumstances, the AMQP commit might fail. In this case, a WARNING is emitted.
The last four parameters are optional and set the following message properties:
delivery_mode (either 1 or 2), content_type, reply_to and correlation_id.

Publish returns a boolean indicating if the publish command was successful. Note that as
AMQP publish is asynchronous, you may find out later it was unsuccessful.';
7 changes: 4 additions & 3 deletions sql/functions/functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,18 @@ CREATE FUNCTION @[email protected](
, content_type varchar default null
, reply_to varchar default null
, correlation_id varchar default null
, headers varchar[][2] default null
, properties varchar[][2] default null
)
RETURNS boolean AS 'pg_amqp.so', 'pg_amqp_publish'
LANGUAGE C IMMUTABLE;

COMMENT ON FUNCTION @[email protected](integer, varchar, varchar, varchar, integer, varchar, varchar, varchar) IS
'Publishes a message (broker_id, exchange, routing_key, message).
COMMENT ON FUNCTION @[email protected](integer, varchar, varchar, varchar, integer, varchar, varchar, varchar, varchar[][2], varchar[][2]) IS
'Publishes a message (broker_id, exchange, routing_key, message, delivery_mode, content_type, reply_to, correlation_id, headers, properties).
The message will only be published if the containing PostgreSQL transaction successfully commits.
Under certain circumstances, the AMQP commit might fail. In this case, a WARNING is emitted.
The last four parameters are optional and set the following message properties:
delivery_mode (either 1 or 2), content_type, reply_to and correlation_id.

Publish returns a boolean indicating if the publish command was successful. Note that as
AMQP publish is asynchronous, you may find out later it was unsuccessful.';

7 changes: 7 additions & 0 deletions sql/tables/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ CREATE TABLE @[email protected] (
vhost text,
username text,
password text,
requiressl boolean DEFAULT false,
verify_cert boolean DEFAULT true,
verify_cn boolean DEFAULT true,
cert text,
key text,
key_password character varying,
ca text,
PRIMARY KEY (broker_id, host, port)
);

Expand Down
13 changes: 9 additions & 4 deletions src/librabbitmq/amqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ typedef enum amqp_response_type_enum_ {
AMQP_RESPONSE_SERVER_EXCEPTION
} amqp_response_type_enum;

typedef enum amqp_ssl_flags_type_enum_ {
AMQP_SSL_FLAG_VERIFY = 1 << 0,
AMQP_SSL_FLAG_CHECK_CN = 1 << 2,
} amqp_ssl_flags_type_enum;

typedef struct amqp_rpc_reply_t_ {
amqp_response_type_enum reply_type;
amqp_method_t reply;
Expand All @@ -113,8 +118,7 @@ typedef enum amqp_sasl_method_enum_ {

typedef struct amqp_basic_return_t_ amqp_basic_return_t;
typedef int (*amqp_output_fn_t)(void *context, void *buffer, size_t count);
typedef void (*amqp_basic_return_fn_t)(amqp_channel_t, amqp_basic_return_t *,
void *);
typedef void (*amqp_basic_return_fn_t)(amqp_channel_t, amqp_basic_return_t *, void *);


/* Opaque struct. */
Expand All @@ -140,6 +144,7 @@ extern amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src);
} \
})

extern amqp_connection_state_t amqp_new_ssl_connection(const char *certificate, const char *key, const char *password, const char *ca, unsigned short flags);
extern amqp_connection_state_t amqp_new_connection(void);
extern int amqp_get_sockfd(amqp_connection_state_t state);
extern void amqp_set_sockfd(amqp_connection_state_t state,
Expand Down Expand Up @@ -173,8 +178,7 @@ extern int amqp_send_frame_to(amqp_connection_state_t state,

extern int amqp_table_entry_cmp(void const *entry1, void const *entry2);

extern int amqp_open_socket(char const *hostname, int portnumber,
struct timeval *timeout);
extern int amqp_connect(amqp_connection_state_t state, char const *hostname, int portnumber, struct timeval *timeout);

extern int amqp_send_header(amqp_connection_state_t state);
extern int amqp_send_header_to(amqp_connection_state_t state,
Expand Down Expand Up @@ -343,6 +347,7 @@ extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state);
*/
extern amqp_rpc_reply_t *amqp_get_rpc_reply(void);


#ifdef __cplusplus
}
#endif
Expand Down
11 changes: 4 additions & 7 deletions src/librabbitmq/amqp_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <stdint.h>
#include <errno.h>

#define DISABLE_THREADS

#ifndef DISABLE_THREADS
#include <pthread.h>
#endif
Expand Down Expand Up @@ -103,14 +105,9 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
code, amqp_cstring_bytes(codestr), 0, 0);
}

amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
int code)
amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code)
{
char codestr[13];
snprintf(codestr, sizeof(codestr), "%d", code);
return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK,
amqp_connection_close_t,
code, amqp_cstring_bytes(codestr), 0, 0);
return state->close_connection(state, code);
}

amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
Expand Down
Loading