博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Active MQ C#实现
阅读量:6759 次
发布时间:2019-06-26

本文共 9540 字,大约阅读时间需要 31 分钟。

原文链接:

 

内容概要

主要以源码的形式介绍如何用C#实现同Active MQ 的通讯。本文假设你已经正确安装JDK1.6.x,了解Active MQ并有一定的编程基础。

正文

    JMS 程序的最终目的是生产和消费的消息能被其他程序使用,JMS 的 Message 是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非JMS 程序格式的消息。

Message 由消息头,属性和消息体三部份组成。

    Active MQ支持过滤机制,即生产者可以设置消息的属性(Properties),该属性与消费者端的Selector对应,只有消费者设置的selector与消息的Properties匹配,消息才会发给该消费者。Topic和Queue都支持Selector。

示例代码

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Windows;using System.Windows.Controls;using System.Windows.Data;using System.Windows.Documents;using System.Windows.Input;using System.Windows.Media;using System.Windows.Media.Imaging;using System.Windows.Navigation;using System.Windows.Shapes;using Apache.NMS;using System.Diagnostics;using Apache.NMS.Util;using System.Windows.Threading;/* * 功能描述:C#使用ActiveMQ示例 * 修改次数:2 * 最后更新: by Kagula,2012-07-31 *  * 前提条件: * [1]apache-activemq-5.4.2 * [2]Apache.NMS.ActiveMQ-1.5.6-bin * [3]WinXP SP3 * [4]VS2008 SP1 * [5]WPF工程 With .NET Framework 3.5 *  * 启动 *   * 不带安全控制方式启动 * [你的解压路径]\apache-activemq-5.4.2\bin\activemq.bat *  * 安全方式启动 * 添加环境变量:            ACTIVEMQ_ENCRYPTION_PASSWORD=activemq * [你的解压路径]\apache-activemq-5.4.2\bin>activemq xbean:file:../conf/activemq-security.xml *  * Active MQ 管理地址 * http://127.0.0.1:8161/admin/ * 添加访问"http://127.0.0.1:8161/admin/"的限制 *  * 第一步:添加访问限制 * 修改D:\apache\apache-activemq-5.4.2\conf\jetty.xml文件 * 下面这行编码,原 * 
* 修改为 *
* * 第二步:修改登录用户名密码,缺省分别为admin,admin * D:\apache\apache-activemq-5.4.2\conf\jetty-realm.properties * * 用户管理(前提:以安全方式启动ActiveMQ) * * 在[你的解压路径]\apache-activemq-5.4.2\conf\credentials.properties文件中修改默认的用户名密码 * 在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中可以添加新的用户名 * e.g. 添加oa用户,密码同用户名。 *
* * 在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中你还可以设置指定的Topic或Queue * 只能被哪些用户组read 或 write。 * * * 配置C# with WPF项目 * 项目的[Application]->[TargetFramework]属性设置为[.NETFramework 3.5](这是VS2008WPF工程的默认设置) * 添加[你的解压路径]\Apache.NMS.ActiveMQ-1.5.6-bin\lib\Apache.NMS\net-3.5\Apache.NMS.dll的引用 * Apache.NMS.dll相当于接口 * * 如果是以Debug方式调试 * 把[你的解压路径]\Apache.NMS.ActiveMQ-1.5.6-bin\build\net-3.5\debug\目录下的 * Apache.NMS.ActiveMQ.dll文件复制到你项目的Debug目录下 * Apache.NMS.ActiveMQ.dll相当于实现 * * 如果是以Release方式调试 * 参考上文,去取Apache.NMS,Release目录下相应的DLL文件,并复制到你项目的Release目录下。 * * * 参考资料 * [1]《C#调用ActiveMQ官方示例》 http://activemq.apache.org/nms/examples.html * [2]《ActiveMQ NMS下载地址》http://activemq.apache.org/nms/activemq-downloads.html * [3]《Active MQ在C#中的应用》http://www.cnblogs.com/guthing/archive/2010/06/17/1759333.html * [4]《NMS API Reference》http://activemq.apache.org/nms/nms-api.html */namespace testActiveMQSubscriber{ /// /// Interaction logic for Window1.xaml /// public partial class Window1 : Window { private static IConnectionFactory connFac; private static IConnection connection; private static ISession session; private static IDestination destination; private static IMessageProducer producer; private static IMessageConsumer consumer; protected static ITextMessage message = null; public Window1() { InitializeComponent(); initAMQ("MyFirstTopic"); } private void initAMQ(String strTopicName) { try { connFac = new NMSConnectionFactory(new Uri("activemq:failover:(tcp://localhost:61616)")); //新建连接 //connection = connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码 //如果你要持久“订阅”,则需要设置ClientId,这样程序运行当中被停止,恢复运行时,能拿到没接收到的消息! connection.ClientId = "testing listener"; connection = connFac.CreateConnection();//如果你是缺省方式启动Active MQ服务,则不需填用户名、密码 //创建Session session = connection.CreateSession(); //发布/订阅模式,适合一对多的情况 destination = SessionUtil.GetDestination(session, "topic://" + strTopicName); //新建生产者对象 producer = session.CreateProducer(destination); producer.DeliveryMode = MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留 //新建消费者对象:普通“订阅”模式 //consumer = session.CreateConsumer(destination);//不需要持久“订阅” //新建消费者对象:持久"订阅"模式: // 持久“订阅”后,如果你的程序被停止工作后,恢复运行, //从第一次持久订阅开始,没收到的消息还可以继续收 consumer = session.CreateDurableConsumer( session.GetTopic(strTopicName) , connection.ClientId, null, false); //设置消息接收事件 consumer.Listener += new MessageListener(OnMessage); //启动来自Active MQ的消息侦听 connection.Start(); } catch (Exception e) { //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息! Debug.WriteLine(e.Message); } } private void SendMsg2Topic_Click(object sender, RoutedEventArgs e) { //发送消息 ITextMessage request = session.CreateTextMessage(DateTime.Now.ToLocalTime()+" "+tbMsg.Text); producer.Send(request); } protected void OnMessage(IMessage receivedMsg) { //接收消息 message = receivedMsg as ITextMessage; //UI线程,显示收到的消息 Dispatcher.Invoke(DispatcherPriority.Normal, new Action(() => { DateTime dt = new DateTime(); ListBoxItem lbi = new ListBoxItem(); lbi.Content = DateTime.Now.ToLocalTime() + " " + message.Text; lbR.Items.Add(lbi); })); } }}

队列通讯方式,消费者例子

using System;using System.Collections.Generic;using System.Linq;using System.Text;using Apache.NMS;using System.Diagnostics;using log4net;using Apache.NMS.Util;using System.Collections;namespace Cat8637AutoCallServer{    public class SMTask    {        public String Callee { get; set; }        public String CheckNumber { get; set; }        public int Deadline { get; set; }        public override String ToString()         {            return String.Format("Callee={0},CheckNumber={1},Deadline={2}",                Callee,CheckNumber,Deadline);        }    }    /*     * 负责接收任务,并把任务放在任务等待队列中。     */    public class MQClient    {        private static readonly ILog logger = LogManager.GetLogger(typeof(MQClient));        private static IConnection connection = null;        private static ISession session = null;        Queue _voiceSMTasks = new Queue();         public MQClient()        {            try            {                IConnectionFactory factory = new NMSConnectionFactory(new Uri("activemq:failover:(tcp://localhost:61616)"));                //新建连接                  //connection = connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码                  connection = factory.CreateConnection();                session = connection.CreateSession();                IMessageConsumer consumer = session.CreateConsumer(session.GetQueue("TaskIssue_VoiceSM"));                  consumer.Listener += new MessageListener(OnMessage);                connection.Start();            }            catch (Exception ex)            {                Debug.WriteLine(ex.Message);            }        }        protected void OnMessage(IMessage receivedMsg)        {            IMessage message = receivedMsg as ITextMessage;            SMTask smTask = new SMTask();            smTask.Callee = message.Properties["Callee"] as String;            smTask.CheckNumber = message.Properties["Message"] as String;            smTask.Deadline = Convert.ToInt32(message.Properties["deadline"] as String);            logger.Info("Received: "+smTask.ToString());            lock (_voiceSMTasks)            {                _voiceSMTasks.Enqueue(smTask);            }        }        public SMTask GetVoiceSMTask()        {            SMTask result = null;            lock (_voiceSMTasks)            {                if (_voiceSMTasks.Count > 0)                {                    result = _voiceSMTasks.Dequeue() as SMTask;                }            }            return result;        }    }}

队列通讯方式,生产者例子

private void Send_Click(object sender, RoutedEventArgs e)        {            try            {                IDestination destination = SessionUtil.GetDestination(session, "queue://TaskIssue_VoiceSM");                //新建生产者对象                  IMessageProducer producer = session.CreateProducer(destination);                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留                  ITextMessage request = session.CreateTextMessage();                request.NMSCorrelationID = "TestVoiceSM";//这里我填了应用程序的名称。                request.Properties["Callee"] = tbCallee.Text;                request.Properties["Message"] = tbCheckNumber.Text;                request.Properties["deadline"] = tbValidDuration.Text;                producer.Send(request);            }            catch (Exception ex)            {                //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!                  Debug.WriteLine(ex.Message);            }          }        private void Window_Closed(object sender, EventArgs e)        {            try            {                if (session == null)                    return;                //if (connection == null)                //    return;                session.Close();                //connection.Close();            }            catch (Exception ex)            {                Debug.WriteLine(ex.Message);            }        }

 

转载于:https://www.cnblogs.com/wuling129/p/5150681.html

你可能感兴趣的文章
经典算法-链表(golang)
查看>>
leetcode — search-a-2d-matrix
查看>>
魔板 bfs() 预处理,记录每种状态。然后状态置换,(重点要用到全排列的hash记录状态)...
查看>>
构建之法课后作业第一次作业(15个题选一个)
查看>>
操作redis方法
查看>>
C语言函数
查看>>
Python3-异常处理
查看>>
Python-简单打印进度条
查看>>
【02】天气查询应用(第二课)
查看>>
监听微信返回按钮
查看>>
第二次实验报告
查看>>
HDU ACM 3790 最短路径问题
查看>>
python生成器
查看>>
linux 安装 ftp
查看>>
python 监控FTP目录下的文件个数
查看>>
MapInfo格式转arggis格式
查看>>
Network - SSL/TLS的基本概念
查看>>
python学习之老男孩python全栈第九期_day012知识点总结
查看>>
pandas学习(数据分组与分组运算、离散化处理、数据合并)
查看>>
geeksforgeeks-Array-Rotate and delete
查看>>