Commit 52efe794 authored by shiyubin's avatar shiyubin 🌼

山西数据推送程序

parents
Pipeline #127 failed with stages
# Getting Started
### Reference Documentation
For further reference, please consider the following sections:
* [Official Apache Maven documentation](https://maven.apache.org/guides/index.html)
* [Spring Boot Maven Plugin Reference Guide](https://docs.spring.io/spring-boot/docs/2.3.2.RELEASE/maven-plugin/reference/html/)
* [Create an OCI image](https://docs.spring.io/spring-boot/docs/2.3.2.RELEASE/maven-plugin/reference/html/#build-image)
* [Spring for RabbitMQ](https://docs.spring.io/spring-boot/docs/2.3.2.RELEASE/reference/htmlsingle/#boot-features-amqp)
* [JDBC API](https://docs.spring.io/spring-boot/docs/2.3.2.RELEASE/reference/htmlsingle/#boot-features-sql)
### Guides
The following guides illustrate how to use some features concretely:
* [Messaging with RabbitMQ](https://spring.io/guides/gs/messaging-rabbitmq/)
* [Accessing Relational Data using JDBC with Spring](https://spring.io/guides/gs/relational-data-access/)
* [Managing Transactions](https://spring.io/guides/gs/managing-transactions/)
This diff is collapsed.
This diff is collapsed.
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM https://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Maven Start Up Batch script
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM M2_HOME - location of maven2's installed home dir
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM set title of command window
title %0
@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
:skipRcPre
@setlocal
set ERROR_CODE=0
@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal
@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome
echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init
echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
@REM ==== END VALIDATION ====
:init
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir
:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir
:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"
:endDetectBaseDir
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
:endReadAdditionalConfig
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
)
@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
if exist %WRAPPER_JAR% (
if "%MVNW_VERBOSE%" == "true" (
echo Found %WRAPPER_JAR%
)
) else (
if not "%MVNW_REPOURL%" == "" (
SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
)
if "%MVNW_VERBOSE%" == "true" (
echo Couldn't find %WRAPPER_JAR%, downloading it ...
echo Downloading from: %DOWNLOAD_URL%
)
powershell -Command "&{"^
"$webclient = new-object System.Net.WebClient;"^
"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
"}"^
"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
"}"
if "%MVNW_VERBOSE%" == "true" (
echo Finished downloading %WRAPPER_JAR%
)
)
@REM End of extension
@REM Provide a "standardized" way to retrieve the CLI args that will
@REM work with both Windows and non-Windows executions.
set MAVEN_CMD_LINE_ARGS=%*
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
if ERRORLEVEL 1 goto error
goto end
:error
set ERROR_CODE=1
:end
@endlocal & set ERROR_CODE=%ERROR_CODE%
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
:skipRcPost
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%" == "on" pause
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
exit /B %ERROR_CODE%
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.amqp</groupId>
<artifactId>shanxi</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>shanxi</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.10</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="FacetManager">
<facet type="Spring" name="Spring">
<configuration />
</facet>
</component>
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="jdk" jdkName="1.8" jdkType="JavaSDK" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-amqp:2.3.2.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter:2.3.2.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot:2.3.2.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-autoconfigure:2.3.2.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-logging:2.3.2.RELEASE" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.2.3" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.2.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-to-slf4j:2.13.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-api:2.13.3" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:jul-to-slf4j:1.7.30" level="project" />
<orderEntry type="library" name="Maven: jakarta.annotation:jakarta.annotation-api:1.3.5" level="project" />
<orderEntry type="library" name="Maven: org.yaml:snakeyaml:1.26" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-messaging:5.2.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-beans:5.2.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework.amqp:spring-rabbit:2.2.9.RELEASE" level="project" />
<orderEntry type="library" name="Maven: com.rabbitmq:amqp-client:5.9.0" level="project" />
<orderEntry type="library" name="Maven: org.springframework.amqp:spring-amqp:2.2.9.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework.retry:spring-retry:1.2.5.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-context:5.2.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-aop:5.2.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-expression:5.2.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-tx:5.2.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-jdbc:2.3.2.RELEASE" level="project" />
<orderEntry type="library" name="Maven: com.zaxxer:HikariCP:3.4.5" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.30" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-jdbc:5.2.8.RELEASE" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: org.postgresql:postgresql:42.2.14" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.springframework.boot:spring-boot-starter-test:2.3.2.RELEASE" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.springframework.boot:spring-boot-test:2.3.2.RELEASE" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.springframework.boot:spring-boot-test-autoconfigure:2.3.2.RELEASE" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.jayway.jsonpath:json-path:2.4.0" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: net.minidev:json-smart:2.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: net.minidev:accessors-smart:1.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.ow2.asm:asm:5.0.4" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: jakarta.xml.bind:jakarta.xml.bind-api:2.3.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: jakarta.activation:jakarta.activation-api:1.2.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.assertj:assertj-core:3.16.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest:2.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.junit.jupiter:junit-jupiter:5.6.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.junit.jupiter:junit-jupiter-api:5.6.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.apiguardian:apiguardian-api:1.1.0" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.opentest4j:opentest4j:1.2.0" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.junit.platform:junit-platform-commons:1.6.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.junit.jupiter:junit-jupiter-params:5.6.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.junit.jupiter:junit-jupiter-engine:5.6.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.junit.platform:junit-platform-engine:1.6.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.mockito:mockito-core:3.3.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: net.bytebuddy:byte-buddy:1.10.13" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: net.bytebuddy:byte-buddy-agent:1.10.13" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.objenesis:objenesis:2.6" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.mockito:mockito-junit-jupiter:3.3.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.skyscreamer:jsonassert:1.5.0" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.vaadin.external.google:android-json:0.0.20131108.vaadin1" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-core:5.2.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-jcl:5.2.8.RELEASE" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.springframework:spring-test:5.2.8.RELEASE" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.xmlunit:xmlunit-core:2.7.0" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.springframework.amqp:spring-rabbit-test:2.2.9.RELEASE" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:2.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-library:2.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.springframework.amqp:spring-rabbit-junit:2.2.9.RELEASE" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.rabbitmq:http-client:3.2.0.RELEASE" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.11.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.11.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.fasterxml.jackson.core:jackson-core:2.11.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.springframework:spring-web:5.2.8.RELEASE" level="project" />
<orderEntry type="library" name="Maven: org.mybatis.spring.boot:mybatis-spring-boot-starter:1.3.1" level="project" />
<orderEntry type="library" name="Maven: org.mybatis.spring.boot:mybatis-spring-boot-autoconfigure:1.3.1" level="project" />
<orderEntry type="library" name="Maven: org.mybatis:mybatis:3.4.5" level="project" />
<orderEntry type="library" name="Maven: org.mybatis:mybatis-spring:1.3.1" level="project" />
<orderEntry type="library" scope="PROVIDED" name="Maven: org.projectlombok:lombok:1.18.12" level="project" />
<orderEntry type="library" name="Maven: cn.hutool:hutool-all:5.3.10" level="project" />
</component>
</module>
\ No newline at end of file
package com.amqp.shanxi;
import cn.hutool.json.JSONUtil;
import com.amqp.shanxi.model.DBDeviceInfo;
import com.amqp.shanxi.util.AESUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/*
* 生产者消费确认
*/
public class Sender {
public static final String QUEUE = "test_confirm_queue";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//ip
factory.setHost("114.115.212.187");
//端口
factory.setPort(5672);
//用户名
factory.setUsername("bhsoft");
//密码
factory.setPassword("bhsoft");
factory.setVirtualHost("/");// rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
// 创建与RabbitMQ服务的TCP连接
connection = factory.newConnection();
// 创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
channel = connection.createChannel();
// 通道连接交换机
// 通过连接队列 声明队列
channel.queueDeclare(QUEUE, true, false, false, null);
String jsonString = "";
List<DBDeviceInfo> resultList = new ArrayList<DBDeviceInfo>();
DBDeviceInfo dbDeviceInfo = new DBDeviceInfo();
//给DBDeviceInfo实体赋值
dbDeviceInfo.setEquipname("雨量计");
resultList.add(dbDeviceInfo);
//对数据加密
String result = JSONUtil.toJsonStr(resultList);
String encrypt = AESUtil.encrypt(result, "BRlp%IYyTYFntG!g7oPo$N==" , "wP!n$!ZD%0k2g6Gahpdk6l==");
Map<String, Object> resultMap = new HashMap<String, Object>();
//荷载消息内容
while(true) {
resultMap.put("tailingNo","1409240052");
resultMap.put("departmentId","" );
resultMap.put("provinceId", "");
resultMap.put("type","0101");
resultMap.put("sendTime", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
resultMap.put("datas", encrypt);
jsonString = JSONUtil.toJsonStr(resultMap);
//发送消息
//第一个参数是交换机,第二个参数是routingkey(主题格式)
channel.basicPublish("basicData", "equipInfo.01", null, jsonString.getBytes());
System.out.println("[send] msg " + jsonString);
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("发送成功!");
} else {
System.out.println("发送失败!");
}
Thread.sleep(1000 * 10);
}
} catch (Exception e) {
e.getStackTrace();
} finally {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.getStackTrace();
}
}
}
}
package com.amqp.shanxi;
import com.amqp.shanxi.service.AlarmDataAMQPService;
import com.amqp.shanxi.service.DeviceStaticDataAMQPService;
import com.amqp.shanxi.service.RealTimeDataAMQPService;
import com.amqp.shanxi.service.ThreadService;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class ShanxiApplication {
public static void main(String[] args) {
ConfigurableApplicationContext run = SpringApplication.run(ShanxiApplication.class, args);
DeviceStaticDataAMQPService deviceStaticDataAMQPService = run.getBean(DeviceStaticDataAMQPService.class);
RealTimeDataAMQPService realTimeDataAMQPService = run.getBean(RealTimeDataAMQPService.class);
AlarmDataAMQPService alarmDataAMQPService = run.getBean(AlarmDataAMQPService.class);
try{
ConnectionFactory factory = new ConnectionFactory();
//rabbitmq监听IP
factory.setHost("114.115.212.187");
//rabbitmq监听默认端口
factory.setPort(5672);
//设置访问的用户
factory.setUsername("bhsoft");
factory.setPassword("bhsoft");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
ThreadService threadService1 = new ThreadService("staticData", connection, deviceStaticDataAMQPService, realTimeDataAMQPService, alarmDataAMQPService);
ThreadService threadService2 = new ThreadService("realTimeData", connection, deviceStaticDataAMQPService, realTimeDataAMQPService,alarmDataAMQPService);
ThreadService threadService3 = new ThreadService("alarmData", connection, deviceStaticDataAMQPService, realTimeDataAMQPService,alarmDataAMQPService);
threadService1.start();
threadService2.start();
threadService3.start();
} catch(Exception e) {
e.printStackTrace();
}
}
}
package com.amqp.shanxi.dao;
import com.amqp.shanxi.model.*;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.util.List;
@Mapper
public interface AlarmInfoDao {
/**
* 干滩实时数据
*/
@Select("SELECT c.gantanid AS sensorno, c.jctime AS warningdata, c.gantan AS value, c.ifbj AS warninglevel, b.id AS recordid FROM (" +
" SELECT a.id, a.gantanid, a.jctime, a.gantan, a.bagao, a.podu, a.ifbj from (" +
" SELECT id, gantanid, jctime, gantan, bagao, podu, ifbj, rank() OVER (PARTITION BY gantanid ORDER BY jctime DESC)" +
" FROM tab_gantanweiyi where jctime > now() - INTERVAL '6 hour'" +
" ) a WHERE rank = 1 and ifbj = '1'" +
" ) c INNER JOIN (SELECT id, name FROM tab_sensor) b on c.gantanid = b.name")
List<GantanAlarmInfoModel> findGantanAlarmData();
@Select("SELECT c.sensorname AS sensorno, c.jctime AS warningdata, c.value2 as value, c.ifbj AS warninglevel, b.id AS recordid FROM (" +
" SELECT a.id, a.sensorname, a.jctime, a.value2, a.ifbj FROM (" +
" SELECT id, sensorname, jctime, value2, ifbj, rank() OVER (PARTITION BY sensorname ORDER BY jctime DESC)"+
" FROM tab_kushuiwei WHERE jctime > now() - INTERVAL '6 hour'" +
" ) a WHERE rank = 1 and ifbj = '1'" +
" ) c INNER JOIN (SELECT id, name FROM tab_sensor) b ON c.sensorname = b.name")
List<KushuiweiAlarmInfoModel> findKushuiweiAlarmData();
@Select("SELECT c.sensorname AS sensorno, c.jctime AS warningdata, c.disx AS value, c.ifbj AS warninglevel, b.id AS recordid FROM (" +
" SELECT a.id, a.sensorname, a.jctime, a.disx, a.disy, a.disz, a.ifbj from (" +
" SELECT id, sensorname, jctime, disx, disy, disz, ifbj, rank() OVER (PARTITION BY sensorname ORDER BY jctime DESC)" +
" FROM tab_batidibiaoweiyi where jctime > now() - INTERVAL '6 hour'" +
" ) a WHERE rank = 1 and ifbj = '1'" +
" ) c INNER JOIN (SELECT id, name FROM tab_sensor) b on c.sensorname = b.name")
List<BiaomianweiyiAlarmInfoModel> findBiaomianweiyiAlarmData();
@Select("select c.sensorname AS sensorno, c.jctime AS warningdata, c.value2 as value, c.ifbj AS warninglevel, b.id AS recordid from ("+
" select a.id, a.sensorname, a.jctime, a.value2, a.ifbj from (" +
" select id, sensorname, jctime, value2, ifbj, rank() over (partition by sensorname order by jctime desc)" +
" from tab_jinrunxian where jctime > now() - interval '6 hour'" +
" ) a where rank = 1 and ifbj = '1'" +
" ) c inner join (select id, name from tab_sensor) b on c.sensorname = b.name")
List<JinrunxianAlarmInfoModel> findJinrunxianAlarmData();
@Select("select c.sensorname AS sensorno, c.jctime AS warningdata, c.value2 as value, c.ifbj AS warninglevel, b.id AS recordid from (" +
" select a.id, a.sensorname, a.jctime, a.value2, a.ifbj from (" +
" select id, sensorname, jctime, value2, ifbj, rank() over (partition by sensorname order by jctime desc)" +
" from tab_jiangyuliang where jctime > now() - interval '6 hour'" +
" ) a where rank = 1 and ifbj = '1'" +
" ) c inner join (select id, name from tab_sensor) b on c.sensorname = b.name")
List<JiangyuliangAlarmInfoModel> findJiangyuliangAlarmData();
// 监测项报警
@Select("select a.mark AS itemno,b.id AS recordid, b.yjjb AS rwarninglevel, b.jctime AS warningdata from"+
" (select mark from tab_sensor group by mark) a left join" +
" (select id, alarmtype, yjjb, jctime from tab_alarmtype where status = 0) b on a.mark = b.alarmtype where b.jctime is not null")
List<DeviceAlarmInfoModel> findDeviceAlarmData();
@Select("select alarmname as warningcontent, jctime AS collectdate, yjjb AS rwarninglevel from tab_alarmtype where status = 0")
List<WkkAlarmTimeInfoModel> findWkkAlarmData();
@Select("select c.sensorname AS sensorno, c.jctime AS warningdata, c.value2 as value, c.ifbj AS warninglevel, b.id AS recordid from (" +
" select a.id, a.sensorname, a.jctime, a.value2, a.ifbj from ("+
" select id, sensorname, jctime, value2, ifbj, rank() over (partition by sensorname order by jctime desc)"+
" from tab_shuipingweiyi where jctime > now() - interval '6 hour'" +
" ) a where rank = 1 and ifbj = '1'" +
" ) c inner join (select id, name from tab_sensor) b on c.sensorname = b.name")
List<NeibuweiyiAlarmInfoModel> findNeibushuipingAlarmData();
@Select("select c.sensorname AS sensorno, c.jctime AS warningdata, c.value2 as value, c.ifbj AS warninglevel, b.id AS recordid from (" +
" select a.id, a.sensorname, a.jctime, a.value2, a.ifbj from (" +
" select id, sensorname, jctime, value2, ifbj, rank() over (partition by sensorname order by jctime desc)" +
" from tab_neibuchenjiang where jctime > now() - interval '6 hour'" +
" ) a where rank = 1 and ifbj = '1'" +
" ) c inner join (select id, name from tab_sensor) b on c.sensorname = b.name")
List<NeibuweiyiAlarmInfoModel> findNeibuchenjiangAlarmData();
}
package com.amqp.shanxi.dao;
import com.amqp.shanxi.model.*;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import org.springframework.stereotype.Repository;
import java.util.List;
@Mapper
public interface DeviceStaticDataInfoDao {
@Select("SELECT id AS equipno, name AS equipname FROM tab_sensor WHERE mark = '干滩2'")
List<GantanDeviceInfoModel> findGantanDeviceInfo();
@Select("SELECT id AS equipno, name AS equipname FROM tab_sensor WHERE mark = '库水位'")
List<KushuiweiDeviceInfoModel> findKushuiweiDeviceInfo();
@Select("SELECT id AS equipno, name AS equipname FROM tab_sensor WHERE mark = '坝体表面位移'")
List<BiaomianweiyiDeviceInfoModel> findBiaomianweiyiDeviceInfo();
@Select("SELECT id AS equipno, name AS equipname FROM tab_sensor WHERE mark = '浸润线'")
List<JinrunxianDeviceInfoModel> findJinrunxianDeviceInfo();
@Select("SELECT id AS equipno, name AS equipname FROM tab_sensor WHERE mark = '雨量计'")
List<JinrunxianDeviceInfoModel> findJiangyuliangDeviceInfo();
@Select("SELECT id AS equipno, name AS equipname FROM tab_sensor WHERE mark = '坝体内部水平位移' or mark = '坝体内部沉降'")
List<NeibuweiyiDeviceInfoModel> findNeibuweiyiDeviceInfo();
}
package com.amqp.shanxi.dao;
import com.amqp.shanxi.model.*;
import lombok.Data;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.util.List;
@Mapper
public interface RealTimeInfoDao {
/**
* 干滩实时数据
*/
@Select("SELECT c.gantanid AS sensorno, c.jctime AS collectdate, c.gantan AS value, c.bagao AS beachTopHeight, c.podu AS beachLongSlope, c.ifbj AS valuestate, b.id AS recordid FROM (" +
" SELECT a.id, a.gantanid, a.jctime, a.gantan, a.bagao, a.podu, a.ifbj from (" +
" SELECT id, gantanid, jctime, gantan, bagao, podu, ifbj, rank() OVER (PARTITION BY gantanid ORDER BY jctime DESC)" +
" FROM tab_gantanweiyi where jctime > now() - INTERVAL '6 hour'" +
" ) a WHERE rank = 1" +
" ) c INNER JOIN (SELECT id, name FROM tab_sensor) b on c.gantanid = b.name")
List<GantanRealTimeInfoModel> findGantanRealTimeData();
@Select("SELECT c.sensorname AS sensorno, c.jctime AS collectdate, c.value2 as value, c.ifbj AS valuestate, b.id AS recordid FROM (" +
" SELECT a.id, a.sensorname, a.jctime, a.value2, a.ifbj FROM (" +
" SELECT id, sensorname, jctime, value2, ifbj, rank() OVER (PARTITION BY sensorname ORDER BY jctime DESC)"+
" FROM tab_kushuiwei WHERE jctime > now() - INTERVAL '6 hour'" +
" ) a WHERE rank = 1" +
" ) c INNER JOIN (SELECT id, name FROM tab_sensor) b ON c.sensorname = b.name")
List<KushuiweiRealTimeInfoModel> findKushuiweiRealTimeData();
@Select("SELECT c.sensorname AS sensorno, c.jctime AS collectdate, c.disx AS xvalue, c.disy AS yvalue, c.disz AS zvalue, c.ifbj AS displacementstate, b.id AS recordid FROM (" +
" SELECT a.id, a.sensorname, a.jctime, a.disx, a.disy, a.disz, a.ifbj from (" +
" SELECT id, sensorname, jctime, disx, disy, disz, ifbj, rank() OVER (PARTITION BY sensorname ORDER BY jctime DESC)" +
" FROM tab_batidibiaoweiyi where jctime > now() - INTERVAL '6 hour'" +
" ) a WHERE rank = 1" +
" ) c INNER JOIN (SELECT id, name FROM tab_sensor) b on c.sensorname = b.name")
List<BiaomianweiyiRealTimeInfoModel> findBiaomianweiyiRealTimeData();
@Select("select c.sensorname AS sensorno, c.jctime AS collectdate, c.value2 as value, c.ifbj AS valuestate, b.id AS recordid from ("+
" select a.id, a.sensorname, a.jctime, a.value2, a.ifbj from (" +
" select id, sensorname, jctime, value2, ifbj, rank() over (partition by sensorname order by jctime desc)" +
" from tab_jinrunxian where jctime > now() - interval '6 hour'" +
" ) a where rank = 1" +
" ) c inner join (select id, name from tab_sensor) b on c.sensorname = b.name")
List<JinrunxianRealTimeInfoModel> findJinrunxianRealTimeData();
@Select("select c.sensorname AS sensorno, c.jctime AS collectdate, c.value2 as value, c.ifbj AS onehourvaluestate, b.id AS recordid from (" +
" select a.id, a.sensorname, a.jctime, a.value2, a.ifbj from (" +
" select id, sensorname, jctime, value2, ifbj, rank() over (partition by sensorname order by jctime desc)" +
" from tab_jiangyuliang where jctime > now() - interval '6 hour'" +
" ) a where rank = 1" +
" ) c inner join (select id, name from tab_sensor) b on c.sensorname = b.name")
List<JiangyuliangRealTimeInfoModel> findJiangyuliangRealTimeData();
// 监测项报警
@Select("select a.mark AS itemno,b.id AS recordid, b.yjjb AS state, b.jctime AS collectdate from"+
" (select mark from tab_sensor group by mark) a left join" +
" (select id, alarmtype, yjjb, jctime from tab_alarmtype where status = 0) b on a.mark = b.alarmtype")
List<DeviceRealTimeInfoModel> findDeviceRealTimeData();
@Select("select jctime AS collectdate, yjjb AS state from tab_alarmtype where status = 0")
List<WkkRealTimeInfoModel> findWkkRealTimeData();
/**
* 设备离线
* @return
*/
@Select("select b.id AS recordid, b.name, a.jctime from" +
" (select jctime, gantanid from tab_gantanweiyi where jctime > now() - interval '6 hour') a right join" +
" (select id, name from tab_sensor where mark = '干滩2') b on a.gantanid = b.name where jctime is null" +
" union all" +
" select b.id AS recordid, b.name, a.jctime from" +
" (select sensorname, jctime from tab_kushuiwei where jctime > now() - interval '6 hour') a right join" +
" (select id, name from tab_sensor where mark = '库水位') b on a.sensorname = b.name where a.jctime is null" +
" union all" +
" select b.id AS recordid, b.name, a.jctime from" +
" (select sensorname, jctime from tab_batidibiaoweiyi where jctime > now() - interval '6 hour') a right join" +
" (select id, name from tab_sensor where mark = '坝体表面位移') b on a.sensorname = b.name where a.jctime is null" +
" union all"+
" select b.id AS recordid, b.name, a.jctime from" +
" (select sensorname, jctime from tab_jinrunxian where jctime > now() - interval '6 hour') a right join" +
" (select id, name from tab_sensor where mark = '浸润线') b on a.sensorname = b.name where a.jctime is null" +
" union all" +
" select b.id AS recordid, b.name, a.jctime from" +
" (select sensorname, jctime from tab_jiangyuliang where jctime > now() - interval '6 hour') a right join" +
" (select id, name from tab_sensor where mark = '雨量计') b on a.sensorname = b.name where a.jctime is null")
List<DeviceOffLineInfoModel> findDeviceOffLineData();
/**
* 内部位移(水平)
* @return
*/
@Select("select c.sensorname AS sensorno, c.jctime AS collectdate, c.value2 as xvalue, 0 as yvalue, c.ifbj AS valuestate, b.id AS recordid from (" +
" select a.id, a.sensorname, a.jctime, a.value2, a.ifbj from ("+
" select id, sensorname, jctime, value2, ifbj, rank() over (partition by sensorname order by jctime desc)" +
" from tab_shuipingweiyi where jctime > now() - interval '6 hour'" +
" ) a where rank = 1" +
" ) c inner join (select id, name from tab_sensor) b on c.sensorname = b.name")
List<NeibuweiyiRealTimeInfoModel> findNeibuShuipingRealTimeData();
/**
* 内部位移(沉降)
* @return
*/
@Select("select c.sensorname AS sensorno, c.jctime AS collectdate, c.value2 as yvalue, 0 as xvalue, c.ifbj AS valuestate, b.id AS recordid from (" +
" select a.id, a.sensorname, a.jctime, a.value2, a.ifbj from (" +
" select id, sensorname, jctime, value2, ifbj, rank() over (partition by sensorname order by jctime desc)" +
" from tab_neibuchenjiang where jctime > now() - interval '6 hour'" +
" ) a where rank = 1" +
" ) c inner join (select id, name from tab_sensor) b on c.sensorname = b.name")
List<NeibuweiyiRealTimeInfoModel> findNeibuChenjiangRealTimeData();
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class BiaomianweiyiAlarmInfoModel {
private Integer recordid;
private String sensorno;
private String collectdate;
private String value;
private String warninglevel;
private String rvalue;
private String rwarninglevel;
private String warningcontent;
private String warningdata;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class BiaomianweiyiDeviceInfoModel {
private String equipno;
private String equipname;
private String installationdate;
private String installationlocate;
private String manufacture;
private Double longitude;
private Double latitude;
private Double altitude;
private Double onelevelalarm;
private Double twolevelalarm;
private Double threelevelalarm;
private Double rateonelevelalarm;
private Double ratetwolevelalarm;
private Double ratethreelevelalarm;
private String is_used;
private String is_sync;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class BiaomianweiyiRealTimeInfoModel {
private Integer recordid;
private String sensorno;
private String collectdate;
private Double xvalue;
private Double yvalue;
private Double zvalue;
private Double displacement;
private String displacementstate;
private Double displacementrate;
private String displacementratestate;
}
package com.amqp.shanxi.model;
/**
* 干滩设备信息实体类
*/
public class DBDeviceInfo {
private String equipno;//设备编号
private String equipname;//设备名称
private String installationdate;//安装日期
private String manufacture;//安装厂家
private Double installlocation;//安装高度
private String installationlocate;//安装位置
private Double longitude;//经度
private Double latitude;//纬度
private Double altitude;//高程
private Double onelevelalarm;//一级预警阈值
private Double twolevelalarm;//二级预警阈值
private Double threelevelalarm;//三级预警阈值
private Double honelevelalarm;//安全超高一级预警阈值
private Double htwolevelalarm;//安全超高二级预警阈值
private Double hthreelevelalarm;//安全超高三级预警阈值
private String is_used;//设备在用状态
private String is_sync;//同步状态
public String getEquipno() {
return equipno;
}
public void setEquipno(String equipno) {
this.equipno = equipno;
}
public String getEquipname() {
return equipname;
}
public void setEquipname(String equipname) {
this.equipname = equipname;
}
public String getInstallationdate() {
return installationdate;
}
public void setInstallationdate(String installationdate) {
this.installationdate = installationdate;
}
public String getManufacture() {
return manufacture;
}
public void setManufacture(String manufacture) {
this.manufacture = manufacture;
}
public Double getInstalllocation() {
return installlocation;
}
public void setInstalllocation(Double installlocation) {
this.installlocation = installlocation;
}
public String getInstallationlocate() {
return installationlocate;
}
public void setInstallationlocate(String installationlocate) {
this.installationlocate = installationlocate;
}
public Double getLongitude() {
return longitude;
}
public void setLongitude(Double longitude) {
this.longitude = longitude;
}
public Double getLatitude() {
return latitude;
}
public void setLatitude(Double latitude) {
this.latitude = latitude;
}
public Double getAltitude() {
return altitude;
}
public void setAltitude(Double altitude) {
this.altitude = altitude;
}
public Double getOnelevelalarm() {
return onelevelalarm;
}
public void setOnelevelalarm(Double onelevelalarm) {
this.onelevelalarm = onelevelalarm;
}
public Double getTwolevelalarm() {
return twolevelalarm;
}
public void setTwolevelalarm(Double twolevelalarm) {
this.twolevelalarm = twolevelalarm;
}
public Double getThreelevelalarm() {
return threelevelalarm;
}
public void setThreelevelalarm(Double threelevelalarm) {
this.threelevelalarm = threelevelalarm;
}
public Double getHonelevelalarm() {
return honelevelalarm;
}
public void setHonelevelalarm(Double honelevelalarm) {
this.honelevelalarm = honelevelalarm;
}
public Double getHtwolevelalarm() {
return htwolevelalarm;
}
public void setHtwolevelalarm(Double htwolevelalarm) {
this.htwolevelalarm = htwolevelalarm;
}
public Double getHthreelevelalarm() {
return hthreelevelalarm;
}
public void setHthreelevelalarm(Double hthreelevelalarm) {
this.hthreelevelalarm = hthreelevelalarm;
}
public String getIs_used() {
return is_used;
}
public void setIs_used(String is_used) {
this.is_used = is_used;
}
public String getIs_sync() {
return is_sync;
}
public void setIs_sync(String is_sync) {
this.is_sync = is_sync;
}
@Override
public String toString() {
return "DBDeviceInfo [equipno=" + equipno + ", equipname=" + equipname + ", installationdate="
+ installationdate + ", manufacture=" + manufacture + ", installlocation=" + installlocation
+ ", installationlocate=" + installationlocate + ", longitude=" + longitude + ", latitude=" + latitude
+ ", altitude=" + altitude + ", onelevelalarm=" + onelevelalarm + ", twolevelalarm=" + twolevelalarm
+ ", threelevelalarm=" + threelevelalarm + ", honelevelalarm=" + honelevelalarm + ", htwolevelalarm="
+ htwolevelalarm + ", hthreelevelalarm=" + hthreelevelalarm + ", is_used=" + is_used + ", is_sync="
+ is_sync + "]";
}
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class DeviceAlarmInfoModel {
private Integer recordid;
private String itemno;
private String rwarninglevel;
private String warningcontent;
private String warningdata;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class DeviceOffLineInfoModel {
private Integer recordid;
private String sensorno;
private String collectdate;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class DeviceRealTimeInfoModel {
private Integer recordid;
private String itemno;
private String collectdate;
private String state;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class GantanAlarmInfoModel {
private Integer recordid;
private String sensorno;
private String value;
private String warninglevel;
private String hvalue;
private String hwarninglevel;
private String warningcontent;
private String warningdata;
}
package com.amqp.shanxi.model;
import lombok.Data;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
@Data
public class GantanDeviceInfoModel {
private String equipno;
private String equipname;
private String installationdate;
private String installationlocate;
private Double installlocation;
private String manufacture;
private Double longitude;
private Double latitude;
private Double altitude;
private Double onelevelalarm;
private Double twolevelalarm;
private Double threelevelalarm;
private Double honelevelalarm;
private Double htwolevelalarm;
private Double hthreelevelalarm;
private String is_used;
private String is_sync;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class GantanRealTimeInfoModel {
private Integer recordid;
private String sensorno;
private String collectdate;
private Double value;
private String valuestate;
private Double freeBoard;
private String freeBoardState;
private String beachLongSlope;
private Double beachTopHeight;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class JiangyuliangAlarmInfoModel {
private Integer recordid;
private String sensorno;
private String value;
private String warninglevel;
private String threevalue;
private String threewarninglevel;
private String sixvalue;
private String sixwarninglevel;
private String twelvevalue;
private String twelvewarninglevel;
private String twentyvalue;
private String twentywarninglevel;
private String warningcontent;
private String warningdata;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class JiangyuliangDeviceInfoModel {
private String equipno;
private String equipname;
private String installationdate;
private String installationlocate;
private String manufacture;
private Double longitude;
private Double latitude;
private Double altitude;
private Double onelevelalarm;
private Double twolevelalarm;
private Double threelevelalarm;
private Double threehoursonelevelalarm;
private Double threehourstwolevelalarm;
private Double threehoursthreelevelalarm;
private Double sixhoursonelevelalarm;
private Double sixhourstwolevelalarm;
private Double sixhoursthreelevelalarm;
private Double twelvehoursonelevelalarm;
private Double twolvehourstwolevelalarm;
private Double twolvehoursthreelevelalarm;
private Double tfhoursonelevelalarm;
private Double tfhourstwolevelalarm;
private Double tfhoursthreelevelalarm;
private String is_used;
private String is_sync;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class JiangyuliangRealTimeInfoModel {
private Integer recordid;
private String sensorno;
private String collectdate;
private Double value;
private Double onehourvalue;
private String onehourvaluestate;
private Double threehoursvalue;
private String threehoursvaluestate;
private Double sixhoursvalue;
private String sixhoursvaluestate;
private Double twelvehoursvalue;
private String twelvehoursvaluestate;
private Double twentyhoursvalue;
private String twentyhoursvaluestate;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class JinrunxianAlarmInfoModel {
private Integer recordid;
private String sensorno;
private String value;
private String warninglevel;
private String warningcontent;
private String warningdata;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class JinrunxianDeviceInfoModel {
private String equipno;
private String equipname;
private String installationdate;
private String installationlocate;
private String manufacture;
private Double holedepth;
private Double burialdepath;
private Double longitude;
private Double latitude;
private Double altitude;
private Double onelevelalarm;
private Double twolevelalarm;
private Double threelevelalarm;
private String is_used;
private String is_sync;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class JinrunxianRealTimeInfoModel {
private Integer recordid;
private String sensorno;
private String collectdate;
private Double value;
private String valuestate;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class KushuiweiAlarmInfoModel {
private Integer recordid;
private String sensorno;
private String value;
private String warninglevel;
private String warningcontent;
private String warningdata;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class KushuiweiDeviceInfoModel {
private String equipno;
private String equipname;
private String installationdate;
private String installationlocate;
private Double installlocation;
private String manufacture;
private Double longitude;
private Double latitude;
private Double altitude;
private Double onelevelalarm;
private Double twolevelalarm;
private Double threelevelalarm;
private String is_used;
private String is_sync;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class KushuiweiRealTimeInfoModel {
private Integer recordid;
private String sensorno;
private String collectdate;
private Double value;
private String valuestate;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class NeibuweiyiAlarmInfoModel {
private Integer recordid;
private String sensorno;
private String value;
private String warninglevel;
private String rvalue;
private String rwarninglevel;
private String warningcontent;
private String warningdata;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class NeibuweiyiDeviceInfoModel {
private String equipno;
private String equipname;
private String installationdate;
private String installationlocate;
private String manufacture;
private Double longitude;
private Double latitude;
private Double altitude;
private Double onelevelalarm;
private Double twolevelalarm;
private Double threelevelalarm;
private Double rateonelevelalarm;
private Double ratetwolevelalarm;
private Double ratethreelevelalarm;
private String is_used;
private String is_sync;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class NeibuweiyiRealTimeInfoModel {
private Integer recordid;
private String sensorno;
private String collectdate;
private Double xvalue;
private Double yvalue;
private Double displacement;
private String displacementstate;
private Double displacementrate;
private String displacementratestate;
}
package com.amqp.shanxi.model;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* 荷载 (消息内容)
*/
@Data
public class StaticDataInfoModel implements Serializable {
private String tailingNo;
private String departmentId;
private String provinceId;
private String type;
private String sendTime;
private String datas;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class TabSensorModel {
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class WkkAlarmTimeInfoModel {
private Integer recordid;
private String rwarninglevel;
private String warningcontent;
private String warningdata;
}
package com.amqp.shanxi.model;
import lombok.Data;
@Data
public class WkkRealTimeInfoModel {
private Integer recordid;
private String collectdat;
private String state;
}
package com.amqp.shanxi.service;
import com.amqp.shanxi.util.AMQPTopicStaticDataConfig;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class AlarmDataAMQPService {
@Autowired
private AlarmDataService alarmDataService;
/**
* 实时数据推送入口
*/
public void run(Connection connection) {
publishAlarm("gt", connection);
publishAlarm("ksw", connection);
publishAlarm("bmwy", connection);
publishAlarm("nbwy", connection);
publishAlarm("jrx", connection);
publishAlarm("jyl", connection);
publishAlarm("device", connection);
publishAlarm("wkk", connection);
}
public void publishAlarm(String type, Connection connection) {
try {
if ("gt".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEGANtANALARM
, null
, alarmDataService.gantanAlarm().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("干滩报警信息发送成功!");
} else {
System.out.println("干滩报警信息发送失败!");
}
channel.close();
} else if ("ksw".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEKUSHUIWEIALARM
, null
, alarmDataService.kushuiweiAlarm().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("库水位报警信息发送成功!");
} else {
System.out.println("库水位报警信息发送失败!");
}
channel.close();
} else if ("bmwy".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEBIAOMIANWEIYIALARM
, null
, alarmDataService.biaomianweiyiAlarm().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("表面位移报警信息发送成功!");
} else {
System.out.println("表面位移报警信息发送失败!");
}
channel.close();
} else if ("nbwy".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUENEIBUWEIYIALARM
, null
, alarmDataService.neibushuipingAlarm().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("内部水平报警信息发送成功!");
} else {
System.out.println("内部水平报警信息发送失败!");
}
// ====================内部沉降
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUENEIBUWEIYIALARM
, null
, alarmDataService.neibuchenjiangAlarm().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("内部沉降报警信息发送成功!");
} else {
System.out.println("内部沉降报警信息发送失败!");
}
channel.close();
} else if ("jrx".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEJINRUNXIANALARM
, null
, alarmDataService.jinrunxianAlarm().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("浸润线报警信息发送成功!");
} else {
System.out.println("浸润线报警信息发送失败!");
}
channel.close();
} else if ("jyl".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEJIANGYULIANGALARM
, null
, alarmDataService.jiangyuliangAlarm().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("降雨量报警信息发送成功!");
} else {
System.out.println("降雨量报警信息发送失败!");
}
channel.close();
} else if ("sll".equals(type)) {
} else if ("device".equals(type)) {
// 监测项数据信息
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEDEVICEALARM
, null
, alarmDataService.deviceAlarm().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("检测项报警信息发送成功!");
} else {
System.out.println("检测项报警信息发送失败!");
}
channel.close();
} else if ("wkk".equals(type)) {
// 尾矿库数据信息
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEWKKALARM
, null
, alarmDataService.wkkAlarm().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("尾矿库报警信息发送成功!");
} else {
System.out.println("尾矿库报警信息发送失败!");
}
channel.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/*
@RabbitListener(queues ="equipOfflineInfo" )
public void receiveMessage1(String str){
System.out.println("我是监听干滩topic.message的, dataInfo.01: " + str);
}
*/
}
package com.amqp.shanxi.service;
import org.springframework.stereotype.Service;
@Service
public interface AlarmDataService {
String gantanAlarm();
String kushuiweiAlarm();
String biaomianweiyiAlarm();
String jinrunxianAlarm();
String jiangyuliangAlarm();
String deviceAlarm();
String wkkAlarm();
String neibuchenjiangAlarm();
String neibushuipingAlarm();
}
package com.amqp.shanxi.service;
import cn.hutool.json.JSONUtil;
import com.amqp.shanxi.ShanxiApplication;
import com.amqp.shanxi.model.StaticDataInfoModel;
import com.amqp.shanxi.service.impl.DeviceStaticDataInfoServiceImpl;
import com.amqp.shanxi.util.AMQPTopicStaticDataConfig;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class DeviceStaticDataAMQPService {
@Autowired
private RabbitTemplate amqpTemplate;
@Autowired
private DeviceStaticDataInfoService deviceStaticDataInfoService;
public void run(Connection connection) {
try{
// 设备信息
this.publish("gt", connection);
this.publish("ksw",connection);
this.publish("bmwy", connection);
this.publish("jrx", connection);
this.publish("jyl", connection);
this.publish("nbwy", connection);
}catch(Exception e) {
e.printStackTrace();
}
}
public void publish(String type, Connection connection) {
try {
if ("gt".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.TOPIC_EXCHANGE, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.TOPIC_EXCHANGE
, AMQPTopicStaticDataConfig.TOPIC_ROUTINGKEY_GANTAN
, null
, JSONUtil.toJsonStr(deviceStaticDataInfoService.gantanDeviceInfo()).getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("干滩设备信息发送成功!");
} else {
System.out.println("干滩设备信息发送失败!");
}
channel.close();
} else if ("ksw".equals(type)) {
Channel channel = connection.createChannel();
channel.basicPublish(AMQPTopicStaticDataConfig.TOPIC_EXCHANGE
, AMQPTopicStaticDataConfig.TOPIC_ROUTINGKEY_KUSHUIWEI
, null
, JSONUtil.toJsonStr(deviceStaticDataInfoService.kushuiweiDeviceInfo()).getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("库水位设备信息发送成功!");
} else {
System.out.println("库水位设备信息发送失败!");
}
channel.close();
} else if ("bmwy".equals(type)) {
Channel channel = connection.createChannel();
channel.basicPublish(AMQPTopicStaticDataConfig.TOPIC_EXCHANGE
, AMQPTopicStaticDataConfig.TOPIC_ROUTINGKEY_BIAOMIANWEIYI
, null
, JSONUtil.toJsonStr(deviceStaticDataInfoService.biaomianweiyiDeviceInfo()).getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("表面位移设备信息发送成功!");
} else {
System.out.println("表面位移设备信息发送失败!");
}
channel.close();
} else if ("nbwy".equals(type)) {
Channel channel = connection.createChannel();
channel.basicPublish(AMQPTopicStaticDataConfig.TOPIC_EXCHANGE
, AMQPTopicStaticDataConfig.TOPIC_ROUTINGKEY_NEIBUWEIYI
, null
, JSONUtil.toJsonStr(deviceStaticDataInfoService.neibuweiyiDeviceInfo()).getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("内部位移设备信息发送成功!");
} else {
System.out.println("内部位移设备信息发送失败!");
}
channel.close();
} else if ("jrx".equals(type)) {
Channel channel = connection.createChannel();
channel.basicPublish(AMQPTopicStaticDataConfig.TOPIC_EXCHANGE
, AMQPTopicStaticDataConfig.TOPIC_ROUTINGKEY_JINRUNXIAN
, null
, JSONUtil.toJsonStr(deviceStaticDataInfoService.jinrunxianDeviceInfo()).getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("浸润线设备信息发送成功!");
} else {
System.out.println("浸润线设备信息发送失败!");
}
channel.close();
} else if ("jyl".equals(type)) {
Channel channel = connection.createChannel();
channel.basicPublish(AMQPTopicStaticDataConfig.TOPIC_EXCHANGE
, AMQPTopicStaticDataConfig.TOPIC_ROUTINGKEY_JIANGYULIANG
, null
, JSONUtil.toJsonStr(deviceStaticDataInfoService.jiangyuliangDeviceInfo()).getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("降雨量设备信息发送成功!");
} else {
System.out.println("降雨量设备信息发送失败!");
}
channel.close();
} else if ("sll".equals(type)) {
}
}catch(Exception e) {
e.printStackTrace();
}
}
/**
* 以下是监听,接收队列中消息
*
*/
/* @RabbitListener(queues ="equipInfo.01" )
public void receiveMessage1(String str){
System.out.println("我是监听干滩topic.message的, 消息队列equipInfo.01: " + str);
}
@RabbitListener(queues ="equipInfo.02" )
public void receiveMessage2(String str){
System.out.println("我是监听库水位topic.message的, 消息队列equipInfo.02: " + str);
}
@RabbitListener(queues ="equipInfo.03" )
public void receiveMessage3(String str){
System.out.println("我是监听表面位移topic.message的, 消息队列equipInfo.03: " + str);
}
@RabbitListener(queues ="equipInfo.05" )
public void receiveMessage5(String str){
System.out.println("我是监听浸润线topic.message的, 消息队列equipInfo.05: " + str);
}
@RabbitListener(queues ="equipInfo.06" )
public void receiveMessage6(String str){
System.out.println("我是监听降雨量topic.message的, 消息队列equipInfo.06: " + str);
}*/
}
package com.amqp.shanxi.service;
import com.amqp.shanxi.model.StaticDataInfoModel;
import org.springframework.stereotype.Service;
@Service
public interface DeviceStaticDataInfoService {
StaticDataInfoModel gantanDeviceInfo();
StaticDataInfoModel kushuiweiDeviceInfo();
StaticDataInfoModel biaomianweiyiDeviceInfo();
StaticDataInfoModel jinrunxianDeviceInfo();
StaticDataInfoModel jiangyuliangDeviceInfo();
StaticDataInfoModel neibuweiyiDeviceInfo();
}
package com.amqp.shanxi.service;
import cn.hutool.json.JSONUtil;
import com.amqp.shanxi.util.AMQPTopicStaticDataConfig;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RealTimeDataAMQPService {
@Autowired
private RealTimeInfoService realTimeInfoService;
/**
* 实时数据推送入口
*/
public void run(Connection connection) {
publishRealTime("gt", connection);
publishRealTime("ksw", connection);
publishRealTime("bmwy", connection);
publishRealTime("nbwy", connection);
publishRealTime("jrx", connection);
publishRealTime("jyl", connection);
publishRealTime("device", connection);
publishRealTime("wkk", connection);
publishRealTime("offline", connection);
}
public void publishRealTime(String type, Connection connection) {
try {
if ("gt".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEGANTANREALTIME
, null
, realTimeInfoService.gantanRealTimeData().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("干滩实时数据发送成功!");
} else {
System.out.println("干滩实时数据发送失败!");
}
channel.close();
} else if ("ksw".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEKUSHUIWEIREALTIME
, null
, realTimeInfoService.kushuiweiRealTimeData().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("库水位实时数据发送成功!");
} else {
System.out.println("库水位实时数据发送失败!");
}
channel.close();
} else if ("bmwy".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEBIAOMIANWEIYIREALTIME
, null
, realTimeInfoService.biaomianweiyiRealTimeData().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("表面位移实时数据发送成功!");
} else {
System.out.println("表面位移实时数据发送失败!");
}
channel.close();
} else if ("nbwy".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUENEIBUWEIYIREALTIME
, null
, realTimeInfoService.neibuChenJiangRealTimeData().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("内部沉降实时数据发送成功!");
} else {
System.out.println("内部沉降实时数据发送失败!");
}
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUENEIBUWEIYIREALTIME
, null
, realTimeInfoService.neibuShuiPingRealTimeData().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("内部水平实时数据发送成功!");
} else {
System.out.println("内部水平实时数据发送失败!");
}
channel.close();
} else if ("jrx".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEJINRUNXIANREALTIME
, null
, realTimeInfoService.jinrunxianRealTimeData().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("浸润线实时数据发送成功!");
} else {
System.out.println("浸润线实时数据发送失败!");
}
channel.close();
} else if ("jyl".equals(type)) {
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEJIANGYULIANGREALTIME
, null
, realTimeInfoService.jiangyuliangRealTimeData().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("降雨量实时数据发送成功!");
} else {
System.out.println("降雨量实时数据发送失败!");
}
channel.close();
} else if ("sll".equals(type)) {
} else if ("device".equals(type)) {
// 监测项数据信息
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEDEVICEREALTIME
, null
, realTimeInfoService.deviceRealTimeData().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("检测项实时数据发送成功!");
} else {
System.out.println("检测项实时数据发送失败!");
}
channel.close();
} else if ("wkk".equals(type)) {
// 尾矿库数据信息
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEWKKREALTIME
, null
, realTimeInfoService.wkkRealTimeData().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("尾矿库实时数据发送成功!");
} else {
System.out.println("尾矿库实时数据发送失败!");
}
channel.close();
} else if ("offline".equals(type)) {
// 设备离线
Channel channel = connection.createChannel();
// channel.exchangeDeclare(AMQPTopicStaticDataConfig.topicExchangeRealTime, "topic", false, true, null);
channel.basicPublish(AMQPTopicStaticDataConfig.topicExchangeRealTime
, AMQPTopicStaticDataConfig.TOPICQUEUEOFFLINEINFO
, null
, realTimeInfoService.deviceOffLineData().getBytes());
//消息确认
channel.confirmSelect();
if (channel.waitForConfirms()) {
System.out.println("设备离线实时数据发送成功!");
} else {
System.out.println("设备离线实时数据发送失败!");
}
channel.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/*
@RabbitListener(queues ="equipOfflineInfo" )
public void receiveMessage1(String str){
System.out.println("我是监听干滩topic.message的, dataInfo.01: " + str);
}
*/
}
package com.amqp.shanxi.service;
import org.springframework.stereotype.Service;
@Service
public interface RealTimeInfoService {
String gantanRealTimeData();
String kushuiweiRealTimeData();
String biaomianweiyiRealTimeData();
String jinrunxianRealTimeData();
String jiangyuliangRealTimeData();
String deviceRealTimeData();
String wkkRealTimeData();
String deviceOffLineData();
String neibuShuiPingRealTimeData();
String neibuChenJiangRealTimeData();
}
package com.amqp.shanxi.service;
import com.amqp.shanxi.model.TabSensorModel;
import java.util.List;
public interface SensorDataService {
List<TabSensorModel> findSensorKind();
}
package com.amqp.shanxi.service;
import com.rabbitmq.client.Connection;
import org.springframework.stereotype.Component;
@Component
public class ThreadService extends Thread{
private String type;
private Connection connection;
private DeviceStaticDataAMQPService deviceStaticDataAMQPService;
private RealTimeDataAMQPService realTimeDataAMQPService;
private AlarmDataAMQPService alarmDataAMQPService;
public ThreadService() {}
public ThreadService(String type, Connection connection, DeviceStaticDataAMQPService deviceStaticDataAMQPService, RealTimeDataAMQPService realTimeDataAMQPService, AlarmDataAMQPService alarmDataAMQPService) {
this.type = type;
this.connection = connection;
this.deviceStaticDataAMQPService = deviceStaticDataAMQPService;
this.realTimeDataAMQPService = realTimeDataAMQPService;
this.alarmDataAMQPService = alarmDataAMQPService;
}
@Override
public void run() {
try{
if("staticData".equals(type)) {
while(true) {
deviceStaticDataAMQPService.run(connection);
System.out.println("设备信息发送完成=====================================================================================");
Thread.sleep(360000 * 24);
}
}else if ("realTimeData".equals(type)) {
while(true) {
realTimeDataAMQPService.run(connection);
System.out.println("实时数据发送完成======================================================================================");
Thread.sleep(60000 * 2);
}
}else if("alarmData".equals(type)){
while(true) {
alarmDataAMQPService.run(connection);
System.out.println("设备报警信息发送完成===================================================================================");
Thread.sleep(60000);
}
}
}catch(Exception e) {
e.printStackTrace();
}
}
}
package com.amqp.shanxi.util;
import sun.misc.BASE64Decoder;
import sun.misc.BASE64Encoder;
import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 解密工具类
* @author z0990
*/
public class AESUtil {
/**
* 用于存储已生成的Cipher
*/
private static Map<String, Cipher> ciperCache = new ConcurrentHashMap<>();
/**
* AES解密
* @author z0990
* @param content 待解密的内容
* @param decryptKey 解密秘钥
* @param iv 向量
* @return 解密数据
*/
public static byte[] decrypt(String content, String decryptKey, String iv) throws NoSuchAlgorithmException, NoSuchPaddingException, InvalidAlgorithmParameterException, InvalidKeyException, IOException, BadPaddingException, IllegalBlockSizeException {
Cipher cipher = fetchDecryptCipher(decryptKey, iv);
BASE64Decoder decoder = new BASE64Decoder();
return cipher.doFinal(decoder.decodeBuffer(content));
}
/**
* AES加密
* @param data 待加密的内容
* @param key 解密秘钥
* @param iv 向量
* @return 加密并经base64编码后的数据
*/
public static String encrypt(String data, String key, String iv) throws InvalidAlgorithmParameterException, NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException, IOException, BadPaddingException, IllegalBlockSizeException {
Cipher cipher = getEncryptCipher(key, iv);
byte[] bytes = cipher.doFinal(data.getBytes(Constants.CHARSET_UTF8));
BASE64Encoder encoder = new BASE64Encoder();
return encoder.encode(bytes);
}
private static Cipher getEncryptCipher(String key, String iv) throws NoSuchAlgorithmException, NoSuchPaddingException, InvalidAlgorithmParameterException, InvalidKeyException, IOException {
Cipher cipher = ciperCache.get(key);
if (cipher == null) {
cipher = fetchEncryptCipher(key, iv);
ciperCache.put(key, cipher);
}
return cipher;
}
private static Cipher fetchDecryptCipher(String decryptKey, String iv) throws InvalidAlgorithmParameterException, NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException, IOException {
return createCipher(decryptKey, iv, Cipher.DECRYPT_MODE);
}
private static Cipher fetchEncryptCipher(String encryptKey, String iv) throws InvalidAlgorithmParameterException, NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException, IOException {
return createCipher(encryptKey, iv, Cipher.ENCRYPT_MODE);
}
private static Cipher createCipher(String key, String iv, int mode) throws NoSuchPaddingException, NoSuchAlgorithmException, IOException, InvalidAlgorithmParameterException, InvalidKeyException {
Cipher cipher = Cipher.getInstance(Constants.DECRYPT_CIPHER_ALGORITHM);
BASE64Decoder decoder = new BASE64Decoder();
SecretKeySpec keySpec = new SecretKeySpec(decoder.decodeBuffer(key), Constants.DECRYPT_ALGORITHM);
IvParameterSpec ivSpec = new IvParameterSpec(decoder.decodeBuffer(iv));
//初始化
cipher.init(mode, keySpec, ivSpec);
return cipher;
}
}
package com.amqp.shanxi.util;/*
* Created by z0990 on 2019/8/1.
*/
public class Constants {
/**
* 加密/解密算法
*/
public static final String DECRYPT_CIPHER_ALGORITHM = "AES/CBC/PKCS5Padding";
/**
* 加密/解密算法
*/
public static final String DECRYPT_ALGORITHM = "AES";
/**
* 编码
*/
public static final String CHARSET_UTF8 = "UTF-8";
/**
* 数据包分隔符
*/
public static final String MESSAGE_SPLITE = "@@";
}
package com.amqp.shanxi.util;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
* 生产者消费确认
*/
public class Sender {
private static final String EXCHANGE_NAME = "basicData";
public static void main(String[] argv) throws IOException, TimeoutException {
new ExchangeTopic("equipInfo.#");
}
static class ExchangeTopic{
public ExchangeTopic(final String routingKey) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//rabbitmq监听IP
factory.setHost("192.168.100.252");
//rabbitmq监听默认端口
factory.setPort(5672);
//设置访问的用户
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明路由名字和类型
// channel.exchangeDeclare(EXCHANGE_NAME, "topic", false, true, null);
//队列名称
String queueName = routingKey + ".queue";
//创建队列
channel.queueDeclare(queueName, false, false, true, null);
//把队列绑定到路由上
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println(" [routingKey = "+ routingKey +"] Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("[routingKey = "+ routingKey +"] Received msg is '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
}
package com.amqp.shanxi.util;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class StaticParamUtils {
// 基础信息
public static String PROVINCENO;
public static String WKKNO;
// 尾矿库设备编码
public static String GANTANNO;
public static String KUSHUIWEINO;
public static String BIAOMIANWEIYINO;
public static String NEIBUWEIYINO;
public static String JINRUNXIANNO;
public static String JIANGYULIANGNO;
public static String SHENLIULIANGNO;
// 数据类型
public static String BASICINFO;
public static String GANTANDEVICE;
public static String KUSHUIWEIDEVICE;
public static String BIAOMIANWEIYIDEVICE;
public static String NEIBUWEIYIDEVICE;
public static String JINRUNXIANDEVICE;
public static String JIANGYULIANGEDEVICE;
public static String SHENLIULIANGDEVICE;
public static String GANTANDATA;
public static String KUSHUIWEIDATA;
public static String BIAOMIANWEIYIDATA;
public static String NEIBUWEIYIDATA;
public static String JINRUNXIANDATA;
public static String JIANGYULIANGDATA;
public static String SHENLIULIANGDATA;
public static String DEVICEDATA;
public static String WKKDATA;
public static String GANTANALARM;
public static String KUSHUIWEIALARM;
public static String BIAOMIANWEIYIALARM;
public static String NEIBUWEIYIALARM;
public static String JINRUNXIANALARM;
public static String JIANGYULIANGALARM;
public static String SHENLIULIANGALARM;
public static String DEVICEALARM;
public static String WKKALARM;
public static String DEVICEOFFLINE;
@Value("${basic.provinceno}")
public void setPROVINCENO(String PROVINCENO) {
StaticParamUtils.PROVINCENO = PROVINCENO;
}
@Value("${basic.wkkno}")
public void setWKKNO(String WKKNO) {
StaticParamUtils.WKKNO = WKKNO;
}
@Value("${device.gantanno}")
public void setGANTANNO(String GANTANNO) {
StaticParamUtils.GANTANNO = GANTANNO;
}
@Value("${device.kushuiweino}")
public void setKUSHUIWEINO(String KUSHUIWEINO) {
StaticParamUtils.KUSHUIWEINO = KUSHUIWEINO;
}
@Value("${device.biaomianweiyino}")
public void setBIAOMIANWEIYINO(String BIAOMIANWEIYINO) {
StaticParamUtils.BIAOMIANWEIYINO = BIAOMIANWEIYINO;
}
@Value("${device.neibuweiyino}")
public void setNEIBUWEIYINO(String NEIBUWEIYINO) {
StaticParamUtils.NEIBUWEIYINO = NEIBUWEIYINO;
}
@Value("${device.jinrunxianno}")
public void setJINRUNXIANNO(String JINRUNXIANNO) {
StaticParamUtils.JINRUNXIANNO = JINRUNXIANNO;
}
@Value("${device.jiangyuliangno}")
public void setJIANGYULIANGNO(String JIANGYULIANGNO) {
StaticParamUtils.JIANGYULIANGNO = JIANGYULIANGNO;
}
@Value("${device.shenliuliangno}")
public void setSHENLIULIANGNO(String SHENLIULIANGNO) {
StaticParamUtils.SHENLIULIANGNO = SHENLIULIANGNO;
}
@Value("${datatype.basicinfo}")
public void setBASICINFO(String BASICINFO) {
StaticParamUtils.BASICINFO = BASICINFO;
}
@Value("${datatype.gantandevice}")
public void setGANTANDEVICE(String GANTANDEVICE) {
StaticParamUtils.GANTANDEVICE = GANTANDEVICE;
}
@Value("${datatype.kushuiweidevice}")
public void setKUSHUIWEIDEVICE(String KUSHUIWEIDEVICE) {
StaticParamUtils.KUSHUIWEIDEVICE = KUSHUIWEIDEVICE;
}
@Value("${datatype.biaomianweiyidevice}")
public void setBIAOMIANWEIYIDEVICE(String BIAOMIANWEIYIDEVICE) {
StaticParamUtils.BIAOMIANWEIYIDEVICE = BIAOMIANWEIYIDEVICE;
}
@Value("${datatype.neibuweiyidevice}")
public void setNEIBUWEIYIDEVICE(String NEIBUWEIYIDEVICE) {
StaticParamUtils.NEIBUWEIYIDEVICE = NEIBUWEIYIDEVICE;
}
@Value("${datatype.jinrunxiandevice}")
public void setJINRUNXIANDEVICE(String JINRUNXIANDEVICE) {
StaticParamUtils.JINRUNXIANDEVICE = JINRUNXIANDEVICE;
}
@Value("${datatype.jiangyuliangdevice}")
public void setJIANGYULIANGEDEVICE(String JIANGYULIANGEDEVICE) {
StaticParamUtils.JIANGYULIANGEDEVICE = JIANGYULIANGEDEVICE;
}
@Value("${datatype.shenliuliangdevice}")
public void setSHENLIULIANGDEVICE(String SHENLIULIANGDEVICE) {
StaticParamUtils.SHENLIULIANGDEVICE = SHENLIULIANGDEVICE;
}
@Value("${datatype.gantandata}")
public void setGANTANDATA(String GANTANDATA) {
StaticParamUtils.GANTANDATA = GANTANDATA;
}
@Value("${datatype.kushuiweidata}")
public void setKUSHUIWEIDATA(String KUSHUIWEIDATA) {
StaticParamUtils.KUSHUIWEIDATA = KUSHUIWEIDATA;
}
@Value("${datatype.biaomianweiyidata}")
public void setBIAOMIANWEIYIDATA(String BIAOMIANWEIYIDATA) {
StaticParamUtils.BIAOMIANWEIYIDATA = BIAOMIANWEIYIDATA;
}
@Value("${datatype.neibuweiyidata}")
public void setNEIBUWEIYIDATA(String NEIBUWEIYIDATA) {
StaticParamUtils.NEIBUWEIYIDATA = NEIBUWEIYIDATA;
}
@Value("${datatype.jinrunxiandata}")
public void setJINRUNXIANDATA(String JINRUNXIANDATA) {
StaticParamUtils.JINRUNXIANDATA = JINRUNXIANDATA;
}
@Value("${datatype.jiangyuliangdata}")
public static void setJIANGYULIANGDATA(String JIANGYULIANGDATA) {
StaticParamUtils.JIANGYULIANGDATA = JIANGYULIANGDATA;
}
@Value("${datatype.shenliuliangdata}")
public void setSHENLIULIANGDATA(String SHENLIULIANGDATA) {
StaticParamUtils.SHENLIULIANGDATA = SHENLIULIANGDATA;
}
@Value("${datatype.devicedata}")
public void setDEVICEDATA(String DEVICEDATA) {
StaticParamUtils.DEVICEDATA = DEVICEDATA;
}
@Value("${datatype.wkkdata}")
public void setWKKDATA(String WKKDATA) {
StaticParamUtils.WKKDATA = WKKDATA;
}
@Value("${datatype.gantanalarm}")
public void setGANTANALARM(String GANTANALARM) {
StaticParamUtils.GANTANALARM = GANTANALARM;
}
@Value("${datatype.kushuiweialarm}")
public void setKUSHUIWEIALARM(String KUSHUIWEIALARM) {
StaticParamUtils.KUSHUIWEIALARM = KUSHUIWEIALARM;
}
@Value("${datatype.biaomianweiyialarm}")
public void setBIAOMIANWEIYIALARM(String BIAOMIANWEIYIALARM) {
StaticParamUtils.BIAOMIANWEIYIALARM = BIAOMIANWEIYIALARM;
}
@Value("${datatype.neibuweiyialarm}")
public void setNEIBUWEIYIALARM(String NEIBUWEIYIALARM) {
StaticParamUtils.NEIBUWEIYIALARM = NEIBUWEIYIALARM;
}
@Value("${datatype.jinrunxianalarm}")
public void setJINRUNXIANALARM(String JINRUNXIANALARM) {
StaticParamUtils.JINRUNXIANALARM = JINRUNXIANALARM;
}
@Value("${datatype.jiangyuliangalarm}")
public static void setJIANGYULIANGALARM(String JIANGYULIANGALARM) {
StaticParamUtils.JIANGYULIANGALARM = JIANGYULIANGALARM;
}
@Value("${datatype.shenliuliangalarm}")
public static void setSHENLIULIANGALARM(String SHENLIULIANGALARM) {
StaticParamUtils.SHENLIULIANGALARM = SHENLIULIANGALARM;
}
@Value("${datatype.devicealarm}")
public static void setDEVICEALARM(String DEVICEALARM) {
StaticParamUtils.DEVICEALARM = DEVICEALARM;
}
@Value("${datatype.wkkalarm}")
public static void setWKKALARM(String WKKALARM) {
StaticParamUtils.WKKALARM = WKKALARM;
}
@Value("${datatype.deviceoffline}")
public static void setDEVICEOFFLINE(String DEVICEOFFLINE) {
StaticParamUtils.DEVICEOFFLINE = DEVICEOFFLINE;
}
}
server:
port: 9999
spring:
datasource:
driver-class-name: org.postgresql.Driver
url: jdbc:postgresql://118.190.98.56:5432/MineTRS_fengshan
username: postgres
password: red___
rabbitmq:
host: 114.115.212.187
port: 5672
username: bhsoft
password: bhsoft
template:
retry:
enabled: true #代理链接丢失, 启动重试AMQPTemplate
initial-interval: 2s
virtual-host: /
publisher-confirms: true
publisher-returns: true
device: # 设备编码
gantanno: '140924005201'
kushuiweino: '140924005202'
biaomianweiyino: '140924005203'
neibuweiyino: '140924005204'
jinrunxianno: '140924005205'
jiangyuliangno: '140924005206'
shenliuliangno: '140924005207'
basic: # 基础信息
provinceno: '14'
wkkno: '1409240052'
datatype:
basicinfo: '0001'
gantandevice: '0101'
kushuiweidevice: '0201'
biaomianweiyidevice: '0301'
neibuweiyidevice: '0401'
jinrunxiandevice: '0501'
jiangyuliangdevice: '0601'
shenliuliangdevice: '0701'
gantandata: '0102'
kushuiweidata: '0202'
biaomianweiyidata: '0302'
neibuweiyidata: '0402'
jinrunxiandata: '0502'
jiangyuliangdata: '0602'
shenliuliangdata: '0702'
devicedata: '2002' # 检测项数据信息
wkkdata: '2102' # 尾矿库数据信息
gantanalarm: '0103'
kushuiweialarm: '0203'
biaomianweiyialarm: '0303'
neibuweiyialarm: '0403'
jinrunxianalarm: '0503'
jiangyuliangalarm: '0603' # 缺失
shenliuliangalarm: '0703'
devicealarm: '2003'
wkkalarm: '2103'
deviceoffline: '2202'
package com.amqp.shanxi;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ShanxiApplicationTests {
@Test
void contextLoads() {
}
}
server:
port: 9999
spring:
datasource:
driver-class-name: org.postgresql.Driver
url: jdbc:postgresql://118.190.98.56:5432/MineTRS_fengshan
username: postgres
password: red___
rabbitmq:
host: 114.115.212.187
port: 5672
username: bhsoft
password: bhsoft
template:
retry:
enabled: true #代理链接丢失, 启动重试AMQPTemplate
initial-interval: 2s
virtual-host: /
publisher-confirms: true
publisher-returns: true
device: # 设备编码
gantanno: '140924005201'
kushuiweino: '140924005202'
biaomianweiyino: '140924005203'
neibuweiyino: '140924005204'
jinrunxianno: '140924005205'
jiangyuliangno: '140924005206'
shenliuliangno: '140924005207'
basic: # 基础信息
provinceno: '14'
wkkno: '1409240052'
datatype:
basicinfo: '0001'
gantandevice: '0101'
kushuiweidevice: '0201'
biaomianweiyidevice: '0301'
neibuweiyidevice: '0401'
jinrunxiandevice: '0501'
jiangyuliangdevice: '0601'
shenliuliangdevice: '0701'
gantandata: '0102'
kushuiweidata: '0202'
biaomianweiyidata: '0302'
neibuweiyidata: '0402'
jinrunxiandata: '0502'
jiangyuliangdata: '0602'
shenliuliangdata: '0702'
devicedata: '2002' # 检测项数据信息
wkkdata: '2102' # 尾矿库数据信息
gantanalarm: '0103'
kushuiweialarm: '0203'
biaomianweiyialarm: '0303'
neibuweiyialarm: '0403'
jinrunxianalarm: '0503'
jiangyuliangalarm: '0603' # 缺失
shenliuliangalarm: '0703'
devicealarm: '2003'
wkkalarm: '2103'
deviceoffline: '2202'
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment