LaJiFenLei/Waste.MessageHandler/Program.cs

136 lines
5.1 KiB
C#

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Linq;
using System.Net.Http;
using System.Text;
namespace Waste.MessageHandler
{
class Program
{
static void Main(string[] args)
{
var builder = new HostBuilder().ConfigureServices((hostContext, services) =>
{
services.AddHttpClient();
services.AddTransient<IMyService, MyService>();
}).UseConsoleLifetime();
var host = builder.Build();
try
{
var myService = host.Services.GetRequiredService<IMyService>();
myService.GetMessage();
}
catch (Exception ex)
{
var logger = host.Services.GetRequiredService<ILogger<Program>>();
logger.LogError(ex, "系统发生异常");
}
}
}
public interface IMyService
{
void GetMessage();
}
public class MyService : IMyService
{
private readonly RabbitMqService _rabbitMqProxy;
private readonly IHttpClientFactory _clientFactory;
public MyService(IHttpClientFactory clientFactory)
{
_clientFactory = clientFactory;
_rabbitMqProxy = new RabbitMqService(new MqConfig
{
AutomaticRecoveryEnabled = true,
Host = "localhost",
HeartBeat = new TimeSpan(60),
UserName = "liuzl",
Password = "liuzl"
});
}
public void GetMessage()
{
_rabbitMqProxy.Subscribe<MyPackage>("wastequeue", false, msg =>
{
var message = JsonConvert.SerializeObject(msg);
Console.WriteLine($"{DateTime.Now},收到消息: {message}");
var request = new HttpRequestMessage(HttpMethod.Post,
"http://waste.ybhdmob.com/api/result/insertresult");
request.Content = new StringContent(message, Encoding.UTF8, "application/json");
var client = _clientFactory.CreateClient();
var response = client.Send(request);
if (response.IsSuccessStatusCode)
{
var stream = response.Content.ReadAsStream();
var buffer = new byte[stream.Length];
int count = stream.Read(buffer, 0, buffer.Length);
var result = Encoding.UTF8.GetString(buffer);
Console.WriteLine($"{DateTime.Now},返回结果:{result}");
}
else
{
Console.WriteLine($"{DateTime.Now},发送失败:{response.StatusCode}");
}
}, false);
//创建连接工厂
//ConnectionFactory factory = new ConnectionFactory
//{
// UserName = "liuzl",//用户名
// Password = "liuzl",//密码
// HostName = "localhost"//rabbitmq ip
//};
////创建连接
//var connection = factory.CreateConnection();
////创建通道
//var channel = connection.CreateModel();
////事件基本消费者
//EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
////接收到消息事件
//consumer.Received += (ch, ea) =>
//{
// string message = Encoding.UTF8.GetString(ea.Body.ToArray());
// Console.WriteLine($"收到消息: {message}");
// var request = new HttpRequestMessage(HttpMethod.Post,
// "http://waste.ybhdmob.com/api/result/insertresult");
// request.Content = new StringContent(message, Encoding.UTF8, "application/json");
// var client = _clientFactory.CreateClient();
// var response = client.Send(request);
// if (response.IsSuccessStatusCode)
// {
// var stream = response.Content.ReadAsStream();
// var buffer = new byte[stream.Length];
// int count = stream.Read(buffer, 0, buffer.Length);
// var result = Encoding.UTF8.GetString(buffer);
// Console.WriteLine($"返回结果:{result}");
// //确认该消息已被消费
// channel.BasicAck(ea.DeliveryTag, false);
// }
// else
// {
// Console.WriteLine($"发送失败:{response.StatusCode}");
// }
//};
////启动消费者 设置为手动应答消息
//channel.BasicConsume("wastequeue", false, consumer);
Console.WriteLine($"{DateTime.Now},消费者已启动");
Console.ReadKey();
_rabbitMqProxy.Dispose();
}
}
public class ResultInfo
{
public int code { get; set; }
public string message { get; set; }
public object data { get; set; }
}
}